Running on Java 24-ea+25-3155 (Preview)
Home of The JavaSpecialists' Newsletter

295Faster Empty Streams

Author: Dr Heinz M. KabutzDate: 2021-11-26Java Version: 17Category: Performance
 

Abstract: Streams can make our code more readable, and therefore more maintainable. However, there is some overhead in setting up the stream, a cost that might become pronounced when the stream is empty. In this newsletter we will consider an idea of the EmptyStream to reduce that cost.

 

Welcome to the 295th edition of The Java(tm) Specialists' Newsletter, sent to you from the beautiful Island of Crete. Most mornings will find us running along Kalathas Beach, followed by a dip in the refreshing sea. Invariably I will turn to Helene and say: "You know, people pay thousands of Euros to come here on holiday." Of course she will roll her eyes, having heard this phrase almost every day for the past 18 months.

javaspecialists.teachable.com: Please visit our new self-study course catalog to see how you can upskill your Java knowledge.

Faster Empty Streams

A few months ago some of our JCrete disorganizers popped down for a nano version of JCrete. On the last evening, we were sipping Mythos at Kalathas Beach, talking about Java. During one of our hikes, Marc Hoffmann had expressed his disappointment in Java Stream performance. He voiced it nicely in a recent tweet: "Extensive usage of streams (because "it's cool") is the THE primary performance bottleneck in computation heavy #Java applications. Sometimes 90% of heap allocation caused by stream instances and capturing lambdas." My microbenchmarks had not showed a significant difference between a for-loop and a stream, so after the second Mythos I asked him to clarify his thoughts. The issue wasn't so much when the stream had entries, but rather, when it was empty. With a for-loop, we would create an iterator, but then would jump out of the loop immediately, without creating unnecessary objects. But with streams we would first create the entire pipeline before realizing we didn't need it - and then throw it away. Now that did make sense. One way to avoid that was to not use streams and live with, what now looks like downright ugly, structural code. The other approach was to always add a guard before creating a stream.

I expressed this issue to Oracle's Java Language Architect Brian Goetz (a must-follow on Twitter) and his suggestion was to perhaps look at changing Collection#stream() to return Stream.empty() if the collection did not contain any elements. That reduced object creation only by about 10%, because we would still build up the entire pipeline, only to throw it away. Another idea was to create a new implementation of Stream that simply returned this on methods like map() and filter(). That seemed to work much better, with some remarkable speedups for complex stream pipelines when the collection was actually empty.

I spent a couple of days working on the code, writing extensive unit tests, microbenchmarks, etc. Unfortunately the current OpenJDK unit tests do a lot of white-box testing. They downcast the stream instances to the package-private AbstractPipeline in order to call implementation specific methods. Since I did not know the tests well enough, I thought it would be better to present my work-in-progress to the core JDK team. They gave me some excellent feedback.

Firstly, an EmptyStream should not simply return this from methods such as map() and filter(), since we should guard inside the stream against multiple operations being called on the same stream. This would mean we would need to create a new instance of our EmptyStream each time we call a new decorator function. Fortunately, escape analysis is rather good at getting rid of unnecessary objects for us, so the cost is not nearly as prohibitive as I had imagined.

Secondly, it would be best to keep characteristics of the streams the same as before. This is something that I noticed in my own testing. For example, if we create a stream from an ArrayList, it should be ORDERED|SIZED|SUBSIZED. If instead, we create it from a ConcurrentSkipListSet, it should be CONCURRENT|DISTINCT|NONNULL|ORDERED|SORTED. It then gets challenging. If we make the stream unordered(), then the ArrayList becomes SIZED|SUBSIZED (this makes sense) and the ConcurrentSkipListSet becomes DISTINCT|SORTED, thus besides ORDERED, it strangely also drops the characteristics CONCURRENT|NONNULL. Ah, but not so fast. If the ConcurrentSkipListSet was constructed with a Comparator for the sorting of its elements, then we also drop the SORTED characteristic. As you can imagine, this occupied quite a few hours to reverse engineer out of the existing stream implementations.

Thirdly, the current implementation of streams creates the streams lazily. This means that the following actually works and prints: "hello world"

public class HelloWorld {
  public static void main(String... args) {
    var list = new ArrayList<String>();
    var stream = list.stream();
    Collections.addAll(list, "hello", " ", "world");
    stream.forEach(System.out::print);
  }
}

Whilst I do know at least three speakers who would delight their audience with such nuggets, it is something that I have never done in my code, and certainly would flag as suspect if I saw it during a code review. If we create an EmptyStream as soon as we discover that the collection is empty, then the code above would not print anything. It is debatable which behavior is preferable, but what we cannot deny is that this is the current state of stream. With millions of streams used every day, we have to be fastidious not to change anything. That lazy behavior is different for streams that are either IMMUTABLE or CONCURRENT.

Despite the obstacles, I soldiered on and my final contribution consists of just under 1500 LOC of changes to the java.util.stream package, about 2500 LOC of unit tests and a two micro benchmarks. You can see the PR here. However, please don't hold your breath for this one - it will almost certainly not be accepted soon (if ever). It is a very big change, and it would modify the behavior of the streams. Also, the current unit tests would need a lot of work to make them work with empty streams as well. My benefit was that I got to understand streams a bit better, and a little newsletter was born.

