编者注:本文为历史博文归档;涉及 JDK、框架与工具链版本请以当前官方文档为准。引用外链图片可能失效,阅读时请注意时效性。

Java Concurrency: AtomicReference

java.util.concurrent.atomic.AtomicReference 是一个旨在以线程安全的方式更新变量引用的类。为什么我们需要 AtomicReference 类?为什么不能简单地使用一个 volatile 变量?我们又该如何正确使用它?

为什么需要 AtomicReference?

对于我正在编写的工具,我需要检测是否从多个线程中调用了一个对象。为此,我使用了以下不可变类:

public class State {
    private final Thread thread;
    private final boolean accessedByMultipleThreads;

    public State(Thread thread, boolean accessedByMultipleThreads) {
        super();
        this.thread = thread;
        this.accessedByMultipleThreads = accessedByMultipleThreads;
    }

    public State() {
        super();
        this.thread = null;
        this.accessedByMultipleThreads = false;
    }

    public State update() {
        if (accessedByMultipleThreads) {
            return this;
        }
        if (thread == null) {
            return new State(Thread.currentThread(), accessedByMultipleThreads);
        }
        if (thread != Thread.currentThread()) {
            return new State(null, true);
        }
        return this;
    }

    public boolean isAccessedByMultipleThreads() {
        return accessedByMultipleThreads;
    }
}

您可以在 github 上下载所有示例的源代码。

逻辑说明

  1. 我将访问对象的第一个线程存储在 thread 变量中。
  2. 当另一个线程访问该对象时,我将 accessedByMultipleThreads 变量设置为 true,并将 thread 变量设置为 null
  3. accessedByMultipleThreadstrue 时,我不再更改状态。

我在每个对象中使用此类来检测它是否被多个线程访问。以下示例展示了在类 UpdateStateNotThreadSafe 中如何使用状态:

public class UpdateStateNotThreadSafe {
    private volatile State state = new State();

    public void update() {
        state = state.update();
    }

    public State getState() {
        return state;
    }
}

我将状态存储在 volatile 变量 state 中。我需要 volatile 关键字以确保线程始终看到当前值(此处有更详细的解释)。

测试 volatile 变量的线程安全性

为了检查使用 volatile 变量是否是线程安全的,我使用了以下测试:

import static org.junit.Assert.*;
import org.junit.Test;
import com.vmlens.tutorialAtomicReference.UpdateStateNotThreadSafe;
import com.vmlens.api.AllInterleavings;

public class TestNotThreadSafe {
    @Test
    public void test() throws InterruptedException {
        try (AllInterleavings allInterleavings = new AllInterleavings("TestNotThreadSafe");) {
            while (allInterleavings.hasNext()) {
                final UpdateStateNotThreadSafe object = new UpdateStateNotThreadSafe();
                Thread first = new Thread(() -> {
                    object.update();
                });
                Thread second = new Thread(() -> {
                    object.update();
                });
                first.start();
                second.start();
                first.join();
                second.join();
                assertTrue(object.getState().isAccessedByMultipleThreads());
            }
        }
    }
}

测试结果与分析

我需要两个线程来测试使用 volatile 变量是否是线程安全的。我在测试中启动了这两个线程,然后等待直到两个线程都结束。在两个线程都停止之后,我检查标志 accessedByMultipleThreads 是否为 true

为了测试所有线程交织(Thread Interleavings),我们使用来自 vmlensAllInterleavings 类,将完整的测试放在 while 循环中,对所有线程交织进行迭代。运行测试,我看到以下错误:

java.lang.AssertionError:
    at org.junit.Assert.fail(Assert.java:91)
    at org.junit.Assert.assertTrue(Assert.java:43)
    at org.junit.Assert.assertTrue(Assert.java:54)

vmlens 报告显示出了什么问题:

incorectupdate.png

问题在于,对于特定线程而言,交织两个线程首先读取状态。因此,一个线程将覆盖另一个线程的结果。这就是典型的读 - 改 - 写(Read-Modify-Write)竞争条件。

如何使用 AtomicReference?

为了解决这种竞争状况,我使用 AtomicReference 中的 compareAndSet 方法。

compareAndSet 机制

compareAndSet 方法接受两个参数:期望的当前值(expected)和新的值(update)。该方法自动检查当前值是否等于期望值。

  • 如果是,则该方法将值更新为新值并返回 true
  • 如果不是,则该方法使当前值保持不变并返回 false

使用此方法的想法是让 compareAndSet 在我们计算新值时检查当前值是否被另一个线程更改。如果没有,我们可以安全地更新当前值。否则,我们需要使用更改后的当前值重新计算新值。

代码实现

下面显示了如何使用该 compareAndSet 方法自动更新状态:

import java.util.concurrent.atomic.AtomicReference;

public class UpdateStateWithCompareAndSet {
    private final AtomicReference<State> state = new AtomicReference<State>(new State());

    public void update() {
        State current = state.get();
        State newValue = current.update();
        while (!state.compareAndSet(current, newValue)) {
            current = state.get();
            newValue = current.update();
        }
    }

    public State getState() {
        return state.get();
    }
}

现在,我使用 AtomicReference 来存储状态。要更新状态,流程如下:

  1. 首先需要获取当前值(state.get())。
  2. 然后,计算新值(current.update())。
  3. 尝试使用 compareAndSet 更新 AtomicReference
  4. 如果更新成功,完成操作。
  5. 如果不是,需要再次获取当前值,并重新计算新值。
  6. 再次尝试更新。

我需要一个 while 循环,因为 compareAndSet 可能会失败多次(由于其他线程的竞争)。

JDK 8 简化写法

正如 Grzegorz Borczuch 在对本文的评论中指出的那样,自从 JDK 1.8 以来,AtomicReference 中提供了一种更容易使用的方法来实现相同的结果:updateAndGet。此方法在内部使用带有 while 循环的 compareAndSet 来更新 AtomicReference

总结

使用 volatile 变量会导致竞争情况,因为针对某个线程的特定线程交织会覆盖其他线程的计算。通过使用 AtomicReference 类中的 compareAndSet 方法,我们可以规避这种竞争条件。我们自动检查当前值是否与开始计算时相同。如果是,我们可以安全地更新当前值;否则,我们需要使用更改后的当前值重新计算新值。


说明:本文涉及的多线程测试工具 vmlens 及 JDK 版本特性(如 JDK 1.8 引入的 updateAndGet)请以当前官方文档为准。在现代 JDK 版本中,AtomicReference 及其更新方法已成为标准并发工具包的一部分。