Java多线程复制文件

java多线程方式复制文件

**这几日有一个数据迁移需求,是说从一个系统中将所有的图片按照新的规则迁移到新的系统中来。**

思路如下

1. 使用生产者模式将要下载的清单放到一个阻塞队列里面。 2. 执行消费者模式读取文件并进行处理。
其核心代码如下

生产者

//生产者 import java.util.concurrent.BlockingQueue; public class PRoducer implements Runnable { private BlockingQueue<FilesRes> queue; private FilesRes produce; public Producer(BlockingQueue<FilesRes> queue, FilesRes produce) { this.queue = queue; if (null != produce) this.produce = produce; else this.produce = null; } @Override public void run() { try { queue.put(produce); //当队列里满的话,会阻塞 } catch (InterruptedException e) { System.out.println(e.getMessage()); } } }

消费者

//消费者 import java.io.File; import java.io.IOException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.FileUtils; public class Consumer implements Runnable { private BlockingQueue<FilesRes> queue; private static AtomicInteger atomic = new AtomicInteger(0);//初始化计数器 public Consumer(BlockingQueue<FilesRes> queue) { this.queue = queue; } @Override public void run() { try { FilesRes filesRes = queue.take(); //当队列里是空的话,会阻塞 String nameFiles = "E:/Source/CopyPhotossss/" ; File file = new File(filesRes.getUrl()); File ofile = new File(nameFiles+filesRes.getFileName()); FileUtils.copyFile(file, ofile, true); //使用的是commons-io工具包 } catch (InterruptedException e) { System.out.println(e.getMessage()); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }

JavaBean:里面只是保存了一下文件的路径,在这个测试里面没有多少实际意义

public class FilesRes { private String fileName; private String url; private String idCard; /** * * @param fileName 文件名 * @param url 路径 * @param idCard 身份证 */ public FilesRes(String fileName, String url, String idCard) { super(); this.fileName = fileName; this.url = url; this.idCard = idCard; } ...get和set 方法省略 }

测试类:

import java.io.File; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; public class Tester { public static void main(String[] args) throws InterruptedException { long starTime=System.currentTimeMillis(); LinkedBlockingQueue<FilesRes> queue = new LinkedBlockingQueue<FilesRes>(10); //线程池 操作 ExecutorService service = Executors.newCachedThreadPool(); String sourceFile = "E:/Source/Photos"; File file = new File(sourceFile); File [] files = file.listFiles(); for (File filename : files) { FilesRes filesRes = new FilesRes(filename.getName(),filename.getAbsolutePath(),filename.getName()); service.submit(new Producer(queue, filesRes)); } //开始 for (int i = 0; i < files.length; i++) { service.submit(new Consumer(queue)); } service.shutdown(); while(true){//用于判断是否已经导完 if(service.isTerminated()){ System.out.println("文件已经迁移成功!"); long endTime=System.currentTimeMillis(); long time=endTime-starTime; System.out.println(time); break; } Thread.sleep(200); } } }

搞定,但是代码最后判断是否已经导入完毕的时候,使用了轮询方式去遍历不是很理想。查了写资料,虽然也有其他方式,但是比较麻烦。在研究的过程中,我发现了一个非常好用的disruptor架构提供的生产者和消费者模式有这个功能,很不错。 具体的使用可以看下面的连接,我就不再阐述了。

disruptor-3.3.2源码解析 (序列)http://www.myexception.cn/open-source/2036769.html (队列)http://www.myexception.cn/open-source/2036766.html (发布事件)http://www.myexception.cn/open-source/2036781.html (处理事件)http://www.myexception.cn/open-source/2036779.html (框架支持)http://www.myexception.cn/open-source/2036776.html

最后我把代码贴一些,有兴趣的人可以调一下

package disruptor; import java.io.File; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.IgnoreExceptionHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.WorkHandler; import com.lmax.disruptor.WorkerPool; import com.lmax.disruptor.YieldingWaitStrategy; public class Demo3 { public static void main(String[] args) throws InterruptedException { long starTime=System.currentTimeMillis(); int BUFFER_SIZE=1024 * 1024 * 2; RingBuffer<FilesRes> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<FilesRes>() { @Override public FilesRes newInstance() { return new FilesRes(); } },BUFFER_SIZE,new YieldingWaitStrategy()); SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); ExecutorService executor = Executors.newFixedThreadPool(2); WorkHandler<FilesRes> workHandlers=new FilesWorkHandlers(); WorkerPool<FilesRes> workerPool=new WorkerPool<FilesRes>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), workHandlers); workerPool.start(executor); String sourceFile = "E:/Source/Photos"; //下面这个生产8个数据,图简单就写到主线程算了 File file = new File(sourceFile); File [] files = file.listFiles(); for (File filename : files) { long seq=ringBuffer.next(); try{ FilesRes filesRes = ringBuffer.get(seq); filesRes.setFileName(filename.getName()); filesRes.setUrl(filename.getAbsolutePath()); filesRes.setIdCard(filename.getName()); }finally{ ringBuffer.publish(seq); System.out.println(seq); } } workerPool.drainAndHalt();//任务完成后关闭 线程会在这里堵塞 (太爱了,不用判断是否所有的线程都完毕了,哦也) executor.shutdown(); long endTime=System.currentTimeMillis(); long time=endTime-starTime; System.out.println(time); } } package disruptor; import java.io.File; import org.apache.commons.io.FileUtils; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler; public class FilesWorkHandlers implements EventHandler<FilesRes>,WorkHandler<FilesRes>{ @Override public void onEvent(FilesRes filesRes) throws Exception { //System.out.println(filesRes.getUrl()); String nameFiles = "E:/Source/CopyPhoto2/" ; File file = new File(filesRes.getUrl()); File ofile = new File(nameFiles+filesRes.getFileName()); FileUtils.copyFile(file, ofile, true); } @Override public void onEvent(FilesRes arg0, long arg1, boolean arg2) throws Exception { // TODO Auto-generated method stub } }

怎么样是不是代码更加优雅简洁呢?