Before showing the EmptyStreams implementation, I would suggest the following workaround. Whenever you have a stream being used in a hot method, and you have a more than 80% chance of that stream being empty, it would be best to begin the method with a guard. Something like if (list.isEmpty()) return; This way, we avoid the cost of setting up the pipeline, only to throw it all away again.

I will show the code for the EmptyStream, including code for the primitive streams at the end of the newsletter. It is quite long, and I don't expect you to read this in too much detail. However, before we close, there was one trick that saved me a lot of effort during testing. I wanted to make sure that none of the functional interfaces that could be invoked on an empty stream would ever be called. I thus wanted to throw an AssertionError if anyone ever tried to call any method. But how to do that? I started writing lambdas for each functional interface, but that became tedious. There are a lot. And then I remembered dynamic proxies. Here is the code that made it super easy for me to create assertion throwing functional interfaces:

protected final <T> T failing(Class<T> clazz) {
  return clazz.cast(
    Proxy.newProxyInstance(
      clazz.getClassLoader(),
      new Class<?>[]{clazz},
      (proxy, method, args) -> {
        throw new AssertionError();
      }));
}

We could then use this to create an arbitrary number of functional interfaces that would throw an AssertionError if any method was called. Read more about dynamic proxies in my free book.

Kind regards

Heinz

P.S. A shout-out to Philippe Marschall, who started a similar project for empty streams.

And now for the long long code...

class Streams {
  private static final int DISTINCT = Spliterator.DISTINCT; // 0x1
  private static final int OPERATED_ON = 0x2;
  private static final int SORTED = Spliterator.SORTED; // 0x4
  private static final int CLOSED = 0x8;
  private static final int ORDERED = Spliterator.ORDERED; // 0x10
  private static final int PARALLEL = 0x20;
  private static final int SIZED = Spliterator.SIZED; // 0x40
  private static final int NONNULL = Spliterator.NONNULL; // 0x100
  private static final int IMMUTABLE = Spliterator.IMMUTABLE; // 0x400
  private static final int CONCURRENT = Spliterator.CONCURRENT; // 0x1000
  private static final int SUBSIZED = Spliterator.SUBSIZED; // 0x4000
  private static final int EXTRA_FLAGS = OPERATED_ON|CLOSED|PARALLEL;

  private static sealed class EmptyBaseStream {
    private int state = 0;

    public EmptyBaseStream(EmptyBaseStream input) {
      this.state = input.state & ~(OPERATED_ON);
    }

    public EmptyBaseStream(Spliterator<?> spliterator) {
      this.state = spliterator.characteristics();
    }

    protected final void checkIfOperatedOnOrClosed() {
      if ((state & (OPERATED_ON|CLOSED)) != 0) {
        throw new IllegalStateException(
            "stream has already been operated upon or closed"
        );
      }
    }

    protected final void checkIfOperatedOnOrClosedAndChangeState() {
      checkIfOperatedOnOrClosed();
      state |= OPERATED_ON;
    }

    protected final void stateDistinct() {
      state |= DISTINCT;
      state &= ~(SIZED|SUBSIZED|CONCURRENT|NONNULL|IMMUTABLE);
      if (hasComparator()) state &= ~SORTED;
    }

    protected final void stateDistinctPrimitiveStream() {
      state &= ~(SIZED|SUBSIZED|IMMUTABLE|SORTED);
    }

    protected void stateSorted() {
      state |= SORTED|ORDERED;
      state &= ~(CONCURRENT|NONNULL|IMMUTABLE);
    }

    protected final boolean isSorted() {
      return (state & SORTED) == SORTED;
    }

    protected boolean unorderedSame() {
      if ((state & ORDERED) == ORDERED) {
        state &= ~(CONCURRENT|NONNULL|ORDERED|IMMUTABLE);
        if (hasComparator()) state &= ~SORTED;
        state |= OPERATED_ON;
        return false;
      }
      return true;
    }

    protected final int stateBareCharacteristics() {
      return state & ~EXTRA_FLAGS;
    }

    public final void close() {
      // nothing to do
      state |= CLOSED;
    }

    public boolean isParallel() {
      return false;
    }

    protected final void checkParametersAndThenState(Object parameter) {
      Objects.requireNonNull(parameter);
      checkIfOperatedOnOrClosedAndChangeState();
    }

    protected final void checkParametersAndThenState(Object parameter1, Object parameter2) {
      Objects.requireNonNull(parameter1);
      Objects.requireNonNull(parameter2);
      checkIfOperatedOnOrClosedAndChangeState();
    }

    protected final void checkParametersAndThenState(Object parameter1, Object parameter2, Object parameter3) {
      Objects.requireNonNull(parameter1);
      Objects.requireNonNull(parameter2);
      Objects.requireNonNull(parameter3);
      checkIfOperatedOnOrClosedAndChangeState();
    }

    protected final void checkStateAndThenParameters(Object parameter) {
      // for some of the methods, we first check the state and then the parameter
      checkIfOperatedOnOrClosedAndChangeState();
      Objects.requireNonNull(parameter);
    }

