0%

HBase 读写 Java API

Content here


转载来源

1
2
3
作者:牧梦者
链接:https://www.cnblogs.com/swordfall/p/10301707.html
来源:博客园

HBase 读写的方式概况

主要分为:

  • 纯 Java API 读写 HBase 的方式
  • Spark 读写 HBase 的方式
  • Flink 读写 HBase 的方式
  • HBase 通过 Phoenix 读写的方式

第一种方式是 HBase 自身提供的比较原始的高效操作方式,而第二、第三则分别是 Spark、Flink 集成 HBase 的方式,最后一种是第三方插件 Phoenix 集成的 JDBC 方式,Phoenix 集成的 JDBC 操作方式也能在 Spark、Flink 中调用

注意:使用HBase2.1.2版本,以下代码都是基于该版本开发的。

连接 HBase

这里我们采用静态方式连接HBase,不同于2.1.2之前的版本,无需创建HBase线程池,HBase2.1.2提供的代码已经封装好,只需创建调用即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 声明静态配置
*/
static Configuration conf = null;
static Connection conn = null;
static {
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "hadoop01,hadoop02,hadoop03");
conf.set("hbase.zookeeper.property.client", "2181");
try{
conn = ConnectionFactory.createConnection(conf);
}catch (Exception e){
e.printStackTrace();
}
}

创建 HBase 的表

创建HBase表,是通过Admin来执行的,表和列簇则是分别通过TableDescriptorBuilder和ColumnFamilyDescriptorBuilder来构建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 创建只有一个列簇的表
* @throws Exception
*/
public static void createTable() throws Exception{
Admin admin = conn.getAdmin();
if (!admin.tableExists(TableName.valueOf("test"))){
TableName tableName = TableName.valueOf("test");
//表描述器构造器
TableDescriptorBuilder tdb = TableDescriptorBuilder.newBuilder(tableName);
//列族描述器构造器
ColumnFamilyDescriptorBuilder cdb = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("user"));
//获得列描述器
ColumnFamilyDescriptor cfd = cdb.build();
//添加列族
tdb.setColumnFamily(cfd);
//获得表描述器
TableDescriptor td = tdb.build();
//创建表
admin.createTable(td);
}else {
System.out.println("表已存在");
}
//关闭连接
}

HBase 表添加数据

通过put api来添加数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 添加数据(多个rowKey,多个列族)
* @throws Exception
*/
public static void insertMany() throws Exception{
Table table = conn.getTable(TableName.valueOf("test"));
List<Put> puts = new ArrayList<Put>();
Put put1 = new Put(Bytes.toBytes("rowKey1"));
put1.addColumn(Bytes.toBytes("user"), Bytes.toBytes("name"), Bytes.toBytes("wd"));

Put put2 = new Put(Bytes.toBytes("rowKey2"));
put2.addColumn(Bytes.toBytes("user"), Bytes.toBytes("age"), Bytes.toBytes("25"));

Put put3 = new Put(Bytes.toBytes("rowKey3"));
put3.addColumn(Bytes.toBytes("user"), Bytes.toBytes("weight"), Bytes.toBytes("60kg"));

Put put4 = new Put(Bytes.toBytes("rowKey4"));
put4.addColumn(Bytes.toBytes("user"), Bytes.toBytes("sex"), Bytes.toBytes("男"));

puts.add(put1);
puts.add(put2);
puts.add(put3);
puts.add(put4);
table.put(puts);
table.close();
}

删除 HBase 的列簇或列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 根据rowKey删除一行数据、或者删除某一行的某个列簇,或者某一行某个列簇某列
* @param tableName
* @param rowKey
* @throws Exception
*/
public static void deleteData(TableName tableName, String rowKey, String rowKey, String columnFamily, String columnName) throws Exception{
Table table = conn.getTable(tableName);
Delete delete = new Delete(Bytes.toBytes(rowKey));
//①根据rowKey删除一行数据
table.delete(delete);

//②删除某一行的某一个列簇内容
delete.addFamily(Bytes.toBytes(columnFamily));

//③删除某一行某个列簇某列的值
delete.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName));
table.close();
}

