討論一下LRU緩存的實(shí)現(xiàn)算法
業(yè)務(wù)模型
讀、寫、刪的比例大致是7:3:1,至少要支持500w條緩存,平均每條緩存6k,要求設(shè)計(jì)一套性能比較好的緩存算法。
算法分析
不考慮MemCached,Velocity等現(xiàn)成的key-value緩存方案,也不考慮脫離.NET gc自己管理內(nèi)存,不考慮隨機(jī)讀取數(shù)據(jù)及順序讀取數(shù)據(jù)的場景,目前想到的有如下幾種LRU緩存方案
算法 |
分析 |
SortedDictionary |
.NET自帶的,內(nèi)部用二叉搜索樹(應(yīng)該不是普通樹,至少是做過優(yōu)化的樹)實(shí)現(xiàn)的,檢索為O(log n),比普通的Dictionay(O(1))慢一點(diǎn)。 |
Dictionary + PriorityQueue |
Dictionay可以保證檢索是O(1); |
Dictionay + Binary heap |
二叉堆也是優(yōu)先隊(duì)列,分析應(yīng)該同上,我沒有詳細(xì)評估。 |
b樹 |
查找,刪除,插入效率都很好,數(shù)據(jù)庫都用它,但實(shí)現(xiàn)復(fù)雜,寫一個(gè)沒有BUG的B樹幾乎不可能。有人提到stl:map是自頂向下的紅黑樹,查找,刪除,插入都是O(log n),但咱不懂c++,沒做詳細(xì)測試。 |
Dictionay + List |
Dict用來檢索; |
Dictionay + LinkedList |
Dict用來檢索; |
目前幾種方案在多線程下應(yīng)該都需要加鎖,不太好設(shè)計(jì)無鎖的方案,下面這個(gè)鏈接是一個(gè)支持多線程的方案,但原理至今沒搞特明白
A High Performance Multi-Threaded LRU Cache
http://www.codeproject.com/KB/recipes/LRUCache.aspx
用普通鏈表簡單實(shí)現(xiàn)LRU緩存
以下是最后一種方案的簡單實(shí)現(xiàn),大家討論下這個(gè)方案值不值得優(yōu)化,或者其它的哪個(gè)方案比較合適
- public class LRUCacheHelper
{ - readonly Dictionary
_dict; - readonly LinkedList
_queue = new LinkedList(); - readonly object _syncRoot = new object();
- private readonly int _max;
- public LRUCacheHelper(int capacity, int max) {
- _dict = new Dictionary
(capacity); - _max = max;
- }
- public void Add(K key, V value) {
- lock (_syncRoot) {
- checkAndTruncate();
- _queue.AddFirst(key); //O(1)
- _dict[key] = value; //O(1)
- }
- }
- private void checkAndTruncate() {
- lock (_syncRoot) {
- int count = _dict.Count; //O(1)
- if (count >= _max) {
- int needRemoveCount = count / 10;
- for (int i = 0; i < needRemoveCount; i++) {
- _dict.Remove(_queue.Last.Value); //O(1)
- _queue.RemoveLast(); //O(1)
- }
- }
- }
- }
- public void Delete(K key) {
- lock (_syncRoot) {
- _dict.Remove(key); //(1)
- _queue.Remove(key); // O(n)
- }
- }
- public V Get(K key) {
- lock (_syncRoot) {
- V ret;
- _dict.TryGetValue(key, out ret); //O(1)
- _queue.Remove(key); //O(n)
- _queue.AddFirst(key); //(1)
- return ret;
- }
- }
- }
_dict.TryGetValue(key, out ret); //O(1)
ret.Next.Previous = ret.Previous //O(1)
ret. Previous.Next. = ret.Next //O(1)
_queue.AddFirst(key); //O(1)
我改進(jìn)后的鏈表就差不多滿足需求了,
操作 |
基本操作 |
復(fù)雜度 |
讀取 |
Dict.Get Queue.Move |
O 1 O 1 |
刪除 |
Dict.Remove Queue.Remove |
O 1 O 1 |
增加 |
Dict.Add Queue.AddFirst |
O 1 O 1 |
截?cái)?/SPAN> |
Dict.Remove Queue.RemoveLast |
O k O k K表示截?cái)嗑彺嬖氐膫€(gè)數(shù) |
其中截?cái)嗟臅r(shí)候可以指定當(dāng)緩存滿的時(shí)候截?cái)喟俜种嗌俚淖钌偈褂玫木彺骓?xiàng)。
其它的就是多線程的時(shí)候鎖再看看怎么優(yōu)化,字典有線程安全的版本,就把.NET 3.0的讀寫鎖扣出來再把普通的泛型字典保證成ThreadSafelyDictionay就行了,性能應(yīng)該挺好的。
鏈表的話不太好用讀寫鎖來做線程同步,大不了用互斥鎖,但得考慮下鎖的粒度,Move,AddFirst,RemoveLast的時(shí)候只操作一兩個(gè)節(jié)點(diǎn),是不是想辦法只lock這幾個(gè)節(jié)點(diǎn)就行了,Truncate的時(shí)候因?yàn)橐坎僮骱芏喙?jié)點(diǎn),所以要上個(gè)大多鏈表鎖,但這時(shí)候怎么讓其它操作停止得考慮考慮,類似數(shù)據(jù)庫的表鎖和行鎖。
LRU緩存實(shí)現(xiàn)代碼
- public class DoubleLinkedListNode
{ - public T Value { get; set; }
程序初始化200w條緩存,然后不斷的加,每加到500w,截?cái)嗟?/SPAN>10分之一,然后繼續(xù)加。
測試模型中每秒鐘的讀和寫的比例是7:3,以下是依次在3個(gè)時(shí)間點(diǎn)截取的性能計(jì)數(shù)器圖。
圖1
圖2
圖3
內(nèi)存最高會(huì)達(dá)到1g,cpu也平均百分之90以上,但測試到后期會(huì)發(fā)現(xiàn)每隔一段時(shí)間,就會(huì)有一兩秒,吞吐量為0,如最后一張截圖,后來觀察發(fā)現(xiàn),停頓的那一兩秒是二代內(nèi)存在回收,等不停頓的時(shí)候# gen 2 collections就會(huì)加1,這個(gè)原因應(yīng)該是鏈表引起的,對鏈表中節(jié)點(diǎn)的添加和刪除是很耗費(fèi)GC的,因?yàn)闀?huì)頻繁的創(chuàng)建和銷毀對象。
LRU緩存后續(xù)改進(jìn)
1、 用游標(biāo)鏈表來代替普通的雙頭鏈表,程序起來就收工分配固定大小的數(shù)組,然后用數(shù)組的索引來做鏈表,省得每次添加和刪除節(jié)點(diǎn)都要GC的參與,這相當(dāng)于手工管理內(nèi)存了,但目前我還沒找到c#合適的實(shí)現(xiàn)。
2、 有人說鏈表不適合用在多線程環(huán)境中,因?yàn)閷︽湵淼拿總€(gè)操作都要加互斥鎖,連讀寫鎖都用不上,我目前的實(shí)現(xiàn)是直接用互斥鎖做的線程同步,每秒的吞吐量七八十萬,感覺lock也不是瓶頸,如果要改進(jìn)的話可以把Dictionary用ThreadSafelyDictionary來代替,然后鏈表還用互斥鎖(剛開始設(shè)想的鏈表操作只鎖要操作的幾個(gè)節(jié)點(diǎn)以降低并發(fā)沖突的想法應(yīng)該不可取,不嚴(yán)謹(jǐn))。
3、 還有一個(gè)地方就是把鎖細(xì)分以下,鏈表還用鏈表,但每個(gè)鏈表的節(jié)點(diǎn)是個(gè)HashSet,對HashSet的操作如果只有讀,寫,刪,沒有遍歷的話應(yīng)該不需要做線程同步(我感覺不用,因?yàn)?/SPAN>Set就是一個(gè)集合,一個(gè)線程往里插入,一個(gè)線程往里刪除,一個(gè)線程讀取應(yīng)該沒問題,頂多讀進(jìn)來的數(shù)據(jù)可能馬上就刪除了,而整個(gè)Set的結(jié)構(gòu)不會(huì)破壞)。然后新增數(shù)據(jù)的時(shí)候往鏈表頭頂Set里插入,讀取某個(gè)數(shù)據(jù)的時(shí)候把它所在的節(jié)點(diǎn)的Set里刪除該數(shù)據(jù),然后再鏈表頭的Set里插入一個(gè)數(shù)據(jù),這樣反復(fù)操作后,鏈表的最后一個(gè)節(jié)點(diǎn)的Set里的數(shù)據(jù)都是舊數(shù)據(jù)了,可以安全的刪除了,當(dāng)然這個(gè)刪除的時(shí)候應(yīng)該是要鎖整個(gè)鏈表的。每個(gè)Set應(yīng)該有個(gè)大小上限,比如20w,但set不能安全的遍歷,就不能得到當(dāng)前大小,所以添加、刪除Set的數(shù)據(jù)的時(shí)候應(yīng)該用Interlocked.Decrement()和 Interlocked.Increment()維護(hù)一個(gè)Count,一遍一個(gè)Set滿的時(shí)候,再到鏈表的頭新增一個(gè)Set節(jié)點(diǎn)。
LRU緩存性能測試腳本
- class Program {
- private static PerformanceCounter _addCounter;
- private static PerformanceCounter _getCounter;
- static void Main(string[] args) {
- SetupCategory();
- _addCounter = new PerformanceCounter("wawasoft.lrucache", "add/sec", false);
- _getCounter = new PerformanceCounter("wawasoft.lrucache", "get/sec", false);
- _addCounter.RawValue = 0;
- _getCounter.RawValue = 0;
- Random rnd = new Random();
- const int max = 500 * 10000;
- LRUCacheHelper<int, int> cache = new LRUCacheHelper<int, int>(200 * 10000, max);
- for (int i = 10000*100000 - 1; i >= 0; i--)
- {
- if(i % 10 > 7)
- {
- ThreadPool.QueueUserWorkItem(
- delegate
- {
- cache.Add(rnd.Next(0, 10000), 0);
- _addCounter.Increment();
- });
- }
- else
- {
- ThreadPool.QueueUserWorkItem(
- delegate
- {
- int pop = cache.Get(i);
- _getCounter.Increment();
- });
- }
- }
- Console.ReadKey();
- }
- private static void SetupCategory() {
- if (!PerformanceCounterCategory.Exists("wawasoft.lrucache")) {
- CounterCreationDataCollection CCDC = new CounterCreationDataCollection();
- CounterCreationData addcounter = new CounterCreationData();
- addcounter.CounterType = PerformanceCounterType.RateOfCountsPerSecond32;
- addcounter.CounterName = "add/sec";
- CCDC.Add(addcounter);
- CounterCreationData getcounter = new CounterCreationData();
- getcounter.CounterType = PerformanceCounterType.RateOfCountsPerSecond32;
- getcounter.CounterName = "get/sec";
- CCDC.Add(getcounter);
- PerformanceCounterCategory.Create("wawasoft.lrucache","lrucache",CCDC);
- }
- }
- }
【編輯推薦】