Spring自帶分布式鎖你用過嗎?
環(huán)境:SpringBoot2.7.12
本篇文章將會(huì)為大家介紹有關(guān)spring integration提供的分布式鎖功能。
1. 簡介
Spring Integration 是一個(gè)框架,用于構(gòu)建事件驅(qū)動(dòng)的應(yīng)用程序。在 Spring Integration 中,LockRegistry 是一個(gè)接口,用于管理分布式鎖。分布式鎖是一種同步機(jī)制,用于確保在分布式系統(tǒng)中的多個(gè)節(jié)點(diǎn)之間對(duì)共享資源的互斥訪問。
LockRegistry及相關(guān)子接口(如:RenewableLockRegistry) 接口的主要功能:
- 獲取鎖:當(dāng)應(yīng)用程序需要訪問共享資源時(shí),它可以通過 LockRegistry 獲取一個(gè)鎖。
- 釋放鎖:當(dāng)應(yīng)用程序完成對(duì)共享資源的訪問后,它應(yīng)該釋放鎖,以便其他應(yīng)用程序可以獲取它(第一點(diǎn)中提到,并沒有提供直接釋放鎖的操作,而是內(nèi)部自動(dòng)完成)。
- 續(xù)期:提供續(xù)期機(jī)制,以便在需要時(shí)延長鎖的持有時(shí)間。
常見的 LockRegistry 實(shí)現(xiàn)包括基于數(shù)據(jù)庫、ZooKeeper 和 Redis 的實(shí)現(xiàn)。
公共依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
2. 基于數(shù)據(jù)庫分布式鎖
引入依賴
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.30</version>
</dependency>
配置
spring:
datasource:
driverClassName: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/spring_lock?serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&useSSL=false
username: root
password: xxxooo
type: com.zaxxer.hikari.HikariDataSource
hikari:
minimumIdle: 10
maximumPoolSize: 200
---
spring:
integration:
jdbc:
initialize-schema: always
# 基于數(shù)據(jù)庫需要執(zhí)行初始化腳本
schema: classpath:schema-mysql.sql
注冊核心Bean對(duì)象
@Bean
public DefaultLockRepository defaultLockRepository(DataSource dataSource) {
DefaultLockRepository lockRepository = new DefaultLockRepository(dataSource);
// 這里根據(jù)你的業(yè)務(wù)需要,配置表前綴,默認(rèn):IN_
lockRepository.setPrefix("T_") ;
return lockRepository ;
}
// 注冊基于數(shù)據(jù)庫的分布式鎖
@Bean
public JdbcLockRegistry jdbcLockRegistry(DefaultLockRepository lockRepository) {
return new JdbcLockRegistry(lockRepository) ;
}
測試用例
@Test
public void testLock() throws Exception
int len = 10 ;
CountDownLatch cdl = new CountDownLatch(len) ;
CountDownLatch waiter = new CountDownLatch(len) ;
Thread[] ts = new Thread[len] ;
for (int i = 0; i < len; i++) {
ts[i] = new Thread(() -> {
waiter.countDown() ;
System.out.println(Thread.currentThread().getName() + " - 準(zhǔn)備獲取鎖") ;
try {
waiter.await() ;
} catch (InterruptedException e1) {
e1.printStackTrace();
}
// 獲取鎖
Lock lock = registry.obtain("drug_store_key_001") ;
lock.lock() ;
System.out.println(Thread.currentThread().getName() + " - 獲取鎖成功") ;
try {
try {
TimeUnit.SECONDS.sleep(2) ;
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
// 釋放鎖
lock.unlock() ;
cdl.countDown() ;
System.out.println(Thread.currentThread().getName() + " - 鎖釋放成功") ;
}
}, "T - " + i) ;
}
for (int i = 0; i < len; i++) {
ts[i].start() ;
}
cdl.await() ;
}
數(shù)據(jù)庫
圖片
鎖的實(shí)現(xiàn)JdbcLock,該對(duì)象實(shí)現(xiàn)了java.util.concurrent.locks.Lock,所以該鎖是支持重入等操作的。
配置鎖獲取失敗后的重試間隔,默認(rèn)值100ms
JdbcLockRegistry jdbcLockRegistry = new JdbcLockRegistry(lockRepository);
// 定義鎖對(duì)象時(shí)設(shè)置當(dāng)獲取鎖失敗后重試間隔時(shí)間。
jdbcLockRegistry.setIdleBetweenTries(Duration.ofMillis(200)) ;
鎖續(xù)期
jdbcLockRegistry.renewLock("drug_store_key_001");
3. 基于Redis分布式鎖
引入依賴
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
配置
spring:
redis:
host: localhost
port: 6379
password: xxxooo
database: 8
lettuce:
pool:
maxActive: 8
maxIdle: 100
minIdle: 10
maxWait: -1
測試用例
測試代碼與上面基于JDBC的一樣,只需要修改調(diào)用加鎖的代碼即可
Lock lock = redisLockRegistry.obtain("001") ;
設(shè)置鎖的有效期,默認(rèn)是60s
// 第三個(gè)參數(shù)設(shè)置了key的有效期,這里改成10s
RedisLockRegistry redisLockRegistry = new RedisLockRegistry(connectionFactory, registryKey, 10000) ;
注意:redis key的有效期設(shè)置為10s,如果你的業(yè)務(wù)執(zhí)行超過了10s,那么程序?qū)?huì)報(bào)錯(cuò)。并沒有redission watch dog機(jī)制。
Exception in thread "T - 0" java.lang.IllegalStateException: Lock was released in the store due to expiration. The integrity of data protected by this lock may have been compromised.
at org.springframework.integration.redis.util.RedisLockRegistry$RedisLock.unlock(RedisLockRegistry.java:450)
at com.pack.SpringIntegrationDemoApplicationTests.lambda$1(SpringIntegrationDemoApplicationTests.java:83)
at java.lang.Thread.run(Thread.java:748)
如果10s過期后key自動(dòng)刪除后,其它線程是否能立馬獲取到鎖呢?如果是單節(jié)點(diǎn)中其它現(xiàn)在也不能獲取鎖,必須等上一個(gè)線程結(jié)束后才可以,這是因?yàn)樵趦?nèi)部還維護(hù)了一個(gè)ReentrantLock鎖,在獲取分布式鎖前要先獲取本地的一個(gè)鎖。
private abstract class RedisLock implements Lock {
private final ReentrantLock localLock = new ReentrantLock();
public final void lock() {
this.localLock.lock();
while (true) {
try {
if (tryRedisLock(-1L)) {
return;
}
} catch (InterruptedException e) {
} catch (Exception e) {
this.localLock.unlock();
rethrowAsLockException(e);
}
}
}
}
注意:不管是基于數(shù)據(jù)庫還是Redis都要先獲取本地的鎖
Spring Cloud Task就使用到了Spring Integration中的鎖基于數(shù)據(jù)庫的。
總結(jié):Spring Integration 的分布式鎖為開發(fā)者提供了一種在分布式系統(tǒng)中實(shí)現(xiàn)可靠同步的有效方法。通過合理選擇和使用這些鎖實(shí)現(xiàn),可以確保對(duì)共享資源的訪問在多個(gè)節(jié)點(diǎn)之間保持協(xié)調(diào)一致,從而提高系統(tǒng)的整體可靠性和性能。
完畢?。。?/p>