玩轉(zhuǎn)Redis!非常強(qiáng)大的Redisson分布式集合,少寫(xiě)60%代碼
環(huán)境:SpringBoot2.7.18+Redis6.2.6+Redisson3.18.0
1. 簡(jiǎn)介
Redisson 是 Redis Java 客戶端和實(shí)時(shí)數(shù)據(jù)平臺(tái)。它為使用 Redis 提供了更方便、更簡(jiǎn)單的方法。Redisson 對(duì)象提供了關(guān)注點(diǎn)分離功能,可讓你專注于數(shù)據(jù)建模和應(yīng)用邏輯。
在Java中,為我們提供了豐富的集合類(lèi),如List、Set、Map等,這些集合類(lèi)在單機(jī)應(yīng)用或單個(gè)JVM進(jìn)程中是非常強(qiáng)大和有效的工具。然而,在分布式系統(tǒng)下,數(shù)據(jù)需要在多個(gè)JVM進(jìn)程或節(jié)點(diǎn)之間共享和同步。為實(shí)現(xiàn)這一目標(biāo)Redisson提供了許多分布式集合實(shí)現(xiàn),如RList、RSet、RMap等,這些集合類(lèi)能夠在多個(gè)Redis節(jié)點(diǎn)之間無(wú)縫地共享數(shù)據(jù)。通過(guò)使用Redisson,開(kāi)發(fā)者可以像使用傳統(tǒng)Java集合類(lèi)一樣,在分布式系統(tǒng)中進(jìn)行數(shù)據(jù)的增刪改查操作,而無(wú)需擔(dān)心數(shù)據(jù)在不同節(jié)點(diǎn)之間的同步和一致性問(wèn)題。
2. 實(shí)戰(zhàn)案例
2.1 Map集合
基于 Redis 的 Java 分布式 Map 對(duì)象實(shí)現(xiàn)了 ConcurrentMap 接口。該對(duì)象是完全線程安全的。
- RMap類(lèi)關(guān)系
public interface RMap<K, V> extends ConcurrentMap<K, V>, ...{}
接下來(lái)所有操作都是基于RedissonClient對(duì)象
@Resource
private RedissonClient redisson ;
- 同步存數(shù)據(jù)
RMap<String, User> map = redisson.getMap("user-list");
User preValue = map.put("1", new User(2L, "張三2", 22)) ;
User value = map.putIfAbsent("2", new User(2L, "李四", 33));
- 快速存數(shù)據(jù)
如果你不需要返回上一個(gè)值(舊值)建議使用相應(yīng)fast*方法
RMap<String, User> map = redisson.getMap("user-list");
map.fastPut("1", new User(2L, "張三2", 22));
map.fastPutIfAbsent("2", new User(2L, "李四", 33));
map.fastRemove("1") ;
以上操作不會(huì)返回對(duì)應(yīng)key之前的舊值。
- 異步存數(shù)據(jù)
RFuture<User> f1 = map.putAsync("1", new User(2L, "張三2", 22)) ;
RFuture<Boolean> f2 = map.fastPutAsync("2", new User(2L, "李四", 33)) ;
RFuture<Long> f3 = map.fastRemoveAsync("2") ;
以上操作對(duì)應(yīng)Redis數(shù)據(jù)結(jié)構(gòu)。
HASH數(shù)據(jù)結(jié)構(gòu)
- Map集合中key綁定Lock
上面得知,Map保存的數(shù)據(jù)是hash數(shù)據(jù)結(jié)構(gòu),我們可以將每一個(gè)key綁定到對(duì)應(yīng)的Lock/ReadWriteLock/Semaphore/CountDownLatch。
RMap<String, User> map = redisson.getMap("user-list") ;
RLock lock = map.getLock(key) ;
lock.lock() ;
try {
System.out.printf("當(dāng)前線程: %s, 當(dāng)前時(shí)間: %d%n", Thread.currentThread().getName(), System.currentTimeMillis()) ;
TimeUnit.SECONDS.sleep(3) ;
} finally {
lock.unlock() ;
}
- 本地緩存
用于加快讀取操作速度,避免網(wǎng)絡(luò)往返。它在 Redisson 端緩存地圖條目,執(zhí)行讀取操作的速度是普通實(shí)現(xiàn)的 45 倍。支持本地緩存的地圖對(duì)象實(shí)現(xiàn)了RLocalCachedMap,它擴(kuò)展了 java.util.concurrent.ConcurrentMap 接口。該對(duì)象是完全線程安全的。
// 配置緩存策略
final LocalCachedMapOptions<String, User> LOCAL_CACHE = LocalCachedMapOptions.<String, User>defaults()
// 緩存大小
.cacheSize(200)
// 緩存模式
.storeMode(StoreMode.LOCALCACHE_REDIS)
// 刪除策略
.evictionPolicy(EvictionPolicy.LRU) ;
// 獲取指定key本地緩存
RLocalCachedMap<String,User> localCachedMap = redisson.getLocalCachedMap("user-list", LOCAL_CACHE) ;
User user = localCachedMap.get("1") ;
本地緩存實(shí)例對(duì)象同樣支持fast*及異步方式,這里不再贅述。
- 事件監(jiān)聽(tīng)
Redisson 允許為每個(gè) RMap 對(duì)象綁定監(jiān)聽(tīng)器,RMap 對(duì)象允許跟蹤數(shù)據(jù)上的跟蹤事件。如下表,監(jiān)聽(tīng)類(lèi)及事件
圖片
如下示例:
RMap<String, User> map = redisson.getMap("user-list");
int deletedListener = map.addListener(new DeletedObjectListener() {
@Override
public void onDeleted(String name) {
// ...
}
});
int expredListener = map.addListener(new ExpiredObjectListener() {
@Override
public void onExpired(String name) {
// ...
}
});
int putListener = map.addListener(new MapPutListener() {
@Override
public void onPut(String name) {
// ...
}
});
int removeListener = map.addListener(new MapRemoveListener() {
@Override
public void onRemove(String name) {
// ...
}
});
// 刪除監(jiān)聽(tīng)器
map.removeListener(listenerId) ; // removeListener, putListener ...
以上是關(guān)于Map集合的常用操作。
2.2 Set集合
基于 Redis 的 Java Set 對(duì)象實(shí)現(xiàn)了 java.util.Set 接口。該對(duì)象完全線程安全。通過(guò)元素狀態(tài)比較保持元素的唯一性。Redis 將集合大小限制為 4 294 967 295 個(gè)元素。Redis 使用序列化狀態(tài)檢查值的唯一性,而不是值的 hashCode()/equals() 方法。
- RSet類(lèi)關(guān)系
public interface RSet<V> extends Set<V>,...{}
- 基本操作
RSet<User> set = redisson.getSet("user-set");
set.add(new User(1L, "張三", 33)) ;
set.add(new User(2L, "李四", 55)) ;
Redis中存儲(chǔ)使用的數(shù)據(jù)結(jié)構(gòu):
RSet使用Set集合。與RMap一樣,RSet也支持同步異步方式操作數(shù)據(jù)。
RFuture<Boolean> f1 = set.addAsync(new User(1L, "張三", 33)) ;
RFuture<Boolean> f2 = set.addAsync(new User(2L, "李四", 55)) ;
- 綁定Lock操作
RSet<User> set = redisson.getSet("user-set") ;
RLock lock = set.getLock(new User(1L, "張三", 33)) ;
lock.lock() ;
try {
// ...
} finally {
lock.unlock() ;
}
- 刪除策略
當(dāng)前的Redis實(shí)現(xiàn)沒(méi)有設(shè)置值刪除功能。因此,過(guò)期的數(shù)據(jù)會(huì)被org.redisson.eviction.EvictionScheduler清除。它一次性刪除300個(gè)過(guò)期條目。如果clean task每次刪除300項(xiàng),它將每秒執(zhí)行一次(最小執(zhí)行延遲)。但如果當(dāng)前的過(guò)期值小于前一個(gè),則執(zhí)行延遲將增加1.5倍。
RSetCache<User> set = redisson.getSetCache("user-set") ;
set.add(new User(3L, "陰陽(yáng)路", 66), 180L, TimeUnit.SECONDS) ;
- 事件監(jiān)聽(tīng)
與Map一樣Set也有對(duì)應(yīng)的事件監(jiān)聽(tīng),詳細(xì)查看Map中對(duì)應(yīng)的說(shuō)明。
- Set排序
基于 Redis 的 Java 分布式 SortedSet 實(shí)現(xiàn)了 java.util.SortedSet 接口。該對(duì)象線程安全。它使用比較器對(duì)元素進(jìn)行排序并保持唯一性。對(duì)于字符串?dāng)?shù)據(jù)類(lèi)型,建議使用 LexSortedSet 對(duì)象,以提高性能。
RSortedSet<Integer> set = redisson.getSortedSet("set-sort") ;
// 這里不可以寫(xiě)成lambda表達(dá)式:(o1, o2) -> Integer.compare(o1, o2)
set.trySetComparator(new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return o1 > o2 ? 1 : (o1 < o2 ? -1 : 0) ;
}
}) ;
set.add(3) ;
set.add(1) ;
set.add(2) ;
redis中生成如下2個(gè)key:
圖片
set-sort對(duì)應(yīng)的值:
圖片
2.3 List集合
基于 Redis 的 Java 分布式 List 對(duì)象實(shí)現(xiàn)了 java.util.List 接口。它按插入順序保存元素。它有 Async、Reactive 和 RxJava3 接口。Redis 限制列表大小為 4 294 967 295 個(gè)元素。
- RList類(lèi)關(guān)系
public interface RList<V> extends List<V>, ... {}
- 基本操作
RList<User> list = redisson.getList("user-list");
User user = new User(1L, "張三", 10);
list.add(user) ;
User ret = list.get(0) ;
System.out.println("ret = " + ret) ;
list.remove(user) ;
- 事件監(jiān)聽(tīng)
RList<User> list = redisson.getList("user-list") ;
list.addListener(new ExpiredObjectListener() {
@Override
public void onExpired(String name) {
// ...
}
}) ;
// 其它事件
/**
* DeletedObjectListener
* ListAddListener
* ListInsertListener
* ListSetListener
* ListRemoveListener
* ListTrimListener
*/
2.4 Queue隊(duì)列
基于 Redis 的 Java 分布式無(wú)界隊(duì)列對(duì)象,實(shí)現(xiàn)了 java.util.Queue 接口。該對(duì)象是完全線程安全的。它有 Async、Reactive 和 RxJava3 接口。
- RQueue類(lèi)關(guān)系
public interface RQueue<V> extends Queue<V>, ... {}
- 基本操作
RQueue<User> queue = redisson.getQueue("user-queue");
queue.add(new User()) ;
// 獲取但不刪除
User u1 = queue.peek() ;
// 獲取并刪除
User u2 = queue.poll() ;
redis使用的數(shù)據(jù)結(jié)構(gòu):
圖片
- 事件監(jiān)聽(tīng)
RQueue<User> queue = redisson.getQueue("user-queue") ;
queue.addListener(new ExpiredObjectListener() {
@Override
public void onExpired(String name) {
// ...
}
}) ;
// 其它事件
/**
* ListAddListener
* ListInsertListener
* ListRemoveListener
*/
2.5 阻塞隊(duì)列
基于Redis 的Java 分布式無(wú)界BlockingQueue對(duì)象,實(shí)現(xiàn)了 java.util.concurrent.BlockingQueue接口。該對(duì)象是完全線程安全的。它有 Async、Reactive 和 RxJava3 接口。
- 類(lèi)關(guān)系
public interface RBlockingQueue<V> extends BlockingQueue<V>, ... {}
- 基本操作
RBlockingQueue<User> queue = redisson.getBlockingQueue("user-blockqueue");
queue.offer(new User(1L, "哈哈", 22)) ;
// queue.offer(new User(2L, "嘿嘿", 33)) ;
User u1 = queue.peek() ;
User u2 = queue.poll() ;
// 這里會(huì)被阻塞,最多等待10s隊(duì)列中有元素則直接返回
User u3 = queue.poll(10, TimeUnit.SECONDS) ;
對(duì)應(yīng)redis使用的數(shù)據(jù)結(jié)構(gòu):
圖片
2.6 有界阻塞隊(duì)列
大致使用用途上面一致:
RBoundedBlockingQueue<SomeObject> queue = redisson.getBoundedBlockingQueue("user-capacity-queue");
// 設(shè)置容量大小
queue.trySetCapacity(2);
queue.offer(new User(1L, "張三", 20));
queue.offer(new User(2L, "李四", 10));
Redisson提供了很多分布式的隊(duì)列實(shí)現(xiàn),如還有雙端隊(duì)列,優(yōu)先級(jí)隊(duì)列等,這里就不一一展示了。