Stream原理解析

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> Stream原理解析

1. Stream的优势

Stream原理解析

图中4种stream接口继承自 BaseStream,其中 IntStream, LongStream, DoubleStream对应三种基本类型( int, long, double,注意不是包装类型), Stream对应所有剩余类型的 Stream视图。为不同数据类型设置不同 Stream接口,可以

  • 提高性能,
  • 增加特定接口函数。
  • 你可能会奇怪为什么不把 IntStream等设计成 Stream的子接口?毕竟这接口中的方法名大部分是一样的。答案是这些方法的名字虽然相同,但是返回类型不同,如果设计成父子接口关系,这些方法将不能共存,因为Java不允许只有返回类型不同的方法重载。

    2. Stream的原理和特性

    无存储。stream不是一种数据结构,它只是某种数据源的一个视图,数据源可以是一个数组,Java容器或I/O channel等。

    为函数式编程而生。对stream的任何修改都不会修改背后的数据源,比如对stream执行过滤操作并不会删除被过滤的元素,而是会产生一个不包含被过滤元素的新stream。

    惰式执行。stream上的操作并不会立即执行,只有等到用户真正需要结果的时候才会执行。

    可消费性。stream只能被“消费”一次,一旦遍历过就会失效,就像容器的迭代器那样,想要再次遍历必须重新生成。

    怎么得到Stream

    1. Stream接口的静态工厂方法

    1.of方法:有两个重载方法,一个接受变长参数,一个接口单一值。

    
    Stream integerStream = Stream.of(1, 2, 3, 5);
    Stream stringStream = Stream.of("taobao");
    // generator方法:生成一个无限长度的Stream,其元素的生成是通过给定的Supplier
    Stream.generate(new Supplier() {
        @Override
        public Double get() {
            return Math.random();
        }
    });
    Stream.generate(() - Math.random());
    Stream.generate(Math::random);
    

    三条语句的作用都是一样的,只是使用了lambda表达式和方法引用的语法来简化代码。每条语句其实都是生成一个无限长度的 Stream,其中值是随机的。这个无限长度 Stream是懒加载,一般这种无限长度的 Stream都会配合 Stream limit()方法来用。

    1. iterate方法:也是生成无限长度的 Stream,和generator不同的是,其元素的生成是重复对给定的种子值(seed)调用用户指定函数来生成的。其中包含的元素可以认为是: seed f(seed), f(f(seed))无限循环
    
    Stream.iterate(1, item - item + 1).limit(10).forEach(System.out::println);
    

    这段代码就是先获取一个无限长度的正整数集合的Stream,然后取出前10个打印。千万记住使用limit方法,不然会无限打印下去。

    2. Collection接口的默认方法

  • 调用`Collection.stream()`或者`Collection.parallelStream()`方法
  • 调用`Arrays.stream(T[] array)`方法
  • 3. stream方法使用

  • **中间操作总是会惰式执行**,调用中间操作只会生成一个标记了该操作的新stream,仅此而已。中间操作又可以分为无状态的(Stateless)和有状态的(Stateful),无状态中间操作是指元素的处理不受前面元素的影响,而有状态的中间操作必须等到所有元素处理之后才知道最终结果,比如排序是有状态操作,在读取所有元素之前并不能确定排序结果;
  • **结束操作会触发实际计算**,计算发生时会把所有中间操作积攒的操作以pipeline的方式执行,这样可以减少迭代次数。计算完成之后stream就会失效。结束操作又可以分为短路操作和非短路操作,短路操作是指不用处理全部元素就可以返回结果,比如找到第一个满足条件的元素。之所以要进行如此精细的划分,是因为底层对每一种情况的处理方式不同。下图汇总了Stream接口的部分常见方法:Stream原理解析
    区分中间操作和结束操作最简单的方法,就是看方法的返回值,返回值为stream的大都是中间操作,否则是结束操作。
  • forEach()

    
    StreamString stream = Stream.of("stream", "foreach", "method", "test");
    stream.forEach(System.out::println);
    

    filter()

    Stream原理解析

    ``

    
    ListString beginningWithNumbers
                    = Stream.of("a", "1abc", "abc1")
                    .filter(value - isDigit(value.charAt(0)))
                    .collect(toList());
    assertEquals(Arrays.asList("1abc"), beginningWithNumbers);
    

    若要重构遗留代码,for循环中的if条件语句就是一个很强的信号,可用 filter方法替代。

    map()

    Stream原理解析
    
    ListString collected = Stream.of("a", "b", "hello")
                    .map(String::toUpperCase)
                    .collect(toList());
    assertEquals(Arrays.asList("A", "B", "HELLO"), collected);
    

    若要重构遗留代码,for循环有一个函数需要将一种类型的值转换成另外一种类型,map操作就可以使用该函数。

    flatMap()

    Stream原理解析
    
    ListInteger together = Stream.of(Arrays.asList(1, 2), Arrays.asList(3, 4))
                    .flatMap(Collection::stream)
                    .collect(toList());
    assertEquals(Arrays.asList(1, 2, 3, 4), together);
    

    reduce()

    
    Optional reduce(BinaryOperator accumulator)
    
    T reduce(T identity, BinaryOperator accumulator)
    
    U U reduce(U identity, BiFunctionU,? super T,U accumulator, BinaryOperatorU combiner)
    

    虽然函数定义越来越长,但语义不曾改变,多的参数只是为了指明初始值(参数 identity),或者是指定并行执行时多个部分结果的合并方式(参数 combiner)。 reduce()最常用的场景就是从一堆值中生成一个值。示例1:从一组单词中找出最长的单词。这里“大”的含义就是“长”。

    
    // 找出最长的单词
    StreamString stream = Stream.of("reduce", "optional", "test", "stream");
    OptionalString longest = stream.reduce((s1, s2) - s1.length() = s2.length() ? s1 : s2);
    //OptionalString longest = stream.max(Comparator.comparingInt(String::length));
    assertEquals("optional", longest.get());
    

    示例2:数字求和

    Stream原理解析
    
    int sum = Stream.of(1, 2, 3, 4)
                    .reduce(0, (acc, element) - acc + element);
    assertEquals(10, sum);
    

    示例3:求字符串数组长度之和

    
    // 求单词长度之和
    StreamString stream = Stream.of("a", "bb", "ccc", "dddd");
    int lengthSum = stream.reduce(0,            // 初始值 // (1)
        (sum, str) - sum+str.length(),         // 累加器 // (2)
        (a, b) - a+b);                         // 部分和拼接器,并行执行时才会用到 // (3)
    // int lengthSum = stream.mapToInt(String::length).sum();
    assertEquals(10, lengthSum);
    

    collect()

    
    ListSaleOrderLineDO newSaleOrderLines = saleOrderLines.stream()
                .map(oldSaleOrderLine - {
                    SaleOrderLineDO saleOrderLine = new SaleOrderLineDO();
                    saleOrderLine.setId(oldSaleOrderLine.getId());
                    saleOrderLine.setBuyerId(oldSaleOrderLine.getBuyerId());
                    saleOrderLine.setReceiveQuantity(oldSaleOrderLine.getQuantity());
                    saleOrderLine.setGmtModified(new Date());
                    saleOrderLine.setModifier(modifier);
                    return saleOrderLine;
                }).collect(Collectors.toList());
    
    MapLong, Integer purchaseSendQuantityMap = realQuantities.stream()
                .collect(Collectors.toMap(RealQuantity::getSubLineId, RealQuantity::getSendQuantity));
    

    4. Stream Pipelines

    
    ListString strings = Arrays.asList("a", "bb", "ccc", "abcd");
    
    ListString startWithAList = new ArrayList();
    for (String string: strings) {
        if(string.startsWith("a")){
            startWithAList.add(string); // 1. filter(), 保留以A开头的字符串
        }
    }
    
    ListInteger lengths = new ArrayList();
    for (String string : startWithAList) {
        lengths.add(string.length());   // 2. mapToInt(), 转换成长度
    }
            
    int maxLength = 0;
    for(Integer length : lengths){
        maxLength = Math.max(length, maxLength);   // 3. max(), 保留最长的长度
    }
    assertEquals(4, maxLength);
    

    具体说来,就是调用 filter()方法后立即执行,选出所有以A开头的字符串并放到一个列表list1中,之后让list1传递给 mapToInt()方法并立即执行,生成的结果放到list2中,最后遍历list2找出最大的数字作为最终结果。程序的执行流程如如所示:虽然这样做实现起来非常简单直观,但有两个明显的弊端:

    Stream原理解析
    
    int maxLength = 0;
            ListString strings = Arrays.asList("a", "bb", "ccc", "abcd");
            for(String str : strings){
                if(str.startsWith("a")){                    // 1. filter(), 保留以A开头的字符串
                    int len = str.length();                 // 2. mapToInt(), 转换成长度
                    maxLength = Math.max(len, maxLength);   // 3. max(), 保留最长的长度
                }
            }
    assertEquals(4, maxLength);
    

    采用这种方式我们不但减少了迭代次数,也避免了存储中间结果,显然这就是流水线,因为我们把三个操作放在了一次迭代当中。只要我们事先知道用户意图,总是能够采用上述方式实现跟Stream API等价的功能,但问题是 Stream类库的设计者并不知道用户的意图是什么。如何在无法假设用户行为的前提下实现流水线,是类库的设计者要考虑的问题。采用Stream的方式的代码如下。

    
    int maxLength = Stream.of("a", "bb", "ccc", "abcd")
                    .filter(value - value.startsWith("a"))
                    .mapToInt(String::length)
                    .max()
                    .orElse(0);
    assertEquals(4, maxLength);
    
    Stream原理解析

    5. Stream流水线解决方案

  • 用户的操作如何记录?
  • 操作如何叠加?
  • 叠加之后的操作如何执行?
  • 执行后的结果(如果有)在哪里?

    以下代码为例:

  • 
    Stream.of("onE", "twO", "threE", "fouR")
    //    .parallel()
            .filter(e - e.length()  3)
            .map(String::toLowerCase)
            .sorted()
            .map(String::toUpperCase)
            .forEach(System.out::println);
    

    操作如何记录?

    Stream原理解析由于篇幅原因, DoublePipeline没在图中展示, DoublePipeline IntPipeline LongPipeline这三个类专门为三种基本类型而定制的,跟 ReferencePipeline是并列关系。图中Head用于表示第一个 Stage,即调用调用诸如 Collection.stream()方法产生的 Stage,很显然这个 Stage里不包含任何操作; StatelessOp StatefulOp分别表示无状态和有状态的 Stage,对应于无状态和有状态的中间操作。

    Stream流水线组织结构示意图如下:Stream原理解析图中通过 Stream.of()方法得到 Head Stage,紧接着调用一系列的中间操作,不断产生新的 Stream。这些 Stream对象以双向链表的形式组织在一起,构成整个流水线,由于每个 Stage都记录了前一个 Stage和本次的操作以及回调函数,依靠这种结构就能建立起对数据源的所有操作。这就是 Stream记录操作的方式。

    对于 AbstractPipeline,有两个构造函数,分别生成 head intermediate pipeline stage

    
    /**
     * 构造函数:生成Pipeline的头
     *
     * @param source {@code Spliterator} describing the stream source
     * @param sourceFlags the source flags for the stream source, described in
     * {@link StreamOpFlag}
     * @param parallel {@code true} if the pipeline is parallel
     */
    AbstractPipeline(Spliterator? source,
                     int sourceFlags, boolean parallel) {
        this.previousStage = null;
        this.sourceSpliterator = source;
        this.sourceStage = this;
        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
        // The following is an optimization of:
        // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
        this.combinedFlags = (~(sourceOrOpFlags  1)) & StreamOpFlag.INITIAL_OPS_VALUE;
        this.depth = 0;
        this.parallel = parallel;
    }
    
    /**
     * 构造函数:将中间stage连接到当前pipeline尾部
     *
     * @param previousStage the upstream pipeline stage
     * @param opFlags the operation flags for the new stage, described in
     * {@link StreamOpFlag}
     */
    AbstractPipeline(AbstractPipeline?, E_IN, ? previousStage, int opFlags) {
        if (previousStage.linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        previousStage.linkedOrConsumed = true;
        previousStage.nextStage = this;
        this.previousStage = previousStage;
        this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
        this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
        this.sourceStage = previousStage.sourceStage;
        if (opIsStateful())
            sourceStage.sourceAnyStateful = true;
        this.depth = previousStage.depth + 1;
    }
    

    操作如何叠加?

    方法名作用|------

    有了上面的协议,相邻 Stage之间调用就很方便了,每个 Stage都会将自己的操作封装到一个 Sink里,前一个 Stage只需调用后一个 Stage accept()方法即可,并不需要知道其内部是如何处理的。当然对于有状态的操作, Sink begin() end()方法也是必须实现的。比如 Stream.sorted()是一个有状态的中间操作,其对应的 Sink.begin()方法可能创建一个乘放结果的容器,而 accept()方法负责将元素添加到该容器,最后 end()负责对容器进行排序。对于短路操作, Sink.cancellationRequested()也是必须实现的,比如 Stream.findFirst()是短路操作,只要找到一个元素, cancellationRequested()就应该返回 true,以便调用者尽快结束查找。 Sink的四个接口方法常常相互协作,共同完成计算任务。实际上Stream API内部实现的的本质,就是如何重载Sink的这四个接口方法。

    有了Sink对操作的包装,Stage之间的调用问题就解决了,执行时只需要从流水线的head开始对数据源依次调用每个Stage对应的 Sink.{begin(), accept(), cancellationRequested(), end()}方法就可以了。一种可能的 Sink.accept()方法流程是这样的:

    
    void accept(U u){
        1. 使用当前Sink包装的回调函数处理u
        2. 将处理结果传递给流水线下游的Sink
    }
    

    Sink接口的其他几个方法也是按照这种[处理-转发]的模型实现。下面我们结合具体例子看看 Stream的中间操作是如何将自身的操作包装成 Sink以及 Sink是如何将处理结果转发给下一个 Sink的。先看 Stream.map()方法:

    
    // java.util.stream.ReferencePipeline#map
    // 调用该方法将产生一个新的Stream
    public final R StreamR map(Function? super P_OUT, ? extends R mapper) {
       Objects.requireNonNull(mapper);
       return new StatelessOpP_OUT, R(this, StreamShape.REFERENCE,
                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
           @Override /*opWrapSink()方法返回由回调函数包装而成Sink,由wrapSink()触发*/
           SinkP_OU opWrapSink(int flags, SinkR downstream) {
               return new Sink.ChainedReferenceP_OUT, R(downstream) {
                   @Override
                   public void accept(P_OUT u) {
                       R r = mapper.apply(u);    // 1. 使用当前Sink包装的回调函数mapper处理u
                       downstream.accept(r);    // 2. 将处理结果传递给流水线下游的Sink
                   }
               };
           }
       };
    }
    

    上述代码看似复杂,其实逻辑很简单,就是将回调函数 mapper包装到一个 Sink当中。由于 Stream.map()是一个无状态的中间操作,所以 map()方法返回了一个 StatelessOp内部类对象(一个新的 Stream),调用这个新 Stream opWrapSink()方法将得到一个包装了当前回调函数的 Sink

    
    // java.util.stream.Sink.ChainedReference: 244行
    static abstract class ChainedReferenceT, E_OUT implements SinkT {
        protected final Sink? super E_OU downstream;
    
        public ChainedReference(Sink? super E_OU downstream) {
            this.downstream = Objects.requireNonNull(downstream);
        }
    
        @Override
        public void begin(long size) {
            downstream.begin(size);
        }
    
        @Override
        public void end() {
            downstream.end();
        }
    
        @Override
        public boolean cancellationRequested() {
            return downstream.cancellationRequested();
        }
    }
    

    再来看一个复杂一点的例子。 Stream.sorted()方法将对 Stream中的元素进行排序,显然这是一个有状态的中间操作,因为读取所有元素之前是没法得到最终顺序的。抛开模板代码直接进入问题本质, sorted()方法是如何将操作封装成 Sink的呢? sorted()一种可能封装的 Sink代码如下:

    
    // java.util.stream.SortedOps.RefSortingSink:371行
    // Stream.sorted()方法用到的Sink实现,由java.util.stream.SortedOps.OfRef#opWrapSink:133行 触发
    class RefSortingSinkT extends AbstractRefSortingSinkT {
        private ArrayList list;// 存放用于排序的元素
        RefSortingSink(Sink? super  downstream, Comparator? super  comparator) {
            super(downstream, comparator);
        }
        @Override
        public void begin(long size) {
            ...
            // 1. 创建一个存放排序元素的列表
            list = (size = 0) ? new ArrayList((int) size) : new ArrayList();
        }
        @Override
        public void end() {
            list.sort(comparator);// 3. 只有元素全部接收之后才能开始排序
            downstream.begin(list.size());
            if (!cancellationWasRequested) {// 下游Sink不包含短路操作
                list.forEach(downstream::accept);// 将处理结果传递给流水线下游的Sink
            }
            else {// 4. 下游Sink包含短路操作
                for (T t : list) {// 每次都调用cancellationRequested()询问是否可以结束处理。
                    if (downstream.cancellationRequested()) break;
                    downstream.accept(t);// 将处理结果传递给流水线下游的Sink
                }
            }
            downstream.end();
            list = null;
        }
        @Override
        public void accept(T t) {
            list.add(t);// 2. 使用当前Sink包装动作处理t,只是简单的将元素添加到中间列表当中
        }
    }
    

    上述代码完美的展现了 Sink的四个接口方法是如何协同工作的:首先 beging()方法告诉 Sink参与排序的元素个数,方便确定中间结果容器的的大小;

    之后通过 accept()方法将元素添加到中间结果当中,最终执行时调用者会不断调用该方法,直到遍历所有元素;

    最后 end()方法告诉 Sink所有元素遍历完毕,启动排序步骤,排序完成后将结果传递给下游的 Sink

    如果下游的 Sink是短路操作,将结果传递给下游时不断询问下游 cancellationRequested()是否可以结束处理。

    下图为 Sink的构建图

    Stream原理解析

    叠加之后的操作如何执行?

    Stream原理解析结束操作之后不能再有别的操作,所以结束操作不会创建新的流水线阶段( Stage),直观的说就是流水线的链表不会在往后延伸了。结束操作会创建一个包装了自己操作的 Sink,这也是流水线中最后一个 Sink,这个 Sink只需要处理数据而不需要将结果传递给下游的 Sink(因为没有下游)。对于 Sink的[处理-转发]模型,结束操作的 Sink就是调用链的出口。

    我们再来考察一下上游的 Sink是如何找到下游 Sink的。一种可选的方案是在 PipelineHelper中设置一个 Sink字段,在流水线中找到下游 Stage并访问 Sink字段即可。但 Stream类库的设计者没有这么做,而是设置了一个 Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)方法来得到 Sink,该方法的作用是返回一个新的包含了当前Stage代表的操作以及能够将结果传递给 downstream Sink对象。为什么要产生一个新对象而不是返回一个 Sink字段?这是因为使用 opWrapSink()可以将当前操作与下游 Sink(上文中的 downstream参数)结合成新 Sink。试想只要从流水线的最后一个 Stage开始,不断调用上一个 Stage opWrapSink()方法直到最开始(不包括 stage0,因为 stage0代表数据源,不包含操作),就可以得到一个代表了流水线上所有操作的 Sink,用代码表示就是这样:

    
    // java.util.stream.AbstractPipeline#wrapSink:513行
    // 从下游向上游不断包装Sink。
    // 如果最初传入的Sink代表结束操作,函数返回时就可以得到一个代表了流水线上所有操作的Sink。
    @Override
    @SuppressWarnings("unchecked")
    final P_IN SinkP_IN wrapSink(SinkE_OU sink) {
        Objects.requireNonNull(sink);
        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth  0; p=p.previousStage) {
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (SinkP_IN) sink;
    }
    

    现在流水线上从开始到结束的所有的操作都被包装到了一个 Sink里,执行这个 Sink就相当于执行整个流水线,执行 Sink的代码如下:

    
    // java.util.stream.AbstractPipeline#copyInto:476行
    // 对spliterator代表的数据执行wrappedSink代表的操作
    
    @Override
    final P_IN void copyInto(SinkP_IN wrappedSink, SpliteratorP_IN spliterator) {
        Objects.requireNonNull(wrappedSink);
    
        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());// 通知开始遍历
            spliterator.forEachRemaining(wrappedSink);// 迭代
            wrappedSink.end();// 通知遍历结束
        }
        else {
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }
    

    上述代码首先调用 wrappedSink.begin()方法告诉 Sink数据即将到来,然后调用 spliterator.forEachRemaining()方法对数据进行迭代( Spliterator是容器的一种迭代器),最后调用 wrappedSink.end()方法通知 Sink数据处理结束。Stream原理解析

    执行后的结果在哪里?

    最后一个问题是流水线上所有操作都执行后,用户所需要的结果(如果有)在哪里?首先要说明的是不是所有的 Stream结束操作都需要返回结果,有些操作只是为了使用其副作用(Side-effects),比如使用 Stream.forEach()方法将结果打印出来就是常见的使用副作用的场景(事实上,除了打印之外其他场景都应避免使用副作用),对于真正需要返回结果的结束操作结果存在哪里呢?

    特别说明:副作用不应该被滥用,也许你会觉得在 Stream.forEach()里进行元素收集是个不错的选择,就像下面代码中那样,但遗憾的是这样使用的正确性和效率都无法保证,因为 Stream可能会并行执行。大多数使用副作用的地方都可以使用归约操作更安全和有效的完成。

    
    // 错误的收集方式
    ArrayListString results = new ArrayList();
    stream.filter(s - pattern.matcher(s).matches())
          .forEach(s - results.add(s));  // Unnecessary use of side-effects!
    // 正确的收集方式
    ListStringresults =
         stream.filter(s - pattern.matcher(s).matches())
                 .collect(Collectors.toList());  // No side-effects!
    

    回到流水线执行结果的问题上来,需要返回结果的流水线结果存在哪里呢?这要分不同的情况讨论,下表给出了各种有返回结果的 Stream结束操作。

    返回类型对应的结束操作|------

    对于表中返回 boolean或者 Optional的操作( Optional是存放一个值的容器)的操作,由于值返回一个值,只需要在对应的 Sink中记录这个值,等到执行结束时返回就可以了。

    对于归约操作,最终结果放在用户调用时指定的容器中(容器类型通过收集器指定)。 collect(), reduce(), max(), min()都是归约操作,虽然 max() min()也是返回一个 Optional,但事实上底层是通过调用 reduce()方法实现的。

    对于返回是数组的情况,毫无疑问的结果会放在数组当中。这么说当然是对的,但在最终返回数组之前,结果其实是存储在一种叫做 Node的数据结构中的。 Node是一种多叉树结构,元素存储在树的叶子当中,并且一个叶子节点可以存放多个元素。这样做是为了并行执行方便。

    对于 collect这样的操作是需要拿到最终 end产生的结果. end产生的结果在最后一个 Sink中,这样的操作最终都会提供一个取出数据的 get方法.

    
    // java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOpE_OUT,R) 226行
    final R R evaluate(TerminalOpE_OUT, R terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;
    
        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }
    // java.util.stream.ReduceOps.ReduceOp:684行
    private static abstract class ReduceOpT, R, S extends AccumulatingSinkT, R, S
            implements TerminalOpT, R {
        @Override
        public P_IN R evaluateSequential(PipelineHelper helper,
                                           SpliteratorP_IN spliterator) {
            return helper.wrapAndCopyInto(makeSink(), spliterator).get();
        }
    
        @Override
        public P_IN R evaluateParallel(PipelineHelper helper,
                                         SpliteratorP_IN spliterator) {
            return new ReduceTask(this, helper, spliterator).invoke().get();
        }
    }
    

    6. 参考文章

  • Streams API(I)
  • Streams API(II)
  • 深入理解Java Stream流水线
  • Java8 Stream原理深度解析
  •  

    原文始发于微信公众号(xiaogan的技术博客):Stream原理解析

    本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

    本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

    原文链接:blog.ouyangsihai.cn >> Stream原理解析


      转载请注明: 好好学java Stream原理解析

     上一篇
    啪啪打脸!领导说——try-catch要放在循环体外! 啪啪打脸!领导说——try-catch要放在循环体外!
    **来源:Java中文社群     ** 今天给大家带来的是关于 try-catch 应该放在循环体外,还是放在循环体内的文章,我们将从性能和业务场景分析这两个方面来回答此问题。 很多人对 try-catch 有一定的误解,比如我们经常会把
    下一篇 
    Service层需要接口吗? Service层需要接口吗?
    链接:toutiao.com/i6882356844245975563 前几天刷头条又刷到了「Service层和Dao层真的有必要每个类都加上接口吗?」这个问题,之前简单回答了一波,给出的观点是「看情况」 现在结合我参与的项目