package cn.nest;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class MySemaphore2 {
//统计实际一共处理了多少个数据
static int sum = 0;
public static void main(String[] args) throws IOException, InterruptedException {
//待处理集合
ArrayList<Integer> list = new ArrayList<>();
//定义待处理集合中元素数量
int dataSize = 35;
//拼装数据
for (int i = 0; i < dataSize; i++) {
list.add(i);
}
//打印集合原始数据和长度
System.out.println("list: "+list);
System.out.println("list size: "+list.size());
//定义多线程
final Semaphore semaphore = new Semaphore(5);
ExecutorService exec = Executors.newCachedThreadPool();
//记录开始时间
final long start = System.currentTimeMillis();
//定义一次执行多少个数据
int onceNum = 10;
//计算需要执行多少次
int executeNum = (int) Math.ceil(dataSize / onceNum +1);
System.out.println("num: "+executeNum);
for (int i = 0; i < executeNum; i++) {
//为了在run方法中使用变量i的值,需要在这里重新定义
Integer n = i;
//定义线程任务
Runnable task = new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();//获取信号量
// System.out.println(1000*i+"~"+(1000*i+1000));
//遍历,计算当前需要处理的这批数据的角标
for (int j = onceNum*n; j < (onceNum*n+onceNum); j++) {
//当前处理的数据
System.out.println(Thread.currentThread().getName()+" --: "+list.get(j));
sum++;
}
//开始休息
System.out.println(Thread.currentThread().getName()+" 休息一分钟");
Thread.sleep(1000*60);
semaphore.release();//释放信号量
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//提交任务(开启线程)
exec.submit(task);
}
exec.shutdown();
//判断所有线程都结束
// exec.awaitTermination(1, TimeUnit.HOURS); //用这个也行
while (true) {
if (exec.isTerminated()) {
System.out.println("-----------------所有的子线程都结束了!");
break;
}
Thread.sleep(1000);
}
//后续逻辑
final long end = System.currentTimeMillis();
System.out.println("consumer time : "+(end-start));//耗时
System.out.println("sum: "+sum);//一共处理了多少条数据
}
public static void main1(String[] args) throws IOException, InterruptedException {
final File stream = new File("c:\temp\stonefeng\stream.txt");
final OutputStream os = new FileOutputStream(stream);
final OutputStreamWriter writer = new OutputStreamWriter(os);
final Semaphore semaphore = new Semaphore(10);
ExecutorService exec = Executors.newCachedThreadPool();
final long start = System.currentTimeMillis();
for (int i = 0; i < 10000000; i++) {
final int num = i;
Runnable task = new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
writer.write(String.valueOf(num)+"
");
semaphore.release();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
exec.submit(task);
}
exec.shutdown();
exec.awaitTermination(1, TimeUnit.HOURS);
writer.write("---END---
");
writer.close();
System.out.println("ËùÓеÄ×ÓÏ̶߳¼½áÊøÁË£¡");
final long end = System.currentTimeMillis();
System.out.println(end-start);
}
public static void main2(String[] args) {
int size = 1000;
int num = (int) Math.ceil(3500 / size +1);
System.out.println("num: "+num);
for (int i = 0; i < num; i++) {
System.out.println(1000*i+"~"+(1000*i+1000));
}
}
}