时间:2022-10-17 11:21:14 | 栏目:JAVA代码 | 点击:次
为了理解根本原因,首先回顾一下Compare and swap的概念。Compare and Swap (CAS)在无锁算法中是一种常见的技术。能够保证并发修改共享数据时,一个线程将共享内存修改后,另一线程尝试对共享内存的修改会失败。
我们每次更新时,通过两种信息来实现:要更新的值及原始值。首先Compare and swap 会比较原始值和当前获取到的值。如果相等,那么将值更新为要设置的值。
当执行campare and swap会出现失败的情况。例如,一个线程先读取共享内存数据值A,随后因某种原因,线程暂时挂起,同时另一个线程临时将共享内存数据值先改为B,随后又改回为A。随后挂起线程恢复,并通过CAS比较,最终比较结果将会无变化。这样会通过检查,这就是ABA问题。 在CAS比较前会读取原始数据,随后进行原子CAS操作。这个间隙之间由于并发操作,最终可能会带来问题。
public class Account { private AtomicInteger balance; private AtomicInteger transactionCount; private ThreadLocal<Integer> currentThreadCASFailureCount; public Account() { this.balance = new AtomicInteger(0); this.transactionCount = new AtomicInteger(0); this.currentThreadCASFailureCount = new ThreadLocal<>(); this.currentThreadCASFailureCount.set(0); } public int getBalance() { return balance.get(); } public int getTransactionCount() { return transactionCount.get(); } public int getCurrentThreadCASFailureCount() { return Optional.ofNullable(currentThreadCASFailureCount.get()).orElse(0); } public boolean withdraw(int amount) { int current = getBalance(); maybeWait(); boolean result = balance.compareAndSet(current, current - amount); if (result) { transactionCount.incrementAndGet(); } else { int currentCASFailureCount = currentThreadCASFailureCount.get(); currentThreadCASFailureCount.set(currentCASFailureCount + 1); } return result; } private void maybeWait() { if ("thread1".equals(Thread.currentThread().getName())) { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } public boolean deposit(int amount) { int current = balance.get(); boolean result = balance.compareAndSet(current, current + amount); if (result) { transactionCount.incrementAndGet(); } else { int currentCASFailureCount = currentThreadCASFailureCount.get(); currentThreadCASFailureCount.set(currentCASFailureCount + 1); } return result; } }
@Test public void abaProblemTest() throws InterruptedException { final int defaultBalance = 50; final int amountToWithdrawByThread1 = 20; final int amountToWithdrawByThread2 = 10; final int amountToDepositByThread2 = 10; Assert.assertEquals(0, account.getTransactionCount()); Assert.assertEquals(0, account.getCurrentThreadCASFailureCount()); account.deposit(defaultBalance); Assert.assertEquals(1, account.getTransactionCount()); Thread thread1 = new Thread(() -> { // this will take longer due to the name of the thread Assert.assertTrue(account.withdraw(amountToWithdrawByThread1)); // thread 1 fails to capture ABA problem Assert.assertNotEquals(1, account.getCurrentThreadCASFailureCount()); }, "thread1"); Thread thread2 = new Thread(() -> { Assert.assertTrue(account.deposit(amountToDepositByThread2)); Assert.assertEquals(defaultBalance + amountToDepositByThread2, account.getBalance()); // this will be fast due to the name of the thread Assert.assertTrue(account.withdraw(amountToWithdrawByThread2)); // thread 1 didn't finish yet, so the original value will be in place for it Assert.assertEquals(defaultBalance, account.getBalance()); Assert.assertEquals(0, account.getCurrentThreadCASFailureCount()); }, "thread2"); thread1.start(); thread2.start(); thread1.join(); thread2.join(); // compareAndSet operation succeeds for thread 1 Assert.assertEquals(defaultBalance - amountToWithdrawByThread1, account.getBalance()); //but there are other transactions Assert.assertNotEquals(2, account.getTransactionCount()); // thread 2 did two modifications as well Assert.assertEquals(4, account.getTransactionCount()); }
static class Stack { private AtomicReference<Node> top = new AtomicReference<>(); static class Node { String value; Node next; public Node (String value) { this.value = value; } } //出栈 public Node pop(int time) { Node newTop; Node oldTop; do { oldTop = top.get(); if (oldTop == null) { return null; } newTop = oldTop.next; try { //休眠一段时间,模拟ABA问题 TimeUnit.SECONDS.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } while (!top.compareAndSet(oldTop, newTop)); return oldTop; } public void push (Node node) { Node oldTop; do { oldTop = top.get(); node.next = oldTop; } while (!top.compareAndSet(oldTop, node)); } public AtomicReference<Node> getTop() { return top; } } @Test public void testStack() throws Exception{ Stack stack = new Stack(); Stack.Node a = new Stack.Node("A"); Stack.Node b = new Stack.Node("B"); // 初始化栈结构 stack.push(b); stack.push(a); // ABA 测试 Thread t1 = new Thread(() -> { stack.pop(2); }); Stack.Node c = new Stack.Node("C"); Stack.Node d = new Stack.Node("D"); Thread t2 = new Thread(() -> { stack.pop(0); stack.pop(0); stack.push(d); stack.push(c); stack.push(a); }); // t1.start(); t2.start(); TimeUnit.SECONDS.sleep(5); Stack.Node top = stack.getTop().get(); do { System.out.println(top.value); top = top.next; } while (top != null); }