Java8 lamda表达式中的parallel

概览

Java8为集合流处理提供了并发支持。并且就算集合不是线程安全的,你也可以在不修改集合的状态下,对集合进行并发汇总操作。但是这里面仍有一些限制和需要注意的情况,同时你也需要根据数据的实际情况来决定是否需要并发处理。

并发申明

你可以通过并行或者串行的方式执行流。默认情况下所有的流都是串行的,如果需要并行,可以通过Collection.parallelStream或者调用某个operation的BaseStream.parallel

double average = roster
    .parallelStream()
    .filter(p -> p.getGender() == Person.Sex.MALE)
    .mapToInt(Person::getAge)
    .average()
    .getAsDouble()

上面的例子是并发求取所有男性成员的平均年龄。

并发Reduction

默认串行reduction操作:

Map<Person.Sex, List<Person>> byGender =
    roster
        .stream()
        .collect(
            Collectors.groupingBy(Person::getGender));

上面的例子将所有成员按性别分组

下面是并行reduction:

ConcurrentMap<Person.Sex, List<Person>> byGender =
    roster
        .parallelStream()
        .collect(
            Collectors.groupingByConcurrent(Person::getGender));

可以看到不光是parallelStream的区别,还有groupingByConcurrent。要确定collect操作是否是并发,要满足以下三个条件:

注意:上面用了ConcurrentMap和groupingByConcurrent,都是因为性能的原因。因为他们能更好的适配parallelStream带来更高的性能。

排序

流处理元素的顺序取决于它是并行还是串行的,并且还有中间操作的影响。默认情况下串行的流是按照集合顺序执行的,并行的情况顺序无法保证,但是在迭代时你可以使用forEachOrdered来代替forEach强制并行流按照顺序遍历,但是这会损耗性能,跟并行的初衷相违。

Integer[] intArray = {1, 2, 3, 4, 5, 6, 7, 8 };
List<Integer> listOfIntegers =
    new ArrayList<>(Arrays.asList(intArray));

System.out.println("listOfIntegers:");
listOfIntegers
    .stream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");

System.out.println("listOfIntegers sorted in reverse order:");
Comparator<Integer> normal = Integer::compare;
Comparator<Integer> reversed = normal.reversed(); 
Collections.sort(listOfIntegers, reversed);  
listOfIntegers
    .stream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");
     
System.out.println("Parallel stream");
listOfIntegers
    .parallelStream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");
    
System.out.println("Another parallel stream:");
listOfIntegers
    .parallelStream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");
     
System.out.println("With forEachOrdered:");
listOfIntegers
    .parallelStream()
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");

print:

listOfIntegers:
1 2 3 4 5 6 7 8
listOfIntegers sorted in reverse order:
8 7 6 5 4 3 2 1
Parallel stream:
3 4 1 6 2 5 7 8
Another parallel stream:
6 3 1 5 7 8 4 2
With forEachOrdered:
8 7 6 5 4 3 2 1

副作用

流的collect操作对并发情况有良好的的支持,但是像forEachpeek这种操作,不适用于并发流,他们可能被并发的在多个线程处理多次,并且他们的结果无法估计,还会产生副作用。除了两个遍历操作,像System.out这种返回void的调用,在lambda表达式中除了带来副作用没有任何意义。还要注意像filtermap操作的lambda表达式参数,需要确保他们不会产生任何副作用。

惰性

流的中间操作都不是立即执行的,他们再被申明过后,都会等待最后一个终端操作(比如之前的forEach和collect)的出现,才开始被调用。因为这个特性,在申明流的中间操作时,需要特别注意不要有干扰和状态化的lambda表达式出现。

错误的干扰

下面的例子是一个错误的干扰性的中间操作申明:

try {
    List<String> listOfStrings =
        new ArrayList<>(Arrays.asList("one", "two"));
         
    // This will fail as the peek operation will attempt to add the
    // string "three" to the source after the terminal operation has
    // commenced. 
             
    String concatenatedString = listOfStrings
        .stream()
        
        // Don't do this! Interference occurs here.
        .peek(s -> listOfStrings.add("three"))
        
        .reduce((a, b) -> a + " " + b)
        .get();
                 
    System.out.println("Concatenated string: " + concatenatedString);
         
} catch (Exception e) {
    System.out.println("Exception caught: " + e.toString());
}

上面的例子在peek操作中给集合源添加了一个元素。按照之前的惰性说明,流操作的执行的开始是在终端操作出现之后。当流开始处理,这时候peek操作修改源数据,就会抛出ConcurrentModificationException

不该出现的状态化lambda表达式

List<Integer> serialStorage = new ArrayList<>();
     
System.out.println("Serial stream:");
listOfIntegers
    .stream()
    
    // Don't do this! It uses a stateful lambda expression.
    .map(e -> { serialStorage.add(e); return e; })
    
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");
     
serialStorage
    .stream()
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");

System.out.println("Parallel stream:");
List<Integer> parallelStorage = Collections.synchronizedList(
    new ArrayList<>());
listOfIntegers
    .parallelStream()
    
    // Don't do this! It uses a stateful lambda expression.
    .map(e -> { parallelStorage.add(e); return e; })
    
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");
     
parallelStorage
    .stream()
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");

e -> { parallelStorage.add(e); return e; }在这里是一个状态化的lambda表达式,在parallel的情况下他的执行结果可能是难以预料的:

Serial stream:
8 7 6 5 4 3 2 1
8 7 6 5 4 3 2 1
Parallel stream:
8 7 6 5 4 3 2 1
1 3 6 2 4 5 8 7

注意,这里使用了Collections.synchronizedList来包装并发的List,如果没有这个逻辑,直接申明一个ArrayList来接收并发流生成的list,那么可能会产生多个线程同时修改一个对象的情况,结果可能像这样:

Parallel stream:
8 7 6 5 4 3 2 1
null 3 5 4 7 8 1 2