sparksql读写hbase 内置过滤器的使用  

  1         //写入hbase(hfile方式)
  2         org.apache.hadoop.hbase.client.Connection conn = null;
  3         try {
  4              SparkLog.debug("开始读取hbase信息...");
  5             if (StringUtils.isNotBlank(type) && type.equalsIgnoreCase("hbase")) {
  6                 SparkLog.debug("==========================================");
  7                 String hbasetable = dict.getStringItem("table", "");
  8                 String hbase_site_path = dict.getStringItem("path_site", "");
  9                 String hfile_path = dict.getStringItem("hfile_path", "");
 10                 Configuration  conf = ss.sparkContext().hadoopConfiguration(); 11                 
 12                 if (StringUtils.isBlank(hbase_site_path)) {
 13                     SparkLog.warn("参数配置错误,未配置hbase-site信息!");
 14                 }
 15                 if (StringUtils.isNotBlank(hbase_site_path)) {
 16                     hbase_site_path = hbase_site_path + (hbase_site_path.contains("hbase-site.xml") ? "" : "/hbase-site.xml");
 17                     conf.addResource(new Path(hbase_site_path));
 18                 }
 19                 
 20                 SparkLog.debug("读取hbase信息完成");
 21                 conf.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, "60000");
 22                 if (!P_Spark.delHDFSDir(hfile_path)) {
 23                     return TCResult.newFailureResult("SPARK_ERROR", "删除旧文件失败");
 24                 }
 25                 conf.setInt("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", 1024);
 26                 SparkLog.debug(conf);
 27                 
 28                 SparkLog.debug("创建hbase的链接...");
 29                 // 创建hbase的链接,利用默认的配置文件,实际上读取的hbase的master地址
 30                 conn = ConnectionFactory.createConnection(conf);
 31                 
 32                 SparkLog.debug("开始生成hfile文件...");
 33                 //必须按照key排序
 34                 data.flatMapToPair(new PairFlatMapFunction<Row, ImmutableBytesWritable, KeyValue>() {
 35 
 36                 private static final long serialVersionUID = -8033772725296906227L;
 37     
 38                 @Override
 39                 public Iterator<Tuple2<ImmutableBytesWritable, KeyValue>> call(Row s) throws Exception {
 40                         byte[] rowkey = Bytes.toBytes((new SimpleDateFormat("yyyyMMddHHmmss.sss")).format(System.currentTimeMillis())+Math.random()*100); 
 41                         List<Tuple2<ImmutableBytesWritable,KeyValue>> cols = new ArrayList<>();
 42                         byte[] family = Bytes.toBytes("fm");
 43                         
 44                         String hostname =s.get(0)==null ? "":s.getString(0);
 45                         String request_date =s.get(1)==null ? "":s.getString(1);
 46                         String post_id=s.get(2)==null ? "":Integer.toString(s.getInt(2));
 47                         String title=s.get(3)==null ? "":s.getString(3);
 48                         String author=s.get(4)==null ? "":s.getString(4);
 49                         String country=s.get(5)==null ? "":s.getString(5);
 50                         String category=s.get(6)==null ? "":s.getString(6);
 51                         
 52 
 53                         //String filds[] =s.schema().fieldNames();//获取列名
 54 
 55 
 56                         //列名需要排序,按顺序加入
 57                         cols.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey, family, "author".getBytes(),  Bytes.toBytes(author))));
 58                         cols.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey, family, "category".getBytes(),  Bytes.toBytes(category))));
 59                         cols.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey, family, "country".getBytes(),  Bytes.toBytes(country))));
 60                         cols.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey, family, "hostname".getBytes(),  Bytes.toBytes(hostname))));
 61                          cols.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey, family, "post_id".getBytes(),  Bytes.toBytes(post_id))));
 62                         cols.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey, family, "request_date".getBytes(),  Bytes.toBytes(request_date))));
 63                         cols.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey, family, "title".getBytes(),  Bytes.toBytes(title))));
 64                         return cols.iterator();
 65                 } 
 66             }).sortByKey().saveAsNewAPIHadoopFile(hfile_path, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, conf);
 67                 SparkLog.debug("生成hfile文件成功");
 68                 LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);
 69                 // 根据表名获取表
 70                 SparkLog.debug("根据表名获取表...");
 71                 Table table = conn.getTable(TableName.valueOf(hbasetable));
 72                 SparkLog.debug(table.toString());
 73 
 74                 // 获取hbase表的region分布
 75                 SparkLog.debug("获取hbase表的region分布...");
 76                 RegionLocator regionLocator = conn.getRegionLocator(TableName
 77                         .valueOf(hbasetable));
 78                 // 创建一个hadoop的mapreduce的job
 79                 Job job = Job.getInstance(conf);
 80                 // 设置job名称
 81                 job.setJobName("DumpFile");
 82                 // 此处最重要,需要设置文件输出的key,因为我们要生成HFil,所以outkey要用ImmutableBytesWritable
 83                 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
 84                 // 输出文件的内容KeyValue
 85                 job.setMapOutputValueClass(KeyValue.class);
 86                 // 配置HFileOutputFormat2的信息
 87                 HFileOutputFormat2.configureIncrementalLoad(job, table,
 88                         regionLocator);
 89 
 90                 // 开始导入
 91                 SparkLog.debug("开始导入...");
 92                 load.doBulkLoad(new Path(hfile_path), conn.getAdmin(), table,
 93                         regionLocator);
 94                 // load.doBulkLoad(new Path(path),new , table,regionLocator);
 95                 // load.doBulkLoad(new Path(path), (HTable)table);这个目前也可用
 96                 table.close();
 97 
 98             }
 99 
