Java 8 parallel streams - avoid the shared thread pool

TL;DR Java 8's parallel streams use a shared thread pool which can seriously harm performance. Work around this by wrapping your stream calls in their own thread pools

The problem

Java 8 parallel streams allow us to execute tasks concurrently with relative ease:

myList.parallelStream.map(obj -> longRunningOperation())  

However there is a big problem with this: Behind the scenes the JVM uses a common fork join pool which is shared across all parallel streams. By default it uses a fork join pool with one thread per processor. Let's say you have a 16 core machine - Effectively you can only create 16 threads. For CPU intensive tasks makes sense because your machine can only actually execute 16 threads but in the real world tasks are not purely CPU intensive. Take this example

myList.parallelStream  
   .map(this::retrieveFromA)
   .map(this::processUsingB)
   .forEach(this::saveToC)

myList.parallelStream  
   .map(this::retrieveFromD)
   .map(this::processUsingE)
   .forEach(this::saveToD)

Both streams will be largely IO bound, waiting on other systems. However as both streams will use the same (small) thread pool they will be stuck waiting on each other. This is bad, very bad. This can be proved. Lets take an example with one stream:

final List<Integer> firstRange = buildIntRange();  
   firstRange.parallelStream().forEach((number) -> {
      try {
         // do something slow
         Thread.sleep(5);
      } catch (InterruptedException e) { }
});

View the full gist

During the execution I took a thread dump. Here are the relevant threads (on my Macbook)

ForkJoinPool.commonPool-worker-1  
ForkJoinPool.commonPool-worker-2  
ForkJoinPool.commonPool-worker-3  
ForkJoinPool.commonPool-worker-4  

Now we'll execute two parallel streams in parallel (If you're not a native English speaker I apologise!):

Runnable firstTask = () -> {  
   firstRange.parallelStream().forEach((number) -> {
      try {
         // do something slow
         Thread.sleep(5);
      } catch (InterruptedException e) { }
   });
};

Runnable secondTask = () -> {  
   secondRange.parallelStream().forEach((number) -> {
      try {
         // do something slow
         Thread.sleep(5);
      } catch (InterruptedException e) { }
   });
};

// run threads

View the full gist

Let's look at the thread dump this time:

ForkJoinPool.commonPool-worker-1  
ForkJoinPool.commonPool-worker-2  
ForkJoinPool.commonPool-worker-3  
ForkJoinPool.commonPool-worker-4  

As you can see it's the same. We're only using 4 threads.

A work around

As I mentioned the JVM uses the fork join pool behind the scenes and hidden in the documentation for the ForkJoinTask we see this:

Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool()

Lets try something ...

ForkJoinPool forkJoinPool = new ForkJoinPool(3);  
forkJoinPool.submit(() -> {  
    firstRange.parallelStream().forEach((number) -> {
        try {
            Thread.sleep(5);
        } catch (InterruptedException e) { }
    });
});

ForkJoinPool forkJoinPool2 = new ForkJoinPool(3);  
forkJoinPool2.submit(() -> {  
    secondRange.parallelStream().forEach((number) -> {
        try {
            Thread.sleep(5);
        } catch (InterruptedException e) {
        }
    });
});

View the full gist

Now let's look at the thread pool:

ForkJoinPool-1-worker-1  
ForkJoinPool-1-worker-2  
ForkJoinPool-1-worker-3  
ForkJoinPool-1-worker-4  
ForkJoinPool-2-worker-1  
ForkJoinPool-2-worker-2  
ForkJoinPool-2-worker-3  
ForkJoinPool-1-worker-4  

Because we created our own thread pools we can avoid the shared thread pool and we can even allocate more threads than processor cores if we so wish:

ForkJoinPool forkJoinPool = new ForkJoinPool(<numThreads>);