繼續(xù)項目實戰(zhàn),集成Redis分布式鎖
今天我們就來把基于Redis實現(xiàn)的分布式鎖,集成到我們的項目中,分布式鎖歷來都受到大家的關注。不管是工作中、面試中,分布式鎖永遠是個不老的話題,也希望大家能掌握此技能,便于大家日后能"升官發(fā)財"。
分布式鎖
背景
為什么要有分布式鎖呢?不是已經(jīng)有synchronized、ReantrantLock等相關鎖了嗎?
是的,我們在開發(fā)應用的時候,如果需要對某一個共享變量進行多線程同步訪問的時候,可以使用我們學到的鎖進行處理,并且可以完美的運行,毫無Bug!
注意:這是單機應用,后來業(yè)務發(fā)展,需要做集群,一個應用需要部署到幾臺機器上然后做負載均衡 :
上圖可以看到,變量A存在三個服務器內(nèi)存中(這個變量A主要體現(xiàn)是在一個類中的一個成員變量,是一個有狀態(tài)的對象),如果不加任何控制的話,變量A同時都會在分配一塊內(nèi)存,三個請求發(fā)過來同時對這個變量操作,顯然結(jié)果是不對的!即使不是同時發(fā)過來,三個請求分別操作三個不同內(nèi)存區(qū)域的數(shù)據(jù),變量A之間不存在共享,也不具有可見性,處理的結(jié)果也是不對的!
如果我們業(yè)務中確實存在這個場景的話,我們就需要一種方法解決這個問題!
為了保證一個方法或?qū)傩栽诟卟l(fā)情況下的同一時間只能被同一個線程執(zhí)行,在傳統(tǒng)單體應用單機部署的情況下,可以使用并發(fā)處理相關的功能進行互斥控制。但是,隨著業(yè)務發(fā)展的需要,原單體單機部署的系統(tǒng)被演化成分布式集群系統(tǒng)后,由于分布式系統(tǒng)多線程、多進程并且分布在不同機器上,這將使原單機部署情況下的并發(fā)控制鎖策略失效,單純的應用并不能提供分布式鎖的能力。為了解決這個問題就需要一種跨機器的互斥機制來控制共享資源的訪問,這就是分布式鎖要解決的問題!
分布鎖的基本原理
分布式環(huán)境下,多臺機器上多個進程對同一個共享資源(數(shù)據(jù)、文件等)進行操作,如果不做互斥,就有可能出現(xiàn)“余額扣成負數(shù)”,或者“商品超賣”的情況。
為了解決這個問題,需要分布式鎖服務。首先,來看一下分布式鎖應該具備哪些條件。
- 互斥性:在任意時刻,對于同一個鎖,只有一個客戶端能持有,從而保證一個共享資源同一時間只能被一個客戶端操作;
- 安全性:即不會形成死鎖,當一個客戶端在持有鎖的期間崩潰而沒有主動解鎖的情況下,其持有的鎖也能夠被正確釋放,并保證后續(xù)其它客戶端能加鎖;
- 可用性:當提供鎖服務的節(jié)點發(fā)生宕機等不可恢復性故障時,“熱備” 節(jié)點能夠接替故障的節(jié)點繼續(xù)提供服務,并保證自身持有的數(shù)據(jù)與故障節(jié)點一致。
- 對稱性:對于任意一個鎖,其加鎖和解鎖必須是同一個客戶端,即客戶端 A 不能把客戶端 B 加的鎖給解了。
目前市面上,分布式鎖的實現(xiàn)方案大致有三種:
- 數(shù)據(jù)庫樂觀鎖;
- 基于分布式緩存實現(xiàn)的鎖服務,典型代表有 Redis 和基于 Redis 的 RedLock;
- 基于分布式一致性算法實現(xiàn)的鎖服務,典型代表有 ZooKeeper、Chubby 和 ETCD。
項目中,使用的最多的是后兩種,其實每種方案都各有利弊。
預防死鎖
我們看下面這個典型死鎖場景。
一個客戶端獲取鎖成功,但是在釋放鎖之前崩潰了,此時該客戶端實際上已經(jīng)失去了對公共資源的操作權,但卻沒有辦法請求解鎖(刪除 Key-Value 鍵值對),那么,它就會一直持有這個鎖,而其它客戶端永遠無法獲得鎖。
我們的解決方案是:在加鎖時為鎖設置過期時間,當過期時間到達,Redis 會自動刪除對應的 Key-Value,從而避免死鎖。需要注意的是,這個過期時間需要結(jié)合具體業(yè)務綜合評估設置,以保證鎖的持有者能夠在過期時間之內(nèi)執(zhí)行完相關操作并釋放鎖。
另外,前面已經(jīng)說了,在實際項目中我們都是使用后兩種方案,所以我們重點在后兩種方案上。說明 此文章是基于前面我們搞的項目繼續(xù)開展,同時把Redis已經(jīng)集成到項目中了,所以此文中分布式鎖是基于Redis的實現(xiàn)。
基于Redis實現(xiàn)分布式鎖
先說一下使用Redis實現(xiàn)方案的思路:
- setnx +expire+delete
- setnx+lua
- set key value px milliseconds nx
簡單版
創(chuàng)建分布鎖
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.stereotype.Component;
- import javax.annotation.Resource;
- import java.util.UUID;
- import java.util.concurrent.TimeUnit;
- @Component
- public class DistributedLockV1 {
- private Logger logger = LoggerFactory.getLogger(DistributedLockV1.class);
- @Resource
- private RedisTemplate redisTemplate;
- public boolean lock(String businessKey) {
- boolean result = false;
- String uniqueValue = UUID.randomUUID().toString();
- try {
- //@see <a href="http://redis.io/commands/setnx">Redis Documentation: SETNX</a>
- result = redisTemplate.opsForValue().setIfAbsent(businessKey, uniqueValue);
- if (!result) {
- return false;
- }
- //設置key的有效期
- redisTemplate.expire(businessKey, 10, TimeUnit.SECONDS);
- return result;
- } catch (Exception ex) {
- logger.error("獲取鎖失敗", ex);
- }
- return result;
- }
- public void unlock(String businessKey) {
- try {
- //delete
- redisTemplate.delete(businessKey);
- } catch (Exception ex) {
- logger.error("釋放鎖失敗", ex);
- }
- }
- }
就這樣,一個簡單的分布式鎖就實現(xiàn)了,但是這里會存在問題,問題也不是一定會出現(xiàn),在特定的時刻還是會出現(xiàn)的。
下面,我們就來把這個分布式鎖應用到用戶賬戶余額扣減的功能中。
我們來創(chuàng)建一張用戶賬戶表,表中主要有userId和余額:
- CREATE TABLE `user_account` (
- `id` bigint NOT NULL AUTO_INCREMENT,
- `user_id` bigint DEFAULT NULL,
- `balance` decimal(10,2) DEFAULT NULL,
- `create_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
- `update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
然后創(chuàng)建UserAccountRepository接口。
- import com.tian.user.entity.UserAccount;
- import org.springframework.data.jpa.repository.JpaRepository;
- import org.springframework.data.jpa.repository.Modifying;
- import org.springframework.data.jpa.repository.Query;
- import org.springframework.stereotype.Repository;
- import java.math.BigDecimal;
- @Repository
- public interface UserAccountRepository extends JpaRepository<UserAccount, Long> {
- UserAccount findByUserId(Long userId);
- //通過userId更新余額
- @Modifying
- @Query("update UserAccount u set u.balance=?1 where u.userId=?2")
- void updateBalanceByUserId(BigDecimal balance, Long userId);
- }
創(chuàng)建UserAccountService和實現(xiàn)類,并實現(xiàn)其扣減方法:
- import com.tian.user.entity.UserAccount;
- import java.math.BigDecimal;
- public interface UserAccountService {
- /**
- * 扣減余額
- * @param userId 當前用戶userId
- * @param balance 當前需要減余額
- * @return 是否扣減成功
- */
- boolean reduceBalance(Long userId, BigDecimal balance);
- }
- import com.tian.user.entity.UserAccount;
- import com.tian.user.lock.DistributedLockV1;
- import com.tian.user.repository.UserAccountRepository;
- import com.tian.user.service.UserAccountService;
- import org.springframework.stereotype.Service;
- import org.springframework.transaction.annotation.Transactional;
- import javax.annotation.Resource;
- import java.math.BigDecimal;
- import java.util.Date;
- @Service
- @Transactional(rollbackFor = Exception.class)
- public class UserAccountServiceImpl implements UserAccountService {
- private Logger logger= LoggerFactory.getLogger(getClass());
- @Resource
- private UserAccountRepository userAccountRepository;
- @Resource
- private DistributedLockV1 distributedLockV1;
- @Override
- public boolean reduceBalance(Long userId, BigDecimal balance) {
- try {
- //把該賬戶給鎖住,使用userId作為key。
- boolean lock = distributedLockV1.lock(userId.toString());
- //獲取鎖失敗,則直接返回扣減失敗
- if (!lock) {
- return false;
- }
- UserAccount userAccount = userAccountRepository.findByUserId(userId);
- BigDecimal currBalance = userAccount.getBalance();
- //校驗余額是否足夠扣減
- if (currBalance.compareTo(balance) > 0) {
- BigDecimal newBalance = currBalance.subtract(balance);
- //扣減余額
- userAccountRepository.updateBalanceByUserId(newBalance, userId);
- return true;
- }
- }catch(Exception ex){
- logger.error("余額扣減失敗", ex);
- } finally {
- //釋放鎖
- distributedLockV1.unlock(userId.toString());
- }
- return false;
- }
- }
到此,簡單版的分布式鎖,以及如何使用,這里就已經(jīng)搞完了。下面我們來理一下思路:
1.使用setnx(set not exist),就是如何set的這個key在redis不存在就返回true,否則返回false。
2.對已經(jīng)set的key設置有效期,使用expire設置有效期。
3.校驗我們的可用余額是否足夠扣減,不夠就直接結(jié)束并使用delete刪除redis中的key。
4.扣減余額,更新數(shù)據(jù)庫余額值。
5.刪除key,delete redis中key。
那么,問題來了,第一步、第二步都成功了。但假如第三步查詢余額、扣減余額耗時20秒了,上面我們對Redis中key的有效期設置的10秒,也就是超時了,key過期了,并且在10秒到20秒之間又有其他線程來獲取到鎖了,然后此時把其他線程拿到的鎖給刪了,把其他線程的鎖給解了。此時,不就亂了嗎?
這也是面試中常被問使用redis做分布式鎖,業(yè)務超時了怎么辦?
升級版
我們可以把每次key對應的value返回,當釋放鎖的時候,判斷當前key對應的value是否是當前手里持有的value。
然后,我們針對上面的進行修改一版。
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.stereotype.Component;
- import javax.annotation.Resource;
- import java.util.UUID;
- import java.util.concurrent.TimeUnit;
- @Component
- public class DistributedLockV1 {
- private Logger logger = LoggerFactory.getLogger(DistributedLockV1.class);
- @Resource
- private RedisTemplate redisTemplate;
- public String lockV2(String businessKey) {
- boolean result = false;
- String uniqueValue = UUID.randomUUID().toString();
- try {
- result = redisTemplate.opsForValue().setIfAbsent(businessKey, uniqueValue);
- if (!result) {
- return null;
- }
- redisTemplate.expire(businessKey, 100, TimeUnit.SECONDS);
- return uniqueValue;
- } catch (Exception ex) {
- logger.error("獲取鎖失敗", ex);
- }
- return null;
- }
- public void unlockV2(String businessKey, String businessValue) {
- try {
- Object value = redisTemplate.opsForValue().get(businessKey);
- if (value == null) {
- return false;
- }
- //當前key在redis中value和當前線程手里持有的是否一致
- if (!businessValue.equals(value)) {
- //不一致,證明被其他線程獲取了
- logger.info("key={}釋放鎖失敗嗎,該鎖已被其他線程獲取",businessKey);
- return false;
- }
- redisTemplate.delete(businessKey);
- logger.info("key={}釋放鎖成功",businessKey);
- } catch (Exception ex) {
- logger.error("釋放鎖失敗", ex);
- }
- }
- }
這里比簡單版多了一個判斷,判斷持有鎖的線程是否為當前線程。盡管使用隨機字符串的 value來判斷是否為當前線程,但是在釋放鎖時(delete方法),還是無法做到原子操作,比如進程 A 執(zhí)行完業(yè)務邏輯,在準備釋放鎖時,恰好這時候進程 A 的鎖自動過期時間到了,而另一個進程 B 獲得鎖成功,然后 B 還沒來得及執(zhí)行,進程 A 就執(zhí)行了 delete(key) ,釋放了進程 B 的鎖.... ,因此需要配合 Lua 腳本釋放鎖。
setnx+Lua腳本Lua 是一種輕量小巧的腳本語言,用標準 C 語言編寫并以源代碼形式開放, 其設計目的是為了嵌入應用程序中,從而為應用程序提供靈活的擴展和定制功能。
Lua 提供了交互式編程模式。我們可以在命令行中輸入程序并立即查看效果。
lua腳本優(yōu)點:
- 減少網(wǎng)絡開銷:原先多次請求的邏輯放在 redis 服務器上完成。使用腳本,減少了網(wǎng)絡往返時延
- 原子操作:Redis會將整個腳本作為一個整體執(zhí)行,中間不會被其他命令插入(想象為事務)
- 復用:客戶端發(fā)送的腳本會永久存儲在Redis中,意味著其他客戶端可以復用這一腳本而不需要使用代碼完成同樣的邏輯
在resources目錄下創(chuàng)建一個redis-lock.lua文件。填入內(nèi)容:
- if redis.call('get', KEYS[1]) == ARGV[1]
- then
- return redis.call('del', KEYS[1])
- else
- return 0
- end
這段代碼的意思。就是通過key獲得其在redis中value,然后使用當前線程手里的value與之對比,一樣則刪除redis這個key。刪除返回1,否則返回0表示什么沒做。
Redis鎖代碼塊如下:
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.core.io.ClassPathResource;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.data.redis.core.script.DefaultRedisScript;
- import org.springframework.scripting.support.ResourceScriptSource;
- import org.springframework.stereotype.Component;
- import javax.annotation.PostConstruct;
- import javax.annotation.Resource;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.UUID;
- import java.util.concurrent.TimeUnit;
- @Component
- public class DistributedLockV1 {
- private Logger logger = LoggerFactory.getLogger(DistributedLockV1.class);
- @Resource
- private RedisTemplate redisTemplate;
- private DefaultRedisScript<Long> script;
- @PostConstruct
- public void init() {
- script = new DefaultRedisScript<Long>();
- script.setResultType(Long.class);
- script.setScriptSource(new ResourceScriptSource(new ClassPathResource("redis-lock.lua")));
- }
- public String lockV3(String key) {
- String value = UUID.randomUUID().toString().replace("-", "");
- /*
- * setIfAbsent <=> SET key value [NX] [XX] [EX <seconds>] [PX [millseconds]]
- * set expire time 5 mins
- */
- Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, value, 10000, TimeUnit.MILLISECONDS);
- if (flag) {
- return value;
- }
- return null;
- }
- public void unlockV3(String key, String value) {
- /** 業(yè)務邏輯處理完畢,釋放鎖 **/
- String lockValue = (String) redisTemplate.opsForValue().get(key);
- if (lockValue != null && lockValue.equals(value)) {
- System.out.println("lockValue========:" + lockValue);
- List<String> keys = new ArrayList<>();
- keys.add(key);
- Object execute = redisTemplate.execute(script, keys, lockValue);
- System.out.println("execute執(zhí)行結(jié)果,1表示執(zhí)行del,0表示未執(zhí)行 ===== " + execute);
- logger.info("{} 解鎖成功,結(jié)束處理業(yè)務", key);
- return;
- }
- logger.info("key={}釋放鎖失敗", key);
- }
- }
最后我們再次執(zhí)行罰款扣減,日志輸出:
- lockValue========:199740e62c184a6a9897f9c95e720b4d
- execute執(zhí)行結(jié)果,1表示執(zhí)行del,0表示未執(zhí)行 ===== 1
- 2021-03-09 19:03:51.592 INFO 6692 --- [nio-8080-exec-4] com.tian.user.lock.DistributedLockV1 : 1 解鎖成功,結(jié)束處理業(yè)務
到此,setnx+Lua 這種方案我們已經(jīng)實現(xiàn)了。如果對此有懷疑的,是好事,建議創(chuàng)建多個線程去調(diào)用罰款扣減這個service方法,看看器是否會出現(xiàn)問題。
「注意」
setnx在redis較低的版本里是沒有的,后面才引入的。其實我們也可以使用set命令來解決setnx,另外還可以加過期時間,整體命令為
- set key value nx px xxx
value 最好是隨機字符串,這樣可以防止業(yè)務代碼執(zhí)行時間超過設置的鎖自動過期時間,而導致再次釋放鎖時出現(xiàn)釋放其他進程鎖的情況。
setnx 瑣最大的缺點就是它加鎖時只作用在一個 Redis 節(jié)點上,即使 Redis 通過 Sentinel(哨崗、哨兵) 保證高可用,如果這個 master 節(jié)點由于某些原因發(fā)生了主從切換,那么就會出現(xiàn)鎖丟失的情況,下面是個例子:
- 在 Redis 的 master 節(jié)點上拿到了鎖;
- 但是這個加鎖的 key 還沒有同步到 slave 節(jié)點;
- master 故障,發(fā)生故障轉(zhuǎn)移,slave 節(jié)點升級為 master節(jié)點;
- 上邊 master 節(jié)點上的鎖丟失。
有的時候甚至不單單是鎖丟失這么簡單,新選出來的 master 節(jié)點可以重新獲取同樣的鎖,出現(xiàn)一把鎖被拿兩次的場景。
由此可知,鎖被獲取兩次,肯定不能滿足安全性了。
盡管前兩種方案不是很如意,總是有些問題,但也有被部分企業(yè)采用,下面我們就來看基于Redis來實現(xiàn)分布式鎖更高級的版本。
高級版 Redisson + RedLock
Redisson 是 java 的 Redis 客戶端之一,是 Redis 官網(wǎng)推薦的 java 語言實現(xiàn)分布式鎖的項目。
Redisson 提供了一些 api 方便操作 Redis。因為本文主要以鎖為主,所以接下來我們主要關注鎖相關的類,以下是 Redisson 中提供的多樣化的鎖:
- 可重入鎖(Reentrant Lock)
- 公平鎖(Fair Lock)
- 聯(lián)鎖(MultiLock)
- 紅鎖(RedLock)
- 讀寫鎖(ReadWriteLock)
- 信號量(Semaphore) 等等
總之,管你了解不了解,反正 Redisson 就是提供了一堆鎖... 也是目前大部分公司使用 Redis 分布式鎖最常用的一種方式。
整體加鎖和解鎖的代碼結(jié)構如下:
- RLock lock = redissonClient.getLock("xxx");
- lock.lock();
- try {
- ...
- } finally {
- lock.unlock();
- }
其實,加鎖和解鎖的磁層也是使用Lua腳本來實現(xiàn)的,有興趣的朋友可以去翻看一下器底層源碼。
由于篇幅問題,還涉及到Redis集群,所以這里就給出加鎖和解鎖的流程圖,僅供參考。
「加鎖過程」
「解鎖過程」
總結(jié)
通常我們?yōu)榱藢崿F(xiàn) Redis 的高可用,一般都會搭建 Redis 的集群模式,比如給 Redis 節(jié)點掛載一個或多個 slave 從節(jié)點,然后采用哨兵模式進行主從切換。但由于 Redis 的主從模式是異步的,所以可能會在數(shù)據(jù)同步過程中,master 主節(jié)點宕機,slave 從節(jié)點來不及數(shù)據(jù)同步就被選舉為 master 主節(jié)點,從而導致數(shù)據(jù)丟失,大致過程如下:
- 用戶在 Redis 的 master 主節(jié)點上獲取了鎖;
- master 主節(jié)點宕機了,存儲鎖的 key 還沒有來得及同步到 slave 從節(jié)點上;
- slave 從節(jié)點升級為 master 主節(jié)點;
- 用戶從新的 master 主節(jié)點獲取到了對應同一個資源的鎖,同把鎖獲取兩次。
ok,然后為了解決這個問題,Redis 作者提出了 RedLock 算法,步驟如下(五步):
在下面的示例中,我們假設有 5 個完全獨立的 Redis Master 節(jié)點,他們分別運行在 5 臺服務器中,可以保證他們不會同時宕機。
獲取當前 Unix 時間,以毫秒為單位。
依次嘗試從 N 個實例,使用相同的 key 和隨機值獲取鎖。在步驟 2,當向 Redis 設置鎖時,客戶端應該設置一個網(wǎng)絡連接和響應超時時間,這個超時時間應該小于鎖的失效時間。例如你的鎖自動失效時間為 10 秒,則超時時間應該在 5-50 毫秒之間。這樣可以避免服務器端 Redis 已經(jīng)掛掉的情況下,客戶端還在死死地等待響應結(jié)果。如果服務器端沒有在規(guī)定時間內(nèi)響應,客戶端應該盡快嘗試另外一個 Redis 實例。
客戶端使用當前時間減去開始獲取鎖時間(步驟 1 記錄的時間)就得到獲取鎖使用的時間。當且僅當從大多數(shù)(這里是 3 個節(jié)點)的 Redis 節(jié)點都取到鎖,并且使用的時間小于鎖失效時間時,鎖才算獲取成功。
如果取到了鎖,key 的真正有效時間等于有效時間減去獲取鎖所使用的時間(步驟 3 計算的結(jié)果)。
如果因為某些原因,獲取鎖失敗(沒有在至少 N/2+1 個Redis實例取到鎖或者取鎖時間已經(jīng)超過了有效時間),客戶端應該在所有的 Redis 實例上進行解鎖(即便某些 Redis 實例根本就沒有加鎖成功)。
到這,基本看出來,只要是大多數(shù)的 Redis 節(jié)點可以正常工作,就可以保證 Redlock 的正常工作。這樣就可以解決前面單點 Redis 的情況下我們討論的節(jié)點掛掉,由于異步通信,導致鎖失效的問題。
但是細想后, Redlock 還是存在如下問題:
假設一共有5個Redis節(jié)點:A, B, C, D, E。設想發(fā)生了如下的事件序列:
- 客戶端1 成功鎖住了 A, B, C,獲取鎖成功(但 D 和 E 沒有鎖住)。
- 節(jié)點 C 崩潰重啟了,但客戶端1在 C 上加的鎖沒有持久化下來,丟失了。
- 節(jié)點 C 重啟后,客戶端2 鎖住了 C, D, E,獲取鎖成功。
- 這樣,客戶端1 和 客戶端2 同時獲得了鎖(針對同一資源)。
哎,還是不能解決故障重啟后帶來的鎖的安全性問題...
針對節(jié)點重后引發(fā)的鎖失效問題,Redis 作者又提出了 延遲重啟 的概念,大致就是說,一個節(jié)點崩潰后,不要立刻重啟他,而是等到一定的時間后再重啟,等待的時間應該大于鎖的過期時間,采用這種方式,就可以保證這個節(jié)點在重啟前所參與的鎖都過期,聽上去感覺 延遲重啟 解決了這個問題...
但是,還是有個問題,節(jié)點重啟后,在等待的時間內(nèi),這個節(jié)點對外是不工作的。那么如果大多數(shù)節(jié)點都掛了,進入了等待,就會導致系統(tǒng)的不可用,因為系統(tǒng)在過期時間內(nèi)任何鎖都無法加鎖成功。
總之,在 Redis 分布式鎖的實現(xiàn)上還有很多問題等待解決,我們需要認識到這些問題并清楚如何正確實現(xiàn)一個 Redis 分布式鎖,然后在工作中合理的選擇和正確的使用分布式鎖。
但為什么又說,有一部分企業(yè)采用Redis來實現(xiàn)分布式鎖呢?其實實現(xiàn)分布式鎖,從中間件上來選,也有 Zookeeper 可選,并且 Zookeeper 可靠性比 Redis 強太多,但是效率是低了點,如果并發(fā)量不是特別大,追求可靠性,那么肯定首選 Zookeeper。
關于分布式鎖的實現(xiàn)方案,沒有絕對的好與壞,沒有最好的方案,只有最適合你的業(yè)務的方案。
以下兩種方案僅供參考:
- 如果為了效率,建議使用Redis來實現(xiàn);
- 如果追求可靠性,建議使用Zookeeper來實現(xiàn)。
本文轉(zhuǎn)載自微信公眾號「Java后端技術全?!梗梢酝ㄟ^以下二維碼關注。轉(zhuǎn)載本文請聯(lián)系Java后端技術全棧公眾號。