SynchronousQueue的疑点
SynchronousQueue的疑问
缓冲队列采用 SynchronousQueue,它将任务直接交给线程处理而不保持它们。如果不存在可用于立即运行任务的线程(即线程池中的线程都在工作),则试图把任务加入缓冲队列将会失败,因此会构造一个新的线程来处理新添加的任务,并将其加入到线程池中。直接提交通常要求无界 maximumPoolSizes(Integer.MAX_VALUE) 以避免拒绝新提交的任务。newCachedThreadPool采用的便是这种策略。
这是网上找的关于SynchronousQueue的一段话,我的疑问有2个:
1. 他说的“缓冲队列采用 SynchronousQueue,它将任务直接交给线程处理而不保持它们”,这个不保持它们是什么意思,SynchronousQueue这个缓冲队列到底里面会存放任务吗,什么情况会存放,又什么时候被取出
2. 他说的“如果不存在可用于立即运行任务的线程(即线程池中的线程都在工作),则试图把任务加入缓冲队列将会失败”,这句话反过来理解,如果存在可用于立即运行任务的线程,那就会成功把任务加入SynchronousQueue吗?既然有空闲的线程,直接让线程运行任务不就完了吗,还需要把任务加入SynchronousQueue吗?
先在web论坛发了没人理,不知道是不是发错了地方,原帖见http://bbs.****.net/topics/391852458,知道的也可以在那边留言,一起结贴
------解决思路----------------------
我的理解SynchronousQueue只要没有消费掉就不会产生。SynchronousQueue 的作用你可以理解为hadoff,可以看作线程的调度。
A blocking queue in which each put must wait for a take, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to take it; you cannot add an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate. The head of the queue is the element that the first queued thread is trying to add to the queue; if there are no queued threads then no element is being added and the head is null. For purposes of other Collection methods (for example contains), a SynchronousQueue acts as an empty collection. This queue does not permit null elements.
至于你说的保存任务,是在ThreadPoolExecutor 还是在SynchronousQueue.
有任务的时候是存在workQueue。 SynchronousQueue是BlockingQueue<Runnable>实现类
其实每一个任务包装成一个Runnable类型的对象,执行 Runnable的run()方法。其次加入到线程池中也就是ThreadPoolExecutor中,主要的执行方法就是上面的execute(Runnable)方法。
任务满了之后就调用
具体还是看看源码吧,个人见解。
------解决思路----------------------
用SynchronousQueue得明白SynchronousQueue的特性,它是一种阻塞队列,意味着一个put操作必须等待另一个take操作完成。它并不是一个容器,可以认为它是一个快速交换信息的通道。
缓冲队列采用 SynchronousQueue,它将任务直接交给线程处理而不保持它们。如果不存在可用于立即运行任务的线程(即线程池中的线程都在工作),则试图把任务加入缓冲队列将会失败,因此会构造一个新的线程来处理新添加的任务,并将其加入到线程池中。直接提交通常要求无界 maximumPoolSizes(Integer.MAX_VALUE) 以避免拒绝新提交的任务。newCachedThreadPool采用的便是这种策略。
这是网上找的关于SynchronousQueue的一段话,我的疑问有2个:
1. 他说的“缓冲队列采用 SynchronousQueue,它将任务直接交给线程处理而不保持它们”,这个不保持它们是什么意思,SynchronousQueue这个缓冲队列到底里面会存放任务吗,什么情况会存放,又什么时候被取出
2. 他说的“如果不存在可用于立即运行任务的线程(即线程池中的线程都在工作),则试图把任务加入缓冲队列将会失败”,这句话反过来理解,如果存在可用于立即运行任务的线程,那就会成功把任务加入SynchronousQueue吗?既然有空闲的线程,直接让线程运行任务不就完了吗,还需要把任务加入SynchronousQueue吗?
先在web论坛发了没人理,不知道是不是发错了地方,原帖见http://bbs.****.net/topics/391852458,知道的也可以在那边留言,一起结贴
------解决思路----------------------
我的理解SynchronousQueue只要没有消费掉就不会产生。SynchronousQueue 的作用你可以理解为hadoff,可以看作线程的调度。
A blocking queue in which each put must wait for a take, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to take it; you cannot add an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate. The head of the queue is the element that the first queued thread is trying to add to the queue; if there are no queued threads then no element is being added and the head is null. For purposes of other Collection methods (for example contains), a SynchronousQueue acts as an empty collection. This queue does not permit null elements.
至于你说的保存任务,是在ThreadPoolExecutor 还是在SynchronousQueue.
有任务的时候是存在workQueue。 SynchronousQueue是BlockingQueue<Runnable>实现类
/**
* The queue used for holding tasks and handing off to worker
* threads. Note that when using this queue, we do not require
* that workQueue.poll() returning null necessarily means that
* workQueue.isEmpty(), so must sometimes check both. This
* accommodates special-purpose queues such as DelayQueues for
* which poll() is allowed to return null even if it may later
* return non-null when delays expire.
*/
private final BlockingQueue<Runnable> workQueue;
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current <tt>RejectedExecutionHandler</tt>.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
* for execution
* @throws NullPointerException if command is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize
------解决思路----------------------
!addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING
------解决思路----------------------
poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
其实每一个任务包装成一个Runnable类型的对象,执行 Runnable的run()方法。其次加入到线程池中也就是ThreadPoolExecutor中,主要的执行方法就是上面的execute(Runnable)方法。
任务满了之后就调用
/**
* Rechecks state after queuing a task. Called from execute when
* pool state has been observed to change after queuing a task. If
* the task was queued concurrently with a call to shutdownNow,
* and is still present in the queue, this task must be removed
* and rejected to preserve shutdownNow guarantees. Otherwise,
* this method ensures (unless addThread fails) that there is at
* least one live thread to handle this task
* @param command the task
*/
private void ensureQueuedTaskHandled(Runnable command) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean reject = false;
Thread t = null;
try {
int state = runState;
if (state != RUNNING && workQueue.remove(command))
reject = true;
else if (state < STOP &&
poolSize < Math.max(corePoolSize, 1) &&
!workQueue.isEmpty())
t = addThread(null);
} finally {
mainLock.unlock();
}
if (reject)
reject(command);
else if (t != null)
t.start();
}
/**
* Invokes the rejected execution handler for the given command.
*/
void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
具体还是看看源码吧,个人见解。
------解决思路----------------------
用SynchronousQueue得明白SynchronousQueue的特性,它是一种阻塞队列,意味着一个put操作必须等待另一个take操作完成。它并不是一个容器,可以认为它是一个快速交换信息的通道。