1. 概述

在本文中,我们将研究 java.util.concurrent 包中的两个重要构造类:LongAdderLongAccumulator

两者均专为多线程环境设计,具有极高的效率。它们都采用了巧妙的策略来实现无锁(lock-free)操作,同时保持线程安全。

2. LongAdder

考虑这样一种场景:我们需要频繁递增某个数值。在高并发环境下,使用 AtomicLong 可能会成为性能瓶颈。这是因为 AtomicLong 基于比较并交换(CAS, Compare-And-Swap)操作,在竞争激烈时会导致大量的 CPU 周期浪费在重试上。

相比之下,LongAdder 使用了一种巧妙的技巧来减少线程间的争用(contention)。

当我们需要递增 LongAdder 实例时,需调用 increment() 方法。其内部实现维护了一组可按需增长的计数器(内部称为 cells)。当更多线程同时调用 increment() 时,内部数组会动态扩容。数组中的每个单元格都可以被独立更新,从而显著减少了争用。因此,LongAdder 是从多个线程递增计数器的非常有效的方法。

让我们创建 LongAdder 类的实例,并从多个线程中对其进行更新:

LongAdder counter = new LongAdder();
ExecutorService executorService = Executors.newFixedThreadPool(8);

int numberOfThreads = 4;
int numberOfIncrements = 100;

Runnable incrementAction = () -> IntStream
  .range(0, numberOfIncrements)
  .forEach(i -> counter.increment());

for (int i = 0; i < numberOfThreads; i++) {
    executorService.execute(incrementAction);
}

在调用 sum() 方法之前,无法直接获取 LongAdder 中计数器的最终结果。该方法会遍历内部数组的所有值,并对这些值求和以返回正确结果。需要注意的是,sum() 方法的调用开销可能较大:

assertEquals(counter.sum(), numberOfIncrements * numberOfThreads);

有时,在调用 sum() 之后,我们希望清除与 LongAdder 实例相关联的所有状态,以便从头开始计数。我们可以使用 sumThenReset() 方法来实现:

assertEquals(counter.sumThenReset(), numberOfIncrements * numberOfThreads);
assertEquals(counter.sum(), 0);

请注意,随后对 sum() 方法的调用返回零,表示状态已成功重置。

3. LongAccumulator

LongAccumulator 也是一个非常有趣的类,它使我们可以在许多场景下实现无锁算法。例如,它可以根据提供的 LongBinaryOperator 来累积结果,这与 Stream API 中的 reduce() 操作类似。

可以通过将 LongBinaryOperator 及其初始值提供给构造函数来创建 LongAccumulator 实例。重要的是要记住,如果我们为 LongAccumulator 提供一个交换函数(commutative function),且累加顺序无关紧要,它将能正常工作。

LongAccumulator accumulator = new LongAccumulator(Long::sum, 0L);

我们创建了一个 LongAccumulator,该累加器会将新值添加到已有的累加值中。我们将 LongAccumulator 的初始值设置为零,因此在第一次调用 accumulate() 方法时,previousValue 将为零。

让我们从多个线程中调用 accumulate() 方法:

int numberOfThreads = 4;
int numberOfIncrements = 100;

Runnable accumulateAction = () -> IntStream
  .rangeClosed(0, numberOfIncrements)
  .forEach(accumulator::accumulate);

for (int i = 0; i < numberOfThreads; i++) {
    executorService.execute(accumulateAction);
}

请注意,我们将数字作为参数传递给 accumulate() 方法。该方法将调用我们定义的 sum() 函数(即 Long::sum)。

LongAccumulator 底层使用比较并交换(CAS)实现,这导致了以下语义:

  1. 首先,它执行定义为 LongBinaryOperator 的操作。
  2. 然后检查 previousValue 是否已更改。
  3. 如果已更改,则使用新值再次执行该动作。
  4. 如果没有更改,它将成功更新存储在累加器中的值。

现在我们可以断言,所有线程中所有迭代值的总和为 20200(即 4 个线程 × (0 到 100 的累加和 5050)):

assertEquals(accumulator.get(), 20200);

4. 总结

在本快速教程中,我们了解了 LongAdderLongAccumulator,并展示了如何使用这两种构造来实现高效且无锁的解决方案。

所有这些示例和代码段的实现都可以在 github 项目中找到。这是一个 Maven 项目,因此应该很容易直接导入和运行。

说明:本文涉及的 LongAdderLongAccumulator 类自 Java 8 引入,适用于 Java 8 及以上版本。