自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

實(shí)戰(zhàn)派 | Java項(xiàng)目中玩轉(zhuǎn)Redis6.0客戶端緩存!

數(shù)據(jù)庫(kù) Redis
首先介紹一下今天要使用到的工具Lettuce,它是一個(gè)可伸縮線程安全的redis客戶端。多個(gè)線程可以共享同一個(gè)RedisConnection,利用nio框架Netty來(lái)高效地管理多個(gè)連接。

哈嘍大家好啊,我是Hydra。

在前面的文章中,我們介紹了Redis6.0中的新特性客戶端緩存client-side caching,通過(guò)telnet連接模擬客戶端,測(cè)試了三種客戶端緩存的工作模式,這篇文章我們就來(lái)點(diǎn)硬核實(shí)戰(zhàn),看看客戶端緩存在java項(xiàng)目中應(yīng)該如何落地。

鋪墊

首先介紹一下今天要使用到的工具Lettuce,它是一個(gè)可伸縮線程安全的redis客戶端。多個(gè)線程可以共享同一個(gè)RedisConnection,利用nio框架Netty來(lái)高效地管理多個(gè)連接。

放眼望向現(xiàn)在常用的redis客戶端開發(fā)工具包,雖然能用的不少,但是目前率先擁抱redis6.0,支持客戶端緩存功能的卻不多,而lettuce就是其中的領(lǐng)跑者。

我們先在項(xiàng)目中引入最新版本的依賴,下面正式開始實(shí)戰(zhàn)環(huán)節(jié):

<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.1.8.RELEASE</version>
</dependency>

實(shí)戰(zhàn)

在項(xiàng)目中應(yīng)用lettuce,開啟并使用客戶端緩存功能,只需要下面這一段代碼:

public static void main(String[] args) throws InterruptedException {
// 創(chuàng)建 RedisClient 連接信息
RedisURI redisURI= RedisURI.builder()
.withHost("127.0.0.1")
.withPort(6379)
.build();
RedisClient client = RedisClient.create(redisURI);
StatefulRedisConnection<String, String> connect = client.connect();

Map<String, String> map = new HashMap<>();
CacheFrontend<String,String> frontend=ClientSideCaching.enable(CacheAccessor.forMap(map),
connect, TrackingArgs.Builder.enabled().noloop());

String key="user";
while (true){
String value = frontend.get(key);
System.out.println(value);
TimeUnit.SECONDS.sleep(10);
}
}

上面的代碼主要完成了幾項(xiàng)工作:

  • 通過(guò)RedisURI配置redis連接的標(biāo)準(zhǔn)信息,并建立連接。
  • 創(chuàng)建用于充當(dāng)本地緩存的Map,開啟客戶端緩存功能,建立一個(gè)緩存訪問(wèn)器CacheFrontend。
  • 在循環(huán)中使用CacheFrontend,不斷查詢同一個(gè)key對(duì)應(yīng)的值并打印。

啟動(dòng)上面的程序,控制臺(tái)會(huì)不斷的打印user對(duì)應(yīng)的緩存,在啟動(dòng)一段時(shí)間后,我們?cè)谄渌目蛻舳诵薷膗ser對(duì)應(yīng)的值,運(yùn)行的結(jié)果如下:

可以看到,在其他客戶端修改了key所對(duì)應(yīng)的值后,打印結(jié)果也發(fā)生了變化。但是到這里,我們也不知道lettuce是不是真的使用了客戶端緩存,雖然結(jié)果正確,但是說(shuō)不定是它每次都重新執(zhí)行了get命令呢?

所以我們下面來(lái)看看源碼,分析一下具體的代碼執(zhí)行流程。

分析

在上面的代碼中,最關(guān)鍵的類就是CacheFrontend了,我們?cè)賮?lái)仔細(xì)看一下上面具體實(shí)例化時(shí)的語(yǔ)句:

CacheFrontend<String,String> frontend=ClientSideCaching.enable(
CacheAccessor.forMap(map),
connect,
TrackingArgs.Builder.enabled().noloop()
);

首先調(diào)用了ClientSideCaching的enable()方法,我們看一下它的源碼:

解釋一下傳入的3個(gè)參數(shù):

  • CacheAccessor:一個(gè)定義對(duì)客戶端緩存進(jìn)行訪問(wèn)接口,上面調(diào)用它的forMap方法返回的是一個(gè)MapCacheAccessor,它的底層使用的我們自定義的Map來(lái)存放本地緩存,并且提供了get、put、evict等方法操作Map。
  • StatefulRedisConnection:使用到的redis連接。
  • TrackingArgs:客戶端緩存的參數(shù)配置,使用noloop后不會(huì)接收當(dāng)前連接修改key后的通知。