    protected final <R> EmptyStream<R> nextEmptyStream(Object parameter) {
      checkParametersAndThenState(parameter);
      return new EmptyStream<>(this);
    }

    protected final EmptyIntStream nextEmptyIntStream(Object parameter) {
      checkParametersAndThenState(parameter);
      return new EmptyIntStream(this);
    }

    protected final EmptyLongStream nextEmptyLongStream(Object parameter) {
      checkParametersAndThenState(parameter);
      return new EmptyLongStream(this);
    }

    protected final EmptyDoubleStream nextEmptyDoubleStream(Object parameter) {
      checkParametersAndThenState(parameter);
      return new EmptyDoubleStream(this);
    }

    protected boolean hasComparator() {
      return false;
    }
  }


  /**
   * EmptyStream is an optimization to reduce object allocation
   * during stream creation for empty streams. Most of the
   * methods such as filter() and map() will return "this".
   * We have tried to mirror the behavior of the previous
   * Stream.empty() for spliterator characteristics, parallel()
   * and
   *
   * @param <T>
   */
  static final class EmptyStream<T> extends EmptyBaseStream implements Stream<T> {
    private final Comparator<? super T> comparator;

    public EmptyStream(EmptyBaseStream input) {
      super(input);
      comparator = null;
    }

    public EmptyStream(Spliterator<T> spliterator) {
      super(spliterator);
      comparator = spliterator.hasCharacteristics(Spliterator.SORTED) ?
          spliterator.getComparator() : null;
    }

    @Override
    public Stream<T> filter(Predicate<? super T> predicate) {
      return nextEmptyStream(predicate);
    }

    @Override
    public <R> Stream<R> map(Function<? super T, ? extends R> mapper) {
      return nextEmptyStream(mapper);
    }

    @Override
    public IntStream mapToInt(ToIntFunction<? super T> mapper) {
      return nextEmptyIntStream(mapper);
    }

    @Override
    public LongStream mapToLong(ToLongFunction<? super T> mapper) {
      return nextEmptyLongStream(mapper);
    }

    @Override
    public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) {
      return nextEmptyDoubleStream(mapper);
    }

