【高并發(fā)】ReadWriteLock怎么和緩存扯上關(guān)系了?
作者個人研發(fā)的在高并發(fā)場景下,提供的簡單、穩(wěn)定、可擴展的延遲消息隊列框架,具有精準的定時任務和延遲隊列處理功能。自開源半年多以來,已成功為十幾家中小型企業(yè)提供了精準定時調(diào)度方案,經(jīng)受住了生產(chǎn)環(huán)境的考驗。為使更多童鞋受益,現(xiàn)給出開源框架地址:https://github.com/sunshinelyz/mykit-delay
寫在前面
在實際工作中,有一種非常普遍的并發(fā)場景:那就是讀多寫少的場景。在這種場景下,為了優(yōu)化程序的性能,我們經(jīng)常使用緩存來提高應用的訪問性能。因為緩存非常適合使用在讀多寫少的場景中。而在并發(fā)場景中,Java SDK中提供了ReadWriteLock來滿足讀多寫少的場景。本文我們就來說說使用ReadWriteLock如何實現(xiàn)一個通用的緩存中心。
本文涉及的知識點有:
文章已收錄到:
https://github.com/sunshinelyz/technology-binghe
https://gitee.com/binghe001/technology-binghe
讀寫鎖
說起讀寫鎖,相信小伙伴們并不陌生??傮w來說,讀寫鎖需要遵循以下原則:
- 一個共享變量允許同時被多個讀線程讀取到。
- 一個共享變量在同一時刻只能被一個寫線程進行寫操作。
- 一個共享變量在被寫線程執(zhí)行寫操作時,此時這個共享變量不能被讀線程執(zhí)行讀操作。
這里,需要小伙伴們注意的是:讀寫鎖和互斥鎖的一個重要的區(qū)別就是:讀寫鎖允許多個線程同時讀共享變量,而互斥鎖不允許。所以,在高并發(fā)場景下,讀寫鎖的性能要高于互斥鎖。但是,讀寫鎖的寫操作是互斥的,也就是說,使用讀寫鎖時,一個共享變量在被寫線程執(zhí)行寫操作時,此時這個共享變量不能被讀線程執(zhí)行讀操作。
讀寫鎖支持公平模式和非公平模式,具體是在ReentrantReadWriteLock的構(gòu)造方法中傳遞一個boolean類型的變量來控制。
- public ReentrantReadWriteLock(boolean fair) {
- sync = fair ? new FairSync() : new NonfairSync();
- readerLock = new ReadLock(this);
- writerLock = new WriteLock(this);
- }
另外,需要注意的一點是:在讀寫鎖中,讀鎖調(diào)用newCondition()會拋出UnsupportedOperationException異常,也就是說:讀鎖不支持條件變量。
緩存實現(xiàn)
這里,我們使用ReadWriteLock快速實現(xiàn)一個緩存的通用工具類,總體代碼如下所示。
- public class ReadWriteLockCache<K,V> {
- private final Map<K, V> m = new HashMap<>();
- private final ReadWriteLock rwl = new ReentrantReadWriteLock();
- // 讀鎖
- private final Lock r = rwl.readLock();
- // 寫鎖
- private final Lock w = rwl.writeLock();
- // 讀緩存
- public V get(K key) {
- r.lock();
- try { return m.get(key); }
- finally { r.unlock(); }
- }
- // 寫緩存
- public V put(K key, V value) {
- w.lock();
- try { return m.put(key, value); }
- finally { w.unlock(); }
- }
- }
可以看到,在ReadWriteLockCache中,我們定義了兩個泛型類型,K代表緩存的Key,V代表緩存的value。在ReadWriteLockCache類的內(nèi)部,我們使用Map來緩存相應的數(shù)據(jù),小伙伴都都知道HashMap并不是線程安全的類,所以,這里使用了讀寫鎖來保證線程的安全性,例如,我們在get()方法中使用了讀鎖,get()方法可以被多個線程同時執(zhí)行讀操作;put()方法內(nèi)部使用寫鎖,也就是說,put()方法在同一時刻只能有一個線程對緩存進行寫操作。
這里需要注意的是:無論是讀鎖還是寫鎖,鎖的釋放操作都需要放到finally{}代碼塊中。
在以往的經(jīng)驗中,有兩種向緩存中加載數(shù)據(jù)的方式,一種是:項目啟動時,將數(shù)據(jù)全量加載到緩存中,一種是在項目運行期間,按需加載所需要的緩存數(shù)據(jù)。
接下來,我們就分別來看看全量加載緩存和按需加載緩存的方式。
全量加載緩存
全量加載緩存相對來說比較簡單,就是在項目啟動的時候,將數(shù)據(jù)一次性加載到緩存中,這種情況適用于緩存數(shù)據(jù)量不大,數(shù)據(jù)變動不頻繁的場景,例如:可以緩存一些系統(tǒng)中的數(shù)據(jù)字典等信息。整個緩存加載的大體流程如下所示。
將數(shù)據(jù)全量加載到緩存后,后續(xù)就可以直接從緩存中讀取相應的數(shù)據(jù)了。
全量加載緩存的代碼實現(xiàn)比較簡單,這里,我就直接使用如下代碼進行演示。
- public class ReadWriteLockCache<K,V> {
- private final Map<K, V> m = new HashMap<>();
- private final ReadWriteLock rwl = new ReentrantReadWriteLock();
- // 讀鎖
- private final Lock r = rwl.readLock();
- // 寫鎖
- private final Lock w = rwl.writeLock();
- public ReadWriteLockCache(){
- //查詢數(shù)據(jù)庫
- List<Field<K, V>> list = .....;
- if(!CollectionUtils.isEmpty(list)){
- list.parallelStream().forEach((f) ->{
- m.put(f.getK(), f.getV);
- });
- }
- }
- // 讀緩存
- public V get(K key) {
- r.lock();
- try { return m.get(key); }
- finally { r.unlock(); }
- }
- // 寫緩存
- public V put(K key, V value) {
- w.lock();
- try { return m.put(key, value); }
- finally { w.unlock(); }
- }
- }
按需加載緩存
按需加載緩存也可以叫作懶加載,就是說:需要加載的時候才會將數(shù)據(jù)加載到緩存。具體來說:就是程序啟動的時候,不會將數(shù)據(jù)加載到緩存,當運行時,需要查詢某些數(shù)據(jù),首先檢測緩存中是否存在需要的數(shù)據(jù),如果存在,則直接讀取緩存中的數(shù)據(jù),如果不存在,則到數(shù)據(jù)庫中查詢數(shù)據(jù),并將數(shù)據(jù)寫入緩存。后續(xù)的讀取操作,因為緩存中已經(jīng)存在了相應的數(shù)據(jù),直接返回緩存的數(shù)據(jù)即可。
這種查詢緩存的方式適用于大多數(shù)緩存數(shù)據(jù)的場景。
我們可以使用如下代碼來表示按需查詢緩存的業(yè)務。
- class ReadWriteLockCache<K,V> {
- private final Map<K, V> m = new HashMap<>();
- private final ReadWriteLock rwl = new ReentrantReadWriteLock();
- private final Lock r = rwl.readLock();
- private final Lock w = rwl.writeLock();
- V get(K key) {
- V v = null;
- //讀緩存
- r.lock();
- try {
- v = m.get(key);
- } finally{
- r.unlock();
- }
- //緩存中存在,返回
- if(v != null) {
- return v;
- }
- //緩存中不存在,查詢數(shù)據(jù)庫
- w.lock();
- try {
- //再次驗證緩存中是否存在數(shù)據(jù)
- v = m.get(key);
- if(v == null){
- //查詢數(shù)據(jù)庫
- v=從數(shù)據(jù)庫中查詢出來的數(shù)據(jù)
- m.put(key, v);
- }
- } finally{
- w.unlock();
- }
- return v;
- }
- }
這里,在get()方法中,首先從緩存中讀取數(shù)據(jù),此時,我們對查詢緩存的操作添加了讀鎖,查詢返回后,進行解鎖操作。判斷緩存中返回的數(shù)據(jù)是否為空,不為空,則直接返回數(shù)據(jù);如果為空,則獲取寫鎖,之后再次從緩存中讀取數(shù)據(jù),如果緩存中不存在數(shù)據(jù),則查詢數(shù)據(jù)庫,將結(jié)果數(shù)據(jù)寫入緩存,釋放寫鎖。最終返回結(jié)果數(shù)據(jù)。
這里,有小伙伴可能會問:為啥程序都已經(jīng)添加寫鎖了,在寫鎖內(nèi)部為啥還要查詢一次緩存呢?
這是因為在高并發(fā)的場景下,可能會存在多個線程來競爭寫鎖的現(xiàn)象。例如:第一次執(zhí)行g(shù)et()方法時,緩存中的數(shù)據(jù)為空。如果此時有三個線程同時調(diào)用get()方法,同時運行到 w.lock()代碼處,由于寫鎖的排他性。此時只有一個線程會獲取到寫鎖,其他兩個線程則阻塞在w.lock()處。獲取到寫鎖的線程繼續(xù)往下執(zhí)行查詢數(shù)據(jù)庫,將數(shù)據(jù)寫入緩存,之后釋放寫鎖。
此時,另外兩個線程競爭寫鎖,某個線程會獲取到鎖,繼續(xù)往下執(zhí)行,如果在w.lock()后沒有v = m.get(key); 再次查詢緩存的數(shù)據(jù),則這個線程會直接查詢數(shù)據(jù)庫,將數(shù)據(jù)寫入緩存后釋放寫鎖。最后一個線程同樣會按照這個流程執(zhí)行。
這里,實際上第一個線程已經(jīng)查詢過數(shù)據(jù)庫,并且將數(shù)據(jù)寫入緩存了,其他兩個線程就沒必要再次查詢數(shù)據(jù)庫了,直接從緩存中查詢出相應的數(shù)據(jù)即可。所以,在w.lock()后添加v = m.get(key); 再次查詢緩存的數(shù)據(jù),能夠有效的減少高并發(fā)場景下重復查詢數(shù)據(jù)庫的問題,提升系統(tǒng)的性能。
讀寫鎖的升降級
關(guān)于鎖的升降級,小伙伴們需要注意的是:在ReadWriteLock中,鎖是不支持升級的,因為讀鎖還未釋放時,此時獲取寫鎖,就會導致寫鎖永久等待,相應的線程也會被阻塞而無法喚醒。
雖然不支持鎖升級,但是ReadWriteLock支持鎖降級,例如,我們來看看官方的ReentrantReadWriteLock示例,如下所示。
- class CachedData {
- Object data;
- volatile boolean cacheValid;
- final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
- void processCachedData() {
- rwl.readLock().lock();
- if (!cacheValid) {
- // Must release read lock before acquiring write lock
- rwl.readLock().unlock();
- rwl.writeLock().lock();
- try {
- // Recheck state because another thread might have
- // acquired write lock and changed state before we did.
- if (!cacheValid) {
- data = ...
- cacheValid = true;
- }
- // Downgrade by acquiring read lock before releasing write lock
- rwl.readLock().lock();
- } finally {
- rwl.writeLock().unlock(); // Unlock write, still hold read
- }
- }
- try {
- use(data);
- } finally {
- rwl.readLock().unlock();
- }
- }
- }}
數(shù)據(jù)同步問題
首先,這里說的數(shù)據(jù)同步指的是數(shù)據(jù)源和數(shù)據(jù)緩存之間的數(shù)據(jù)同步,說的再直接一點,就是數(shù)據(jù)庫和緩存之間的數(shù)據(jù)同步。
這里,我們可以采取三種方案來解決數(shù)據(jù)同步的問題,如下圖所示
超時機制
這個比較好理解,就是在向緩存寫入數(shù)據(jù)的時候,給一個超時時間,當緩存超時后,緩存的數(shù)據(jù)會自動從緩存中移除,此時程序再次訪問緩存時,由于緩存中不存在相應的數(shù)據(jù),查詢數(shù)據(jù)庫得到數(shù)據(jù)后,再將數(shù)據(jù)寫入緩存。
采用這種方案需要注意緩存的穿透問題,有關(guān)緩存穿透、擊穿、雪崩的知識,小伙伴們可以參見《【高并發(fā)】面試官:講講什么是緩存穿透?擊穿?雪崩?如何解決?》
定時更新緩存
這種方案是超時機制的增強版,在向緩存中寫入數(shù)據(jù)的時候,同樣給一個超時時間。與超時機制不同的是,在程序后臺單獨啟動一個線程,定時查詢數(shù)據(jù)庫中的數(shù)據(jù),然后將數(shù)據(jù)寫入緩存中,這樣能夠在一定程度上避免緩存的穿透問題。
實時更新緩存
這種方案能夠做到數(shù)據(jù)庫中的數(shù)據(jù)與緩存的數(shù)據(jù)是實時同步的,可以使用阿里開源的Canal框架實現(xiàn)MySQL數(shù)據(jù)庫與緩存數(shù)據(jù)的實時同步。也可以使用我個人開源的mykit-data框架哦(推薦使用)~~
本文轉(zhuǎn)載自微信公眾號「冰河技術(shù)」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系冰河技術(shù)公眾號。