100         } catch (Throwable e) {
101             return TCResult.newFailureResult("SPARK_ERROR", e);
102         } finally {
103             try {
104                 conn.close();
105             } catch (Throwable e) {
106                 return TCResult.newFailureResult("SPARK_ERROR", e);
107             }
108         }

 1         //读取hbase
 2         Configuration  conf = sc.hadoopConfiguration(); 
3
if (null != hbase_site_path) { 4 hbase_site_path = hbase_site_path.contains("hbase-site.xml") ? hbase_site_path : hbase_site_path 5 + "/hbase-site.xml"; 6 conf.addResource(new Path(hbase_site_path)); 7 } else { 8 if(zn_parent == null || zn_parent.equals("")){ 9 zn_parent="/hbase"; 10 } 11 conf.set("hbase.zookeeper.quorum", quorum); 12 conf.set("hbase.zookeeper.property.clientPort", zkport); 13 conf.set("zookeeper.znode.parent", zn_parent); 14 15 } 16 conf.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, "60000"); 17 JavaRDD<String> javardd = null; 18 try { 19 conf.set(TableInputFormat.INPUT_TABLE, tablename); 20 JavaPairRDD<ImmutableBytesWritable, Result> hbRDD = sc 21 .newAPIHadoopRDD(conf, TableInputFormat.class, 22 ImmutableBytesWritable.class, Result.class); 23 24 javardd = hbRDD.values().map(new Function<Result, String>(){ 25 26 /** 27 * 28 */ 29 private static final long serialVersionUID = 1L; 30 31 @Override 32 public String call(Result r) throws Exception { 33 // TODO 自动生成的方法存根 34 String s = ""; 35 for (Cell cell : r.rawCells()) { 36 s += "Rowkey:" 37 + Bytes.toString(CellUtil.cloneRow(cell)) 38 + ",column=" 39 + Bytes.toString(CellUtil.cloneFamily(cell)) 40 + ":" 41 + Bytes.toString( 42 CellUtil.cloneQualifier(cell)) 43 .replaceAll("Quilifier:", "") 44 + ",timestamp=" + cell.getTimestamp() 45 + ",value:" 46 + Bytes.toString(CellUtil.cloneValue(cell)); 47 } 48 return s; 49 } 50 51 }); 52 SparkLog.debug("hbase table records num = " + javardd.count()); 53 54 } catch (Throwable e) { 55 return TCResult.newFailureResult("SPARK_ERROR",e); 56 }
//hbase读取方式2(saveAsNewAPIHadoopDataset && put方式)
String hbasetable = dict.getStringItem("table", "");
                String hbase_site_path = dict.getStringItem("path_site", "");
                //String hfile_path = dict.getStringItem("hfile_path", "");
                Configuration  conf = ss.sparkContext().hadoopConfiguration();
                
                
                if (StringUtils.isBlank(hbase_site_path)) {
                    SparkLog.warn("参数配置错误,未配置hbase-site信息!");
                }
                if (StringUtils.isNotBlank(hbase_site_path)) {
                    hbase_site_path = hbase_site_path + (hbase_site_path.contains("hbase-site.xml") ? "" : "/hbase-site.xml");
                    conf.addResource(new Path(hbase_site_path));
                }
                
                SparkLog.debug("读取hbase信息完成");
                conf.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, "60000");
                conf.set(TableOutputFormat.OUTPUT_TABLE, hbasetable);


                Job job = Job.getInstance(conf);
                // 设置job名称
                job.setJobName("DumpFile");
                // 此处最重要,需要设置文件输出的key,因为我们要生成HFil,所以outkey要用ImmutableBytesWritable
                job.setMapOutputKeyClass(ImmutableBytesWritable.class);
                // 输出文件的内容Result
                job.setMapOutputValueClass(Put.class);
                
                job.setOutputFormatClass((TableOutputFormat.class));
                 
                data.mapToPair(new PairFunction<Row, ImmutableBytesWritable, Put>() {

                private static final long serialVersionUID = -8033772725296906227L;
    
                @Override
                public Tuple2<ImmutableBytesWritable, Put> call(Row s) throws Exception {
                        byte[] rowkey = Bytes.toBytes((new SimpleDateFormat("yyyyMMddHHmmss.sss")).format(System.currentTimeMillis())+Math.random()*100); 
                    
                        byte[] family = Bytes.toBytes("fm");
                        
                        String hostname =s.get(0)==null ? "":s.getString(0);
                        String request_date =s.get(1)==null ? "":s.getString(1);
                        String post_id=s.get(2)==null ? "":Integer.toString(s.getInt(2));
                        String title=s.get(3)==null ? "":s.getString(3);
                        String author=s.get(4)==null ? "":s.getString(4);
                        String country=s.get(5)==null ? "":s.getString(5);
                        String category=s.get(6)==null ? "":s.getString(6);
                        

                        Put put = new Put(rowkey);
                
                        put.addImmutable(family,Bytes.toBytes("hostname"),Bytes.toBytes(hostname));
                        put.addImmutable(family,Bytes.toBytes("request_date"),Bytes.toBytes(request_date));
                        put.addImmutable(family,Bytes.toBytes("post_id"),Bytes.toBytes(post_id));
                        put.addImmutable(family,Bytes.toBytes("title"),Bytes.toBytes(title));
                        put.addImmutable(family,Bytes.toBytes("author"),Bytes.toBytes(author));
                        put.addImmutable(family,Bytes.toBytes("country"),Bytes.toBytes(country));
                        put.addImmutable(family,Bytes.toBytes("category"),Bytes.toBytes(category));
                         
                       return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(rowkey),put);

                      
                } 
            }).saveAsNewAPIHadoopDataset(job.getConfiguration());
 1            //创建hbase表结构
 2             org.apache.hadoop.hbase.client.Connection conn = null;
 3         try {
 4 
 5             //创建hbase的链接,利用默认的配置文件,实际上读取的hbase的master地址
 6             conn = ConnectionFactory.createConnection(conf);
 7             // 根据表名获取表
 8             HBaseAdmin admin =(HBaseAdmin) conn.getAdmin();
 9 
10             if (!admin.tableExists(tablename)) {  
11                   SparkLog.info("Table Not Exists! Create Table"); 
12                   HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tablename));
13                   String fms[] =Col_family.split(",");
14                   for(int i=0;i<fms.length;i++){
15                       tableDesc.addFamily(new HColumnDescriptor(fms[i].getBytes()));
16                   }
17                   admin.createTable(tableDesc); 
18                 }else{
19                      SparkLog.info("Table  Exists!  Do not Create Table"); 
20                 }  
21         }catch (Throwable e) {
22             return TCResult.newFailureResult("SPARK_ERROR", e);
23         } finally {
24             try {
25                 conn.close();
26             } catch (Throwable e) {
27                 return TCResult.newFailureResult("SPARK_ERROR", e);
28             }
29         }
 1 // 添加一条数据  
 2 public static void addRow(String tableName, String rowKey, String columnFamily, String column, String value)   
 3         throws IOException {  
 4     // 建立一个数据库的连接  
 5     Connection conn = ConnectionFactory.createConnection(conf);  
 6     // 获取表  
 7     HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
 8     // 通过rowkey创建一个put对象  
 9     Put put = new Put(Bytes.toBytes(rowKey));  
10     // 在put对象中设置列族、列、值  
11     put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));  
12     // 插入数据,可通过put(List<Put>)批量插入  
13     table.put(put);  
14     // 关闭资源  
15     table.close();  
16     conn.close();  
17 }
// 通过rowkey获取一条数据  
public static void getRow(String tableName, String rowKey) throws IOException {  
    // 建立一个数据库的连接  
    Connection conn = ConnectionFactory.createConnection(conf);  
    // 获取表  
    HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
    // 通过rowkey创建一个get对象  
    Get get = new Get(Bytes.toBytes(rowKey));  
    // 输出结果  
    Result result = table.get(get);  
    for (Cell cell : result.rawCells()) {  
        System.out.println(  
                "行键:" + new String(CellUtil.cloneRow(cell)) + "	" +  
                "列族:" + new String(CellUtil.cloneFamily(cell)) + "	" +   
                "列名:" + new String(CellUtil.cloneQualifier(cell)) + "	" +   
                "值:" + new String(CellUtil.cloneValue(cell)) + "	" +  
                "时间戳:" + cell.getTimestamp());  
    }  
    // 关闭资源  
    table.close();  
    conn.close();  
}
 1 // 全表扫描  
 2     public static void scanTable(String tableName) throws IOException {  
 3         // 建立一个数据库的连接  
 4         Connection conn = ConnectionFactory.createConnection(conf);  
 5         // 获取表  
 6         HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
 7         // 创建一个扫描对象  
 8         Scan scan = new Scan();  
 9         // 扫描全表输出结果  
10         ResultScanner results = table.getScanner(scan);  
11         for (Result result : results) {  
12             for (Cell cell : result.rawCells()) {  
13                 System.out.println(  
14                         "行键:" + new String(CellUtil.cloneRow(cell)) + "	" +  
15                         "列族:" + new String(CellUtil.cloneFamily(cell)) + "	" +   
16                         "列名:" + new String(CellUtil.cloneQualifier(cell)) + "	" +   
17                         "值:" + new String(CellUtil.cloneValue(cell)) + "	" +  
18                         "时间戳:" + cell.getTimestamp());  
19             }  
20         }  
21         // 关闭资源  
22         results.close();  
23         table.close();  
24         conn.close();  
25 }
 1 // 删除一条数据  
 2 public static void delRow(String tableName, String rowKey) throws IOException {  
 3     // 建立一个数据库的连接  
 4     Connection conn = ConnectionFactory.createConnection(conf);  
 5     // 获取表  
 6     HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
 7     // 删除数据  
 8     Delete delete = new Delete(Bytes.toBytes(rowKey));  
 9     table.delete(delete);  
10     // 关闭资源  
11     table.close();  
12     conn.close();  
13 }
 1 // 删除多条数据  
 2 public static void delRows(String tableName, String[] rowkeys) throws IOException {  
 3     // 建立一个数据库的连接  
 4     Connection conn = ConnectionFactory.createConnection(conf);  
 5     // 获取表  
 6     HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
 7     // 删除多条数据  
 8     List<Delete> list = new ArrayList<Delete>();  
 9     for (String row : rowkeys) {  
10         Delete delete = new Delete(Bytes.toBytes(row));  
11         list.add(delete);  
12     }  
13     table.delete(list);  
14     // 关闭资源  
15     table.close();  
16     conn.close();  
17 }
 1 // 删除列族  
 2 public static void delColumnFamily(String tableName, String columnFamily) throws IOException {  
 3     // 建立一个数据库的连接  
 4     Connection conn = ConnectionFactory.createConnection(conf);  
 5     // 创建一个数据库管理员  
 6     HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();  
 7     // 删除一个表的指定列族  
 8     hAdmin.deleteColumn(tableName, columnFamily);  
 9     // 关闭资源  
10     conn.close();  
11 }
 1 // 删除数据库表  
 2 public static void deleteTable(String tableName) throws IOException {  
 3     // 建立一个数据库的连接  
 4     Connection conn = ConnectionFactory.createConnection(conf);  
 5     // 创建一个数据库管理员  
 6     HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();  
 7     if (hAdmin.tableExists(tableName)) {  
 8         // 失效表  
 9         hAdmin.disableTable(tableName);  
10         // 删除表  
11         hAdmin.deleteTable(tableName);  
12         System.out.println("删除" + tableName + "表成功");  
13         conn.close();  
14     } else {  
15         System.out.println("需要删除的" + tableName + "表不存在");  
16         conn.close();  
17         System.exit(0);  
18     }  
19 }
 1 // 追加插入(将原有value的后面追加新的value,如原有value=a追加value=bc则最后的value=abc)  
 2 public static void appendData(String tableName, String rowKey, String columnFamily, String column, String value)   
 3         throws IOException {  
 4     // 建立一个数据库的连接  
 5     Connection conn = ConnectionFactory.createConnection(conf);  
 6     // 获取表  
 7     HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
 8     // 通过rowkey创建一个append对象  
 9     Append append = new Append(Bytes.toBytes(rowKey));  
10     // 在append对象中设置列族、列、值  
11     append.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));  
12     // 追加数据  
13     table.append(append);  
14     // 关闭资源  
15     table.close();  
16     conn.close();  
17 }
 1 // 符合条件后添加数据(只能针对某一个rowkey进行原子操作)  
 2 public static boolean checkAndPut(String tableName, String rowKey, String columnFamilyCheck, String columnCheck, String valueCheck, String columnFamily, String column, String value) throws IOException {  
 3     // 建立一个数据库的连接  
 4     Connection conn = ConnectionFactory.createConnection(conf);  
 5     // 获取表  
 6     HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
 7     // 设置需要添加的数据  
 8     Put put = new Put(Bytes.toBytes(rowKey));  
 9     put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));  
