HTable取连接线程不安全的有关问题
HTable取连接线程不安全的问题
发生情景:
启动多个线程共用table连接,
HTable table = new HTable(conf, "tablename"); @Test public void insert_multithread() throws InterruptedException { for (int k = 1; k <= 10; k++){ final int j = k; new Thread(new Runnable() { public void run() { try { // Configuration conf = new Configuration(); // conf.set("hbase.zookeeper.quorum", "192.168.1.160"); // HTable table = new HTable(conf,"wf:error"); errorTable.setAutoFlushTo(false); long t1 = System.currentTimeMillis(); for (int i = 0; i < 10000; i++){ String uuid = UUID.randomUUID().toString().replaceAll("-", "").substring(0, 8); Put put = new Put(Bytes.toBytes( uuid + "_" +"2015070" + j )); put.add(fBytes,Bytes.toBytes("stacktrace"),Bytes.toBytes("java.io.IOException:file not found" + UUID.randomUUID().toString())); // puts.add(put); errorTable.put(put); if (i % 10000 == 0) { errorTable.flushCommits(); } } // table.close(); long t2 = System.currentTimeMillis(); System.out.println(Thread.currentThread() + ",t2-t1=" + (t2 - t1)); }catch (IOException e){ } } }).start(); } System.out.println("waiting....."); Thread.sleep(1000 * 60 * 10); System.out.println("completing......."); }
报错:
java.util.ConcurrentModificationException at java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:953) at java.util.LinkedList$ListItr.remove(LinkedList.java:919) at org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:319) at org.apache.hadoop.hbase.client.HTable.backgroundFlushCommits(HTable.java:965) at org.apache.hadoop.hbase.client.HTable.flushCommits(HTable.java:1281) at com.cobub.hbase.HbaseTest$1.run(HbaseTest.java:88) at java.lang.Thread.run(Thread.java:745)
分析:先将put进行放到BufferedMutatorImpl进行缓存,然后提交服务器进行插入,同时remove()一个put
Iterator<? extends Row> it = rows.iterator(); while (it.hasNext()) { Row r = it.next(); HRegionLocation loc = findDestLocation(r, posInList); if (loc == null) { // loc is null if there is an error such as meta not available. it.remove(); } else if (canTakeOperation(loc, regionIncluded, serverIncluded)) { Action<Row> action = new Action<Row>(r, ++posInList); setNonce(ng, r, action); retainedActions.add(action); addAction(loc, action, actionsByServer, ng); it.remove(); } }
解决:
每个线程实例化一个HTable:
HTable table = new HTable(conf,"wf:error");