跟小弟我一起java8(3)-Stream

跟我一起java8(3)----Stream
Stream是java8引入的概念,表示"支持序列和平行聚合操作的一系列元素"(A sequence of elements supporting sequential and parallel aggregate operations)。它是一个接口,定义了一系列方法定义和默认实现(注意:java8里的接口已经支持静态方法和default了)。先看一个实例:
    class Widget {

        String color;
        int    weight;

        public Widget(String color, int weight){
            this.color = color;
            this.weight = weight;
        }
    }

    public void testInit() {
        List<Widget> widgets = Arrays.asList(new Widget("RED", 12), 
                                             new Widget("YELLO", 3), 
                                             new Widget("BLUE", 7));

        int sum = widgets.stream()
                         .filter(w -> "RED".equalsIgnoreCase(w.color))
                         .mapToInt(w -> w.weight)
                         .sum();

        System.out.println(sum);

    }


这段代码里,widgets.stream()返回一个序列化的Stream对象,接着filter()接受一个Predicate类型的对象,用来过滤符合要求的元素,然后mapToInt()将Widget元素组装成一个IntStream,此例中是返回它的weight属性,最后sum()求和。

还有一个parallel stream("平行流"?不知道怎么翻译好)的概念,它采用java7引入的Fork/Join框架,将问题分而治之,最后将结果汇总。看下面的例子:
    @Test
    public void testSort() throws IOException {
        List<Double> doubleList = new ArrayList<Double>();
        BufferedReader reader = new BufferedReader(new FileReader("doubles.txt"));
        String line = null;

        while ((line = reader.readLine()) != null) {
            doubleList.add(Double.parseDouble(line));
        }

        reader.close();
        
        System.out.println("AvailableProcessors: " 
                          + Runtime.getRuntime().availableProcessors());

        int LOOP = 5;

        for (int i = 0; i < LOOP; i++) {
            long start = System.currentTimeMillis();
            doubleList.stream()
                      .parallel()
                      .mapToDouble(d -> longTimeComputation(d))
                      .sum();
            long end = System.currentTimeMillis();

            System.out.println("Parallel Take : " + (end - start));
        }
        
        for (int i = 0; i < LOOP; i++) {
            long start = System.currentTimeMillis();
            doubleList.stream()
                      .mapToDouble(d -> longTimeComputation(d))
                      .sum();
            long end = System.currentTimeMillis();

            System.out.println("Take : " + (end - start));
        }
    }

doubles.txt是一千万个double类型随机数,测试结果如下:
AvailableProcessors: 2
Parallel Take : 7124
Parallel Take : 6710
Parallel Take : 6383
Parallel Take : 6215
Parallel Take : 6771
Take : 12172
Take : 12039
Take : 12195
Take : 12557
Take : 12121

在两个处理器的环境下,很明显parallel stream的速度比sequential stream快,耗时差不多后者一半。但是,这也并非在所有情况下使用parallel stream都是最优的方法,根据"Think twice before using Java 8 parallel streams(http://java.dzone.com/articles/think-twice-using-java-8)"一文所讲,parallel stream并行处理任务,如果其中一个耗时很长的情况下,会出现意料不到的情况,有些时候正常的任务都完成了,有些时候一些任务卡在了这个慢任务后面(Sometimes all healthy tasks finish, sometimes few of them are stuck behind the slow one.)。这个暂时还没验证过,只是备注在这里。