10     // 当判断条件为真时添加数据  
11     boolean result = table.checkAndPut(Bytes.toBytes(rowKey), Bytes.toBytes(columnFamilyCheck),   
12             Bytes.toBytes(columnCheck), Bytes.toBytes(valueCheck), put);  
13     // 关闭资源  
14     table.close();  
15     conn.close();  
16       
17     return result;  
18 }
 1 // 符合条件后刪除数据(只能针对某一个rowkey进行原子操作)  
 2 public static boolean checkAndDelete(String tableName, String rowKey, String columnFamilyCheck, String columnCheck,   
 3         String valueCheck, String columnFamily, String column) throws IOException {  
 4     // 建立一个数据库的连接  
 5     Connection conn = ConnectionFactory.createConnection(conf);  
 6     // 获取表  
 7     HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
 8     // 设置需要刪除的delete对象  
 9     Delete delete = new Delete(Bytes.toBytes(rowKey));  
10     delete.addColumn(Bytes.toBytes(columnFamilyCheck), Bytes.toBytes(columnCheck));  
11     // 当判断条件为真时添加数据  
12     boolean result = table.checkAndDelete(Bytes.toBytes(rowKey), Bytes.toBytes(columnFamilyCheck), Bytes.toBytes(columnCheck),   
13             Bytes.toBytes(valueCheck), delete);  
14     // 关闭资源  
15     table.close();  
16     conn.close();  
17   
18     return result;  
19 }
 1 // 计数器(amount为正数则计数器加,为负数则计数器减,为0则获取当前计数器的值)  
 2 public static long incrementColumnValue(String tableName, String rowKey, String columnFamily, String column, long amount)   
 3         throws IOException {  
 4     // 建立一个数据库的连接  
 5     Connection conn = ConnectionFactory.createConnection(conf);  
 6     // 获取表  
 7     HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
 8     // 计数器  
 9     long result = table.incrementColumnValue(Bytes.toBytes(rowKey), Bytes.toBytes(columnFamily), Bytes.toBytes(column), amount);  
10     // 关闭资源  
11     table.close();  
12     conn.close();  
13       
14     return result;  
15 }

