Content here
转载来源
1 2 3
| 作者:牧梦者 链接:https://www.cnblogs.com/swordfall/p/10301707.html 来源:博客园
|
HBase 读写的方式概况
主要分为:
- 纯 Java API 读写 HBase 的方式
- Spark 读写 HBase 的方式
- Flink 读写 HBase 的方式
- HBase 通过 Phoenix 读写的方式
第一种方式是 HBase 自身提供的比较原始的高效操作方式,而第二、第三则分别是 Spark、Flink 集成 HBase 的方式,最后一种是第三方插件 Phoenix 集成的 JDBC 方式,Phoenix 集成的 JDBC 操作方式也能在 Spark、Flink 中调用
注意:使用HBase2.1.2版本,以下代码都是基于该版本开发的。
连接 HBase
这里我们采用静态方式连接HBase,不同于2.1.2之前的版本,无需创建HBase线程池,HBase2.1.2提供的代码已经封装好,只需创建调用即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| /** * 声明静态配置 */ static Configuration conf = null; static Connection conn = null; static { conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "hadoop01,hadoop02,hadoop03"); conf.set("hbase.zookeeper.property.client", "2181"); try{ conn = ConnectionFactory.createConnection(conf); }catch (Exception e){ e.printStackTrace(); } }
|
创建 HBase 的表
创建HBase表,是通过Admin来执行的,表和列簇则是分别通过TableDescriptorBuilder和ColumnFamilyDescriptorBuilder来构建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| /** * 创建只有一个列簇的表 * @throws Exception */ public static void createTable() throws Exception{ Admin admin = conn.getAdmin(); if (!admin.tableExists(TableName.valueOf("test"))){ TableName tableName = TableName.valueOf("test"); //表描述器构造器 TableDescriptorBuilder tdb = TableDescriptorBuilder.newBuilder(tableName); //列族描述器构造器 ColumnFamilyDescriptorBuilder cdb = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("user")); //获得列描述器 ColumnFamilyDescriptor cfd = cdb.build(); //添加列族 tdb.setColumnFamily(cfd); //获得表描述器 TableDescriptor td = tdb.build(); //创建表 admin.createTable(td); }else { System.out.println("表已存在"); } //关闭连接 }
|
HBase 表添加数据
通过put api来添加数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| /** * 添加数据(多个rowKey,多个列族) * @throws Exception */ public static void insertMany() throws Exception{ Table table = conn.getTable(TableName.valueOf("test")); List<Put> puts = new ArrayList<Put>(); Put put1 = new Put(Bytes.toBytes("rowKey1")); put1.addColumn(Bytes.toBytes("user"), Bytes.toBytes("name"), Bytes.toBytes("wd"));
Put put2 = new Put(Bytes.toBytes("rowKey2")); put2.addColumn(Bytes.toBytes("user"), Bytes.toBytes("age"), Bytes.toBytes("25"));
Put put3 = new Put(Bytes.toBytes("rowKey3")); put3.addColumn(Bytes.toBytes("user"), Bytes.toBytes("weight"), Bytes.toBytes("60kg"));
Put put4 = new Put(Bytes.toBytes("rowKey4")); put4.addColumn(Bytes.toBytes("user"), Bytes.toBytes("sex"), Bytes.toBytes("男"));
puts.add(put1); puts.add(put2); puts.add(put3); puts.add(put4); table.put(puts); table.close(); }
|
删除 HBase 的列簇或列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| /** * 根据rowKey删除一行数据、或者删除某一行的某个列簇,或者某一行某个列簇某列 * @param tableName * @param rowKey * @throws Exception */ public static void deleteData(TableName tableName, String rowKey, String rowKey, String columnFamily, String columnName) throws Exception{ Table table = conn.getTable(tableName); Delete delete = new Delete(Bytes.toBytes(rowKey)); //①根据rowKey删除一行数据 table.delete(delete); //②删除某一行的某一个列簇内容 delete.addFamily(Bytes.toBytes(columnFamily)); //③删除某一行某个列簇某列的值 delete.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName)); table.close(); }
|
更新 HBase 表的列
使用Put api直接替换掉即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| /** * 根据RowKey , 列簇, 列名修改值 * @param tableName * @param rowKey * @param columnFamily * @param columnName * @param columnValue * @throws Exception */ public static void updateData(TableName tableName, String rowKey, String columnFamily, String columnName, String columnValue) throws Exception{ Table table = conn.getTable(tableName); Put put1 = new Put(Bytes.toBytes(rowKey)); put1.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), Bytes.toBytes(columnValue)); table.put(put1); table.close(); }
|
HBase 查询
HBase查询分为get、scan、scan和filter结合。filter过滤器又分为RowFilter(rowKey过滤器)、SingleColumnValueFilter(列值过滤器)、ColumnPrefixFilter(列名前缀过滤器)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| /** * 根据rowKey查询数据 * @param tableName * @param rowKey * @throws Exception */ public static void getResult(TableName tableName, String rowKey) throws Exception{ Table table = conn.getTable(tableName); //获得一行 Get get = new Get(Bytes.toBytes(rowKey)); Result set = table.get(get); Cell[] cells = set.rawCells(); for (Cell cell: cells){ System.out.println(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "::" + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } table.close(); }
//过滤器 LESS < LESS_OR_EQUAL <= EQUAL = NOT_EQUAL <> GREATER_OR_EQUAL >= GREATER > NO_OP 排除所有
/** * @param tableName * @throws Exception */ public static void scanTable(TableName tableName) throws Exception{ Table table = conn.getTable(tableName); //①全表扫描 Scan scan1 = new Scan(); ResultScanner rscan1 = table.getScanner(scan1); //②rowKey过滤器 Scan scan2 = new Scan(); //str$ 末尾匹配,相当于sql中的 %str ^str开头匹配,相当于sql中的str% RowFilter filter = new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("Key1$")); scan2.setFilter(filter); ResultScanner rscan2 = table.getScanner(scan2); //③列值过滤器 Scan scan3 = new Scan(); //下列参数分别为列族,列名,比较符号,值 SingleColumnValueFilter filter3 = new SingleColumnValueFilter(Bytes.toBytes("author"), Bytes.toBytes("name"), CompareOperator.EQUAL, Bytes.toBytes("spark")); scan3.setFilter(filter3); ResultScanner rscan3 = table.getScanner(scan3); //列名前缀过滤器 Scan scan4 = new Scan(); ColumnPrefixFilter filter4 = new ColumnPrefixFilter(Bytes.toBytes("name")); scan4.setFilter(filter4); ResultScanner rscan4 = table.getScanner(scan4); //过滤器集合 Scan scan5 = new Scan(); FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ALL); SingleColumnValueFilter filter51 = new SingleColumnValueFilter(Bytes.toBytes("author"), Bytes.toBytes("name"), CompareOperator.EQUAL, Bytes.toBytes("spark")); ColumnPrefixFilter filter52 = new ColumnPrefixFilter(Bytes.toBytes("name")); list.addFilter(filter51); list.addFilter(filter52); scan5.setFilter(list); ResultScanner rscan5 = table.getScanner(scan5); for (Result rs : rscan){ String rowKey = Bytes.toString(rs.getRow()); System.out.println("row key :" + rowKey); Cell[] cells = rs.rawCells(); for (Cell cell: cells){ System.out.println(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) + "::" + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "::" + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } System.out.println("-------------------------------------------"); } }
|
闲扯
现工作使用的是HBase2.1版本的集群,Java 客户端连接 HBase 与之前不一样了