Redisson 全面解析:從使用方法到工作原理的深度探索
Redisson是基于原生redis操作指令上進(jìn)一步的封裝,屏蔽了redis數(shù)據(jù)結(jié)構(gòu)的實(shí)現(xiàn)細(xì)節(jié),開(kāi)發(fā)可以像操作普通java對(duì)象一樣使用redis,而本文將針對(duì)Redisson中各種使用的數(shù)據(jù)結(jié)構(gòu)和工具包使用及其實(shí)現(xiàn)進(jìn)行詳盡的分析,希望對(duì)你有幫助。
一、詳解Redisson基本數(shù)據(jù)類(lèi)型
1. Redisson前置配置說(shuō)明
使用redisson的方式比較簡(jiǎn)單,我們首先需要引入redisson的依賴(lài)包:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.23.5</version>
</dependency>
然后我們指明redis的ip、端口等配置即可:
spring.redis.host=localhost
spring.redis.port=6379
有了上述配置后,我們就可以快速完成redisson客戶端配置:
@Configuration
public class RedissonConfig {
@Autowired
private RedisProperties redisProperties;
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
String redisUrl = String.format("redis://%s:%s", redisProperties.getHost() + "",
redisProperties.getPort() + "");
config.useSingleServer().setAddress(redisUrl);
return Redisson.create(config);
}
}
后續(xù)在進(jìn)行使用的時(shí)候,我們直接注入對(duì)應(yīng)的客戶端依賴(lài)即可:
@Autowired
private RedissonClient redissonClient;
2. 以bucket維度操作字符串
和我們第一次使用redis一樣,我們先用redisson完成一個(gè)字符串的鍵值對(duì)存儲(chǔ),對(duì)應(yīng)的使用例子如下所示,我們只需拿到對(duì)應(yīng)的test-key的bucket即可進(jìn)行讀寫(xiě)操作:
//生成 test-key 的bucket
RBucket<Object> bucket = redissonClient.getBucket("test-key");
//查看對(duì)應(yīng)的bucket是否存在
if (ObjUtil.isEmpty(bucket.get())) {
//基于set指令進(jìn)行插入
bucket.set("test-value");
//嘗試通過(guò)get獲取值
Object value = bucket.get();
log.info("value:{}", value);
}
對(duì)于RBucket對(duì)象的set和get操作本質(zhì)上都是基于redis字符串操作指令set和get的一層封裝,在我們調(diào)用getBucket獲取對(duì)應(yīng)key的bucket的時(shí)候,redisson會(huì)基于當(dāng)前客戶端的連接信息和bucket鍵進(jìn)行一次封裝得到一個(gè)test-key的bucket對(duì)象:
對(duì)應(yīng)的我們給出getBucket的底層實(shí)現(xiàn),可以看到邏輯操作就是封裝維護(hù)如下這份信息:
- 編碼器和解碼器codec,默認(rèn)情況下是Kryo5Codec
- 執(zhí)行命令的commandExecutor,該對(duì)象記錄redis客戶端的基本信息。
- name也就是我們要操作的key的信息,也就是字符串key。
public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
this.codec = codec;
this.commandExecutor = commandExecutor;
if (name == null) {
throw new NullPointerException("name can't be null");
}
setName(name);
}
然后就是執(zhí)行set指令了,我們都知道redisson是基于Netty封裝的redis操作工具,所以在進(jìn)行redis操作時(shí)涉及了大量?jī)?yōu)秀的異步讀寫(xiě)涉及,我們以上文set操作為例,實(shí)際上其底層執(zhí)行時(shí)做了如下幾件事:
- 基于傳入的key,也就是我們的test-key定位到slot地址。
- 獲取到上一步封裝的編碼器codec。
- 本次執(zhí)行是set請(qǐng)求,所以如果我們采用主從模式進(jìn)行部署,這一步是會(huì)從主庫(kù)獲取連接信息,因?yàn)槲覀兙团渲昧艘慌_(tái)redis,所以默認(rèn)直接從默認(rèn)庫(kù)獲取連接。
- 基于連接信息發(fā)送指令。
- 完成操作后歸還連接。
這些步驟完成后,操作結(jié)果會(huì)被封裝為Future對(duì)象,如果需要直到執(zhí)行結(jié)果,我們調(diào)用get即可知曉處理情況:
對(duì)應(yīng)的我們也給出set的源碼入口,如筆者所說(shuō)其底層就是一個(gè)set操作的異步調(diào)用setAsync,通過(guò)該回調(diào)會(huì)得到一個(gè)RFuture對(duì)象,通過(guò)get即可獲取結(jié)果:
@Override
public void set(V value) {
//基于setAsync提交異步set操作,然后通過(guò)get獲取執(zhí)行結(jié)果
get(setAsync(value));
}
對(duì)應(yīng)的我們步入setAsync可以看到它會(huì)拿著我們上一步初始化所得來(lái)的key名稱(chēng)、編碼器、set操作指令對(duì)象以及編碼后的value值通過(guò)commandExecutor進(jìn)行異步寫(xiě)入到redis服務(wù)端:
@Override
public RFuture<Void> setAsync(V value) {
//......
//基于各種信息通過(guò)commandExecutor進(jìn)行異步提交
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.SET, getRawName(), encode(value));
}
我們?cè)俅尾饺爰纯蓙?lái)到第一個(gè)核心步驟,通過(guò)key獲取到slot,因?yàn)槲覀儾渴鸾Y(jié)構(gòu)是單體,所以source拿到的是默認(rèn)值0,然后調(diào)用async正式執(zhí)行異步寫(xiě)操作:
@Override
public <T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
//定位slot
NodeSource source = getNodeSource(key);
//執(zhí)行異步寫(xiě)
return async(false, source, codec, command, params, false, false);
}
步入async即可看到我們的最核心的步驟了,該方法內(nèi)部會(huì)通過(guò)RedisExecutor執(zhí)行execute方法,大體就是執(zhí)行了上圖所說(shuō)的:
- 獲取編碼器
- 基于讀寫(xiě)請(qǐng)求獲取連接,注意獲取連接的操作是異步的
- 得到連接后調(diào)用sendCommand發(fā)送set請(qǐng)求,其內(nèi)部本質(zhì)上就是基于netty所封裝的socketChannel執(zhí)行set操作。
- 完成寫(xiě)操作后釋放連接
public void execute() {
//......
//1. 獲取編碼器
codec = getCodec(codec);
//2.基于讀寫(xiě)請(qǐng)求獲取連接,注意獲取連接的操作是異步的
CompletableFuture<RedisConnection> connectionFuture = getConnection();
//......
//3. 得到連接后調(diào)用sendCommand發(fā)送set請(qǐng)求
connectionFuture.whenComplete((connection, e) -> {
//......
sendCommand(attemptPromise, connection);
//......
});
attemptPromise.whenComplete((r, e) -> {
//完成操作后釋放連接
releaseConnection(attemptPromise, connectionFuture);
checkAttemptPromise(attemptPromise, connectionFuture);
});
}
3. 以Java API風(fēng)格操作redis列表
列表操作就是對(duì)于redis列表的封裝,可以看到redisson給出的操作函數(shù)完全按照java開(kāi)發(fā)的習(xí)慣命名:
RList<Object> list = redissonClient.getList("list");
//循環(huán)添加元素
for (int i = 0; i < 10; i++) {
list.add(i);
}
//移除索引0位置的元素
list.remove(0);
getList和上述bucket操作類(lèi)似這里就不多追贅述,這里我們就看看add的實(shí)現(xiàn)細(xì)節(jié),本質(zhì)上它就是異步調(diào)用redis的RPUSH指令將元素追加到列表末尾,整體流程原理和上述set操作差不多,這里就不多做贅述了:
對(duì)應(yīng)的我們也給出底層源碼的核心部分的介紹:
@Override
public boolean add(V e) {
return get(addAsync(e));
}
@Override
public RFuture<Boolean> addAsync(V e) {
//異步執(zhí)行rpush指令將元素追加到末尾
return addAsync(e, RPUSH_BOOLEAN);
}
4. 以Java API格式操作字典
映射集也就是我們java中常說(shuō)的map,redisson底層使用的就是redis的dict字典,對(duì)應(yīng)示例如下所示,注意這個(gè)put方法,每次操作后它會(huì)有一個(gè)返回值,即如果這個(gè)key存在于redis中,那么本次put擦咯做結(jié)束后就會(huì)返回覆蓋前的值,就像下面這段代碼一樣,第二次put操作后就會(huì)返回value1:
RMap<String, String> hashMap = redissonClient.getMap("hashMap");
//使用put操作,如果這個(gè)key存在則返回這個(gè)key原有的value值
String res = hashMap.put("key1", "value1");
log.info("before res:{}", res);
res = hashMap.put("key1", "value2");
log.info("after res:{}", res);
這里我們也給出put的核心實(shí)現(xiàn),對(duì)應(yīng)的核心代碼就是RedissonMap中的putAsync方法,大體邏輯是進(jìn)行key和value的檢查之后,調(diào)用putOperationAsync生成一個(gè)異步put操作的任務(wù)并得到一個(gè)future,最后封裝成mapWriterFuture返回:
@Override
public RFuture<V> putAsync(K key, V value) {
//進(jìn)行鍵值對(duì)檢查
checkKey(key);
checkValue(value);
//基于putOperationAsync執(zhí)行鍵值對(duì)插入操作
RFuture<V> future = putOperationAsync(key, value);
if (hasNoWriter()) {
return future;
}
//返回結(jié)果
return mapWriterFuture(future, new MapWriterTask.Add(key, value));
}
所以來(lái)到putOperationAsync即可看到這段核心代碼的實(shí)現(xiàn),本質(zhì)上為了保證返回覆蓋前的值,redis用到的lua腳本,該腳本的執(zhí)行流程為:
- 調(diào)用hget判斷key是否存在若存在用v記錄這個(gè)值。
- 調(diào)用hset進(jìn)行鍵值對(duì)設(shè)置。
- 返回v即覆蓋前的值。
對(duì)應(yīng)的我們也給出這段源代碼示例:
protected RFuture<V> putOperationAsync(K key, V value) {
String name = getRawName(key);
return commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_MAP_VALUE,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
+ "return v",
Collections.singletonList(name), encodeMapKey(key), encodeMapValue(value));
}
5. 詳解redisson自實(shí)現(xiàn)的阻塞隊(duì)列
我們?cè)賮?lái)個(gè)阻塞隊(duì)列的例子,整體使用也和java的阻塞隊(duì)列差不多:
RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue("blockingQueue");
//添加元素
blockingQueue.put("element");
//取出元素
String value = blockingQueue.take();
log.info("value:{}", value);
實(shí)際上隊(duì)列的實(shí)現(xiàn)也是基于redis的列表,通過(guò)rpush實(shí)現(xiàn)入隊(duì),lpop實(shí)現(xiàn)出隊(duì):
對(duì)應(yīng)我們也給出入隊(duì)的代碼核心實(shí)現(xiàn)印證這一點(diǎn):
@Override
public RFuture<Void> putAsync(V e) {
//使用rpush模擬入隊(duì)
return addAsync(e, RedisCommands.RPUSH_VOID);
}
用blpop實(shí)現(xiàn)出隊(duì)操作:
@Override
public RFuture<V> takeAsync() {
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BLPOP_VALUE, getRawName(), 0);
}
6. 詳解redisson自實(shí)現(xiàn)延遲隊(duì)列
在上文中我們給出阻塞隊(duì)列的概念,實(shí)際上redisson在此基礎(chǔ)上更進(jìn)一步的封裝做出了一個(gè)延遲隊(duì)列的設(shè)計(jì),如下面這段示例,該代碼會(huì)在5s后提交給blockingQueue一個(gè)element元素,通過(guò)blockingQueue的take方法即可實(shí)現(xiàn)5s后準(zhǔn)時(shí)出去元素:
//創(chuàng)建延遲隊(duì)列
RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue("blockingQueue");
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
//添加元素
delayedQueue.offer("element", 5, TimeUnit.SECONDS);
//取出元素
long begin = System.currentTimeMillis();
String value = blockingQueue.take();
long end = System.currentTimeMillis();
log.info("value:{} cost:{}ms", value, end - begin);
對(duì)應(yīng)的我們也給出這段代碼示例的輸出結(jié)果,可以看到阻塞隊(duì)列必須等到5s左右才能得到元素:
2025-01-14 10:52:27.134 INFO 17684 --- [ main] com.sharkChili.TestRunner : value:element cost:5034ms
其實(shí)現(xiàn)原理也很簡(jiǎn)單,上述代碼我們指明了隊(duì)列名稱(chēng)為blockingQueue,在使用offer進(jìn)行延遲提交本質(zhì)上就是通過(guò)lua腳本實(shí)現(xiàn)元素延遲提交,其工作內(nèi)容為:
- 基于我們給定的名稱(chēng)blockingQueue生成一個(gè)有序集合redisson_delay_queue_timeout:{blockingQueue}告知element元素的超時(shí)時(shí)間。
- 基于我們給定的名稱(chēng)blockingQueue生成列表redisson_delay_queue:{blockingQueue}一個(gè)編碼后的元素值element。
- 到有序集合redisson_delay_queue:{blockingQueue}中查看第一個(gè)元素是否是當(dāng)前元素,如果是則通過(guò)publish發(fā)送一個(gè)給redisson_delay_queue_channel:{blockingQueue}這個(gè)topic告知元素提交的到期時(shí)間。
對(duì)應(yīng)的我們給出offer底層的實(shí)現(xiàn),可以看到該方法通過(guò)我們傳入的時(shí)間得到一個(gè)超時(shí)后的時(shí)間,然后封裝成lua腳本,也就是我們上面所說(shuō)的含義提交到redis服務(wù)端:
public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
//......
//計(jì)算超時(shí)后的時(shí)間
long delayInMs = timeUnit.toMillis(delay);
long timeout = System.currentTimeMillis() + delayInMs;
//生成隨機(jī)數(shù)構(gòu)成一個(gè)唯一的lua腳本
byte[] random = getServiceManager().generateIdArray(8);
//基于隨機(jī)數(shù)生成lua腳本
return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
"local value = struct.pack('Bc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]);"
//提交到超時(shí)隊(duì)列redisson_delay_queue_timeout:{blockingQueue}記錄元素value插入的時(shí)間為ARGV[1],即入?yún)⒅械膖imeout
+ "redis.call('zadd', KEYS[2], ARGV[1], value);"
//提交到元素隊(duì)列redisson_delay_queue:{blockingQueue}當(dāng)前元素值為element
+ "redis.call('rpush', KEYS[3], value);"
//從redisson_delay_queue_timeout:{blockingQueue}獲取第一個(gè)元素,如果是當(dāng)前元素則通過(guò)redisson_delay_queue_channel:{blockingQueue}這個(gè)channel發(fā)布元素的到期時(shí)間為ARGV[1],即入?yún)⒅械膖imeout
+ "local v = redis.call('zrange', KEYS[2], 0, 0); "
+ "if v[1] == value then "
+ "redis.call('publish', KEYS[4], ARGV[1]); "
+ "end;",
//這個(gè)list代表keys列表,getRawName是blockingqueue、timeout就是redisson_delay_queue_timeout:{blockingQueue}、queueName就是redisson_delay_queue:{blockingQueue}、channel就是基于redisson_delay_queue_channel:{blockingQueue}
Arrays.asList(getRawName(), timeoutSetName, queueName, channelName),
//代表arg timeout即超時(shí)的時(shí)間,random是隨機(jī)數(shù)、e就是我們本次插入的編碼后的element
timeout, random, encode(e));
}
基于上述的執(zhí)行腳本,我們的延遲隊(duì)列在初始化時(shí)會(huì)創(chuàng)建一個(gè)QueueTransferTask,從上一步發(fā)布到redisson_delay_queue_channel:{blockingQueue}的信息,這個(gè)QueueTransferTask會(huì)監(jiān)聽(tīng)到元素的到期時(shí)間然后生成一個(gè)定時(shí)任務(wù),到點(diǎn)后執(zhí)行如下邏輯:
- 從redisson_delay_queue_timeout:{blockingQueue}這個(gè)超時(shí)隊(duì)列中獲取到期的元素。
- 將元素值提交到blockingQueue中。
- 將本次延遲提交的元素從redisson_delay_queue_timeout:{blockingQueue}、redisson_delay_queue:{blockingQueue}中移除。
由此一次完整的元素提交就成功了:
對(duì)應(yīng)的我們給出延遲隊(duì)列的初始化代碼,它會(huì)進(jìn)行各種隊(duì)列初始化的任務(wù)提交工作,整體步驟為:
- 基于傳入的blockingQueue生成channel、列表、超時(shí)隊(duì)列。
- 它會(huì)創(chuàng)建一個(gè)lua腳本,內(nèi)容就是上面所說(shuō)的延遲提交入隊(duì)列然后移除延遲提交的任務(wù)信息。
- 調(diào)用schedule啟動(dòng)task。
protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
//基于傳入的blockingQueue生成channel、列表、超時(shí)隊(duì)列-
channelName = prefixName("redisson_delay_queue_channel", getRawName());
queueName = prefixName("redisson_delay_queue", getRawName());
timeoutSetName = prefixName("redisson_delay_queue_timeout", getRawName());
QueueTransferTask task = new QueueTransferTask(commandExecutor.getServiceManager()) {
@Override
protected RFuture<Long> pushTaskAsync() {
//基于初始化的channel、元素列表、延遲隊(duì)列信息生成lua提交
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
+ "if #expiredValues > 0 then "
+ "for i, v in ipairs(expiredValues) do "
+ "local randomId, value = struct.unpack('Bc0Lc0', v);"
+ "redis.call('rpush', KEYS[1], value);"
+ "redis.call('lrem', KEYS[3], 1, v);"
+ "end; "
+ "redis.call('zrem', KEYS[2], unpack(expiredValues));"
+ "end; "
// get startTime from scheduler queue head task
+ "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
+ "if v[1] ~= nil then "
+ "return v[2]; "
+ "end "
+ "return nil;",
Arrays.asList(getRawName(), timeoutSetName, queueName),
System.currentTimeMillis(), 100);
}
//初始化channel的topic為 channelName
@Override
protected RTopic getTopic() {
return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, channelName);
}
};
//調(diào)用schedule提交這個(gè)task
queueTransferService.schedule(queueName, task);
//......
}
對(duì)應(yīng)我們步入這個(gè)schedule方法即可看到,封裝的task啟動(dòng)后會(huì)執(zhí)行會(huì)監(jiān)聽(tīng)redisson_delay_queue_channel:{blockingqueue}得到元素的到期時(shí)間并基于這個(gè)時(shí)間到點(diǎn)執(zhí)行提交隊(duì)列的lua腳本:
public void start() {
//獲取到上一步初始化的channel即redisson_delay_queue_channel:{blockingqueue}
RTopic schedulerTopic = getTopic();
//......
//訂閱這個(gè)channel收到消息后,基于對(duì)應(yīng)的startTime即延遲提交元素的到期時(shí)間通過(guò)scheduleTask執(zhí)行上述的lua腳本將元素提交至blockingqueue中
messageListenerId = schedulerTopic.addListener(Long.class, new MessageListener<Long>() {
@Override
public void onMessage(CharSequence channel, Long startTime) {
scheduleTask(startTime);
}
});
}
如下以來(lái)我們只需通過(guò)阻塞隊(duì)列的task方法就可以等到元素到期后取出,完成邏輯閉環(huán)。
二、更多關(guān)于Redisson
1. 詳解Redisson 中的原子類(lèi)
因?yàn)閞edis執(zhí)行用戶指令是單線程的,所以針對(duì)key執(zhí)行INCR即可實(shí)現(xiàn)元素自增,所以redisson也利用到這一點(diǎn)封裝了一個(gè)原子類(lèi),對(duì)應(yīng)的使用示例如下:
RAtomicLong atomicLong = redissonClient.getAtomicLong("atomicLong");
atomicLong.incrementAndGet();
log.info("atomicLong = {}", atomicLong.get());
2. 詳解redisson中的發(fā)布訂閱模型
對(duì)應(yīng)發(fā)布訂閱模型,redisson也做了很好的封裝時(shí),使用時(shí)的api也非常方便,如下所示,通過(guò)publish即可發(fā)布消息,通過(guò)addListener即可得到對(duì)應(yīng)的channel和message:
CountDownLatch countDownLatch = new CountDownLatch(2);
//訂閱topic消息
new Thread(() -> {
RTopic topic = redissonClient.getTopic("topic");
topic.addListener(String.class, (c, m) -> {
log.info("c:{},m:{}", c, m);
});
countDownLatch.countDown();
}).start();
//發(fā)布消息到topic
new Thread(() -> {
RTopic topic = redissonClient.getTopic("topic");
topic.publish("hello redssion");
countDownLatch.countDown();
}).start();
countDownLatch.await();
log.info("finish");
三、小結(jié)
本文演示了redisson幾個(gè)常用的數(shù)據(jù)結(jié)構(gòu)以及一些簡(jiǎn)單并發(fā)流程工具使用示例和底層源碼分析,希望對(duì)你有幫助。