HBase为筛选数据提供了一组过滤器,通过这个过滤器可以在HBase中数据的多个维度(行、列、数据版本)上进行对数据的筛选操作,也就是说过滤器最终能够筛选的数据能够细化到具体的一个存储单元格上(由行键、列名、时间戳定位)。通常来说,通过行键、值来筛选数据的应用场景较多。需要说明的是,过滤器会极大地影响查询效率。所以,在数据量较大的数据表中,应尽量避免使用过滤器。

下面介绍一些常用的HBase内置过滤器的用法:

1、RowFilter:筛选出匹配的所有的行。使用BinaryComparator可以筛选出具有某个行键的行,或者通过改变比较运算符(下面的例子中是CompareFilter.CompareOp.EQUAL)来筛选出符合某一条件的多条数据,如下示例就是筛选出行键为row1的一行数据。

1 // 筛选出匹配的所有的行  
2 Filter rf = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("row1")));   

2、PrefixFilter:筛选出具有特定前缀的行键的数据。这个过滤器所实现的功能其实也可以由RowFilter结合RegexComparator来实现,不过这里提供了一种简便的使用方法,如下示例就是筛选出行键以row为前缀的所有的行。

// 筛选匹配行键的前缀成功的行  
Filter pf = new PrefixFilter(Bytes.toBytes("row"));  

