.NET:通过 CAS 来理解数据库乐观并发控制,顺便给出无锁的 RingBuffer。 背景 CAS 写着玩的 RingBuffer 备注

大多数企业开发人员都理解数据库乐观并发控制,不过很少有人听说过 CAS(我去年才听说这个概念),CAS 是多线程乐观并发控制策略的一种,一些无锁的支持并发的数据结构都会使用到 CAS,本文对比 CAS 和 数据库乐观并发控制,以此达到强化记忆的目的。

CAS

CAS = Compare And Swap

多线程环境下 this.i = this.i + 1 是没有办法保证线程安全的,因此就有了 CAS,CAS 可以保证上面代码的线程安全性,但是 CAS 并不会保证 Swap 的成功,只有 Compare 成功了才会 Swap,即:没有并发发生,即:在我读取和修改之间没有别人修改。另外说一点,如果 i 是局部变量,即:i = i + 1,那么这段代码是线程安全的,因为局部变量是线程独享的。

不明白 CAS 没关系,下面通过 CAS 的标准模式 和 一个简单的示例来理解 CAS。

CAS 的标准模式

伪代码

 1                     var localValue, currentValue;
 2                     do
 3                     {
 4                         localValue = this.
 5 
 6                         var newValue = 执行一些计算;
 7 
 8                         currentValue = Interlocked.CompareExchange(ref this.value, newValue, localValue);
 9                     } while (localValue != currentValue);

说明

把 this.value 看成是数据库数据,localValue 是某个用户读取的数据,newValue是用户想修改的值,这里有必要解释一下 CompareExchange 和 currentValue,它的内部实现代码是这样的(想想下面代码是线程安全的):

1 var currentValue = this.value
2 if(currentValue == localValue){
3    this.value = newValue;
4 }
5 return currentValue;

CompareExchange  用 sql 来类比就是:update xxx set value = newValue where value = localValue,只不过返回的值不同。通过 CompareExchange 的返回结果我们知道 CAS 是否成功了,即:是否出现并发了,即:是否在我读取和修改之间别人已经修改过了,如果是,可以选择重试。

累加示例

CAS 代码

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 using System.Threading;
 7 
 8 namespace InterlockStudy
 9 {
10     class ConcurrentIncrease
11     {
12         public static void Test()
13         {
14             var sum = 0;
15 
16             var tasks = new Task[10];
17             for (var i = 1; i <= 10; i++)
18             {
19                 tasks[i - 1] = Task.Factory.StartNew((state) =>
20                 {
21                     int localSum, currentSum;
22                     do
23                     {
24                         localSum = sum;
25 
26                         Thread.Sleep(10);
27                         var value = (int)state;
28                         var newValue = localSum + value;
29 
30                         currentSum = Interlocked.CompareExchange(ref sum, newValue, localSum);
31                     } while (localSum != currentSum);
32                 }, i);
33             }
34 
35             Task.WaitAll(tasks);
36 
37             Console.WriteLine(sum);
38         }
39     }
40 }

数据库并发代码

 1         public static void Test13()
 2         {
 3             var tasks = new Task[10];
 4             for (var i = 1; i <= 10; i++)
 5             {
 6                 tasks[i - 1] = Task.Factory.StartNew((state) =>
 7                 {
 8                     int localSum, result;
 9                     do
10                     {
11                         using (var con = new SqlConnection(CONNECTION_STRING))
12                         {
13                             con.Open();
14                             var cmd = new SqlCommand("select sum from Tests where Id = 1", con);
15                             var reader = cmd.ExecuteReader();
16                             reader.Read();
17                             localSum = reader.GetInt32(0);
18 
19                             System.Threading.Thread.Sleep(10);
20                             var value = (int)state;
21                             var newValue = localSum + value;
22 
23                             cmd = new SqlCommand("update Tests set sum = " + newValue + " where sum = " + localSum + "", con);
24                             result = cmd.ExecuteNonQuery();
25                         }
26                     } while (result == 0);
27                 }, i);
28             }
29 
30             Task.WaitAll(tasks);
31         }
32     }

说明

