一种简略无锁队列的实现
一种简单无锁队列的实现
下面是测试代码:
Disruptor是内存无锁并发框架,基于一个环数组作为缓冲,详见Disruptor-1.0。
下面是自己设计的一个简易版,目前没有发现存在冲突或错误的测试用例。大家可以一起测试下。
package tianshui.lockfree.queue; import java.io.Serializable; import java.lang.reflect.Array; import java.util.concurrent.atomic.AtomicInteger; /** * 并发生产者和消费者数不能多于queue的length,默认是2^16 * @author 天水 * @date 2013-4-9 上午11:42:10 */ public class RingBuffer<T> implements Serializable { /** * */ private static final long serialVersionUID = 6976960108708949038L; private volatile AtomicInteger head; private volatile AtomicInteger tail; private int length; final T EMPTY = null; private volatile T[] queue; public RingBuffer(Class<T> type, int length){ this.head = new AtomicInteger(0); this.tail = new AtomicInteger(0); this.length = length == 0 ? 2 << 16 : length; // 默认2^16 this.queue = (T[]) Array.newInstance(type, this.length); } public void enQueue(T t){ if(t == null) t= (T) new Object(); // 阻塞 -- 避免多生成者循环生产同一个节点 while(this.getTail() - this.getHead() >= this.length); int ctail = this.tail.getAndIncrement(); while(this.queue[this.getTail(ctail)] != EMPTY); // 自旋 this.queue[this.getTail(ctail)] = t; } public T deQueue(){ T t = null; // 阻塞 -- 避免多消费者循环消费同一个节点 while(this.head.get() >= this.tail.get()); int chead = this.head.getAndIncrement(); while(this.queue[this.getHead(chead)] == EMPTY); // 自旋 t = this.queue[this.getHead(chead)]; this.queue[this.getHead(chead)] = EMPTY; return t; } public int getHead(int index){ return index & (this.length - 1); } public int getTail(int index) { return index & (this.length - 1); } public int getHead() { return head.get() & (this.length - 1); } public int getTail() { return tail.get() & (this.length - 1); } public T[] getQueue() { return queue; } public int getLength() { return length; } public void setLength(int length) { this.length = length; } }
下面是测试代码:
package tianshui.lockfree.queue; import java.util.concurrent.atomic.AtomicInteger; /** * @author 天水 * @date 2013-4-9 下午04:13:29 */ public class TestBuffer { public static AtomicInteger index = new AtomicInteger(0); public static void main(String[] args){ int tCount = 10; // thread count int length = 0; // buffer length -> 2^16 final RingBuffer<Integer> buffer = new RingBuffer<Integer>(Integer.class, length); // provider Runnable pr = new Runnable(){ @Override public void run() { while(true){ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } int tindex = index.getAndIncrement(); buffer.enQueue(tindex); System.out.println("buffer enQueue: " + tindex); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }; // consumer Runnable cr = new Runnable(){ @Override public void run() { while(true){ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } Integer cindex = buffer.deQueue(); System.out.println("buffer deQueue: " + cindex); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }; Thread[] pt = new Thread[tCount]; Thread[] ct = new Thread[tCount]; for(int i=0; i<tCount; i++){ ct[i] = new Thread(cr); ct[i].start(); } for(int i=0; i<tCount; i++){ pt[i] = new Thread(pr); pt[i].start(); } } }
- 1楼MN11201117昨天 12:01
- 求证算法是否能实现?