向redis服務(wù)端發(fā)送開啟tracking的命令后,繼續(xù)向下調(diào)用create()方法:

這個(gè)過(guò)程中實(shí)例化了一個(gè)重要對(duì)象,它就是實(shí)現(xiàn)了RedisCache接口的DefaultRedisCache對(duì)象,實(shí)際向redis執(zhí)行查詢時(shí)的get請(qǐng)求、寫入的put請(qǐng)求,都是由它來(lái)完成。

實(shí)例化完成后,繼續(xù)向下調(diào)用同名的create()方法:

在這個(gè)方法中,實(shí)例化了ClientSideCaching對(duì)象,注意一下傳入的兩個(gè)參數(shù),通過(guò)前面的介紹也很好理解它們的分工:

  • 當(dāng)本地緩存存在時(shí),直接從CacheAccessor中讀取。
  • 當(dāng)本地緩存不存在時(shí),使用RedisCache從服務(wù)端讀取。

需要額外注意一下的是返回前的兩行代碼,先看第一句(行號(hào)114的那行)。

這里向RedisCache添加了一個(gè)監(jiān)聽,當(dāng)監(jiān)聽到類型為invalidate的作廢消息時(shí),拿到要作廢的key,傳遞給消費(fèi)者。一般情況下,keys中只會(huì)有一個(gè)元素。

消費(fèi)時(shí)會(huì)遍歷當(dāng)前ClientSideCaching的消費(fèi)者列表invalidationListeners:

而這個(gè)列表中的所有,就是在上面的第二行代碼中(行號(hào)115的那行)添加的,看一下方法的定義:

而實(shí)際傳入的方法引用則是下面MapCacheAccessor的evict()方法,也就是說(shuō),當(dāng)收到key作廢的消息后,會(huì)移除掉本地緩存Map中緩存的這個(gè)數(shù)據(jù)。

客戶端緩存的作廢邏輯我們梳理清楚了,再來(lái)看看它是何時(shí)寫入的,直接看ClientSideCaching的get()方法:

可以看到,get方法會(huì)先從本地緩存MapCacheAccessor中嘗試獲取,如果取到則直接返回,如果沒(méi)有再使用RedisCache讀取redis中的緩存,并將返回的結(jié)果存入到MapCacheAccessor中。

圖解

源碼看到這里,是不是基本邏輯就串聯(lián)起來(lái)了,我們?cè)佼媰蓮垐D來(lái)梳理一下這個(gè)流程。先看get的過(guò)程:

再來(lái)看一下通知客戶端緩存失效的過(guò)程:

怎么樣,配合這兩張圖再理解一下,是不是很完美?

其實(shí)也不是…回憶一下我們之前使用兩級(jí)緩存Caffeine+Redis時(shí),當(dāng)時(shí)使用的通知機(jī)制,會(huì)在修改redis緩存后通知所有主機(jī)修改本地緩存,修改成為最新的值。目前的lettuce看來(lái),顯然不滿足這一功能,只能做到作廢刪除緩存但是不會(huì)主動(dòng)更新。

擴(kuò)展

那么,如果想實(shí)現(xiàn)本地客戶端緩存的實(shí)時(shí)更新,我們應(yīng)該如何在現(xiàn)在的基礎(chǔ)上進(jìn)行擴(kuò)展呢?仔細(xì)想一下的話,思路也很簡(jiǎn)單:

  • 首先,移除掉lettuce的客戶端緩存本身自帶的作廢消息監(jiān)聽器
  • 然后,添加我們自己的作廢消息監(jiān)聽器

回顧一下上面源碼分析的圖,在調(diào)用DefaultRedisCache的addInvalidationListener()方法時(shí),其實(shí)是調(diào)用的是StatefulRedisConnection的addListener()方法,也就是說(shuō),這個(gè)監(jiān)聽器其實(shí)是添加在redis連接上的。

如果我們?cè)倏匆幌逻@個(gè)方法源碼的話,就會(huì)發(fā)現(xiàn),在它的附近還有一個(gè)對(duì)應(yīng)的removeListener()方法,一看就是我們要找的東西,準(zhǔn)備用它來(lái)移除消息監(jiān)聽。