我们发现 CAS 版本的代码和数据库版本的代码出奇的相似,数据库的CAS操作是通过 update + where 来完成的。

写着玩的 RingBuffer

代码

  1 using System;
  2 using System.Collections.Generic;
  3 using System.Collections.Concurrent;
  4 using System.Linq;
  5 using System.Text;
  6 using System.Threading.Tasks;
  7 using System.Threading;
  8 
  9 namespace InterlockStudy
 10 {
 11     internal class Node<T>
 12     {
 13         public T Data { get; set; }
 14 
 15         public bool HasValue { get; set; }
 16     }
 17 
 18     class RingBuffer<T>
 19     {
 20         private readonly Node<T>[] _nodes;
 21         private long _tailIndex = -1;
 22         private long _headIndex = -1;
 23         private AutoResetEvent _readEvent = new AutoResetEvent(false);
 24         private AutoResetEvent _writeEvent = new AutoResetEvent(false);
 25 
 26         public RingBuffer(int maxSize)
 27         {
 28             _nodes = new Node<T>[maxSize];
 29 
 30             for (var i = 0; i < maxSize; i++)
 31             {
 32                 _nodes[i] = new Node<T>();
 33             }
 34         }
 35 
 36         public void EnQueue(T data)
 37         {
 38             while (true)
 39             {
 40                 if (this.TryEnQueue(data))
 41                 {
 42                     _readEvent.Set();
 43                     return;
 44                 }
 45 
 46                 _writeEvent.WaitOne();
 47             }
 48 
 49         }
 50 
 51         public T DeQueue()
 52         {
 53             while (true)
 54             {
 55                 T data;
 56                 if (this.TryDeQueue(out data))
 57                 {
 58                     _writeEvent.Set();
 59                     return data;
 60                 }
 61 
 62                 _readEvent.WaitOne();
 63             }
 64 
 65         }
 66 
 67         public bool TryEnQueue(T data)
 68         {
 69             long localTailIndex, newTailIndex, currentTailIndex;
 70             do
 71             {
 72                 localTailIndex = _tailIndex;
 73 
 74                 if (!this.CanWrite(localTailIndex))
 75                 {
 76                     return false;
 77                 }
 78 
 79                 newTailIndex = localTailIndex + 1;
 80 
 81                 if (_nodes[newTailIndex % _nodes.Length].HasValue)
 82                 {
 83                     return false;
 84                 }
 85 
 86                 currentTailIndex = Interlocked.CompareExchange(ref _tailIndex, newTailIndex, localTailIndex);
 87             }
 88             while (localTailIndex != currentTailIndex);
 89 
 90             _nodes[newTailIndex % _nodes.Length].Data = data;
 91             _nodes[newTailIndex % _nodes.Length].HasValue = true;
 92 
 93             return true;
 94         }
 95 
 96         public bool TryDeQueue(out T data)
 97         {
 98             long localHeadIndex, newHeadIndex, currentHeadIndex;
 99             do
100             {
101                 localHeadIndex = _headIndex;
102 
103                 if (!this.CanRead(localHeadIndex))
104                 {
105                     data = default(T);
106                     return false;
107                 }
108 
109                 newHeadIndex = localHeadIndex + 1;
110                 if (_nodes[newHeadIndex % _nodes.Length].HasValue == false)
111                 {
112                     data = default(T);
113                     return false;
114                 }
115 
116                 currentHeadIndex = Interlocked.CompareExchange(ref _headIndex, newHeadIndex, localHeadIndex);
117             }
118             while (localHeadIndex != currentHeadIndex);
119 
120             data = _nodes[newHeadIndex % _nodes.Length].Data;
121             _nodes[newHeadIndex % _nodes.Length].HasValue = false;
122 
123             return true;
124         }
125 
126         private bool CanWrite(long localTailIndex)
127         {
128             return localTailIndex - _headIndex < _nodes.Length;
129         }
130 
131         private bool CanRead(long localHeadIndex)
132         {
133             return _tailIndex - localHeadIndex > 0;
134         }
135     }
136 }

备注

仓促成文,如果有必要可以再写篇文章,希望大家多批评。