lucene治理IndexReader和IndexWriter的最佳实践
lucene管理IndexReader和IndexWriter的最佳实践
实例化IndexReader需要加载索引文件,所以实例化它是非常耗资源的。
IndexReader是线程安全的,通常一个索引目录,我们只实例化一个IndexReader就够了。
当索引数据足够大(G的数量级)的时候,一般把索引资源按照某种规则散列到多个文件目录里(如:index-0,index-1,index-2.... 或者 blog,posts....),当然这些文件目录应该放在同一个根目录下---这时,最好的方式就是用一个Pool去维护这些IndexReader:保证一个文件目录只有一个实例,且不同的IndexReader可以根据名字动态的组合。
StandardIndexReaderPool.java
IndexWriter也需要Pool来管理
StandardIndexWriterPool.java
人家说的是多个索引目录
实例化IndexReader需要加载索引文件,所以实例化它是非常耗资源的。
IndexReader是线程安全的,通常一个索引目录,我们只实例化一个IndexReader就够了。
当索引数据足够大(G的数量级)的时候,一般把索引资源按照某种规则散列到多个文件目录里(如:index-0,index-1,index-2.... 或者 blog,posts....),当然这些文件目录应该放在同一个根目录下---这时,最好的方式就是用一个Pool去维护这些IndexReader:保证一个文件目录只有一个实例,且不同的IndexReader可以根据名字动态的组合。
StandardIndexReaderPool.java
package com.qiu.search.pool.impl; import java.io.File; import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexReader; import org.apache.lucene.store.FSDirectory; import org.springframework.util.Assert; import com.dukuai.search.exception.PoolException; import com.dukuai.search.pool.IndexReaderPool; import com.dukuai.search.util.IndexResourceUtil; import com.spinn3r.log5j.Logger; /** * {@link IndexReaderPool}的实现类。{@link StandardIndexReaderPool}确保一个目录至多只有一个{@link IndexReader},它是线程安全的, * {@link IndexReader}也是线程安全 * */ public class StandardIndexReaderPool implements IndexReaderPool { private static final Logger LOG = Logger.getLogger(StandardIndexReaderPool.class); /** 低版本的IndexReader的存活时间 5s */ private static final int STALE_INDEXREADER_SURVIVAL_TIME = 5000; private String name = null; // 索引文件的根目录的路径 private String indexRootDirectory = null; // 索引文件目录名列表,目录名不包含路径 private List<String> indexDirNameList = null; /** * 存放IndexReader的Map,Map里存放的都是已经实例化好的IndexReader */ private final Map<String, IndexReader> indexReaderMap = new ConcurrentHashMap<String, IndexReader>(); /** * 待关闭的IndexReader。indexReader.reopen()之后,会产生新的IndexReader。但是旧的IndexReader有可能还被其他线程调用着。 * 旧的IndexReader都要放置到staleIndexReadersMap里,5秒之后再释放资源。 */ private final Map<Long, IndexReader> staleIndexReadersMap = new ConcurrentHashMap<Long, IndexReader>(); @Override public void setIndexDirNameList(List<String> indexDirNameList) { this.indexDirNameList = indexDirNameList; } public void init() { LOG.info("%s begin initialize", getName()); for (String indexDirName : indexDirNameList) { try { IndexReader indexReader = createIndexReader(indexDirName); if (indexReader != null) indexReaderMap.put(indexDirName, indexReader); } catch (IOException e) {// 若初始化时出错,就直接抛错,终止程序再执行下去 throw new PoolException(e); } } LOG.info("%s initialization complete", getName()); } /** * 根据indexDirPath,返回IndexReader。 * * @param indexDirName 文件目录名 * @return IndexReader */ public IndexReader getIndexReader(String indexDirName) { Assert.hasText(indexDirName, "this indexDirName must not be empty"); IndexReader indexReader = indexReaderMap.get(indexDirName); if (indexReader != null) return refreshIndexReader(indexDirName, indexReader); synchronized (indexReaderMap) { if (!indexReaderMap.containsKey(indexDirName)) { try { indexReader = createIndexReader(indexDirName); } catch (CorruptIndexException e) { LOG.error("CorruptIndexException while creating IndexReader of %s,the root cause is %s", indexDirName, e.getMessage()); } catch (IOException e) { LOG.error("IOException while creating IndexReader of %s,%s", indexDirName, e.getMessage()); } if (indexReader != null) indexReaderMap.put(indexDirName, indexReader); } } return indexReaderMap.get(indexDirName); } /** * 刷新指定的indexReader--加载新的索引数据,若产生新的indexReader,则在indexReaderMap里替换旧的indexReader * * @param indexDirName * @param indexReader * @return {@link IndexReader} */ private synchronized IndexReader refreshIndexReader(String indexDirName, IndexReader indexReader) { try { closeStaleIndexReaders(staleIndexReadersMap); LOG.debug("hashCode of indexReader is %s", indexReader.hashCode()); IndexReader newIndexReader = indexReader.reopen(); if (newIndexReader != indexReader) { // this indexReader are old version IndexReader oldIndexReader = indexReader; /** * may be this oldIndexReader was invoke by other thread,so put * oldIndexReader to staleIndexReadersMap,closing it after 5s; */ staleIndexReadersMap.put(System.currentTimeMillis(), oldIndexReader); LOG.debug("hashCode of oldIndexReader is %s", oldIndexReader.hashCode()); // replace old version IndexReader with newIndexReader indexReaderMap.put(indexDirName, newIndexReader); LOG.debug("hashCode of newIndexReader is %s", newIndexReader.hashCode()); } } catch (Exception e) { LOG.error("Exception while getting IndexReader of %s,the root cause is %s", indexDirName, e.getMessage()); } // return newest IndexReader return indexReaderMap.get(indexDirName); } /** * 关闭所有低版本的IndexReaders * * @param staleIndexReadersMap */ private void closeStaleIndexReaders(Map<Long, IndexReader> staleIndexReadersMap) { Iterator<Entry<Long, IndexReader>> entryIterator = staleIndexReadersMap.entrySet().iterator(); while (entryIterator.hasNext()) { Entry<Long, IndexReader> entry = entryIterator.next(); if ((System.currentTimeMillis() - entry.getKey()) >= STALE_INDEXREADER_SURVIVAL_TIME) { try { entry.getValue().close(); LOG.debug("a stale IndexReader whose hashCode is %s has bean closed", entry.getValue().hashCode()); } catch (IOException e) { LOG.error("IOException while colsing IndexReader,%s", e.getMessage()); } finally { entryIterator.remove(); LOG.debug("delete a stale IndexReader from pool,hashCode:" + entry.getValue().hashCode()); } } } } public void destroy() { Iterator<Entry<String, IndexReader>> iterator = indexReaderMap.entrySet().iterator(); while (iterator.hasNext()) { Entry<String, IndexReader> entry = iterator.next(); IndexReader indexReader = entry.getValue(); try { indexReader.close(); indexReader = null; } catch (IOException e) { LOG.info("IOException while closing IndexReader whose indexDirName is %s", entry.getKey()); } } indexReaderMap.clear(); LOG.info("%s destroyed", getName()); } /** * 根据索引目录名实例化{@link IndexReader},有可能返回null,调用者需要判断返回的{@link IndexReader}是否为null * * @param indexDirName * @return {@link IndexReader} * 返回indexDirName对应的IndexReader,如果对应的目录不存在就返回null, */ private IndexReader createIndexReader(String indexDirName) throws CorruptIndexException, IOException { File indexFile = new File(IndexResourceUtil.getDirPath(indexRootDirectory, indexDirName)); if (IndexResourceUtil.isEmptyIndexDir(indexFile)) { LOG.warn("%s is empty,no index resource", indexDirName); return null; } if (indexFile.exists() && indexFile.isDirectory()) {// 判断索引目录是否存在。 return IndexReader.open(FSDirectory.getDirectory(indexFile)); } return null; } public int size() { return indexReaderMap.size(); } @Override public String toString() { return (new ToStringBuilder(this).append("name", getName()).append("indexRootDirectory", indexRootDirectory) .append("size", size()).append("indexReader Set", indexReaderMap.keySet())).toString(); } public String getName() { return name; } /** spring inject */ public void setIndexRootDirectory(String indexRootDirectory) { this.indexRootDirectory = indexRootDirectory; } public void setName(String name) { this.name = name; } }
IndexWriter也需要Pool来管理
StandardIndexWriterPool.java
package com.dukuai.search.pool.impl; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexWriter; import com.dukuai.search.exception.PoolException; import com.dukuai.search.pool.IndexWriterPool; import com.dukuai.search.util.IndexResourceUtil; import com.dukuai.search.util.MetisUtil; import com.spinn3r.log5j.Logger; /** * <code>IndexWriterPool</code>的实现类。<code>StandardIndexWriterPool</code>是线程安全的 public class StandardIndexWriterPool implements IndexWriterPool { private static final Logger LOG = Logger.getLogger(); /** * 索引优化后文件段的数量,数量越大,优化效率月到 */ private static final int DEFAULT_MAX_NUM_SEGMENTS = 2; private String indexRootDirectory = null; private String name = null; /** * 索引优化后块的数量,数字越大优化速度越快、优化效果越不显著。 */ private int maxNumSegments = DEFAULT_MAX_NUM_SEGMENTS; /** * 存放IndexWriter的map */ private Map<String, IndexWriter> indexWriterMap = new ConcurrentHashMap<String, IndexWriter>(); private List<String> indexDirNameList = null; @Override public void setIndexDirNameList(List<String> indexDirNameList) { this.indexDirNameList = indexDirNameList; } /** * <code>StandardIndexWriterPool</code>的初始化,预加载<code>IndexWriter</code>。 */ public void init() { LOG.info("%s begin initialize", getName()); synchronized (indexWriterMap) { for (String indexDirName : indexDirNameList) { indexWriterMap.put(indexDirName, createIndexWriter(indexDirName)); } } LOG.info("%s initialization complete", getName()); } /** * 返回一个indexWriter,indexWriter是线程安全的,允许多个线程同时使用IndexWriter。但一个索引目录只能初始化一个IndexWriter * * @param indexDirName * @return IndexWriter */ public IndexWriter getIndexWriter(String indexDirName) { if (!indexWriterMap.containsKey(indexDirName)) { synchronized (indexWriterMap) { if (!indexWriterMap.containsKey(indexDirName)) { indexWriterMap.put(indexDirName, createIndexWriter(indexDirName)); LOG.info("added a new IndexWriter whose name is %s to pool,the pool size:%s", indexDirName, size()); } } } return indexWriterMap.get(indexDirName); } /** * 创建一个新的IndexWriter,不允许多个线程同时调用,因为方法是私有的,能确保不会同时被调用,所以就免去锁了。 * * @param indexDirName * @return {@link IndexWriter} */ private IndexWriter createIndexWriter(String indexDirName) { final String indexDirPath = getIndexDirPath(indexDirName); boolean create = IndexResourceUtil.isEmptyIndexDir(indexDirPath); try { return new IndexWriter(indexDirPath, new StandardAnalyzer(), create, IndexWriter.MaxFieldLength.LIMITED); } catch (Exception e) { throw new PoolException(e.getMessage()); } } /** * 提交索引,只有提交的索引才能被检索的到。 见{@link IndexWriter#commit()} */ public void commit() { LOG.info("begin to commit all IndexWiters of pool,the pool size:%s", size()); synchronized (indexWriterMap) { Iterator<Entry<String, IndexWriter>> iterator = indexWriterMap.entrySet().iterator(); while (iterator.hasNext()) { Entry<String, IndexWriter> entry = iterator.next(); IndexWriter indexWriter = entry.getValue(); try { indexWriter.commit(); } catch (Exception e) { LOG.error("exception while commiting pending updates,indexDir:%s,exception:%s", entry.getKey(), e .getMessage()); destoryIndexWriter(iterator, indexWriter); } } } LOG.info("%s IndexWiters had committed pending updates", size()); } /** * 优化索引,提升检索速度。另注意事项见{@link IndexWriter#optimize} */ public void optimize() { LOG.info("begin to optimize at %s", MetisUtil.getCurrentDisplayFormatTime()); synchronized (indexWriterMap) { Iterator<Entry<String, IndexWriter>> iterator = indexWriterMap.entrySet().iterator(); while (iterator.hasNext()) { Entry<String, IndexWriter> entry = iterator.next(); IndexWriter indexWriter = entry.getValue(); try { indexWriter.commit(); indexWriter.optimize(maxNumSegments); } catch (Exception e) { LOG.error("Exception while optimizing %s,the root cause:%s", entry.getKey(), e.getMessage()); destoryIndexWriter(iterator, indexWriter); } } } LOG.info("end optimize at %s", MetisUtil.getCurrentDisplayFormatTime()); } /** * 重新加载所有的{@link IndexWriter},{@link IndexWriter}不会及时释放哪些在创建索引过程中产生的索引文件碎片,哪怕哪些索引文件已经消失。 * {@link #reload()}就是为了释放哪些文件句柄,防止进程持有过多的文件句柄。 */ public void reload() { LOG.info("begin to reload %s at %s", name, MetisUtil.getCurrentDisplayFormatTime()); // 需要重新加载的索引目录列表 List<String> indexDirNameList = new ArrayList<String>(); synchronized (indexWriterMap) { Iterator<Entry<String, IndexWriter>> iterator = indexWriterMap.entrySet().iterator(); while (iterator.hasNext()) { Entry<String, IndexWriter> entry = iterator.next(); indexDirNameList.add(entry.getKey()); IndexWriter indexWriter = entry.getValue(); try { indexWriter.commit(); } catch (Exception e) { LOG.error("Exception while commiting %s,the root cause:%s", entry.getKey(), e.getMessage()); } finally { destoryIndexWriter(iterator, indexWriter); } } for (String indexDirName : indexDirNameList) { indexWriterMap.put(indexDirName, createIndexWriter(indexDirName)); } } LOG.info("%s reload end at %s", name, MetisUtil.getCurrentDisplayFormatTime()); } /** * 销毁指定的{@link IndexWriter} */ private void destoryIndexWriter(Iterator<Entry<String, IndexWriter>> iterator, IndexWriter indexWriter) { try { indexWriter.close(); } catch (CorruptIndexException e) { LOG.error("CorruptIndexException while closing indexWriter,the root cause:%s", e.getMessage()); } catch (IOException e) { LOG.error("IOException while closing indexWriter,the root cause:%s", e.getMessage()); } iterator.remove(); LOG.info("destory a indexWriter,current pool's size:%s", size()); } /** * 销毁{@link StandardIndexWriterPool},释放持有的资源。 */ public void destroy() { synchronized (indexWriterMap) { Iterator<Entry<String, IndexWriter>> iterator = indexWriterMap.entrySet().iterator(); while (iterator.hasNext()) { Entry<String, IndexWriter> entry = iterator.next(); IndexWriter indexWriter = entry.getValue(); try { indexWriter.commit(); indexWriter.close(); } catch (Exception e) { LOG.error("Exception while closing %s,the root cause:%s", entry.getKey(), e.getMessage()); destoryIndexWriter(iterator, indexWriter); } } indexWriterMap = null; LOG.info("%s destoryed", getName()); } } private String getIndexDirPath(String indexDirName) { return (new StringBuffer(indexRootDirectory).append(File.separatorChar).append(indexDirName)).toString(); } public int size() { return this.indexWriterMap.size(); } public String getName() { return name; } @Override public String toString() { ToStringBuilder builder = new ToStringBuilder(this); builder.append("name", this.name); builder.append("indexRootDirectory", this.indexRootDirectory); builder.append("size", this.size()); builder.append("IndexWriter Set", indexWriterMap.keySet()); return builder.toString(); } /** spring inject */ public void setName(String name) { this.name = name; } public void setIndexRootDirectory(String indexRootDirectory) { this.indexRootDirectory = indexRootDirectory; } public void setMaxNumSegments(int maxNumSegments) { this.maxNumSegments = maxNumSegments; } }
1 楼
swprogrammer
2010-01-14
朋友,能加你QQ我们聊下吗?我有些问题想请教下你。
我的QQ是136516594
我的QQ是136516594
2 楼
illu
2010-02-05
这篇文章真的很不错 我学到了很多
3 楼
千秋万世
2010-04-09
很好,非常感谢!
4 楼
千秋万世
2010-04-12
我使用你的方法,不过一位仁兄说了看过epoll就知道我过时了,第一次设计这个,不知道他说的是否更加有效,博主如果清楚,忘回复
5 楼
luckaway
2010-04-12
epoll我不了解,更不知道原理
6 楼
千秋万世
2010-04-12
嗯,同样感谢!
7 楼
whiletrue
2012-03-20
indexReader和indexWriter都是线程安全的,用pool有什么意义?
8 楼
promzaid
2012-07-10
whiletrue 写道
indexReader和indexWriter都是线程安全的,用pool有什么意义?
人家说的是多个索引目录