不過(guò)再仔細(xì)看看,這個(gè)方法是要傳參數(shù)的啊,我們明顯不知道現(xiàn)在里面已經(jīng)存在的PushListener有什么,所以沒(méi)法直接使用,那么無(wú)奈只能再接著往下看看這個(gè)pushHandler是什么玩意…

通過(guò)注釋可以知道,這個(gè)PushHandler就是一個(gè)用來(lái)操作PushListener的處理工具,雖然我們不知道具體要移除的PushListener是哪一個(gè),但是驚喜的是,它提供了一個(gè)getPushListeners()方法,可以獲取當(dāng)前所有的監(jiān)聽器。

這樣一來(lái)就簡(jiǎn)單了,我上來(lái)直接清除掉這個(gè)集合中的所有監(jiān)聽器,問(wèn)題就迎刃而解了~

不過(guò),在StatefulRedisConnectionImpl中的pushHandler是一個(gè)私有對(duì)象,也沒(méi)有對(duì)外進(jìn)行暴露,想要操作起來(lái)還是需要費(fèi)上一點(diǎn)功夫的。下面,我們就在分析的結(jié)果上進(jìn)行代碼的修改。

魔改

首先,我們需要自定義一個(gè)工具類,它的主要功能是操作監(jiān)聽器,所以就命名為L(zhǎng)istenerChanger好了。它要完成的功能主要有三個(gè):

  • 移除原有的全部消息監(jiān)聽。
  • 添加新的自定義消息監(jiān)聽。
  • 更新本地緩存MapCacheAccessor中的數(shù)據(jù)。

首先定義構(gòu)造方法,需要傳入StatefulRedisConnection和CacheAccessor作為參數(shù),在后面的方法中會(huì)用到,并且創(chuàng)建一個(gè)RedisCommands,用于后面向redis服務(wù)端發(fā)送get命令請(qǐng)求。

public class ListenerChanger<K, V> {
private StatefulRedisConnection<K, V> connection;
private CacheAccessor<K, V> mapCacheAccessor;
private RedisCommands<K, V> command;
public ListenerChanger(StatefulRedisConnection<K, V> connection,
CacheAccessor<K, V> mapCacheAccessor) {
this.connection = connection;
this.mapCacheAccessor = mapCacheAccessor;
this.command = connection.sync();
}

//其他方法先省略……
}

移除監(jiān)聽

前面說(shuō)過(guò),pushHandler是一個(gè)私有對(duì)象,我們無(wú)法直接獲取和操作,所以只能先使用反射獲得。PushHandler中的監(jiān)聽器列表存儲(chǔ)在一個(gè)CopyOnWriteArrayList中,我們直接使用迭代器移除掉所有內(nèi)容即可。

public void removeAllListeners() {
try {
Class connectionClass = StatefulRedisConnectionImpl.class;
Field pushHandlerField = connectionClass.getDeclaredField("pushHandler");
pushHandlerField.setAccessible(true);
PushHandler pushHandler = (PushHandler) pushHandlerField.get(this.connection);

CopyOnWriteArrayList<PushListener> pushListeners
= (CopyOnWriteArrayList) pushHandler.getPushListeners();
Iterator<PushListener> it = pushListeners.iterator();
while (it.hasNext()) {
PushListener listener = it.next();
pushListeners.remove(listener);
}
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
}
}

添加監(jiān)聽

這里我們模仿DefaultRedisCache中addInvalidationListener()方法的寫法,添加一個(gè)監(jiān)聽器,除了最后處理的代碼基本一致。對(duì)于監(jiān)聽到的要作廢的keys集合,另外啟動(dòng)一個(gè)線程更新本地?cái)?shù)據(jù)。

public void addNewListener() {
this.connection.addListener(new PushListener() {
@Override
public void onPushMessage(PushMessage message) {
if (message.getType().equals("invalidate")) {
List<Object> content = message.getContent(StringCodec.UTF8::decodeKey);
List<K> keys = (List<K>) content.get(1);
System.out.println("modifyKeys:"+keys);

// start a new thread to update cacheAccessor
new Thread(()-> updateMap(keys)).start();
}
}
});
}

本地更新

使用RedisCommands重新從redis服務(wù)端獲取最新的數(shù)據(jù),并更新本地緩存mapCacheAccessor中的數(shù)據(jù)。

private void updateMap(List<K> keys){
for (K key : keys) {
V newValue = this.command.get(key);
System.out.println("newValue:"+newValue);
mapCacheAccessor.put(key, newValue);
}
}