更新 HBase 表的列

使用Put api直接替换掉即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 根据RowKey , 列簇, 列名修改值
* @param tableName
* @param rowKey
* @param columnFamily
* @param columnName
* @param columnValue
* @throws Exception
*/
public static void updateData(TableName tableName, String rowKey, String columnFamily, String columnName, String columnValue) throws Exception{
Table table = conn.getTable(tableName);
Put put1 = new Put(Bytes.toBytes(rowKey));
put1.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), Bytes.toBytes(columnValue));
table.put(put1);
table.close();
}

HBase 查询

HBase查询分为get、scan、scan和filter结合。filter过滤器又分为RowFilter(rowKey过滤器)、SingleColumnValueFilter(列值过滤器)、ColumnPrefixFilter(列名前缀过滤器)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**
* 根据rowKey查询数据
* @param tableName
* @param rowKey
* @throws Exception
*/
public static void getResult(TableName tableName, String rowKey) throws Exception{
Table table = conn.getTable(tableName);
//获得一行
Get get = new Get(Bytes.toBytes(rowKey));
Result set = table.get(get);
Cell[] cells = set.rawCells();
for (Cell cell: cells){
System.out.println(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "::" +
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}
table.close();
}

//过滤器 LESS < LESS_OR_EQUAL <= EQUAL = NOT_EQUAL <> GREATER_OR_EQUAL >= GREATER > NO_OP 排除所有

/**
* @param tableName
* @throws Exception
*/
public static void scanTable(TableName tableName) throws Exception{
Table table = conn.getTable(tableName);

//①全表扫描
Scan scan1 = new Scan();
ResultScanner rscan1 = table.getScanner(scan1);

//②rowKey过滤器
Scan scan2 = new Scan();
//str$ 末尾匹配,相当于sql中的 %str ^str开头匹配,相当于sql中的str%
RowFilter filter = new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("Key1$"));
scan2.setFilter(filter);
ResultScanner rscan2 = table.getScanner(scan2);

//③列值过滤器
Scan scan3 = new Scan();
//下列参数分别为列族,列名,比较符号,值
SingleColumnValueFilter filter3 = new SingleColumnValueFilter(Bytes.toBytes("author"), Bytes.toBytes("name"),
CompareOperator.EQUAL, Bytes.toBytes("spark"));
scan3.setFilter(filter3);
ResultScanner rscan3 = table.getScanner(scan3);

//列名前缀过滤器
Scan scan4 = new Scan();
ColumnPrefixFilter filter4 = new ColumnPrefixFilter(Bytes.toBytes("name"));
scan4.setFilter(filter4);
ResultScanner rscan4 = table.getScanner(scan4);

//过滤器集合
Scan scan5 = new Scan();
FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ALL);
SingleColumnValueFilter filter51 = new SingleColumnValueFilter(Bytes.toBytes("author"), Bytes.toBytes("name"),
CompareOperator.EQUAL, Bytes.toBytes("spark"));
ColumnPrefixFilter filter52 = new ColumnPrefixFilter(Bytes.toBytes("name"));
list.addFilter(filter51);
list.addFilter(filter52);
scan5.setFilter(list);
ResultScanner rscan5 = table.getScanner(scan5);

for (Result rs : rscan){
String rowKey = Bytes.toString(rs.getRow());
System.out.println("row key :" + rowKey);
Cell[] cells = rs.rawCells();
for (Cell cell: cells){
System.out.println(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) + "::"
+ Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "::"
+ Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}
System.out.println("-------------------------------------------");
}
}

闲扯

现工作使用的是HBase2.1版本的集群,Java 客户端连接 HBase 与之前不一样了