3、KeyOnlyFilter:这个过滤器唯一的功能就是只返回每行的行键,值全部为空,这对于只关注于行键的应用场景来说非常合适,这样忽略掉其值就可以减少传递到客户端的数据量,能起到一定的优化作用。

// 返回所有的行键,但值全是空  
Filter kof = new KeyOnlyFilter();  

4、RandomRowFilter:按照一定的几率(<=0会过滤掉所有的行,>=1会包含所有的行)来返回随机的结果集,对于同样的数据集,多次使用同一个RandomRowFilter会返回不同的结果集,对于需要随机抽取一部分数据的应用场景,可以使用此过滤器。

// 随机选出一部分的行  
Filter rrf = new RandomRowFilter((float) 0.8);     

5、InclusiveStopFilter:扫描的时候,我们可以设置一个开始行键和一个终止行键,默认情况下,这个行键的返回是前闭后开区间,即包含起始行,但不包含终止行。如果我们想要同时包含起始行和终止行,那么可以使用此过滤器。

// 包含了扫描的上限在结果之内  
Filter isf = new InclusiveStopFilter(Bytes.toBytes("row1"));   

6、FirstKeyOnlyFilter:如果想要返回的结果集中只包含第一列的数据,那么这个过滤器能够满足要求。它在找到每行的第一列之后会停止扫描,从而使扫描的性能也得到了一定的提升。

