張開(kāi)濤:應(yīng)用級(jí)緩存之緩存使用模式實(shí)踐—
前面已經(jīng)介紹了Java緩存的使用。對(duì)于我們來(lái)說(shuō)如果有人總結(jié)一些緩存使用模式/模板的話,我們?cè)谑褂脮r(shí)直接照著模式寫即可。而實(shí)際確實(shí)已經(jīng)有總結(jié)好的模式,主要分兩大類:Cache-Aside和Cache-As-SoR(Read-through、Write-through、Write-behind)。
首先,同步兩個(gè)名詞。
- SoR(system-of-record):記錄系統(tǒng),或者可以叫做數(shù)據(jù)源,即實(shí)際存儲(chǔ)原始數(shù)據(jù)的系統(tǒng)。
- Cache:緩存,是SoR的快照數(shù)據(jù),Cache的訪問(wèn)速度比SoR要快,放入Cache的目的是提升訪問(wèn)速度,減少回源到SoR的次數(shù)。
- 回源:即回到數(shù)據(jù)源頭獲取數(shù)據(jù),Cache沒(méi)有***時(shí),需要從SoR讀取數(shù)據(jù),這叫做回源。
本文主要以Guava Cache和Ehcache3.x作為實(shí)踐框架來(lái)講解。
一、Cache-Aside
Cache-Aside即業(yè)務(wù)代碼圍繞著Cache寫,是由業(yè)務(wù)代碼直接維護(hù)緩存,示例代碼如下所示。
讀場(chǎng)景,先從緩存獲取數(shù)據(jù),如果沒(méi)有***,則回源到SoR并將源數(shù)據(jù)放入緩存供下次讀取使用。
- //1、先從緩存中獲取數(shù)據(jù)
- value = myCache.getIfPresent(key);
- if(value == null) {
- //2.1、如果緩存沒(méi)有***,則回源到SoR獲取源數(shù)據(jù)
- value = loadFromSoR(key);
- //2.2、將數(shù)據(jù)放入緩存,下次即可從緩存中獲取數(shù)據(jù)
- myCache.put(key, value);
- }
寫場(chǎng)景,先將數(shù)據(jù)寫入SoR,寫入成功后立即將數(shù)據(jù)同步寫入緩存。
- //1、先將數(shù)據(jù)寫入SoR
- writeToSoR(key,value);
- //2、執(zhí)行成功后立即同步寫入緩存
- myCache.put(key, value);
或者先將數(shù)據(jù)寫入SoR,寫入成功后將緩存數(shù)據(jù)過(guò)期,下次讀取時(shí)再加載緩存。
- //1、先將數(shù)據(jù)寫入SoR
- writeToSoR(key,value);
- //2、失效緩存,然后下次讀時(shí)再加載緩存
- myCache.invalidate(key);
Cache-Aside適合使用AOP模式去實(shí)現(xiàn),可以參考筆者的博客《Spring Cache抽象詳解》去實(shí)現(xiàn)。
對(duì)于Cache-Aside可能存在并發(fā)更新情況,即如果多個(gè)應(yīng)用實(shí)例同時(shí)更新,那么緩存怎么辦?
● 如果是用戶維度的數(shù)據(jù)(如訂單數(shù)據(jù)、用戶數(shù)據(jù)),則出現(xiàn)這種幾率非常小,因?yàn)椴l(fā)的情況很少,可以不考慮這個(gè)問(wèn)題,加上過(guò)期時(shí)間來(lái)解決即可。
● 對(duì)于如商品這種基礎(chǔ)數(shù)據(jù),可以考慮使用canal訂閱binlog進(jìn)行增量更新分布式緩存,這樣不會(huì)存在緩存數(shù)據(jù)不一致的情況,但是,緩存更新會(huì)存在延遲。而本地緩存根據(jù)不一致容忍度設(shè)置合理的過(guò)期時(shí)間。
● 讀服務(wù)場(chǎng)景,可以考慮使用一致性哈希,將相同的操作負(fù)載均衡到同一個(gè)實(shí)例,從而減少并發(fā)幾率?;蛘咴O(shè)置比較短的過(guò)期時(shí)間,可參考“第17章 京東商品詳情頁(yè)服務(wù)閉環(huán)實(shí)踐”。
二、Cache-As-SoR
Cache-As-SoR即把Cache看作為SoR,所有操作都是對(duì)Cache進(jìn)行,然后Cache再委托給SoR進(jìn)行真實(shí)的讀/寫。即業(yè)務(wù)代碼中只看到Cache的操作,看不到關(guān)于SoR相關(guān)的代碼。有三種實(shí)現(xiàn):read-through、write-through、write-behind。
1. Read-Through
Read-Through,業(yè)務(wù)代碼首先調(diào)用Cache,如果Cache不***由Cache回源到SoR,而不是業(yè)務(wù)代碼(即由Cache讀SoR)。使用Read-Through模式,需要配置一個(gè)CacheLoader組件用來(lái)回源到SoR加載源數(shù)據(jù)。Guava Cache和Ehcache 3.x都支持該模式。
Guava Cache實(shí)現(xiàn)
- LoadingCache<Integer,Result<Category>> getCache =
- CacheBuilder.newBuilder()
- .softValues()
- .maximumSize(5000).expireAfterWrite(2, TimeUnit.MINUTES)
- .build(new CacheLoader<Integer,Result<Category>>() {
- @Override
- public Result<Category> load(final Integer sortId) throwsException {
- return categoryService.get(sortId);
- }
- });
在build Cache時(shí),傳入一個(gè)CacheLoader用來(lái)加載緩存,操作流程如下。
- 應(yīng)用業(yè)務(wù)代碼直接調(diào)用getCache.get(sortId)。
- 首先查詢Cache,如果緩存中有,則直接返回緩存數(shù)據(jù)。
- 如果緩存沒(méi)有***,則委托給CacheLoader,CacheLoader會(huì)回源到SoR查詢?cè)磾?shù)據(jù)(返回值必須不為null,可以包裝為Null對(duì)象),然后寫入緩存。
使用CacheLoader后有幾個(gè)好處。
● 應(yīng)用業(yè)務(wù)代碼更簡(jiǎn)潔了,不需要像Cache-Aside模式那樣緩存查詢代碼和SoR代碼交織在一起。如果緩存使用邏輯散落在多處,則使用這種方式很簡(jiǎn)單的消除了重復(fù)代碼。
● 解決Dog-pile effect,即當(dāng)某個(gè)緩存失效時(shí),又有大量相同的請(qǐng)求沒(méi)***緩存,從而同時(shí)請(qǐng)求到后端,導(dǎo)致后端壓力太大,此時(shí)限制一個(gè)請(qǐng)求去拿即可。
- if (firstCreateNewEntry) {//***個(gè)請(qǐng)求加載緩存的線程去SoR加載源數(shù)據(jù)
- try {
- synchronized (e) {
- returnloadSync(key, hash, loadingValueReference, loader);
- }
- } finally{
- statsCounter.recordMisses(1);
- }
- } else {//其他并發(fā)線程等待“***個(gè)線程”加載的數(shù)據(jù)
- return waitForLoadingValue(e, key,valueReference);
- }
- Guava Cache還支持get(K key, Callable<? extends V> valueLoader)方法,傳入一個(gè)Callable實(shí)例,當(dāng)緩存沒(méi)***時(shí),會(huì)調(diào)用Callable#call來(lái)查詢SoR加載源數(shù)據(jù)。
- Ehcache 3.x實(shí)現(xiàn)
- CacheManager cacheManager = CacheManagerBuilder. newCacheManagerBuilder(). build(true);
- org.ehcache.Cache<String, String> myCache =cacheManager. createCache ("myCache",
- CacheConfigurationBuilder.newCacheConfigurationBuilder(String.class,String.class,
- ResourcePoolsBuilder.newResourcePoolsBuilder().heap(100,MemoryUnit.MB))
- .withDispatcherConcurrency(4)
- .withExpiry(Expirations.timeToLiveExpiration(Duration.of(10,TimeUnit.SECONDS)))
- .withLoaderWriter(newDefaultCacheLoaderWriter<String, String> () {
- @Override
- public String load(String key) throws Exception {
- return readDB(key);
- }
- @Override
- public Map<String, String> loadAll(Iterable<? extendsString> keys) throws BulkCacheLoadingException, Exception {
- return null;
- }
- }));
Ehcache 3.x使用CacheLoaderWriter來(lái)實(shí)現(xiàn),通過(guò)load(K key)和loadAll(Iterable keys)分別來(lái)加載單個(gè)KEY和批量KEY。Ehcache 3.1沒(méi)有自己去解決Dog-pile effect。
2. Write-Through
Write-Through,稱之為穿透寫模式/直寫模式,業(yè)務(wù)代碼首先調(diào)用Cache寫(新增/修改)數(shù)據(jù),然后由Cache負(fù)責(zé)寫緩存和寫SoR,而不是業(yè)務(wù)代碼。使用Write-Through模式需要配置一個(gè)CacheWriter組件用來(lái)回寫SoR。Guava Cache沒(méi)有提供支持。Ehcache 3.x支持該模式。Ehcache需要配置一個(gè)CacheLoaderWriter,CacheLoaderWriter知道如何去寫SoR。當(dāng)Cache需要寫(新增/修改)數(shù)據(jù)時(shí),首先調(diào)用CacheLoaderWriter來(lái)同步(立即)到SoR,成功后會(huì)更新緩存。
- CacheManager cacheManager = CacheManagerBuilder.newCacheManagerBuilder().build(true);
- org.ehcache.Cache<String, String> myCache =cacheManager.createCache ("myCache",
- CacheConfigurationBuilder.newCacheConfigurationBuilder(String.class,String.class,
- ResourcePoolsBuilder.newResourcePoolsBuilder().heap(100,MemoryUnit.MB))
- .withDispatcherConcurrency(4)
- .withExpiry(Expirations.timeToLiveExpiration(Duration.of(10,TimeUnit.SECONDS)))
- .withLoaderWriter(newDefaultCacheLoaderWriter<String, String> () {
- @Override
- public void write(String key, String value) throws Exception{
- //write
- }
- @Override
- public void writeAll(Iterable<? extends Map.Entry<? extendsString, ? extends String>> entries) throws BulkCacheWritingException,Exception {
- for(Object entry: entries) {
- //batch write
- }
- }
- @Override
- public void delete(Stringkey) throws Exception {
- //delete
- }
- @Override
- public void deleteAll(Iterable<? extends String>keys) throws BulkCacheWritingException, Exception {
- for(Object key :keys) {
- //batch delete
- }
- }
- }).build());
Ehcache 3.x還是使用CacheLoaderWriter來(lái)實(shí)現(xiàn),通過(guò)write(String key, String value)、writeAll(Iterable> entries)和delete(String key)、deleteAll(Iterable keys)分別來(lái)支持單個(gè)寫、批量寫和單個(gè)刪除、批量刪除操作。
操作流程如下。
- 當(dāng)我們調(diào)用myCache.put("e","123")或者myCache.putAll(map)時(shí),寫緩存。
- 首先,Cache會(huì)將寫操作立即委托給CacheLoaderWriter#write和#writeAll,然后由CacheLoaderWriter負(fù)責(zé)立即去寫SoR。
- 當(dāng)寫SoR成功后,再寫入Cache。
3. Write-Behind
Write-Behind,也叫Write-Back,稱之為回寫模式,不同于Write-Through是同步寫SoR和Cache,Write-Behind是異步寫。異步之后可以實(shí)現(xiàn)批量寫、合并寫、延時(shí)和限流。
(1) 異步寫
- CacheManager cacheManager = CacheManagerBuilder. newCacheManagerBuilder()
- .using(PooledExecutionServiceConfigurationBuilder
- .newPooledExecutionServiceConfigurationBuilder()
- .pool("writeBehindPool", 1, 5)
- .build())
- .build(true);
- org.ehcache.Cache<String, String> myCache =cacheManager. createCache ("myCache",
- CacheConfigurationBuilder.newCacheConfigurationBuilder(String.class,String.class,
- ResourcePoolsBuilder.newResourcePoolsBuilder().heap(100,MemoryUnit.MB))
- .withDispatcherConcurrency(4)
- .withExpiry(Expirations.timeToLiveExpiration(Duration.of(10,TimeUnit.SECONDS)))
- .withLoaderWriter(new DefaultCacheLoaderWriter<String,String >() {
- @Override
- public void write(String key, String value) throws Exception{
- //write
- }
- @Override
- public void delete(String key) throws Exception {
- //delete
- }
- })
- .add(WriteBehindConfigurationBuilder
- .newUnBatchedWriteBehindConfiguration()
- .queueSize(5)
- .concurrencyLevel(2
- .useThreadPool("writeBehindPool")
- .build()));
幾個(gè)重要配置如下。
- hreadPool:使用PooledExecutionServiceConfigurationBuilder配置線程池;然后WriteBehindConfigurationBuilder通過(guò)useThreadPool配置使用哪一個(gè)線程池;
- WriteBehindConfigurationBuilder:配置WriteBehind策略;
- CacheLoaderWriter:配置WriteBehind如何操作SoR。
WriteBehindConfigurationBuilder會(huì)進(jìn)行如下幾個(gè)配置。
- newUnBatchedWriteBehindConfiguration:表示不進(jìn)行批量處理,那么所有批量操作都將會(huì)轉(zhuǎn)換成單個(gè)操作,即CacheLoaderWriter只需要實(shí)現(xiàn)write和delete即可。
- queueSize(int size):因?yàn)椴僮魇钱惒交貙慡oR,需要將操作先放入寫操作等待隊(duì)列,因此,使用queue size定義寫操作等待隊(duì)列***大小,即線程池隊(duì)列大小。內(nèi)部使用NonBatchingLocalHeapWriteBehindQueue。
- concurrencyLevel(int concurrency):配置使用多少個(gè)并發(fā)線程和隊(duì)列進(jìn)行WriteBehind。因?yàn)槲覀冎粋魅胍粋€(gè)線程池,這是如何實(shí)現(xiàn)該模式的呢?首先看如下代碼片段。
- for (int i = 0; i < writeBehindConcurrency; i++) {
- if (config.getBatchingConfiguration()== null) {
- this.stripes.add(newNonBatchingLocalHeapWriteBehindQueue<K, V>(executionService,defaultThreadPool, config, cacheLoaderWriter));
- } else {
- this.stripes.add(newBatchingLocalHeapWriteBehindQueue<K, V>(executionService, defaultThreadPool,config, cacheLoaderWriter));
- }
- }
可以看到會(huì)創(chuàng)建concurrencyLevel個(gè)隊(duì)列NonBatchingLocalHeapWriteBehindQueue,其又通過(guò)如下代碼片段創(chuàng)建線程池和線程池隊(duì)列。
- this.executorQueue = new LinkedBlockingQueue<Runnable>(config.getMaxQueueSize());
- if (config.getThreadPoolAlias() == null) {
- this.executor= executionService.getOrderedExecutor(defaultThreadPool, executorQueue);
- } else {
- this.executor= executionService.getOrderedExecutor(config. getThreadPoolAlias(), executorQueue);
- }
- ● CacheLoaderWriter:此處我們只配置了write和delete,而writeAll和deleteAll將會(huì)把批量操作委托給write和delete。
- PooledExecutionService#getOrderedExecutor方法會(huì)創(chuàng)建PartitionedOrderedExecutor實(shí)例。
- PartitionedOrderedExecutor(BlockingQueue<Runnable> queue,ExecutorService executor) {
- this.delegate= new PartitionedUnorderedExecutor(queue, executor, 1);
- }
其使用maxWorkers=1創(chuàng)建了PartitionedUnorderedExecutor,然后Partitioned UnorderedExecutor通過(guò)this.runnerPermit = newSemaphore(maxWorkers)來(lái)控制并發(fā),即maxWorkers=1就實(shí)現(xiàn)了一個(gè)并發(fā)。
因此,Ehcache實(shí)際能寫的***隊(duì)列大小為concurrency level *queue size。
因?yàn)閮?nèi)部使用線程池去寫,因此就實(shí)現(xiàn)了異步寫,又因?yàn)槭褂昧岁?duì)列,因此控制了總的吞吐量(此處有注意根據(jù)實(shí)際場(chǎng)景給線程池配置Rejected Policy),接下來(lái)看下如何實(shí)現(xiàn)批量寫。
(2) 批量寫
- .withLoaderWriter(new DefaultCacheLoaderWriter<String,String>() {
- @Override
- publicvoid writeAll(Iterable<? extends Map.Entry<? extends String,? extends String>> entries) throws BulkCacheWritingException,Exception {
- for(Objectentry : entries) {
- //batchwrite
- }
- }
- @Override
- publicvoid deleteAll(Iterable<? extends String> keys) throws BulkCacheWritingException,Exception {
- for(Objectkey : keys) {
- //batchdelete
- }
- }
- })
- .add(WriteBehindConfigurationBuilder
- .newBatchedWriteBehindConfiguration(3,TimeUnit.SECONDS, 2)
- .queueSize(5)
- .concurrencyLevel(1)
- .enableCoalescing()
- .useThreadPool("writeBehindPool")
- .build()));
和上一個(gè)示例不同的地方是使用了newBatchedWriteBehindConfiguration進(jìn)行批量配置。
● newBatchedWriteBehindConfiguration(longmaxDelay, TimeUnit maxDelayUnit, int batchSize):設(shè)置批處理大小和***延遲。batchSize用于定義批處理大小,當(dāng)寫操作數(shù)量等于批處理大小時(shí),將把這一批數(shù)據(jù)發(fā)給CacheLoaderWriter進(jìn)行處理。Ehcache使用BatchingLocalHeapWriteBehindQueue實(shí)現(xiàn)批量隊(duì)列,其中操作批量的代碼如下。
- if (openBatch.add(operation)) {//往batch里添加操作,添加的數(shù)量等于批處理大小時(shí)
- submit(openBatch);//異步提交批處理操作
- openBatch= null;
- }
因此,Ehcache實(shí)際能寫的***隊(duì)列大小為concurrency level * queue size * batch size。
maxDelay用于配置未完成的批處理***延遲,比如,我們?cè)O(shè)置批處理大小為3,而我們實(shí)際只寫入了兩個(gè)數(shù)據(jù),當(dāng)寫第3個(gè)數(shù)據(jù)時(shí),會(huì)觸發(fā)提交批處理操作。但是,如果我們不寫第3個(gè),那么將造成這2個(gè)數(shù)據(jù)一直等待,我們可以設(shè)置maxDelay,當(dāng)超時(shí)時(shí)也會(huì)將這兩個(gè)數(shù)據(jù)提交批處理。
● enableCoalescing:是否需要合并寫,即對(duì)于相同的Key只記錄***一次數(shù)據(jù)。
● CacheLoaderWriter:write和delete會(huì)轉(zhuǎn)換為writeAll和deleteAll,即批處理。
三、Copy Pattern
有兩種Copy Pattern,Copy-On-Read(在讀時(shí)復(fù)制)和Copy-On-Write(在寫時(shí)復(fù)制),對(duì)于Guava Cache和Ehcache中堆緩存都是基于引用的,這樣如果有人拿到緩存數(shù)據(jù)并修改了它,則可能發(fā)生不可預(yù)測(cè)的問(wèn)題,筆者就見(jiàn)過(guò)因?yàn)檫@種情況造成數(shù)據(jù)錯(cuò)誤。Guava Cache沒(méi)有提供支持,Ehcache 3.x提供了支持。
- public interface Copier<T> {
- TcopyForRead(T obj); //Copy-On-Read,比如myCache.get()
- TcopyForWrite(T obj); //Copy-On-Write,比如myCache.put()
- }
通過(guò)如下方法來(lái)配置Key和Value的Copier。
- CacheConfigurationBuilder.withKeyCopier()
- CacheConfigurationBuilder.withValueCopier()
【本文是51CTO專欄作者張開(kāi)濤的原創(chuàng)文章,作者微信公眾號(hào):開(kāi)濤的博客( kaitao-1234567)】