至于為什么執(zhí)行這個(gè)方法時(shí)額外啟動(dòng)了一個(gè)新線程,是因?yàn)槲以跍y(cè)試中發(fā)現(xiàn),當(dāng)在PushListener的onPushMessage方法中執(zhí)行RedisCommands的get()方法時(shí),會(huì)一直取不到值,但是像這樣新啟動(dòng)一個(gè)線程就沒(méi)有問(wèn)題。

測(cè)試

下面,我們來(lái)寫一段測(cè)試代碼,來(lái)測(cè)試上面的改動(dòng)。

public static void main(String[] args) throws InterruptedException {
// 省略之前創(chuàng)建連接代碼……

Map<String, String> map = new HashMap<>();
CacheAccessor<String, String> mapCacheAccessor = CacheAccessor.forMap(map);
CacheFrontend<String, String> frontend = ClientSideCaching.enable(mapCacheAccessor,
connect,
TrackingArgs.Builder.enabled().noloop());

ListenerChanger<String, String> listenerChanger
= new ListenerChanger<>(connect, mapCacheAccessor);
// 移除原有的listeners
listenerChanger.removeAllListeners();
// 添加新的監(jiān)聽器
listenerChanger.addNewListener();

String key = "user";
while (true) {
String value = frontend.get(key);
System.out.println(value);
TimeUnit.SECONDS.sleep(30);
}
}

可以看到,代碼基本上在之前的基礎(chǔ)上沒(méi)有做什么改動(dòng),只是在創(chuàng)建完ClientSideCaching后,執(zhí)行了我們自己實(shí)現(xiàn)的ListenerChanger的兩個(gè)方法。先移除所有監(jiān)聽器、再添加新的監(jiān)聽器。下面我們以debug模式啟動(dòng)測(cè)試代碼,簡(jiǎn)單看一下代碼的執(zhí)行邏輯。

首先,在未執(zhí)行移除操作前,pushHandler中的監(jiān)聽器列表中有一個(gè)監(jiān)聽器:

移除后,監(jiān)聽器列表為空:

在添加完自定義監(jiān)聽器、并且執(zhí)行完第一次查詢操作后,在另外一個(gè)redis客戶端中修改user的值,這時(shí)PushListener會(huì)收到作廢類型的消息監(jiān)聽:

啟動(dòng)一個(gè)新線程,查詢r(jià)edis中user對(duì)應(yīng)的最新值,并放入cacheAccessor中:

當(dāng)循環(huán)中CacheFrontend的get()方法再被執(zhí)行時(shí),會(huì)直接從cacheAccessor中取到刷新后的值,不需要再次去訪問(wèn)redis服務(wù)端了:

總結(jié)

到這里,我們基于lettuce的客戶端緩存的基本使用、以及在這個(gè)基礎(chǔ)上進(jìn)行的魔改就基本完成了??梢钥吹?,lettuce客戶端已經(jīng)在底層封裝了一套比較成熟的API,能讓我們?cè)趯edis升級(jí)到6.0以后,開箱即用式地使用客戶端緩存這一新特性。在使用中,不需要我們關(guān)注底層原理,也不用做什么業(yè)務(wù)邏輯的改造,總的來(lái)說(shuō),使用起來(lái)還是挺香的。


責(zé)任編輯:姜華 來(lái)源: 碼農(nóng)參上
相關(guān)推薦

2020-05-20 14:45:38

緩存MySQL數(shù)據(jù)庫(kù)

2020-05-11 21:31:02

Redis 6.0緩存客戶端

2022-02-11 16:35:41

Redis6.0代碼命令

2022-04-28 13:58:41

Redis6客戶端服務(wù)端

2015-08-17 09:48:29

C#客戶端分布式緩存

2011-11-30 14:21:19

Java分布式緩存

2021-09-22 15:46:29

虛擬桌面瘦客戶端胖客戶端

2014-08-11 16:35:35

KafkaJava客戶端

2011-08-17 10:10:59

2022-09-23 08:02:42

Kafka消息緩存

2021-06-15 09:20:08

Redis數(shù)據(jù)類型

2011-03-02 14:36:24

Filezilla客戶端

2010-12-21 11:03:15

獲取客戶端證書

2011-03-24 13:00:31

配置nagios客戶端

2010-05-31 10:11:32

瘦客戶端

2011-10-26 13:17:05

2020-09-23 13:37:25

Redis6.0

2021-08-01 23:18:21

Redis Golang命令

2011-08-15 14:09:59

JavaHBase

2023-12-01 08:18:24

Redis網(wǎng)絡(luò)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)