Flink实践-读取hbase
讲述Flink操作hbase,非结构化数据存储。
hbase简介
Apache Hbase,大数据下不可不提的一个存储数据库,可能有些用redis,mongdb等,但是Hbase作为非结构化的数据库,可以存储多个版本数据,列族和列的设计,让大量数据的存储成为可能,非常适合存储大量数据,有些非结构化数据。并且在实践中,能够支撑起高吞吐量。
hbase的设计是可不删除数据数据的,更新数据也必须整条更新(可能说法有误),hbase的读取是顺序读取。更多参考可见官网。
hbase的安装
下面介绍在本地搭建一个hbase库,首先从官网下载自己环境所兼容的hbase版本号,安装在linux版本下,以下是在ubuntu-14.04版本安装的。
1,下载hbase-1.0.3,
2,配置文件1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35hbase-env.sh
添加java环境:export JAVA_HOME=/home/**/programs/jdk/jdk1.8.0_74
hbase-site.xml
添加如下配置信息:
<configuration>
<property>
<name>hbase.rootdir</name>
<value>/***/hbase-1.0.3/hbaseData</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>127.0.0.1</value>
</property>
<property>
<name>hbase.rest.port</name>
<value>8080</value>
</property>
<property>
<name>hbase.rest.readonly</name>
<value>true</value>
</property>
<property>
<name>hbase.rest.authentication.type</name>
<value>kerberos</value>
</property>
<property>
<name>hbase.rest.authentication.kerberos.principal</name>
<value>HTTP/_HOST@HADOOP.LOCALDOMAIN</value>
</property>
<property>
<name>hbase.rest.authentication.kerberos.keytab</name>
<value>$KEYTAB</value>
</property>
</configuration>
3,启动hbase,在安装目录下输入命令,start-hbase.sh,这需要开启ssh。
4,输入 hbase shell进入hbase的命令行,
5,一些hbase命令
1 | 修改表的存储期限 |
Flink读取hbase表
Flink操作hbase表与连接数据库操作一致,需要先进行连接,再进行操作。而一般在大数据环境下,没有像mysql那样有权限设置,角色设置,以保证数据库的安全性,大数据是分布式存储,一般能够连接master则可以获取到数据,这使得数据会不安全;但是,这可以通过隔离ip地址范围,限制访问。
大数据的数据安全一直是讨论的热点,对于权限的设置,目前也只能限制访问或者端口,做不到角色的配置。
Flink操作hbase,需经连接,获得表,读取行,返回值等步骤。
1,建立连接,需提供hbase的zookeeper地址,1
2
3
4
5
6lazy val conn = {
val conf = new Configuration()
conf.set("hbase.zookeeper","127.0.0.1:2181")
#如果是集群,可以更改
ConnectionFactory.createConnection(conf, executor)
}
2,读取某张表,1
2
3
4
5
6
7
8private def open[T](tableName: String)(f: Table => T): T = {
val table = conn.getTable(TableName.valueOf(tableName))
try {
f(table)
} finally {
table.close()
}
}
3,读取数据,读取一行,1
2
3
4
5
6
7
8
9
10
11
12
13
14def getRow(tableName: String, rowKey: String): Map[String, String] = {
val get = new Get(rowKey)
val result = open(tableName) { table =>
table.get(get)
}
Option(result.getFamilyMap(COLUMN_FAMILY)) match {
case Some(row) =>
val rowConv = row.asScala.map { case (k, v) =>
(k: String, v: String)
}.toMap
rowConv
case None => None.toMap
}
}
4,写入数据进表,1
2
3
4
5
6
7
8
9
10
11
12def write(tableName: String, rows: Iterable[Row]): Int = {
open(tableName) { table =>
val results = Array.fill[Object](rows.size)(null)
table.batch(rows.toSeq.asJava, results)
results.filter(_ != null).size
}
}
def write(tableName: String, row: Put): Unit = {
open(tableName) { table =>
table.put(row)
}
}
5,拷贝表,需建立拷贝的表名,1
2
3
4
5
6
7
8
9
10
11
12
13
14def copyHbaseTableToNew(tableName:String,newTableName:String):Unit={
val yestodayRecommed = fullScanWithRowkey(tableName)
val puts =yestodayRecommed.map{ x =>
val rowkey = x._1
val put = new Put(rowkey.getBytes)
val colums = x._2.keys
colums.foreach { colum =>
val value = x._2.get(colum).get
put.addColumn("cf", colum, value)
}
put
}
write(newTableName,puts)
}
经验总结
hbase虽然可以支持高吞吐,但是Flink是流式,如果对非常实时即ms秒需解决或者监控更新的话,也是可以的,国外已经试验过每秒50w次可承受,随着不断优化,支撑的并发或吞吐量更高;
但是,对于实时性要求不高的场景,使用Flink处理流式数据,则采用窗口处理机制比较高效,不会造成太多的读写。