張開(kāi)濤:京東業(yè)務(wù)數(shù)據(jù)應(yīng)用級(jí)緩存示例
一、多級(jí)緩存API封裝
我們的業(yè)務(wù)數(shù)據(jù)如商品類(lèi)目、店鋪、商品基本信息都可以進(jìn)行適當(dāng)?shù)谋镜鼐彺?,以提升性能。?duì)于多實(shí)例的情況時(shí)不僅會(huì)使用本地緩存,還會(huì)使用分布式緩存,因此需要進(jìn)行適當(dāng)?shù)腁PI封裝以簡(jiǎn)化緩存操作。
1. 本地緩存初始化
- public class LocalCacheInitService extends BaseService {
- @Override
- publicvoid afterPropertiesSet() throws Exception {
- //商品類(lèi)目緩存
- Cache<String, Object> categoryCache =
- CacheBuilder.newBuilder()
- .softValues()
- .maximumSize(1000000)
- .expireAfterWrite(Switches.CATEGORY.getExpiresInSeconds()/ 2, TimeUnit.SECONDS)
- .build();
- addCache(CacheKeys.CATEGORY_KEY, categoryCache);
- }
- privatevoid addCache(String key, Cache<?, ?> cache) {
- localCacheService.addCache(key,cache);
- }
- }
本地緩存過(guò)期時(shí)間使用分布式緩存過(guò)期時(shí)間的一半,防止本地緩存數(shù)據(jù)緩存時(shí)間太長(zhǎng)造成多實(shí)例間的數(shù)據(jù)不一致。
另外,將緩存KEY前綴與本地緩存關(guān)聯(lián),從而匹配緩存KEY前綴就可以找到相關(guān)聯(lián)的本地緩存。
2. 寫(xiě)緩存API封裝
先寫(xiě)本地緩存,如果需要寫(xiě)分布式緩存,則通過(guò)異步更新分布式緩存。
- public void set(final String key, final Object value, final intremoteCacheExpiresInSeconds) throws RuntimeException {
- if (value== null) {
- return;
- }
- //復(fù)制值對(duì)象
- //本地緩存是引用,分布式緩存需要序列化
- //如果不復(fù)制的話,則假設(shè)之后數(shù)據(jù)改了將造成本地緩存與分布式緩存不一致
- final Object finalValue = copy(value);
- //如果配置了寫(xiě)本地緩存,則根據(jù)KEY獲得相關(guān)的本地緩存,然后寫(xiě)入
- if (writeLocalCache) {
- Cache localCache = getLocalCache(key);
- if(localCache != null) {
- localCache.put(key, finalValue);
- }
- }
- //如果配置了不寫(xiě)分布式緩存,則直接返回
- if (!writeRemoteCache) {
- return;
- }
- //異步更新分布式緩存
- asyncTaskExecutor.execute(() -> {
- try {
- redisCache.set(key,JSONUtils.toJSON(finalValue), remoteCacheExpiresInSeconds);
- } catch(Exception e) {
- LOG.error("updateredis cache error, key : {}", key, e);
- }
- });
- }
此處使用了異步更新,目的是讓用戶(hù)請(qǐng)求盡快返回。而因?yàn)橛斜镜鼐彺?,所以即使分布式緩存更新比較慢又產(chǎn)生了回源,也可以在本地緩存***。
3. 讀緩存API封裝
先讀本地緩存,本地緩存不***的再批量查詢(xún)分布式緩存,在查詢(xún)分布式緩存時(shí)通過(guò)分區(qū)批量查詢(xún)。
- private Map innerMget(List<String> keys, List<Class> types) throwsException {
- Map<String, Object> result = Maps.newHashMap();
- List<String> missKeys = Lists.newArrayList();
- List<Class> missTypes = Lists.newArrayList();
- //如果配置了讀本地緩存,則先讀本地緩存
- if(readLocalCache) {
- for(int i = 0; i < keys.size(); i++) {
- String key = keys.get(i);
- Class type = types.get(i);
- Cache localCache = getLocalCache(key);
- if(localCache != null) {
- Object value = localCache.getIfPresent(key);
- result.put(key, value);
- if (value == null) {
- missKeys.add(key);
- missTypes.add(type);
- }
- } else {
- missKeys.add(key);
- missTypes.add(type);
- }
- }
- }
- //如果配置了不讀分布式緩存,則返回
- if(!readRemoteCache) {
- returnresult;
- }
- finalMap<String, String> missResult = Maps.newHashMap();
- //對(duì)KEY分區(qū),不要一次性批量調(diào)用太大
- final List<List<String>>keysPage = Lists.partition(missKeys, 10);
- List<Future<Map<String, String>>> pageFutures = Lists.newArrayList();
- try {
- //批量獲取分布式緩存數(shù)據(jù)
- for(final List<String>partitionKeys : keysPage) {
- pageFutures.add(asyncTaskExecutor.submit(() -> redisCache.mget(partitionKeys)));
- }
- for(Future<Map<String,String>> future : pageFutures) {
- missResult.putAll(future.get(3000, TimeUnit.MILLISECONDS));
- }
- } catch(Exception e) {
- pageFutures.forEach(future -> future.cancel(true));
- throw e;
- }
- //合并result和missResult,此處實(shí)現(xiàn)省略
- return result;
- }
此處將批量讀緩存進(jìn)行了分區(qū),防止亂用批量獲取API。
二、NULL Cache
首先,定義NULL對(duì)象。
- private static final String NULL_STRING =new String();
當(dāng)DB沒(méi)有數(shù)據(jù)時(shí),寫(xiě)入NULL對(duì)象到緩存
- //查詢(xún)DB
- String value = loadDB();
- //如果DB沒(méi)有數(shù)據(jù),則將其封裝為NULL_STRING并放入緩存
- if(value == null) {
- value = NULL_STRING;
- }
- myCache.put(id, value);
讀取數(shù)據(jù)時(shí),如果發(fā)現(xiàn)NULL對(duì)象,則返回null,而不是回源到DB
- value = suitCache.getIfPresent(id);
- //DB沒(méi)有數(shù)據(jù),返回null
- if(value == NULL_STRING) {
- return null;
- }
通過(guò)這種方式可以防止當(dāng)KEY對(duì)應(yīng)的數(shù)據(jù)在DB不存在時(shí)頻繁查詢(xún)DB的情況。
三、強(qiáng)制獲取***數(shù)據(jù)
在實(shí)際應(yīng)用中,我們經(jīng)常需要強(qiáng)制更新數(shù)據(jù),此時(shí)就不能使用緩存數(shù)據(jù)了,可以通過(guò)配置ThreadLocal開(kāi)關(guān)來(lái)決定是否強(qiáng)制刷新緩存(refresh方法要配合CacheLoader一起使用)。
- if(ForceUpdater.isForceUpdateMyInfo()) {
- myCache.refresh(skuId);
- }
- String result = myCache.get(skuId);
- if(result == NULL_STRING) {
- return null;
- }
四、失敗統(tǒng)計(jì)
- private LoadingCache<String, AtomicInteger> failedCache =
- CacheBuilder.newBuilder()
- .softValues()
- .maximumSize(10000)
- .build(new CacheLoader<String, AtomicInteger>() {
- @Override
- public AtomicIntegerload(String skuId) throws Exception {
- return new AtomicInteger(0);
- }
- });
當(dāng)失敗時(shí),通過(guò)failedCache.getUnchecked(id).incrementAndGet()增加失敗次數(shù);當(dāng)成功時(shí),使用failedCache.invalidate(id)失效緩存。通過(guò)這種方式可以控制失敗重試次數(shù),而且又是內(nèi)存敏感緩存。當(dāng)內(nèi)存不足時(shí),可以清理該緩存騰出一些空間。
五、延遲報(bào)警
- private static LoadingCache<String, Integer> alarmCache =
- CacheBuilder.newBuilder()
- .softValues()
- .maximumSize(10000).expireAfterAccess(1, TimeUnit.HOURS)
- .build(new CacheLoader<String, Integer>() {
- @Override
- public Integer load(String key) throws Exception {
- return 0;
- }
- });
- //報(bào)警代碼
- Integer count = 0;
- if(redis != null) {
- StringcountStr = Objects.firstNonNull(redis.opsForValue().get(key), "0");
- count =Integer.valueOf(countStr);
- } else {
- count = alarmCache.get(key);
- }
- if(count % 5 == 0) { //5次報(bào)一次
- //報(bào)警
- }
- countcount = count + 1;
- if(redis != null) {
- redis.opsForValue().set(key,String.valueOf(count), 1, TimeUnit. HOURS);
- } else {
- alarmCache.put(key,count);
- }
如果一出問(wèn)題就報(bào)警,則存在報(bào)警量非常多或者假報(bào)警,因此,可以考慮N久報(bào)警了M次,才真正報(bào)警。此時(shí),也可以使用Cache來(lái)統(tǒng)計(jì)。本示例還加入了Redis分布式緩存記錄支持。
六、性能測(cè)試
筆者使用JMH 1.14進(jìn)行基準(zhǔn)性能測(cè)試,比如測(cè)試寫(xiě)。
- @Benchmark
- @Warmup(iterations = 10, time = 10, timeUnit =TimeUnit.SECONDS)
- @Measurement(iterations = 10, time = 10, timeUnit= TimeUnit.SECONDS)
- @BenchmarkMode(Mode.Throughput)
- @OutputTimeUnit(TimeUnit.SECONDS)
- @Fork(1)
- public void test_1_Write() {
- counterWritercounterWriter= counterWriter + 1;
- myCache.put("key"+ counterWriter, "value" + counterWriter);
- }
使用JMH時(shí)首先進(jìn)行JVM預(yù)熱,然后進(jìn)行度量,產(chǎn)生測(cè)試結(jié)果(本文使用吞吐量)。建議讀者按照需求進(jìn)行基準(zhǔn)性能測(cè)試來(lái)選擇適合自己的緩存框架。
【本文是51CTO專(zhuān)欄作者張開(kāi)濤的原創(chuàng)文章,作者微信公眾號(hào):開(kāi)濤的博客( kaitao-1234567)】