hbase二级目录Observer
hbase二级索引Observer
使用必须是线性安全:
package coprocessor; import java.io.IOException; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; public class AISObserver extends BaseRegionObserver { private HTablePool pool = null; HTableInterface table1 = null; private final byte[] tablename = Bytes.toBytes("zchx_ais_index"); private final byte[] family = Bytes.toBytes("info"); private final byte[] q = Bytes.toBytes("index"); public void start(CoprocessorEnvironment env) throws IOException { // this.ShipAtable_index = new HTable(env.getConfiguration(), "zchx_ais_index"); pool = new HTablePool(env.getConfiguration(), 10); table1 = pool.getTable(tablename); table1.setAutoFlush(false); } @Override public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, boolean writeToWAL) throws IOException { byte[] row = put.getRow(); String rowkey = Bytes.toString(row); if(rowkey.length()==24){ StringBuilder sb = new StringBuilder(); sb.append(rowkey.substring(10, 24)); String type = rowkey.substring(9,10); sb.append(rowkey.substring(0, 9)); Put secPut = new Put(Bytes.toBytes(sb.toString())); secPut.add(family,q,row); if(type.equals("a")){ table1.put(secPut); } // this.table.flushCommits(); sb = null; } } @Override public void stop(CoprocessorEnvironment env) throws IOException { pool.close(); } }