Java 并行数据处理和性能分析

并行流

并行流是一个把元素分成多个块的流,每个块用不同的线程处理。可以自动分区,让所有的处理器都忙起来。

假设要写一个方法,接受一个数量n做参数,计算1-n的和。可以这样实现:

  public long sequentialSum(long n) {
    return Stream.iterate(1L, i -> i + 1)
        .limit(n)
        .reduce(0L, Long::sum);
  }

也许可以使用parallel方法,简单地使用并行计算,提高程序性能:

  public long sequentialSum(long n) {
    return Stream.iterate(1L, i -> i + 1)
        .limit(n)
        .parallel()
        .reduce(0L, Long::sum);
  }

这样,流可能在内部被分成多个块,导致reduction操作可以在不同的块上互不依赖地并行地各自工作。最后,reduction操作组合每个子流的并行reductions的返回值,返回的结果就是整个流的结果。见下面的示意图

Java 并行数据处理和性能分析

实际上,调用parallel方法,流自身不会有任何变化。在内部,设置一个布尔类型的标记,标明你想在并行模式执行操作,接下来的操作都是并行的。

类似地,你也可以使用sequential方法,把并行流转成串行的。你也许认为可以组合这两个方法:

    stream.parallel()
      .filter(...)
      .sequential()
      .map(...)
      .parallel()
      .reduce();

但是,最后一次调用parallel或者sequential才会全局地影响管道。上面的例子,管道将被并行地执行。

配置并行流使用的线程池

并行流内部使用ForkJoinPool。默认地,线程数量等于处理器数量(Runtime.getRuntime().availableProcessors())。但是,可以修改系统属性java.util.concurrent.ForkJoinPool.common.parallelism,配置线程数量。

这是全局配置,所以,除非你认为对性能有帮助,否则不要修改。

测量流的性能

我们声称并行加法应该比串行的或者自己的迭代方法快。我们可以使用JMH测量一下。这是一个工具,使用基于注解的方法,可以为JVM程序增加

可靠的microbenchmarks。如果使用maven,可以这样引入:

    <dependency>
      <groupId>org.openjdk.jmh</groupId>
      <artifactId>jmh-core</artifactId>
      <version>1.21</version>
    </dependency>
    <dependency>
      <groupId>org.openjdk.jmh</groupId>
      <artifactId>jmh-generator-annprocess</artifactId>
      <version>1.21</version>
    </dependency>

第一个库是核心实现,第二个包含一个注解处理器,帮助生成JAR文件,通过它可以方便地运行你的benchmark。maven配置里还应该有下面的plugin:

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <finalName>benchmarks</finalName>
              <transformers>
                <transformer
                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>org.openjdk.jmh.Main</mainClass>
                </transformer>
              </transformers>
            </configuration>
          </execution>
        </executions>
      </plugin>

程序代码如下

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;

import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

//测量平均时间
@BenchmarkMode(Mode.AverageTime)
//以毫秒为单位,打印benchmark结果
@OutputTimeUnit(TimeUnit.MILLISECONDS)
//执行两次,增加可靠性。堆空间是4Gb
@Fork(value = http://www.cppcns.com/ruanjian/java/2, jvmArgs = {"-Xms4G", "-Xmx4G"})
@State(Scope.Benchmark)
public class ParallelStreamBenchmark {
  private static final long N = 10_000_000L;

  @Benchmark
  public long sequentialSum() {
    return Stream.iterate(1L, i -> i + 1).limit(N)
        .reduce(0L, Long::sum);
  }
  
  //每次执行benchmark后,执行GC
  @TearDown(Level.Invocation)
  public void tearDown() {
    System.gc();
  }
}

Java 并行数据处理和性能分析

扫一扫手机访问