    @Override
    public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) {
      return nextEmptyStream(mapper);
    }

    @Override
    public IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper) {
      return nextEmptyIntStream(mapper);
    }

    @Override
    public LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper) {
      return nextEmptyLongStream(mapper);
    }

    @Override
    public DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper) {
      return nextEmptyDoubleStream(mapper);
    }

    @Override
    public <R> Stream<R> mapMulti(BiConsumer<? super T, ? super Consumer<R>> mapper) {
      return nextEmptyStream(mapper);
    }

    @Override
    public IntStream mapMultiToInt(BiConsumer<? super T, ? super IntConsumer> mapper) {
      return nextEmptyIntStream(mapper);
    }

    @Override
    public LongStream mapMultiToLong(BiConsumer<? super T, ? super LongConsumer> mapper) {
      return nextEmptyLongStream(mapper);
    }

    @Override
    public DoubleStream mapMultiToDouble(BiConsumer<? super T, ? super DoubleConsumer> mapper) {
      return nextEmptyDoubleStream(mapper);
    }

    @Override
    public Stream<T> distinct() {
      checkIfOperatedOnOrClosedAndChangeState();
      super.stateDistinct();
      return new EmptyStream<>(this);
    }

    @Override
    protected boolean hasComparator() {
      return comparator != null;
    }

    @Override
    public Stream<T> sorted() {
      checkIfOperatedOnOrClosedAndChangeState();
      super.stateSorted();
      return new EmptyStream<>(this);
    }

    @Override
    public Stream<T> sorted(Comparator<? super T> comparator) {
      checkStateAndThenParameters(comparator);
      return new EmptyStream<>(this);
    }

    @Override
    public Stream<T> peek(Consumer<? super T> action) {
      return nextEmptyStream(action);
    }

    @Override
    public Stream<T> limit(long maxSize) {
      if (maxSize < 0)
        throw new IllegalArgumentException(Long.toString(maxSize));
      checkIfOperatedOnOrClosedAndChangeState();
      return new EmptyStream<>(this);
    }

    @Override
    public Stream<T> skip(long n) {
      if (n < 0)
        throw new IllegalArgumentException(Long.toString(n));
      if (n == 0) return this;
      checkIfOperatedOnOrClosedAndChangeState();
      return new EmptyStream<>(this);
    }

    @Override
    public Stream<T> takeWhile(Predicate<? super T> predicate) {
      return nextEmptyStream(predicate);
    }

    @Override
    public Stream<T> dropWhile(Predicate<? super T> predicate) {
      return nextEmptyStream(predicate);
    }

    @Override
    public void forEach(Consumer<? super T> action) {
      checkStateAndThenParameters(action);
      // do nothing
    }

    @Override
    public void forEachOrdered(Consumer<? super T> action) {
      checkStateAndThenParameters(action);
      // do nothing
    }

    private static final Object[] EMPTY_ARRAY = {};

    @Override
    public Object[] toArray() {
      checkIfOperatedOnOrClosedAndChangeState();
      return EMPTY_ARRAY;
    }

    @Override
    public <A> A[] toArray(IntFunction<A[]> generator) {
      checkStateAndThenParameters(generator);
      return Objects.requireNonNull(generator.apply(0));
    }

    @Override
    public T reduce(T identity, BinaryOperator<T> accumulator) {
      checkParametersAndThenState(accumulator);
      return identity;
    }

    @Override
    public Optional<T> reduce(BinaryOperator<T> accumulator) {
      checkParametersAndThenState(accumulator);
      return Optional.empty();
    }

    @Override
    public <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) {
      checkParametersAndThenState(accumulator, combiner);
      return identity;
    }

    @Override
    public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
      checkParametersAndThenState(supplier, accumulator, combiner);
      return supplier.get();
    }

    @Override
    public <R, A> R collect(Collector<? super T, A, R> collector) {
      checkParametersAndThenState(collector);
      return collector.finisher().apply(collector.supplier().get());
    }

    @Override
    public List<T> toList() {
      checkIfOperatedOnOrClosedAndChangeState();
      return List.of();
    }

    @Override
    public Optional<T> min(Comparator<? super T> comparator) {
      checkParametersAndThenState(comparator);
      return Optional.empty();
    }

    @Override
    public Optional<T> max(Comparator<? super T> comparator) {
      checkParametersAndThenState(comparator);
      return Optional.empty();
    }

    @Override
    public long count() {
      checkIfOperatedOnOrClosedAndChangeState();
      return 0L;
    }

    @Override
    public boolean anyMatch(Predicate<? super T> predicate) {
      checkParametersAndThenState(predicate);
      return false;
    }

    @Override
    public boolean allMatch(Predicate<? super T> predicate) {
      checkParametersAndThenState(predicate);
      return true;
    }

    @Override
    public boolean noneMatch(Predicate<? super T> predicate) {
      checkParametersAndThenState(predicate);
      return true;
    }

    @Override
    public Optional<T> findFirst() {
      checkIfOperatedOnOrClosedAndChangeState();
      return Optional.empty();
    }

    @Override
    public Optional<T> findAny() {
      checkIfOperatedOnOrClosedAndChangeState();
      return Optional.empty();
    }

    @Override
    public Iterator<T> iterator() {
      checkIfOperatedOnOrClosedAndChangeState();
      return Collections.emptyIterator();
    }

    @Override
    public Spliterator<T> spliterator() {
      checkIfOperatedOnOrClosedAndChangeState();
      if (isSorted())
        return new EmptySpliterator.OfRefSorted<>(stateBareCharacteristics(), comparator);
      else
        return new EmptySpliterator.OfRef<>(stateBareCharacteristics());
    }

    @Override
    public Stream<T> sequential() {
      return this;
    }

    @Override
    public Stream<T> parallel() {
      return StreamSupport.stream(spliterator(), true);
    }

    @Override
    public Stream<T> unordered() {
      checkIfOperatedOnOrClosed();
      if (super.unorderedSame()) return this;
      else return new EmptyStream<>(this);
    }

    @Override
    public Stream<T> onClose(Runnable closeHandler) {
      return StreamSupport.stream(
          spliterator(), isParallel()
      ).onClose(closeHandler);
    }
  }

  // TODO: https://github.com/marschall > 4. I made my `EmptyBaseStream`
  //  implement `BaseStream` and make `EmptyIntLongDoubleStream` extend
  //  from this class as `IntLongDoubleStream` all extend `BaseStream`.
  //  This allowed me to move the following methods up in the hierarchy
  //  `#isParallel` , `#onClose`, `#sequential`, `#parallel`, `#unordered`.
  static final class EmptyIntStream extends EmptyBaseStream implements IntStream {
    public EmptyIntStream(EmptyBaseStream input) {
      super(input);
    }

    public EmptyIntStream(Spliterator.OfInt spliterator) {
      super(spliterator);
    }

    @Override
    public IntStream filter(IntPredicate predicate) {
      return nextEmptyIntStream(predicate);
    }

    @Override
    public IntStream map(IntUnaryOperator mapper) {
      return nextEmptyIntStream(mapper);
    }

    @Override
    public <U> Stream<U> mapToObj(IntFunction<? extends U> mapper) {
      return nextEmptyStream(mapper);
    }

    @Override
    public LongStream mapToLong(IntToLongFunction mapper) {
      return nextEmptyLongStream(mapper);
    }

    @Override
    public DoubleStream mapToDouble(IntToDoubleFunction mapper) {
      return nextEmptyDoubleStream(mapper);
    }

    @Override
    public IntStream flatMap(IntFunction<? extends IntStream> mapper) {
      return nextEmptyIntStream(mapper);
    }

    @Override
    public IntStream mapMulti(IntMapMultiConsumer mapper) {
      return nextEmptyIntStream(mapper);
    }

    @Override
    public IntStream distinct() {
      checkIfOperatedOnOrClosedAndChangeState();
      super.stateDistinctPrimitiveStream();
      return new EmptyIntStream(this);
    }

    @Override
    public IntStream sorted() {
      checkIfOperatedOnOrClosedAndChangeState();
      super.stateSorted();
      return new EmptyIntStream(this);
    }

    @Override
    public IntStream peek(IntConsumer action) {
      return nextEmptyIntStream(action);
    }

    @Override
    public IntStream limit(long maxSize) {
      if (maxSize < 0)
        throw new IllegalArgumentException(Long.toString(maxSize));
      checkIfOperatedOnOrClosedAndChangeState();
      return new EmptyIntStream(this);
    }

    @Override
    public IntStream skip(long n) {
      if (n < 0)
        throw new IllegalArgumentException(Long.toString(n));
      if (n == 0) return this;
      checkIfOperatedOnOrClosedAndChangeState();
      return new EmptyIntStream(this);
    }

    @Override
    public IntStream takeWhile(IntPredicate predicate) {
      return nextEmptyIntStream(predicate);
    }

    @Override
    public IntStream dropWhile(IntPredicate predicate) {
      return nextEmptyIntStream(predicate);
    }

    @Override
    public void forEach(IntConsumer action) {
      checkStateAndThenParameters(action);
      // do nothing
    }

    @Override
    public void forEachOrdered(IntConsumer action) {
      checkStateAndThenParameters(action);
      // do nothing
    }

    private static final int[] EMPTY_ARRAY = {};

    @Override
    public int[] toArray() {
      checkIfOperatedOnOrClosedAndChangeState();
      return EMPTY_ARRAY;
    }

    @Override
    public int reduce(int identity, IntBinaryOperator op) {
      checkParametersAndThenState(op);
      return identity;
    }

    @Override
    public OptionalInt reduce(IntBinaryOperator op) {
      checkParametersAndThenState(op);
      return OptionalInt.empty();
    }

    @Override
    public <R> R collect(Supplier<R> supplier, ObjIntConsumer<R> accumulator, BiConsumer<R, R> combiner) {
      checkParametersAndThenState(supplier, accumulator, combiner);
      return supplier.get();
    }

    @Override
    public OptionalInt min() {
      checkIfOperatedOnOrClosedAndChangeState();
      return OptionalInt.empty();
    }

    @Override
    public OptionalInt max() {
      checkIfOperatedOnOrClosedAndChangeState();
      return OptionalInt.empty();
    }

    @Override
    public long count() {
      checkIfOperatedOnOrClosedAndChangeState();
      return 0L;
    }

    @Override
    public boolean anyMatch(IntPredicate predicate) {
      checkParametersAndThenState(predicate);
      return false;
    }

    @Override
    public boolean allMatch(IntPredicate predicate) {
      checkParametersAndThenState(predicate);
      return true;
    }

    @Override
    public boolean noneMatch(IntPredicate predicate) {
      checkParametersAndThenState(predicate);
      return true;
    }

    @Override
    public OptionalInt findFirst() {
      checkIfOperatedOnOrClosedAndChangeState();
      return OptionalInt.empty();
    }

    @Override
    public OptionalInt findAny() {
      checkIfOperatedOnOrClosedAndChangeState();
      return OptionalInt.empty();
    }

    private static final PrimitiveIterator.OfInt EMPTY_ITERATOR =
        new PrimitiveIterator.OfInt() {
          @Override
          public int nextInt() {
            throw new NoSuchElementException();
          }

          @Override
          public boolean hasNext() {
            return false;
          }
        };


    @Override
    public PrimitiveIterator.OfInt iterator() {
      checkIfOperatedOnOrClosedAndChangeState();
      return EMPTY_ITERATOR;
    }

    @Override
    public Spliterator.OfInt spliterator() {
      checkIfOperatedOnOrClosedAndChangeState();
      return new EmptySpliterator.OfInt(stateBareCharacteristics());
    }

    @Override
    public IntStream sequential() {
      return this;
    }

    @Override
    public IntStream parallel() {
      return StreamSupport.intStream(spliterator(), true);
    }

    @Override
    public IntStream unordered() {
      checkIfOperatedOnOrClosed();
      if (super.unorderedSame()) return this;
      else return new EmptyIntStream(this);
    }

    @Override
    public IntStream onClose(Runnable closeHandler) {
      return StreamSupport.intStream(
          spliterator(), isParallel()
      ).onClose(closeHandler);
    }

    @Override
    public int sum() {
      checkIfOperatedOnOrClosedAndChangeState();
      return 0;
    }

    @Override
    public OptionalDouble average() {
      checkIfOperatedOnOrClosedAndChangeState();
      return OptionalDouble.empty();
    }

    @Override
    public IntSummaryStatistics summaryStatistics() {
      checkIfOperatedOnOrClosedAndChangeState();
      return new IntSummaryStatistics();
    }

    @Override
    public LongStream asLongStream() {
      checkIfOperatedOnOrClosedAndChangeState();
      return new EmptyLongStream(this);
    }

    @Override
    public DoubleStream asDoubleStream() {
      checkIfOperatedOnOrClosedAndChangeState();
      return new EmptyDoubleStream(this);
    }

    @Override
    public Stream<Integer> boxed() {
      checkIfOperatedOnOrClosedAndChangeState();
      return new EmptyStream<>(this);
    }
  }

  static final class EmptyLongStream extends EmptyBaseStream implements LongStream {
    public EmptyLongStream(EmptyBaseStream input) {
      super(input);
    }

    public EmptyLongStream(Spliterator.OfLong spliterator) {
      super(spliterator);
    }

    @Override
    public LongStream filter(LongPredicate predicate) {
      return nextEmptyLongStream(predicate);
    }

    @Override
    public LongStream map(LongUnaryOperator mapper) {
      return nextEmptyLongStream(mapper);
    }

    @Override
    public <U> Stream<U> mapToObj(LongFunction<? extends U> mapper) {
      return nextEmptyStream(mapper);
    }

    @Override
    public IntStream mapToInt(LongToIntFunction mapper) {
      return nextEmptyIntStream(mapper);
    }

    @Override
    public DoubleStream mapToDouble(LongToDoubleFunction mapper) {
      return nextEmptyDoubleStream(mapper);
    }

    @Override
    public LongStream flatMap(LongFunction<? extends LongStream> mapper) {
      return nextEmptyLongStream(mapper);
    }

    @Override
    public LongStream mapMulti(LongMapMultiConsumer mapper) {
      return nextEmptyLongStream(mapper);
    }

    @Override
    public LongStream distinct() {
      checkIfOperatedOnOrClosedAndChangeState();
      super.stateDistinctPrimitiveStream();
      return new EmptyLongStream(this);
    }

    @Override
    public LongStream sorted() {
      checkIfOperatedOnOrClosedAndChangeState();
      super.stateSorted();
      return new EmptyLongStream(this);
    }

    @Override
    public LongStream peek(LongConsumer action) {
      return nextEmptyLongStream(action);
    }

    @Override
    public LongStream limit(long maxSize) {
      if (maxSize < 0)
        throw new IllegalArgumentException(Long.toString(maxSize));
      checkIfOperatedOnOrClosedAndChangeState();
      return new EmptyLongStream(this);
    }

    @Override
    public LongStream skip(long n) {
      if (n < 0)
        throw new IllegalArgumentException(Long.toString(n));
      if (n == 0) return this;
      checkIfOperatedOnOrClosedAndChangeState();
      return new EmptyLongStream(this);
    }

    @Override
    public LongStream takeWhile(LongPredicate predicate) {
      return nextEmptyLongStream(predicate);
    }

    @Override
    public LongStream dropWhile(LongPredicate predicate) {
      return nextEmptyLongStream(predicate);
    }

    @Override
    public void forEach(LongConsumer action) {
      checkStateAndThenParameters(action);
      // do nothing
    }

    @Override
    public void forEachOrdered(LongConsumer action) {
      checkStateAndThenParameters(action);
      // do nothing
    }

    private static final long[] EMPTY_ARRAY = {};

    @Override
    public long[] toArray() {
      checkIfOperatedOnOrClosedAndChangeState();
      return EMPTY_ARRAY;
    }

    @Override
    public long reduce(long identity, LongBinaryOperator op) {
      checkParametersAndThenState(op);
      return identity;
    }

    @Override
    public OptionalLong reduce(LongBinaryOperator op) {
      checkParametersAndThenState(op);
      return OptionalLong.empty();
    }

    @Override
    public <R> R collect(Supplier<R> supplier, ObjLongConsumer<R> accumulator, BiConsumer<R, R> combiner) {
      checkParametersAndThenState(supplier, accumulator, combiner);
      return supplier.get();
    }

    @Override
    public OptionalLong min() {
      checkIfOperatedOnOrClosedAndChangeState();
      return OptionalLong.empty();
    }

    @Override
    public OptionalLong max() {
      checkIfOperatedOnOrClosedAndChangeState();
      return OptionalLong.empty();
    }

    @Override
    public long count() {
      checkIfOperatedOnOrClosedAndChangeState();
      return 0L;
    }

    @Override
    public boolean anyMatch(LongPredicate predicate) {
      checkParametersAndThenState(predicate);
      return false;
    }

    @Override
    public boolean allMatch(LongPredicate predicate) {
      checkParametersAndThenState(predicate);
      return true;
    }

    @Override
    public boolean noneMatch(LongPredicate predicate) {
      checkParametersAndThenState(predicate);
      return true;
    }

    @Override
    public OptionalLong findFirst() {
      checkIfOperatedOnOrClosedAndChangeState();
      return OptionalLong.empty();
    }

    @Override
    public OptionalLong findAny() {
      checkIfOperatedOnOrClosedAndChangeState();
      return OptionalLong.empty();
    }

    private static final PrimitiveIterator.OfLong EMPTY_ITERATOR =
        new PrimitiveIterator.OfLong() {
          @Override
          public long nextLong() {
            throw new NoSuchElementException();
          }

          @Override
          public boolean hasNext() {
            return false;
          }
        };


    @Override
    public PrimitiveIterator.OfLong iterator() {
      checkIfOperatedOnOrClosedAndChangeState();
      return EMPTY_ITERATOR;
    }

    @Override
    public Spliterator.OfLong spliterator() {
      checkIfOperatedOnOrClosedAndChangeState();
      return new EmptySpliterator.OfLong(stateBareCharacteristics());
    }

    @Override
    public LongStream sequential() {
      return this;
    }

    @Override
    public LongStream parallel() {
      return StreamSupport.longStream(spliterator(), true);
    }

    @Override
    public LongStream unordered() {
      checkIfOperatedOnOrClosed();
      if (super.unorderedSame()) return this;
      else return new EmptyLongStream(this);
    }

    @Override
    public LongStream onClose(Runnable closeHandler) {
      return StreamSupport.longStream(
          spliterator(), isParallel()
      ).onClose(closeHandler);
    }

    @Override
    public long sum() {
      checkIfOperatedOnOrClosedAndChangeState();
      return 0;
    }

    @Override
    public OptionalDouble average() {
      checkIfOperatedOnOrClosedAndChangeState();
      return OptionalDouble.empty();
    }

    @Override
    public LongSummaryStatistics summaryStatistics() {
      checkIfOperatedOnOrClosedAndChangeState();
      return new LongSummaryStatistics();
    }

    @Override
    public DoubleStream asDoubleStream() {
      checkIfOperatedOnOrClosedAndChangeState();
      return new EmptyDoubleStream(this);
    }

    @Override
    public Stream<Long> boxed() {
      checkIfOperatedOnOrClosedAndChangeState();
      return new EmptyStream<>(this);
    }
  }

  static final class EmptyDoubleStream extends EmptyBaseStream implements DoubleStream {
    public EmptyDoubleStream(EmptyBaseStream input) {
      super(input);
    }

    public EmptyDoubleStream(Spliterator.OfDouble spliterator) {
      super(spliterator);
    }

    @Override
    public DoubleStream filter(DoublePredicate predicate) {
      return nextEmptyDoubleStream(predicate);
    }

    @Override
    public DoubleStream map(DoubleUnaryOperator mapper) {
      return nextEmptyDoubleStream(mapper);
    }

    @Override
    public <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) {
      return nextEmptyStream(mapper);
    }

    @Override
    public IntStream mapToInt(DoubleToIntFunction mapper) {
      return nextEmptyIntStream(mapper);
    }

    @Override
    public LongStream mapToLong(DoubleToLongFunction mapper) {
      return nextEmptyLongStream(mapper);
    }

    @Override
    public DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) {
      return nextEmptyDoubleStream(mapper);
    }

    @Override
    public DoubleStream mapMulti(DoubleMapMultiConsumer mapper) {
      return nextEmptyDoubleStream(mapper);
    }

    @Override
    public DoubleStream distinct() {
      checkIfOperatedOnOrClosedAndChangeState();
      super.stateDistinctPrimitiveStream();
      return new EmptyDoubleStream(this);
    }

    @Override
    public DoubleStream sorted() {
      checkIfOperatedOnOrClosedAndChangeState();
      super.stateSorted();
      return new EmptyDoubleStream(this);
    }

    @Override
    public DoubleStream peek(DoubleConsumer action) {
      checkParametersAndThenState(action);
      return new EmptyDoubleStream(this);
    }

    @Override
    public DoubleStream limit(long maxSize) {
      if (maxSize < 0)
        throw new IllegalArgumentException(Double.toString(maxSize));
      checkIfOperatedOnOrClosedAndChangeState();
      return new EmptyDoubleStream(this);
    }

    @Override
    public DoubleStream skip(long n) {
      if (n < 0)
        throw new IllegalArgumentException(Double.toString(n));
      if (n == 0) return this;
      checkIfOperatedOnOrClosedAndChangeState();
      return new EmptyDoubleStream(this);
    }

    @Override
    public DoubleStream takeWhile(DoublePredicate predicate) {
      return nextEmptyDoubleStream(predicate);
    }

    @Override
    public DoubleStream dropWhile(DoublePredicate predicate) {
      return nextEmptyDoubleStream(predicate);
    }

    @Override
    public void forEach(DoubleConsumer action) {
      checkStateAndThenParameters(action);
      // do nothing
    }

    @Override
    public void forEachOrdered(DoubleConsumer action) {
      checkStateAndThenParameters(action);
      // do nothing
    }

    private static final double[] EMPTY_ARRAY = {};

    @Override
    public double[] toArray() {
      checkIfOperatedOnOrClosedAndChangeState();
      return EMPTY_ARRAY;
    }

    @Override
    public double reduce(double identity, DoubleBinaryOperator op) {
      checkParametersAndThenState(op);
      return identity;
    }

    @Override
    public OptionalDouble reduce(DoubleBinaryOperator op) {
      checkParametersAndThenState(op);
      return OptionalDouble.empty();
    }

    @Override
    public <R> R collect(Supplier<R> supplier, ObjDoubleConsumer<R> accumulator, BiConsumer<R, R> combiner) {
      checkParametersAndThenState(supplier, accumulator, combiner);
      return supplier.get();
    }

    @Override
    public OptionalDouble min() {
      checkIfOperatedOnOrClosedAndChangeState();
      return OptionalDouble.empty();
    }

    @Override
    public OptionalDouble max() {
      checkIfOperatedOnOrClosedAndChangeState();
      return OptionalDouble.empty();
    }

    @Override
    public long count() {
      checkIfOperatedOnOrClosedAndChangeState();
      return 0L;
    }

    @Override
    public boolean anyMatch(DoublePredicate predicate) {
      checkParametersAndThenState(predicate);
      return false;
    }

    @Override
    public boolean allMatch(DoublePredicate predicate) {
      checkParametersAndThenState(predicate);
      return true;
    }

    @Override
    public boolean noneMatch(DoublePredicate predicate) {
      checkParametersAndThenState(predicate);
      return true;
    }

    @Override
    public OptionalDouble findFirst() {
      checkIfOperatedOnOrClosedAndChangeState();
      return OptionalDouble.empty();
    }

    @Override
    public OptionalDouble findAny() {
      checkIfOperatedOnOrClosedAndChangeState();
      return OptionalDouble.empty();
    }

    private static final PrimitiveIterator.OfDouble EMPTY_ITERATOR =
        new PrimitiveIterator.OfDouble() {
          @Override
          public double nextDouble() {
            throw new NoSuchElementException();
          }

          @Override
          public boolean hasNext() {
            return false;
          }
        };


    @Override
    public PrimitiveIterator.OfDouble iterator() {
      checkIfOperatedOnOrClosedAndChangeState();
      return EMPTY_ITERATOR;
    }

    @Override
    public Spliterator.OfDouble spliterator() {
      checkIfOperatedOnOrClosedAndChangeState();
      return new EmptySpliterator.OfDouble(stateBareCharacteristics());
    }

    @Override
    public DoubleStream sequential() {
      return this;
    }

    @Override
    public DoubleStream parallel() {
      return StreamSupport.doubleStream(spliterator(), true);
    }

    @Override
    public DoubleStream unordered() {
      checkIfOperatedOnOrClosed();
      if (super.unorderedSame()) return this;
      else return new EmptyDoubleStream(this);
    }

    @Override
    public DoubleStream onClose(Runnable closeHandler) {
      return StreamSupport.doubleStream(
          spliterator(), isParallel()
      ).onClose(closeHandler);
    }

    @Override
    public double sum() {
      checkIfOperatedOnOrClosedAndChangeState();
      return 0;
    }

    @Override
    public OptionalDouble average() {
      checkIfOperatedOnOrClosedAndChangeState();
      return OptionalDouble.empty();
    }

    @Override
    public DoubleSummaryStatistics summaryStatistics() {
      checkIfOperatedOnOrClosedAndChangeState();
      return new DoubleSummaryStatistics();
    }

    @Override
    public Stream<Double> boxed() {
      checkIfOperatedOnOrClosedAndChangeState();
      return new EmptyStream<>(this);
    }
  }

  private abstract static class EmptySpliterator<T, S extends Spliterator<T>, C> {
    private final int characteristics;

    EmptySpliterator(int characteristics) {
      this.characteristics = characteristics;
    }

    public S trySplit() {
      return null;
    }

    public boolean tryAdvance(C consumer) {
      Objects.requireNonNull(consumer);
      return false;
    }

    public void forEachRemaining(C consumer) {
      Objects.requireNonNull(consumer);
    }

    public long estimateSize() {
      return 0;
    }

    public int characteristics() {
      return characteristics;
    }

    private static final class OfRef<T>
        extends EmptySpliterator<T, Spliterator<T>, Consumer<? super T>>
        implements Spliterator<T> {
      OfRef(int characteristics) {
        super(characteristics);
      }
    }

    private static final class OfRefSorted<T>
        extends EmptySpliterator<T, Spliterator<T>, Consumer<? super T>>
        implements Spliterator<T> {
      private final Comparator<? super T> comparator;

      OfRefSorted(int characteristics, Comparator<? super T> comparator) {
        super(characteristics);
        if (!hasCharacteristics(SORTED))
          throw new IllegalArgumentException("Spliterator only for SORTED");
        this.comparator = comparator;
      }

      @Override
      public Comparator<? super T> getComparator() {
        return comparator;
      }
    }

    private static final class OfInt
        extends EmptySpliterator<Integer, Spliterator.OfInt, IntConsumer>
        implements Spliterator.OfInt {
      OfInt(int characteristics) {
        super(characteristics);
      }
    }

    private static final class OfLong
        extends EmptySpliterator<Long, Spliterator.OfLong, LongConsumer>
        implements Spliterator.OfLong {
      OfLong(int characteristics) {
        super(characteristics);
      }
    }

    private static final class OfDouble
        extends EmptySpliterator<Double, Spliterator.OfDouble, DoubleConsumer>
        implements Spliterator.OfDouble {
      OfDouble(int characteristics) {
        super(characteristics);
      }
    }
  }
}
 

Comments

We are always happy to receive comments from our readers. Feel free to send me a comment via email or discuss the newsletter in our JavaSpecialists Slack Channel (Get an invite here)

When you load these comments, you'll be connected to Disqus. Privacy Statement.

Related Articles

Browse the Newsletter Archive

About the Author

Heinz Kabutz Java Conference Speaker

Java Champion, author of the Javaspecialists Newsletter, conference speaking regular... About Heinz

Superpack '23

Superpack '24 Our entire Java Specialists Training in one huge bundle more...

Free Java Book

Dynamic Proxies in Java Book
Java Training

We deliver relevant courses, by top Java developers to produce more resourceful and efficient programmers within their organisations.

Java Consulting

We can help make your Java application run faster and trouble-shoot concurrency and performance bugs...