【搜索引擎】BerkeleyDB实现行列数据库

【搜索引擎】BerkeleyDB实现队列数据库

在使用爬虫爬取URL时,我们总会要使用到队列这一数据结构,在示例里,使用java写一个队列类可以解决此问题,但这种队列存储的数据只能存储在内存中,一旦断电,所有数据全部清空,下次又得重来。所以,这种队列不能用于解决问题,我们必须实现一个能够持久化数据的队列。

下面是我用Berkeley DB实现的一个队列,BerkeleyDB是一个内存嵌入式数据库,当内存中存储的数据大于它缓冲区大小时,它就会把数据自动持久化到磁盘中。

Berkeley DB使用键值对存储方式,所以我就用了Java里的BigInteger作为key,而URL作为value进行存储。key采用递增方式,BigInteger可以实现在1后面加上几千个0,完全符合URL数量庞大的要求。

队列会维护两个队头和队尾两个BigInteger,分别保存此时队列的头部值和尾部值;出队操作时,会删除头部保存的数据,并将头部值加1;入队时,会将数据添加进来,并将尾部值加1;size用于返回队列长度;此外,还有first、current、next、prev、last等几个游标操作,用于遍历队列。

队列的实现基于我以前封装的MyBerkeleyDB类,只有几个简单的API,使用起来很方便,这也算是代码重用了。下面是代码:

package com.nosql;

import java.math.BigInteger;

/*********************************
 * 使用BerkeleyDB封装了一些数据库操作
 * 包括设置缓冲区,设置编码,设置数据可库
 * 路径,存储键值对,根据键查找值,关闭数
 * 据库等操作。
 * @author Administrator
 *********************************/
public class MyBerkeleyDBQueue {

    private MyBerkeleyDB database;                      //数据库
    private static final BigInteger 
        BigIntegerIncrement = BigInteger.valueOf(1);    //key值的递增值

    private BigInteger head;            //队列头部
    private BigInteger tail;            //队列尾部
    private BigInteger current;         //用于遍历数据库的当前位置

    private static final String headString = "head";
    private static final String tailString = "tail";

    public MyBerkeleyDBQueue(){
        database = new MyBerkeleyDB();
    }

    //初始化数据库
    public void open(String dbName){
        database.setEnvironment(database.getPath(),database.getChacheSize());
        database.open(dbName);//打开数据库
        head = (BigInteger) database.get("head");
        tail = (BigInteger) database.get("tail");
        if(head == null||tail == null){
            head = BigInteger.valueOf(0);
            tail = BigInteger.valueOf(-1);
            database.put(headString, head);
            database.put(tailString, tail);
        }
        current = BigInteger.valueOf(head.longValue());
    }

    //设置编码
    public void setCharset(String charset)
    {
        database.setCharset(charset);
    }

    //设置路径
    public void setPath(String path){
        database.setPath(path);
    }

    //设置缓冲区大小
    public boolean setChacheSize(long size){
        return database.setChacheSize(size);
    }

    //入队
    public void enQueue(Object value){
        if(value == null)
            return;
        tail = tail.add(MyBerkeleyDBQueue.BigIntegerIncrement);
        database.put(tailString, tail);
        database.put(tail, value);  //入队,队尾加1
    }

    //出队
    public Object deQueue(){
        Object value = database.del(head);  //获取队头元素并将其删除
        if(value != null){
            head = head.add(BigIntegerIncrement);
            database.put(headString, head);
        }
        return value;
    }

    //队头值
    public Object head(){
        return head;
    }

    //队尾值
    public Object tail(){
        return tail;
    }

    //关闭
    public void close()
    {
        this.database.close();
    }

    //获取数据库存储数据的大小
    public long size()
    {
        return database.size()-2;
    }

    //得到当前的游标值
    public Object current(){
        return database.get(current);
    }
    //得到第一个游标值
    public Object first(){
        current = BigInteger.valueOf(head.longValue());
        return current();
    }
    //得到第一个游标值
    public Object last(){
        current = BigInteger.valueOf(tail.longValue());
        return current();
    }

    //得到下一个游标值
    public Object next(){
        if(current.compareTo(tail)<0){
            current = current.add(BigIntegerIncrement);
            return current();
        }
        return null;
    }
    //得到上一个游标值
    public Object prev(){
        if(current.compareTo(head)>0)
        {
            current = current.divide(BigIntegerIncrement);
            return current();
        }
        return null;
    }
}

队头值和队尾值是通过String/BigInteger这一键值对存储的,而URL和key是采用BigInteger/String键值对存储的(为了重用,代码里全是Object,这里说明一下是为了更好的理解),所以size函数返回队列长度时会减去一个2,就是队头值和队尾值。

下面一个功能测试文件:

package com.test;

import java.math.BigInteger;

import com.nosql.MyBerkeleyDBQueue;

public class Test_MyBerkeleyDBQueue {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        MyBerkeleyDBQueue queue = new MyBerkeleyDBQueue();
        queue.setPath("WebRoot\\data\\db\\queue");
        queue.open("queue");

        System.out.println("头:"+queue.head());
        System.out.println("尾:"+queue.tail());
        System.out.println("大小"+queue.size());
        System.out.println("===================");

        for(int i=0;i<10;i++){
            queue.enQueue(i);
            System.out.println("头:"+queue.head());
            System.out.println("尾:"+queue.tail());
            System.out.println("大小:"+queue.size());
            System.out.println("===================");
        }

        //游标测试
        System.out.println("第一个元素:"+queue.first());
        System.out.println("最后一个元素:"+queue.last());
        long size1 = queue.size()-1;//减掉第一个元素
        System.out.println(queue.first());//将游标重置为0
        while(size1-- >0){
            System.out.println(queue.next());
        }

        System.out.println("===================");
        System.out.println("大小:"+queue.size());
        System.out.println("===================");
        long size = queue.size()+3; //出队的数量多于队列元素总数量时,会输出null
        for(int i=0;i<size;i++){
            System.out.println("删除:"+queue.deQueue());
            System.out.println("头:"+queue.head());
            System.out.println("尾:"+queue.tail());
            System.out.println("大小"+queue.size());
            System.out.println("===================");
        }

        queue.close();
    }

}