Abstract: If we have a List<Stream<T>>, each stream with sorted elements, how can we generate a sorted Stream<T>, taking one at a time from each stream? In this newsletter we show how to do that with the Stream API and by writing our own MergingSortedSpliterator.
Welcome to the 289th edition of The Java(tm) Specialists' Newsletter. I know, I know, 36 hours ago I sent you a newsletter after two months of radio silence, and here is another one? Well, when I see an interesting puzzle, I find myself unable to resist trying to solve it. And that's what happened this morning. So here we are - I hope you enjoy it :-)
javaspecialists.teachable.com: Please visit our new self-study course catalog to see how you can upskill your Java knowledge.
As I rubbed the sleep out of my eyes this morning, I vaguely
made out the shape of a tweet with my name on it. Antoine
DESSAIGNE (@adessaigne)
had tweeted "Hello @java folks 👋, If you have a
List<Stream<T>>
, each stream with sorted elements, how can
you generate a sorted Stream<T>
, taking one at a time from
each stream? Managed to do that using iterators but is there
a clever way to do it with Stream API? Thanks 👍"
None of the Java greats had proposed an easy or performant
solution, so I grabbed my laptop and started typing. Antoine
had apparently created his own solution using Iterator
s, and
I had in mind a solution using Spliterator
s. A Spliterator
can easily be converted to a Stream
using
StreamSupport.stream()
. Having not seen Antoine's solution, I
imagine that he did something along the same lines. Obviously
we could just do:
list.stream() .flatMap(Function.identity()) .sorted();
In that case we would need to sort twice. I wanted to avoid that. However, the second sort would not be as expensive as the first. Timsort is efficient at sorting partially sorted lists. In addition, if we also parallelize the second stream sorting, then internally it will do a divide and conquer. There is a good chance that we would end up with chunks of items that are already mostly sorted.
But more fun is to write our own MergingSortedSpliterator
.
Some constraints are: the streams that are passed in have to
have the sorted characteristic, which also implies that their
Spliterator should return the Comparator
used for sorting.
All the streams must have used the same Comparator
. The
Comparator
does not have to be the same instance, but they
must match equals()
. Another constraint is that this Spliterator
does not support parallel streams.
The characteristics are an AND of all the contained spliterators
and the estimatedSize
is a sum of the spliterators, using
Long.MAX_VALUE
if we overflow.
Note: We updated the class a bit by not reading the first
element in the constructor. We also changed the spliterator
to never be distinct, otherwise distinct()
on
the stream becomes a no-op. (suggestions by @tagir_valeev)
import java.util.*; import java.util.function.*; import java.util.stream.*; public class MergingSortedSpliterator<T> implements Spliterator<T> { private final List<Spliterator<T>> spliterators; private final List<Iterator<T>> iterators; private final int characteristics; private final Object[] nextItem; private static final Object START_OF_STREAM = new Object(); private static final Object END_OF_STREAM = new Object(); private final Comparator<? super T> comparator; public MergingSortedSpliterator(Collection<Stream<T>> streams) { this.spliterators = streams.stream() .map(Stream::spliterator) .collect(Collectors.toList()); if (!spliterators.stream().allMatch( spliterator -> spliterator.hasCharacteristics(SORTED))) throw new IllegalArgumentException("Streams must be sorted"); Comparator<? super T> comparator = spliterators.stream() .map(Spliterator::getComparator) .reduce(null, (a, b) -> { if (Objects.equals(a, b)) return a; else throw new IllegalArgumentException( "Mismatching comparators " + a + " and " + b); }); this.comparator = Objects.requireNonNullElse(comparator, (Comparator<? super T>) Comparator.naturalOrder()); this.characteristics = spliterators.stream() .mapToInt(Spliterator::characteristics) .reduce((ch1, ch2) -> ch1 & ch2) .orElse(0) & ~DISTINCT; // Mask out DISTINCT // setting up iterators this.iterators = spliterators.stream() .map(Spliterators::iterator) .collect(Collectors.toList()); nextItem = new Object[streams.size()]; Arrays.fill(nextItem, START_OF_STREAM); } private Object fetchNext(Iterator<T> iterator) { return iterator.hasNext() ? iterator.next() : END_OF_STREAM; } public boolean tryAdvance(Consumer<? super T> action) { Objects.requireNonNull(action, "action==null"); if (nextItem.length == 0) return false; T smallest = null; int smallestIndex = -1; for (int i = 0; i < nextItem.length; i++) { Object o = nextItem[i]; if (o == START_OF_STREAM) nextItem[i] = o = fetchNext(iterators.get(i)); if (o != END_OF_STREAM) { T t = (T) o; if (smallest == null || comparator.compare(t, smallest) < 0) { smallest = t; smallestIndex = i; } } } // smallest might be null if the stream contains nulls if (smallestIndex == -1) return false; nextItem[smallestIndex] = fetchNext(iterators.get(smallestIndex)); action.accept(smallest); return true; } public Spliterator<T> trySplit() { // never split - parallel not supported return null; } public long estimateSize() { return spliterators.stream() .mapToLong(Spliterator::estimateSize) .reduce((ch1, ch2) -> { long result; if ((result = ch1 + ch2) < 0) result = Long.MAX_VALUE; return result; }) .orElse(0); } public int characteristics() { return characteristics; } public Comparator<? super T> getComparator() { return comparator; } }
We can then create a Stream
from the Spliterator using StreamSupport.stream(new MergingSortedSpliterator<>(streams), false)
. The false
means that the stream will be sequential and not parallel. As mentioned above, parallel
is not supported.
Here is a demo of the MergingSortedSpliterator
at work:
import java.util.*; import java.util.concurrent.*; import java.util.stream.*; public class SortedStreamOfSortedStreams { private static final int SIZE = 5; public static void main(String... args) { List<Stream<Integer>> streams = List.of( generateSortedRandom(SIZE), generateSortedRandom(SIZE), generateSortedRandom(SIZE), generateSortedRandom(SIZE) ); Stream<Integer> numbers = StreamSupport.stream( new MergingSortedSpliterator<>(streams), false ); numbers.forEach(System.out::println); } private static Stream<Integer> generateSortedRandom(int size) { return ThreadLocalRandom.current().ints(size, 0, size * 4) .parallel() .sorted() .boxed(); } }
For example, we might see output like this:
0 0 2 4 4 5 6 6 7 10 10 11 12 15 16 17 18 18 19 19
I ran some performance tests and as I expected, my MergingSortedSpliterator
is faster than a sorted flatMap.
However, even faster, at least on my machine, is a parallel sorted flatMap. Here is
a basic performance test:
import java.util.*; import java.util.function.*; import java.util.stream.*; public class PerformanceTest { private static final int SIZE = 10_000_000; private static final List<Function<List<Stream<Integer>>, Stream<Integer>>> MERGERS = List.of( s -> s.stream() .flatMap(Function.identity()) .sorted(), s -> s.stream() .flatMap(Function.identity()) .parallel() .sorted(), s -> StreamSupport.stream( new MergingSortedSpliterator<>(s), false )); public static void main(String... args) { for (int i = 0; i < 10; i++) { test(); System.out.println(); } } private static void test() { MERGERS.forEach(merger -> { List<Stream<Integer>> streams = makeStreams(); long time = System.nanoTime(); try { Stream<Integer> numbers = merger.apply(streams); numbers.forEach(i -> { }); } finally { time = System.nanoTime() - time; System.out.printf("time = %dms%n", (time / 1_000_000)); } }); } private static List<Stream<Integer>> makeStreams() { return Stream.generate(() -> generateSorted(SIZE)) .limit(10).collect(Collectors.toList()); } private static Stream<Integer> generateSorted(int size) { return IntStream.range(0, size).boxed(); } }
On my 1-8-2 MacBook Pro the best results were:
Sorted flatMap sequential: 7.8 seconds, 3.1 GB allocated Sorted flatMap parallel: 2.1 seconds, 3.0 GB allocated MergingSortedSpliterator: 5.2 seconds, 1.5 GB allocated
Thus the MergingSortedSpliterator
would be faster than the
sequential version and require less CPU cycles than the
parallel version. Unless I was working with very large
datasets, I would probably favour the sorted flatMap
sequential version. Only catch is that we should make sure
that the comparator is the same as for the original streams.
My MergingSortedSpliterator
takes care of that for us.
Thanks Antoine for the challenge. I enjoy solving puzzles like this, especially if they have some practical application and are not just interview questions or homework assignments :-)
Kind regards
Heinz
We are always happy to receive comments from our readers. Feel free to send me a comment via email or discuss the newsletter in our JavaSpecialists Slack Channel (Get an invite here)
We deliver relevant courses, by top Java developers to produce more resourceful and efficient programmers within their organisations.