// 筛选出每行的第一个单元格  
Filter fkof = new FirstKeyOnlyFilter();     

7、ColumnPrefixFilter:它按照列名的前缀来筛选单元格,如果我们想要对返回的列的前缀加以限制的话,可以使用这个过滤器。

// 筛选出前缀匹配的列  
Filter cpf = new ColumnPrefixFilter(Bytes.toBytes("qual1"));    

8、ValueFilter:按照具体的值来筛选单元格的过滤器,这会把一行中值不能满足的单元格过滤掉,如下面的构造器,对于每一行的一个列,如果其对应的值不包含ROW2_QUAL1,那么这个列就不会返回给客户端。

// 筛选某个(值的条件满足的)特定的单元格  
Filter vf = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("ROW2_QUAL1")); 

9、ColumnCountGetFilter:这个过滤器在遇到一行的列数超过我们所设置的限制值的时候,结束扫描操作。

// 如果突然发现一行中的列数超过设定的最大值时,整个扫描操作会停止  
Filter ccf = new ColumnCountGetFilter(2);    

10、SingleColumnValueFilter:用一列的值决定这一行的数据是否被过滤,可对它的对象调用setFilterIfMissing方法,默认的参数是false。其作用是,对于咱们要使用作为条件的列,如果参数为true,这样的行将会被过滤掉,如果参数为false,这样的行会包含在结果集中。

