分布式鎖常見的解決方案,你知道幾種
前言
1、什么是分布式鎖
要介紹分布式鎖,首先要知道與分布式鎖相對應的是線程鎖、進程鎖。
1.線程鎖
主要用來給方法、代碼塊加鎖。當某個方法或代碼使用鎖,在同一時刻僅有一個線程執(zhí)行該方法或該代碼段。線程鎖只在同一JVM中有效果,因為線程鎖的實現在根本上是依靠線程之間共享內存實現的,比如Synchronized、Lock等。
2.進程鎖
為了控制多個進程訪問某個共享資源,因為進程具有獨立性,各個進程無法訪問其他進程的資源,因此無法通過synchronized等線程鎖實現進程鎖。
3.分布式鎖
當在分布式系統(tǒng)中,一個實例往往具有多個節(jié)點。這時候就面臨多個進程對同一資源資源的訪問。因此jvm級別的鎖滿足不了需求了。這個時候就需要用分布式鎖控制訪問的資源
2、分布式鎖的特點
1、互斥性:任意時刻,只能有一個客戶端獲取鎖,不能同時有兩個客戶端獲取到鎖。
2、安全性:鎖只能被持有該鎖的客戶端刪除,不能由其它客戶端刪除。
3、死鎖:獲取鎖的客戶端因為某些原因(如down機等)而未能釋放鎖,其它客戶端再也無法獲取到該鎖。
4、容錯:當部分節(jié)點(redis節(jié)點等)down機時,客戶端仍然能夠獲取鎖和釋放鎖。
3、常見分布式鎖的解決方案
1、基于數據庫
2、基于ZooKeeper
3、etcd
3、基于redis(推薦)
4、數據庫
兩種實現
- 唯一索引
- 排它鎖
1、基于表實現的分布式鎖
CREATE TABLE `methodLock` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
`method_name` varchar(64) NOT NULL DEFAULT '' COMMENT '鎖定的方法名',
`node_info` varchar(64) NOT NULL DEFAULT '' COMMENT '結點信息/線程信息',
`count` int NOT NULL DEFAULT 0 COMMENT '鎖的次數,實現可重入',
`desc` varchar(1024) NOT NULL DEFAULT '備注信息',
`update_time` timestamp NOT NULL DEFAULT now() ON UPDATE now() COMMENT '保存數據時間,自動生成',
PRIMARY KEY (`id`),
UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='鎖定中的方法';
`當我們想要鎖住某個方法時,執(zhí)行以下SQL:
insert into methodLock(method_name,node_info,desc) values ('method_name','node_info','desc')
因為我們對method_name做了唯一性約束,這里如果有多個請求同時提交到數據庫的話,數據庫會保證只有一個操作可以成功,那么我們就可以認為操作成功的那個線程獲得了該方法的鎖,可以執(zhí)行方法體內容。
當方法執(zhí)行完畢之后,想要釋放鎖的話,需要執(zhí)行以下Sql:
delete from methodLock where method_name ='method_name'
2、借助數據庫的排他鎖
排他鎖又稱寫鎖、獨占鎖,如果事務T對數據A加上排他鎖后,則其他事務不能再對A加任何類型的封鎖。獲準排他鎖的事務既能讀數據,又能修改數據在查詢語句后面增加FOR UPDATE,MySQL 就會對查詢結果中的每行都加排他鎖,當沒有其他線程對查詢結果集中的任何一行使用排他鎖時,可以成功申請排他鎖,否則會被阻塞。
SELECT ... FOR UPDATE;
偽代碼實現
//阻塞試獲取鎖
//事務保證原子性
@Transactional
public void lock(){
if(select * from methodLock where method_name='xxx' for update ==>有數據){
//有數據,表示資源已經被加鎖,需要判斷是否是重入
if(current==resultNodeInfo){
//是自己加的鎖,增加count,表示可重入
update methodLock set count=count+1 where method_name=xxx'
return true;
}else{
return false;
}
}
insert into methodLock(method_name,desc) values (‘method_name’,‘desc’);
return true;
}
//非阻塞
public bool trylock(){
long endTimeout=System.currentTimeMills()+timeout;
while(true){
if(mysqlLock.lock()){
return true;
}
//判斷是否超時,如果超時。枷鎖失敗
if(endTimeout<System.currentTimeMills()){
return false;
}
}
}
//釋放鎖
@Transactional
public void unlock(){
if(select * from methodLock where method_name='xxx' for update ==>有數據){
//有數據,表示資源已經被加鎖,需要判斷是否是重入
if(current==resultNodeInfo){
//判斷是否是自己加鎖,如果是自己的鎖,需要解鎖
if(count>1){
update count=count-1;
}else{
//鎖沒有重入
delete from methodLock where method_name ='method_name'
}
//是自己加的鎖,增加count,表示可重入
update methodLock set count=count+1 where method_name=xxx'
return true;
}else{
//不是自己的鎖不釋放
return false;
}
}else{
//沒有數據,表示資源未被加鎖,無需釋放
return true;
}
}
3、缺點
- 依賴數據庫的可用性,數據庫掛掉,導致業(yè)務不可用
- 鎖沒有失效時間,解鎖一旦失敗,鎖記錄會一直在數據庫,其他線程則不能獲取鎖,或者啟動定時任務循環(huán)遍歷鎖,長時間未被釋放的,認定為超時,直接刪除
- 多個線程搶鎖時,搶鎖失敗的線程會拋異常,如果需要再次獲取鎖,需要再次觸發(fā)業(yè)務請求?;蛘咝枰约簩崿FCAS搶鎖
5、借助redison來實現redis分布式鎖
5.1、Redison分布式鎖實現原理
6、SpringBoot整合Redisson
6.1、添加maven依賴
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.0</version>
</dependency>
6.2、自定義配置類
單機模式為例
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.ReadMode;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Configuration
public class RedissonConfig {
@Resource
private RedisProperties redisProperties;
@Bean(destroyMethod = "shutdown")
public RedissonClient redissonClient() {
Config config = new Config();
String redisUrl = String.format("redis://%s:%s", redisProperties.getHost() + "", redisProperties.getPort() + "");
config.useSingleServer()
.setDatabase(0)
.setAddress(redisUrl)
.setPassword(redisProperties.getPassword());
config.setLockWatchdogTimeout(9000);
return Redisson.create(config);
}
}
7、分布式鎖常見api說明
public interface RRLock {
//----------------------Lock接口方法-----------------------
/**
* 加鎖 鎖的有效期默認30秒
*/
void lock();
/**
* tryLock()方法是有返回值的,它表示用來嘗試獲取鎖,如果獲取成功,則返回true,如果獲取失?。存i已被其他線程獲?。瑒t返回false .
*/
boolean tryLock();
/**
* tryLock(long time, TimeUnit unit)方法和tryLock()方法是類似的,只不過區(qū)別在于這個方法在拿不到鎖時會等待一定的時間,
* 在時間期限之內如果還拿不到鎖,就返回false。如果如果一開始拿到鎖或者在等待期間內拿到了鎖,則返回true。
*
* @param time 等待時間
* @param unit 時間單位 小時、分、秒、毫秒等
*/
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
/**
* 解鎖
*/
void unlock();
/**
* 中斷鎖 表示該鎖可以被中斷 假如A和B同時調這個方法,A獲取鎖,B為獲取鎖,那么B線程可以通過
* Thread.currentThread().interrupt(); 方法真正中斷該線程
*/
void lockInterruptibly();
//----------------------RLock接口方法-----------------------
/**
* 加鎖 上面是默認30秒這里可以手動設置鎖的有效時間
*
* @param leaseTime 鎖有效時間
* @param unit 時間單位 小時、分、秒、毫秒等
*/
void lock(long leaseTime, TimeUnit unit);
/**
* 這里比上面多一個參數,多添加一個鎖的有效時間
*
* @param waitTime 等待時間
* @param leaseTime 鎖有效時間
* @param unit 時間單位 小時、分、秒、毫秒等
*/
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
/**
* 檢驗該鎖是否被線程使用,如果被使用返回True
*/
boolean isLocked();
/**
* 檢查當前線程是否獲得此鎖(這個和上面的區(qū)別就是該方法可以判斷是否當前線程獲得此鎖,而不是此鎖是否被線程占有)
* 這個比上面那個實用
*/
boolean isHeldByCurrentThread();
/**
* 中斷鎖 和上面中斷鎖差不多,只是這里如果獲得鎖成功,添加鎖的有效時間
* @param leaseTime 鎖有效時間
* @param unit 時間單位 小時、分、秒、毫秒等
*/
void lockInterruptibly(long leaseTime, TimeUnit unit);
}
8、如何使用
@Autowired
private RedissonClient redissonClient;
public String test(String id) {
RLock lock = redissonClient.getLock(id);
try {
// 獲取鎖等待時間2秒,鎖過期時間30秒
// 當leaseTime參數值設置為:-1時,鎖自動續(xù)約機制生效
boolean b = lock.tryLock(2, 30, TimeUnit.SECONDS);
if (b) {
log.info("獲取鎖成功,id={}", id);
Thread.sleep(10000);
log.info("業(yè)務處理結束...");
} else {
log.warn("獲取鎖失敗,id={}", id);
}
} catch (InterruptedException e) {
log.error("獲取鎖異常", e);
} finally {
// 僅允許鎖的持有者線程解鎖
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
return id;
}
9、實戰(zhàn)數據防重
9.1、先新建一個網關服務,項目結構如下,用來實現轉發(fā),負載均衡
9.2、新建t_person_test表
CREATE TABLE `t_person_test` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`phone` varchar(30) DEFAULT NULL,
`num` int(11) DEFAULT NULL,
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `t_person_test_id_uindex` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1900802051 DEFAULT CHARSET=utf8
9.3、新建一個名為redisson-demo的項目,項目結構如下
9.4、application.properties
server.port=8384
spring.application.name=demo
spring.datasource.url=jdbc:mysql://10.1x.xx.x:3306/attendance_saas?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&serverTimezone=GMT
spring.datasource.username=test
spring.datasource.password=Test123.
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
# 服務注冊到nacos
spring.cloud.nacos.discovery.server-addr=10.x.x.2x0:8848
spring.cloud.nacos.discovery.namespace=12d825d5-165b-489c-beef-c72944cac9d2
spring.cloud.nacos.discovery.password=nacos
spring.cloud.nacos.discovery.username=nacos
# redis配置
spring.redis.port=6379
spring.redis.host=1x.x.xx.xx
9.5、對應的實體類
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
@TableName("t_person_test")
public class PersonTestEntity {
private Integer id;
private String phone;
private Integer num;
}
9.6、對應的mapper
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.redissondemo.entity.PersonTestEntity;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
public interface PersonTestMapper extends BaseMapper<PersonTestEntity> {
@Update("update t_person_test set num=num-1 where phone=#{phone}")
int updateNum(@Param("phone") String phone);
@Select("select * from t_person_test where phone=#{phone}")
PersonTestEntity selectPersonByPhone(@Param("phone") String phone);
}
9.7、對應的控制器RedissonController
import com.example.redissondemo.entity.PersonTestEntity;
import com.example.redissondemo.mapper.PersonTestMapper;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@Slf4j
public class RedissonController {
@Resource
private RedissonClient redissonClient;
@Resource
private PersonTestMapper personTestMapper;
/**
* 常規(guī)插入代碼代碼-大多數人寫的
* @return
*/
@PostMapping("/insert")
public int insert(){
String phone="15797638118";
PersonTestEntity build = PersonTestEntity.builder()
.num(10)
.phone(phone).build();
PersonTestEntity testEntity = personTestMapper.selectPersonByPhone(phone);
if(null!=testEntity){
throw new RuntimeException("數據已存在");
}
int insert = personTestMapper.insert(build);
return insert;
}
/**
* 使用分布式鎖的插入代碼-保證數據防重
* @return
*/
@PostMapping("/insertLock")
public int insertLock(){
String phone="15797638118";
PersonTestEntity build = PersonTestEntity.builder()
.num(10)
.phone(phone).build();
RLock lock = redissonClient.getLock("insertKey");
int insert = 0;
try {
lock.lock();
PersonTestEntity testEntity = personTestMapper.selectPersonByPhone(phone);
if(null!=testEntity){
throw new RuntimeException("數據已存在");
}
insert = personTestMapper.insert(build);
} catch (RuntimeException e) {
log.error("insertLock方法拋出異常",e);
} finally {
if(lock.isHeldByCurrentThread()){
lock.unlock();
}
}
return insert;
}
/**
* 常規(guī)的更新代碼-大多數人寫的-字段更新結果超出預期
* @return
*/
@PostMapping("/update")
public int update(){
int update = 0;
String phone="15797638118";
PersonTestEntity testEntity = personTestMapper.selectPersonByPhone(phone);
if(testEntity!=null&&testEntity.getNum()>0){
update = personTestMapper.updateNum(phone);
}else {
log.info("num已經等于0");
}
return update;
}
/**
* 使用分布式鎖的更新代碼-保證數據在預期內相減
* @return
*/
@PostMapping("/updateLock")
public int updateLock(){
int insert = 0;
String phone="15797638118";
RLock lock = redissonClient.getLock("updateKey");
try {
lock.unlock();
PersonTestEntity testEntity = personTestMapper.selectPersonByPhone(phone);
if(testEntity!=null&&testEntity.getNum()>0){
insert = personTestMapper.updateNum(phone);
}else {
log.info("num已經等于0");
}
} catch (Exception e) {
log.error("updateLock方法拋出異常",e);
} finally {
if(lock.isHeldByCurrentThread()){
lock.unlock();
}
}
return insert;
}
}
9.8、將Redisson-demo項目啟動兩個實例,端口號8383,8384
9.9、jmter壓力測試,新建一個線程組,指定線程數為10,循環(huán)數次為2。相當于20個線程
8、新建一個http請求,這個我們借助spring cloud gateway 來實現負載均衡,先測試插入接口
10、測試結果
10.1、insert接口
- 先查詢數據庫內容嗎,數據為空,
執(zhí)行jmter的20個線程結果
查詢到有10數據入庫,顯然不對,我們的代碼期望不符合
10.2、insertLock接口
執(zhí)行jmter的20個線程結果
數據庫中只有一條數據,分布式鎖生效
10.3update接口
我們將jemter接口換成/demo/update,線程數任然是20個,執(zhí)行前的結果如果下,num字段為10
num字段被更新成-8,顯然不符合預期
10.3updateLock接口
jmter執(zhí)行前
jmter執(zhí)行后,多次測試num字段符合預期
11、分布式鎖與本地事物
問題代碼
@Transactional
public void update(int id) {
boolean lock = redisLock.lock(id);
if (!lock) {
throw new RuntimeException("當前人數過多,請稍后再試");
}
/*
業(yè)務代碼在該區(qū)域
*/
redisLock.unlock(id);
}
@Transactional是spring的aop實現,會在update方法之前開啟事務,之后再加鎖,當鎖住的代碼執(zhí)行完成后,再提交事務,因此鎖住的代碼塊執(zhí)行是在事務之內執(zhí)行的,可以推斷在代碼塊執(zhí)行完時,事務還未提交,鎖已經被釋放,此時其他線程拿到鎖之后進行鎖住的代碼塊,讀取的庫存數據不是最新的。先拿到鎖的線程修改的數據,可能被覆蓋。
解決方案:
@Transactional一定要的分布式鎖內,或者直接拋棄事物。