sparksql读写hbase
内置过滤器的使用
 
// 将满足条件的列所在的行过滤掉  
SingleColumnValueFilter scvf = new SingleColumnValueFilter(    
•          Bytes.toBytes("colfam1"),     
•          Bytes.toBytes("qual2"),     
•          CompareFilter.CompareOp.NOT_EQUAL,     
•          new SubstringComparator("BOGUS"));    
scvf.setFilterIfMissing(true);  
sparksql读写hbase
内置过滤器的使用
 

11、SingleColumnValueExcludeFilter:这个过滤器与第10种过滤器唯一的区别就是,作为筛选条件的列,其行不会包含在返回的结果中。

12、SkipFilter:这是一种附加过滤器,其与ValueFilter结合使用,如果发现一行中的某一列不符合条件,那么整行就会被过滤掉。

// 发现某一行中的一列需要过滤时,整个行就会被过滤掉  
Filter skf = new SkipFilter(vf);  

13、WhileMatchFilter:使用这个过滤器,当遇到不符合设定条件的数据的时候,整个扫描结束。

// 当遇到不符合过滤器rf设置的条件时,整个扫描结束  
Filter wmf = new WhileMatchFilter(rf);  

14. FilterList:可以用于综合使用多个过滤器。其有两种关系: Operator.MUST_PASS_ONE表示关系AND,Operator.MUST_PASS_ALL表示关系OR,并且FilterList可以嵌套使用,使得我们能够表达更多的需求。

// 综合使用多个过滤器,AND和OR两种关系  
List<Filter> filters = new ArrayList<Filter>();    
filters.add(rf);    
filters.add(vf);    
FilterList fl = new FilterList(FilterList.Operator.MUST_PASS_ALL,filters);  

下面给出一个使用RowFilter过滤器的完整示例:

sparksql读写hbase
内置过滤器的使用
 
public class HBaseFilter {  
      
    private static final String TABLE_NAME = "table1";  
  
    public static void main(String[] args) throws IOException {  
        // 设置配置  
        Configuration conf = HBaseConfiguration.create();  
        conf.set("hbase.zookeeper.quorum", "localhost");  
        conf.set("hbase.zookeeper.property.clientPort", "2181");  
        // 建立一个数据库的连接  
        Connection conn = ConnectionFactory.createConnection(conf);  
        // 获取表  
        HTable table = (HTable) conn.getTable(TableName.valueOf(TABLE_NAME));  
        // 创建一个扫描对象  
        Scan scan = new Scan();  
        // 创建一个RowFilter过滤器  
        Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("abc")));  
        // 将过滤器加入扫描对象  
        scan.setFilter(filter);  
        // 输出结果  
        ResultScanner results = table.getScanner(scan);  
        for (Result result : results) {  
            for (Cell cell : result.rawCells()) {  
                System.out.println(  
                        "行键:" + new String(CellUtil.cloneRow(cell)) + "	" +  
                        "列族:" + new String(CellUtil.cloneFamily(cell)) + "	" +   
                        "列名:" + new String(CellUtil.cloneQualifier(cell)) + "	" +   
                        "值:" + new String(CellUtil.cloneValue(cell)) + "	" +  
                        "时间戳:" + cell.getTimestamp());  
            }  
        }  
        // 关闭资源  
        results.close();  
        table.close();  
        conn.close();  
          
    }  
  
}   
sparksql读写hbase
内置过滤器的使用