美團(tuán)一面:項(xiàng)目中使用過Redis嗎?
引言
Redis,作為一種開源的、基于內(nèi)存且支持持久化的鍵值存儲系統(tǒng),以其卓越的性能、豐富靈活的數(shù)據(jù)結(jié)構(gòu)和高度可擴(kuò)展性在全球范圍內(nèi)廣受歡迎。Redis不僅提供了一種簡單直觀的方式來存儲和檢索數(shù)據(jù),更因其支持?jǐn)?shù)據(jù)結(jié)構(gòu)如字符串、哈希、列表、集合、有序集合等多種類型,使得其在眾多場景下表現(xiàn)出強(qiáng)大的適用性和靈活性。
Redis的核心特點(diǎn)包括:
- 高性能:基于內(nèi)存操作,讀寫速度極快,特別適用于對性能要求高的實(shí)時(shí)應(yīng)用。
- 數(shù)據(jù)持久化:支持RDB和AOF兩種持久化方式,確保即使在服務(wù)器重啟后也能恢復(fù)數(shù)據(jù)。
- 分布式的特性:通過主從復(fù)制、哨兵模式或集群模式,Redis可以輕松地構(gòu)建高可用和可擴(kuò)展的服務(wù)。
- 豐富的數(shù)據(jù)結(jié)構(gòu):提供了多種數(shù)據(jù)結(jié)構(gòu)支持,便于開發(fā)人員根據(jù)實(shí)際需求進(jìn)行數(shù)據(jù)建模和處理。
Redis的廣泛應(yīng)用跨越了多個(gè)行業(yè)和技術(shù)領(lǐng)域,諸如網(wǎng)站加速、緩存服務(wù)、會話管理、實(shí)時(shí)統(tǒng)計(jì)、排行榜、消息隊(duì)列、分布式鎖、社交網(wǎng)絡(luò)功能、限流控制等。本文將深入探討Redis在這些場景下的具體應(yīng)用方法及其背后的工作原理,旨在幫助開發(fā)者更好地理解和掌握Redis,以應(yīng)對各種復(fù)雜的業(yè)務(wù)需求,并充分發(fā)揮其潛能。同時(shí),我們也將關(guān)注如何在實(shí)踐中平衡Redis的性能、安全性、一致性等方面的挑戰(zhàn),為實(shí)際項(xiàng)目帶來更高的價(jià)值。
數(shù)據(jù)緩存
在高并發(fā)訪問的場景下,數(shù)據(jù)庫經(jīng)常成為系統(tǒng)的瓶頸。Redis因其內(nèi)存存儲、讀取速度快的特點(diǎn),常被用作數(shù)據(jù)庫查詢結(jié)果的緩存層,有效降低數(shù)據(jù)庫負(fù)載,提高整體系統(tǒng)的響應(yīng)速度。這也是我們使用場景頻率最高的一個(gè)。
通常我們選擇使用String類型來存儲數(shù)據(jù)庫查詢結(jié)果,如單個(gè)實(shí)體對象的JSON序列化形式。
@Service
public class ProductService {
@Autowired
private RedisTemplate<String, Product> redisTemplate;
// 使用@Cacheable注解進(jìn)行緩存
@Cacheable(value = "productCache", key = "#id")
public Product getProductById(String id) {
// 此處是從數(shù)據(jù)庫或其他數(shù)據(jù)源獲取商品的方法
// 在實(shí)際場景中,如果緩存命中,則不會執(zhí)行下面的數(shù)據(jù)庫查詢邏輯
return getProductFromDatabase(id);
}
}
而使用Redis作為緩存使用時(shí),有一些特別需要注意的事項(xiàng):
- 緩存穿透:當(dāng)查詢的數(shù)據(jù)在數(shù)據(jù)庫和緩存中均不存在時(shí),可能會導(dǎo)致大量的無效請求直接打到數(shù)據(jù)庫。可通過布隆過濾器預(yù)防緩存穿透。
- 緩存雪崩:若大量緩存在同一時(shí)刻失效,所有請求都會涌向數(shù)據(jù)庫,造成瞬時(shí)壓力過大??赏ㄟ^設(shè)置合理的過期時(shí)間分散、預(yù)加載或采用Redis集群等方式避免。
- 緩存一致性:當(dāng)數(shù)據(jù)庫數(shù)據(jù)發(fā)生變化時(shí),需要及時(shí)更新緩存,避免數(shù)據(jù)不一致??梢圆捎弥鲃痈虏呗裕ㄈ绫O(jiān)聽數(shù)據(jù)庫binlog)或被動更新策略(如在讀取時(shí)判斷數(shù)據(jù)新鮮度)。
而對于數(shù)據(jù)緩存,我們常使用的業(yè)務(wù)場景如熱點(diǎn)數(shù)據(jù)存儲、全頁緩存等。
會話管理
在說會話管理之前,我們來簡單介紹一下Spring Session。Spring Session 是 Spring Framework 的一個(gè)項(xiàng)目,旨在簡化分布式應(yīng)用程序中的會話管理。在傳統(tǒng)的基于 Servlet 的應(yīng)用程序中,會話管理是通過 HttpSession 接口實(shí)現(xiàn)的,但在分布式環(huán)境中,每個(gè)節(jié)點(diǎn)上的 HttpSession 不能簡單地共享,因此需要一種機(jī)制來管理會話并確保會話在集群中的一致性。
Spring Session 提供了一種簡單的方法來解決這個(gè)問題,它將會話數(shù)據(jù)從容器(如 Tomcat 或 Jetty)中分離出來,并存儲在外部數(shù)據(jù)存儲(如 Redis、MongoDB、JDBC 等)中。這樣,不同節(jié)點(diǎn)上的應(yīng)用程序?qū)嵗梢怨蚕硐嗤臅挃?shù)據(jù),實(shí)現(xiàn)分布式環(huán)境下的會話管理。
所以在Web應(yīng)用中,Redis用于會話管理時(shí),可以取代傳統(tǒng)基于服務(wù)器內(nèi)存或Cookie的會話存儲方案。通過將會話數(shù)據(jù)序列化后存儲為Redis中的鍵值對,實(shí)現(xiàn)跨多個(gè)服務(wù)器實(shí)例的會話共享。
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session-data-redis</artifactId>
<version>3.2.0</version>
</dependency>
然后我們在啟動類中,使用@EnableRedisHttpSession啟用Redis作為會話存儲。
@Configuration
@EnableRedisHttpSession
public class RedisSessionConfig {
@Bean
public RedisConnectionFactory connectionFactory() {
// 這里假設(shè)你已經(jīng)在application.properties或application.yml中配置了Redis的信息
// 根據(jù)實(shí)際情況填寫Redis服務(wù)器地址、端口等信息
return new LettuceConnectionFactory();
}
}
以上是一個(gè)簡單的Spring Session使用Redis進(jìn)行會話管理的示例代碼。通過這種方式,我們可以輕松地在分布式環(huán)境中管理會話,并確保會話數(shù)據(jù)的一致性和可靠性。如果需要了解一些具體的用法,請自行參考Spring Session。
排行榜與計(jì)分板
有序集合(Sorted Sets)是Redis的一種強(qiáng)大數(shù)據(jù)結(jié)構(gòu),可以用來實(shí)現(xiàn)動態(tài)排行榜,每個(gè)成員都有一個(gè)分?jǐn)?shù),按分?jǐn)?shù)排序。有序集合中的每一個(gè)成員都有一個(gè)分?jǐn)?shù)(score),成員依據(jù)其分?jǐn)?shù)進(jìn)行排序,且成員本身是唯一的。
當(dāng)需要給某個(gè)用戶增加積分或改變其排名時(shí),可以使用ZADD命令向有序集合中添加或更新成員及其分?jǐn)?shù)。例如,ZADD leaderboard score member,這里的ranking是有序集合的名稱,score是用戶的積分值,member是用戶ID。
查詢排行榜時(shí),可以使用ZRANGE命令獲取指定范圍內(nèi)的成員及其分?jǐn)?shù),例如,ZRANGE ranking 0 -1 WITHSCORES,這條命令會返回集合中所有的成員及其對應(yīng)的分?jǐn)?shù),按照分?jǐn)?shù)從低到高排序。
若要按照分?jǐn)?shù)從高到低顯示排行榜,使用ZREVRANGE命令,如ZREVRANGE ranking 0 -1 WITHSCORES。
@Service
public class RankingService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void addToRanking(String playerName, int score) {
redisTemplate.opsForZSet().add("ranking", playerName, score);
}
public List<RankingInfo> getRanking() {
List<RankingInfo> rankingInfos = new ArrayList<>();
Set<ZSetOperations.TypedTuple<String>> rankingSet = redisTemplate.opsForZSet().rangeWithScores("ranking", 0, -1);
for (ZSetOperations.TypedTuple<String> tuple : rankingSet) {
RankingInfo rankingInfo = new RankingInfo();
rankingInfo.setPlayerName(tuple.getValue());
rankingInfo.setScore(tuple.getScore().intValue());
rankingInfos.add(rankingInfo);
System.out.println("playerName: " + tuple.getValue() + ", score: " + tuple.getScore().intValue());
}
return rankingInfos;
}
}
我們模擬請求,往redis中填入一些數(shù)據(jù),在獲取排行榜:
圖片
在實(shí)際場景中,有序集合非常適合處理實(shí)時(shí)動態(tài)變化的排行榜數(shù)據(jù),比如京東的月度銷量榜單、商品按時(shí)間的上新排行榜等,因?yàn)樗母潞筒樵儾僮鞫际窃有缘?,并且能高效地支持按分?jǐn)?shù)排序的操作。
計(jì)數(shù)器與統(tǒng)計(jì)
Redis的原子性操作如INCR和DECR可以用于計(jì)數(shù),確保在高并發(fā)環(huán)境下的計(jì)數(shù)準(zhǔn)確性。比如在流量統(tǒng)計(jì)、電商網(wǎng)站商品的瀏覽量、視頻網(wǎng)站視頻的播放數(shù)贊等場景的應(yīng)用。
@Service
public class CounterService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void incrementLikeCount(String postId) {
redisTemplate.opsForValue().increment(postId + ":likes");
}
public void decrementLikeCount(String postId) {
redisTemplate.opsForValue().decrement(postId + ":likes");
}
public long getLikeCount(String postId) {
String value = redisTemplate.opsForValue().get(postId + ":likes");
return StringUtils.isBlank(value) ? 0 : Long.parseLong(value);
}
}
在使用Redis實(shí)現(xiàn)點(diǎn)贊,統(tǒng)計(jì)等功能時(shí)一定要考慮設(shè)置計(jì)數(shù)值的最大值或最小值限制,以及過期策略。
分布式鎖
分布式鎖
Redis的SETNX(設(shè)置并檢查是否存在)和EXPIRE命令組合可以實(shí)現(xiàn)分布式鎖,因其操作時(shí)原子性的,所以可以確保在分布式環(huán)境下同一資源只能被一個(gè)客戶端修改。
使用 Redis 實(shí)現(xiàn)分布式鎖通常會使用 Redis 的 SETNX 命令。這個(gè)命令用于設(shè)置一個(gè)鍵的值,如果這個(gè)鍵不存在的話,它會設(shè)置成功并返回 1,如果這個(gè)鍵已經(jīng)存在,則設(shè)置失敗并返回 0。結(jié)合 Redis 的 EXPIRE 命令,可以為這個(gè)鍵設(shè)置一個(gè)過期時(shí)間,確保即使獲取鎖的客戶端異常退出,鎖也會在一段時(shí)間后自動釋放。
@Component
public class DistributedLock {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public boolean acquireLock(String lockKey, String requestId, long expireTime) {
Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, requestId, expireTime);
return result != null && result;
}
public void releaseLock(String lockKey, String requestId) {
String value = redisTemplate.opsForValue().get(lockKey);
if (value != null && value.equals(requestId)) {
redisTemplate.delete(lockKey);
}
}
}
使用分布式鎖時(shí),務(wù)必確保在加鎖和解鎖操作之間處理完臨界區(qū)代碼,否則可能出現(xiàn)死鎖。并且要注意鎖定超時(shí)時(shí)間應(yīng)當(dāng)合理設(shè)置,以避免鎖定資源長時(shí)間無法釋放。
關(guān)于分布式鎖,推薦使用一些第三方的分布式鎖框架,例如Redisson
全局ID
在全局ID生成的場景中,我們可以使用 Redis 的原子遞增操作來實(shí)現(xiàn)。通過對 Redis 中的一個(gè)特定的 key 進(jìn)行原子遞增操作,可以確保生成的ID是唯一的。
@Component
public class UniqueIdGenerator {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public long generateUniqueId(String key) {
return redisTemplate.opsForValue().increment(key, 1);
}
}
庫存扣減
在扣減庫存的場景中,我們可以使用 Redis 的原子遞減操作來實(shí)現(xiàn)。將庫存數(shù)量存儲在 Redis 的一個(gè)特定key中(例如倉庫編碼:SKU),然后通過遞減操作來實(shí)現(xiàn)庫存的扣減。這樣可以保證在高并發(fā)情況下,庫存扣減的原子性。
@Component
public class StockService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**商品庫存的key*/
private static final String STOCK_PREFIX = "stock:%s:%s";
/**
* 扣減庫存
* @param warehouseCode
* @param productId
* @param quantity
* @return
*/
public boolean decreaseStock(String warehouseCode, String productId, long quantity) {
String key = String.format(STOCK_PREFIX, warehouseCode, productId);
Long stock = redisTemplate.opsForValue().decrement(key, quantity);
return stock >= 0;
}
}
秒殺
在秒殺場景中,使用Lua腳本。Lua 腳本可以在 Redis 服務(wù)器端原子性地執(zhí)行多個(gè)命令,這樣可以避免在多個(gè)命令之間出現(xiàn)競態(tài)條件。
我們使用Lua腳本來檢查庫存是否足夠并進(jìn)行扣減操作。如果庫存足夠,則減少庫存并返回 true;如果庫存不足,則直接返回 false。通過 Lua 腳本的原子性執(zhí)行,可以確保在高并發(fā)情況下,庫存扣減操作的正確性和一致性。
我們先定義一個(gè)扣減庫存的lua腳本,使用Lua腳本一次性執(zhí)行獲取庫存、判斷庫存是否充足以及扣減庫存這三個(gè)操作,確保了操作的原子性
-- 獲取Lua腳本參數(shù):商品ID和要購買的數(shù)量
local productId = KEYS[1]
local amount = tonumber(ARGV[1])
-- 獲取當(dāng)前庫存
local currentStock = tonumber(redis.call('GET', 'seckill:product:'..productId))
-- 判斷庫存是否充足
if currentStock <= 0 or currentStock < amount then
return 0
end
-- 扣減庫存
redis.call('DECRBY', 'seckill:product:'..productId, amount)
-- 返回成功標(biāo)志
return 1
然后在秒殺服務(wù)中使用Redis的DefaultRedisScript執(zhí)行l(wèi)ua腳本,完成秒殺
@Component
public class SeckillService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 初始化RedisScript對象
*/
private final DefaultRedisScript<Long> seckillScript = new DefaultRedisScript<>();
{
seckillScript.setLocation(new ClassPathResource("rate_limiter.lua"));
seckillScript.setResultType(Long.class);
}
public boolean seckillyLua(String productId, int amount){
// 設(shè)置Lua腳本參數(shù)
List<String> keys = Collections.singletonList(productId);
List<String> args = Collections.singletonList(Integer.toString(amount));
// 執(zhí)行Lua腳本
Long result = redisTemplate.execute(seckillScript, keys, args);
// 如果執(zhí)行結(jié)果為1,表示秒殺成功
return Objects.equals(result, 1L);
}
}
關(guān)于秒殺場景,我們也可以使用WATCH命令監(jiān)視庫存鍵,然后嘗試獲取并扣減庫存。如果在WATCH之后、EXEC之前庫存發(fā)生了變化,exec方法會返回null,此時(shí)我們?nèi)∠鸚ATCH并重新嘗試整個(gè)流程,直到成功扣減庫存為止。這樣就實(shí)現(xiàn)了基于Redis樂觀鎖的秒殺場景,有效防止了超賣現(xiàn)象。
/**
* 秒殺方法
* @param productId 商品ID
* @param amount 要購買的數(shù)量
* @return 秒殺成功與否
*/
@Transactional(rollbackFor = Exception.class)
public boolean seckilByWatch(String productId, int amount) {
// 樂觀鎖事務(wù)操作
while (true) {
// WATCH指令監(jiān)控庫存鍵
redisTemplate.watch("stock:" + productId);
// 獲取當(dāng)前庫存
String currentStockStr = redisTemplate.opsForValue().get("stock:" + productId);
if (currentStockStr == null) {
// 庫存不存在,可能是商品已售罄或異常情況
return false;
}
int currentStock = Integer.parseInt(currentStockStr);
// 判斷庫存是否充足
if (currentStock < amount) {
// 庫存不足,取消WATCH并退出循環(huán)
redisTemplate.unwatch();
return false;
}
// 開啟Redis事務(wù)
redisTemplate.multi();
// 執(zhí)行扣減庫存操作
redisTemplate.opsForValue().decrement("stock:" + productId, amount);
// 執(zhí)行其他與秒殺相關(guān)的操作,如增加訂單、更新用戶余額等...
// 提交事務(wù),如果在此期間庫存被其他客戶端修改,則exec返回null
List<Object> results = redisTemplate.exec();
// 如果事務(wù)執(zhí)行成功,跳出循環(huán)
if (!results.isEmpty()) {
return true;
}
}
}
消息隊(duì)列與發(fā)布/訂閱
Redis的發(fā)布/訂閱(Pub/Sub)模式,可以實(shí)現(xiàn)一個(gè)簡單的消息隊(duì)列。發(fā)布/訂閱模式允許消息的發(fā)布者(發(fā)布消息)和訂閱者(接收消息)之間解耦,消息的發(fā)布者不需要知道消息的接收者是誰,從而實(shí)現(xiàn)了一對多的消息傳遞。
首先我們需要定義一個(gè)消息監(jiān)聽器,我們可以實(shí)現(xiàn)這個(gè)借口并實(shí)現(xiàn)其中的方法來處理接收到的消息。這樣可以根據(jù)具體的業(yè)務(wù)需求來定義消息的處理邏輯。
public interface MessageListener {
void onMessage(String channel, String message);
}
然后我們就可以定義消息的生產(chǎn)者以及消費(fèi)者。publish 方法用于向指定頻道發(fā)布消息,我們使用 RedisTemplate 的 convertAndSend 方法來發(fā)送消息到指定的頻道。而subscribe方法用于訂閱指定的頻道,并設(shè)置消息監(jiān)聽器。當(dāng)有消息發(fā)布到指定的頻道時(shí),消息監(jiān)聽器會收到消息并進(jìn)行處理。
@Component
public class MessageQueue {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void publish(String channel, String message) {
redisTemplate.convertAndSend(channel, message);
}
public void subscribe(String channel, MessageListener listener) {
redisTemplate.getConnectionFactory().getConnection().subscribe((message, pattern) -> {
listener.onMessage(channel, message);
}, channel.getBytes());
}
}
使用Redis的發(fā)布訂閱模式實(shí)現(xiàn)一個(gè)輕量級的隊(duì)列時(shí)要注意:Pub/Sub是非持久化的,一旦消息發(fā)布,沒有訂閱者接收的話,消息就會丟失。還有就是Pub/Sub不適合大規(guī)模的消息堆積場景,因?yàn)樗槐WC消息順序和重復(fù)消費(fèi),更適合實(shí)時(shí)廣播型消息推送。
社交網(wǎng)絡(luò)
在社交網(wǎng)絡(luò)中,Redis可以利用集合(Set)、哈希(Hash)和有序集合(Sorted Set)等數(shù)據(jù)結(jié)構(gòu)構(gòu)建用戶關(guān)系圖譜。
使用哈希(Hash)數(shù)據(jù)結(jié)構(gòu)存儲用戶的個(gè)人資料信息,每個(gè)用戶對應(yīng)一個(gè)哈希表,其中包含用戶的各種屬性,比如用戶名、年齡、性別等。
@Component
public class RelationshipGraphService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**用戶資料*/
private static final String USER_PROFILE_PREFIX = "user_profile:";
/**
* 存儲用戶個(gè)人資料
* @param userId
* @param profile
*/
public void setUserProfile(String userId, Map<String, String> profile) {
String key = USER_PROFILE_PREFIX + userId;
redisTemplate.opsForHash().putAll(key, profile);
}
/**
* 獲取用戶個(gè)人資料
* @param userId
* @return
*/
public Map<Object, Object> getUserProfile(String userId) {
String key = USER_PROFILE_PREFIX + userId;
return redisTemplate.opsForHash().entries(key);
}
}
使用集合(Set)數(shù)據(jù)結(jié)構(gòu)來存儲用戶的好友關(guān)系。每個(gè)用戶都有一個(gè)集合,其中包含了他的所有好友的用戶ID。
@Component
public class RelationshipGraphService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**用戶好友*/
private static final String FRIENDS_PREFIX = "friends:";
/**
* 添加好友關(guān)系
* @param userId
* @param friendId
*/
public void addFriend(String userId, String friendId) {
String key = FRIENDS_PREFIX + userId;
redisTemplate.opsForSet().add(key, friendId);
}
/**
* 獲取用戶的所有好友
* @param userId
* @return
*/
public Set<String> getFriends(String userId) {
String key = FRIENDS_PREFIX + userId;
return redisTemplate.opsForSet().members(key);
}
}
同理,我們還可以實(shí)現(xiàn)點(diǎn)贊的業(yè)務(wù)場景
@Service
public class LikeService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 點(diǎn)贊
* @param objectId
* @param userId
*/
public void like(String objectId, String userId) {
// 將點(diǎn)贊人放入zset中
redisTemplate.opsForSet().add(getLikeKey(objectId), userId);
}
/**
* 取消點(diǎn)贊
* @param objectId
* @param userId
*/
public void unlike(String objectId, String userId) {
// 減少點(diǎn)贊人數(shù)
redisTemplate.opsForSet().remove(getLikeKey(objectId), userId);
}
/**
* 是否點(diǎn)贊
* @param objectId
* @param userId
* @return
*/
public Boolean isLiked(String objectId, String userId) {
return redisTemplate.opsForSet().isMember(getLikeKey(objectId), userId);
}
/**
* 獲取點(diǎn)贊數(shù)
* @param objectId
* @return
*/
public Long getLikeCount(String objectId) {
return redisTemplate.opsForSet().size(getLikeKey(objectId));
}
/**
* 獲取所有點(diǎn)贊的用戶
* @param objectId
* @return
*/
public Set<String> getLikedUsers(String objectId) {
return redisTemplate.opsForSet().members(getLikeKey(objectId));
}
private String getLikeKey(String objectId) {
return "likes:" + objectId;
}
}
使用有序集合(Sorted Set)數(shù)據(jù)結(jié)構(gòu)來存儲用戶的關(guān)注者列表。有序集合中的成員是關(guān)注者的用戶ID,而分?jǐn)?shù)可以是關(guān)注時(shí)間或者其他指標(biāo),比如活躍度。
@Component
public class RelationshipGraphService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**用戶關(guān)注者*/
private static final String FOLLOWERS_PREFIX = "followers:";
/**
* 添加關(guān)注者
* @param userId
* @param followerId
* @param score
*/
public void addFollower(String userId, String followerId, double score) {
String key = FOLLOWERS_PREFIX + userId;
redisTemplate.opsForZSet().add(key, followerId, score);
}
/**
* 獲取用戶的關(guān)注者列表(按照關(guān)注時(shí)間排序)
* @param userId
* @return
*/
public Set<String> getFollowers(String userId) {
String key = FOLLOWERS_PREFIX + userId;
return redisTemplate.opsForZSet().range(key, 0, -1);
}
}
除此之外,我們還可以實(shí)現(xiàn)可能認(rèn)識的人,共同好友等業(yè)務(wù)場景。
限流與速率控制
Redis可以精確地實(shí)施限流策略,如使用INCR命令結(jié)合Lua腳本實(shí)現(xiàn)滑動窗口限流。
創(chuàng)建一個(gè)Lua腳本,該腳本負(fù)責(zé)檢查在一定時(shí)間段內(nèi)請求次數(shù)是否超過限制。
-- rate_limiter.lua
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local timeWindow = tonumber(ARGV[2]) -- 時(shí)間窗口,例如單位為秒
-- 獲取當(dāng)前時(shí)間戳
local currentTime = redis.call('TIME')[1]
-- 獲取最近timeWindow秒內(nèi)的請求次數(shù)
local count = redis.call('ZCOUNT', key .. ':requests', currentTime - timeWindow, currentTime)
-- 如果未超過限制,則累加請求次數(shù),并返回true
if count < limit then
redis.call('ZADD', key .. ':requests', currentTime, currentTime)
return 1
else
return 0
end
限流服務(wù)中Redis使用DefaultRedisScript執(zhí)行Lua腳本
@Component
public class RateLimiter {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**限流Key*/
private static final String TATE_LIMITER_KEY = "rate-limit:%s";
/**規(guī)定的時(shí)間窗口內(nèi)允許的最大請求數(shù)量*/
private static final Integer LIMIT = 100;
/**限流策略的時(shí)間窗口長度,單位是秒*/
private static final Integer TIME_WINDOW = 60;
/**
* 初始化RedisScript對象
*/
private final DefaultRedisScript<Long> rateLimiterScript = new DefaultRedisScript<>();
{
rateLimiterScript.setLocation(new ClassPathResource("rate_limiter.lua"));
rateLimiterScript.setResultType(Long.class);
}
/**
* 限流方法 1分鐘內(nèi)最多100次請求
* @param userId
* @return
*/
public boolean allowRequest(String userId) {
String key = String.format(TATE_LIMITER_KEY, userId);
List<String> keys = Collections.singletonList(key);
List<String> args = Arrays.asList(String.valueOf(LIMIT), String.valueOf(TIME_WINDOW));
// 執(zhí)行Lua腳本
Long result = redisTemplate.execute(rateLimiterScript, keys, args);
// 結(jié)果為1表示允許請求,0表示請求被限流
return Objects.equals(result, 1L);
}
}
位運(yùn)算與位圖應(yīng)用
Redis的位圖(BitMap)是一種特殊的數(shù)據(jù)結(jié)構(gòu),它允許我們在單一的字符串鍵(String Key)中存儲一系列二進(jìn)制位(bits),每個(gè)位對應(yīng)一個(gè)布爾值(0或1),并通過偏移量(offset)來定位和操作這些位。位圖極大地節(jié)省了存儲空間,尤其適合于大規(guī)模數(shù)據(jù)的標(biāo)記、統(tǒng)計(jì)和篩選場景。
在位圖中,每一位相當(dāng)于一個(gè)標(biāo)識符,例如可以用來表示用戶是否在線、商品是否有庫存、用戶是否已讀郵件等。相對于傳統(tǒng)的鍵值對存儲。位圖可以非常快速地統(tǒng)計(jì)滿足特定條件的元素個(gè)數(shù),如統(tǒng)計(jì)在線用戶數(shù)、激活用戶數(shù)等。
@Service
public class UserOnlineStatusService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final String ONLINE_STATUS_KEY = "online_status";
private static final String RETENTION_RATE_KEY_PREFIX = "retention_rate:";
private static final String DAILY_ACTIVITY_KEY_PREFIX = "daily_activity:";
/**
* 設(shè)置用戶在線狀態(tài)為在線
* @param userId
*/
public void setUserOnline(long userId) {
redisTemplate.opsForValue().setBit(ONLINE_STATUS_KEY, userId, true);
}
/**
* 設(shè)置用戶在線狀態(tài)為離線
* @param userId
*/
public void setUserOffline(long userId) {
redisTemplate.opsForValue().setBit(ONLINE_STATUS_KEY, userId, false);
}
/**
* 獲取用戶在線狀態(tài)
* @param userId
* @return
*/
public boolean isUserOnline(long userId) {
return redisTemplate.opsForValue().getBit(ONLINE_STATUS_KEY, userId);
}
/**
* 統(tǒng)計(jì)在線用戶數(shù)量
* @return
*/
public long countOnlineUsers() {
return getCount(ONLINE_STATUS_KEY);
}
/**
* 記錄用戶的留存情況
* @param userId
* @param daysAgo
*/
public void recordUserRetention(long userId, int daysAgo) {
String key = RETENTION_RATE_KEY_PREFIX + LocalDate.now().minusDays(daysAgo).toString();
redisTemplate.opsForValue().setBit(key, userId, true);
}
/**
* 獲取指定日期的留存率
* @param daysAgo
* @return
*/
public double getRetentionRate(int daysAgo) {
String key = RETENTION_RATE_KEY_PREFIX + LocalDate.now().minusDays(daysAgo).toString();
long totalUsers = countOnlineUsers();
long retainedUsers = getCount(key);
return (double) retainedUsers / totalUsers * 100;
}
/**
* 記錄用戶的每日活躍情況
* @param userId
*/
public void recordUserDailyActivity(long userId) {
String key = DAILY_ACTIVITY_KEY_PREFIX + LocalDate.now().toString();
redisTemplate.opsForValue().setBit(key, userId, true);
}
/**
* 獲取指定日期的活躍用戶數(shù)量
* @param date
* @return
*/
public long countDailyActiveUsers(LocalDate date) {
String key = DAILY_ACTIVITY_KEY_PREFIX + date.toString();
return getCount(key);
}
/**
* 獲取最近幾天每天的活躍用戶數(shù)量列表
* @param days
* @return
*/
public List<Long> getDailyActiveUsers(int days) {
LocalDate currentDate = LocalDate.now();
List<Long> results = Lists.newArrayList();
for (int i = 0; i < days; i++) {
LocalDate date = currentDate.minusDays(i);
String key = DAILY_ACTIVITY_KEY_PREFIX + date.toString();
results.add(getCount(key));
}
return results;
}
/**
* 獲取key下的數(shù)量
* @param key
* @return
*/
private long getCount(String key) {
return (long) redisTemplate.execute((RedisCallback<Long>) connection -> connection.bitCount(key.getBytes()));
}
}
最新列表
Redis的List(列表)是一個(gè)基于雙向鏈表實(shí)現(xiàn)的數(shù)據(jù)結(jié)構(gòu),允許我們在列表頭部(左端)和尾部(右端)進(jìn)行高效的插入和刪除操作。LPUSH命令:全稱是LIST PUSH LEFT,用于將一個(gè)或多個(gè)值插入到列表的最左邊(頭部),在這里用于將最新生成的內(nèi)容ID推送到列表頂部,保證列表中始終是最新的內(nèi)容排在前面。
LTRIM命令用于修剪列表,保留指定范圍內(nèi)的元素,從而限制列表的長度。在這個(gè)場景中,每次添加新ID后都會執(zhí)行LTRIM操作,只保留最近的N個(gè)ID,確保列表始終保持固定長度,即只包含最新的內(nèi)容ID。
@Service
public class LatestListService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final String LATEST_LIST_KEY = "latest_list";
/**
* 添加最新內(nèi)容ID到列表頭部
* @param contentId 內(nèi)容ID
*/
public void addLatestContent(String contentId) {
ListOperations<String, String> listOps = redisTemplate.opsForList();
listOps.leftPush(LATEST_LIST_KEY, contentId);
// 限制列表最多存儲N個(gè)ID,假設(shè)N為100
listOps.trim(LATEST_LIST_KEY, 0, 99);
}
/**
* 獲取最新的N個(gè)內(nèi)容ID
* @param count 要獲取的數(shù)量,默認(rèn)為10
* @return 最新的內(nèi)容ID列表
*/
public List<String> getLatestContentIds(int count) {
ListOperations<String, String> listOps = redisTemplate.opsForList();
return listOps.range(LATEST_LIST_KEY, 0, count - 1);
}
}
抽獎(jiǎng)
借助Redis的Set數(shù)據(jù)結(jié)構(gòu)以及其內(nèi)置的Spop命令,我們能夠高效且隨機(jī)地選定抽獎(jiǎng)獲勝者。Set作為一種不允許包含重復(fù)成員的數(shù)據(jù)集合,其特性天然適用于防止抽獎(jiǎng)過程中出現(xiàn)重復(fù)參與的情況,確保每位參與者僅擁有一個(gè)有效的抽獎(jiǎng)資格。
由于Set內(nèi)部元素的排列不具備確定性,這意味著在對集合執(zhí)行隨機(jī)獲取操作時(shí),每一次選取都將獨(dú)立且不可預(yù)測,這與抽獎(jiǎng)活動中所要求的隨機(jī)公平原則高度契合。
Redis的Spop命令允許我們在單個(gè)原子操作下,不僅隨機(jī)選取,還會從Set中移除指定數(shù)量(默認(rèn)為1)的元素。這一原子操作機(jī)制尤為關(guān)鍵,在高并發(fā)環(huán)境下,即便有多個(gè)請求同時(shí)進(jìn)行抽獎(jiǎng),Spop也能夠確保同一時(shí)刻只有一個(gè)請求能成功獲取并移除一個(gè)元素,有效避免了重復(fù)選擇同一位參與者作為獲獎(jiǎng)?wù)叩目赡苄浴?/p>
@Service
public class LotteryService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final String PARTICIPANTS_SET_KEY = "lottery:participants";
/**
* 添加參與者到抽獎(jiǎng)名單
* @param participant 參與者ID
*/
public void joinLottery(String participant) {
redisTemplate.opsForSet().add(PARTICIPANTS_SET_KEY, participant);
}
/**
* 抽取一名幸運(yùn)兒
* @return 幸運(yùn)兒ID
*/
public String drawWinner() {
// 使用Spop命令隨機(jī)抽取一個(gè)參與者
return redisTemplate.opsForSet().pop(PARTICIPANTS_SET_KEY);
}
/**
* 抽取N個(gè)幸運(yùn)兒
* @param count 抽取數(shù)量
* @return 幸運(yùn)兒ID列表
*/
public List<String> drawWinners(int count) {
return redisTemplate.opsForSet().pop(PARTICIPANTS_SET_KEY, count);
}
}
Stream類型
Redis Stream作為一種自Redis 5.0起引入的高級數(shù)據(jù)結(jié)構(gòu),專為存儲和處理有序且持久的消息流而設(shè)計(jì)。可視作一個(gè)分布式的、具備持久特性的消息隊(duì)列,通過唯一的鍵名來標(biāo)識每個(gè)Stream,其中容納了多個(gè)攜帶時(shí)間戳和唯一標(biāo)識符的消息實(shí)體。
每條存儲于Stream中的消息都具有全球唯一的message ID,該ID內(nèi)嵌時(shí)間戳和序列編號,旨在確保即使在復(fù)雜的集群部署中仍能保持消息的嚴(yán)格時(shí)序性。這些消息內(nèi)容會持久存儲在Redis中,確保即使服務(wù)器重啟也能安全恢復(fù)。
生產(chǎn)者利用XADD指令將新消息添加到Stream中,而消費(fèi)者則通過XREAD或針對多消費(fèi)者組場景優(yōu)化的XREADGROUP命令來讀取并處理消息。XREADGROUP尤其擅長處理多消費(fèi)者組間的公平分配和持久訂閱,確保消息的公正、有序送達(dá)各個(gè)消費(fèi)者。
Stream核心特性之一是支持消費(fèi)者組機(jī)制,消費(fèi)者組內(nèi)的不同消費(fèi)者可獨(dú)立地消費(fèi)消息,并通過XACK命令確認(rèn)已消費(fèi)的消息,從而實(shí)現(xiàn)了消息的持久化消費(fèi)和至少一次(at-least-once)交付保證。當(dāng)消息量超出消費(fèi)者處理能力時(shí),未處理的消息可在Stream中積壓,直到達(dá)到預(yù)設(shè)的最大容量限制。此外,還能設(shè)定消息的有效期(TTL),逾期未被消費(fèi)的消息將自動剔除。即使在網(wǎng)絡(luò)傳輸過程中消息遭受損失,亦可通過message ID保障消息的冪等性重新投遞。盡管網(wǎng)絡(luò)條件可能導(dǎo)致消息到達(dá)消費(fèi)者的時(shí)間順序與生產(chǎn)者發(fā)出的順序有所偏差,但Stream機(jī)制確保了每個(gè)消息在其內(nèi)在的時(shí)間上下文中依然保持著嚴(yán)格的順序關(guān)系。
Redis Stream作為一個(gè)集消息持久化、多消費(fèi)者公平競爭、消息追溯和排序等功能于一體的強(qiáng)大消息隊(duì)列工具,已在日志采集、實(shí)時(shí)數(shù)據(jù)分析、活動追蹤等諸多領(lǐng)域展現(xiàn)出卓越的適用性和價(jià)值。
@Component
public class LogCollector {
private static final String LOGS_STREAM_KEY = "logs";
private static final String GROUP_NAME = "log_consumers";
private static final String CONSUMER_NAME = "log_consumer";
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// 發(fā)送日志事件至 Redis Stream
public void sendLogEvent(String message, Map<String, String> attributes) {
StreamOperations<String, Object, Object> streamOperations = redisTemplate.opsForStream();
RecordId messageId = streamOperations.add(StreamRecords.newRecord()
.ofStrings(attributes)
.withStreamKey(LOGS_STREAM_KEY));
}
// 實(shí)時(shí)消費(fèi)日志事件
public StreamRecords<String, String> consumeLogs(int batchSize) {
Consumer consumer = Consumer.from(CONSUMER_NAME, GROUP_NAME);
StreamOffset<String> offset = StreamOffset.create(LOGS_STREAM_KEY, ReadOffset.lastConsumed());
StreamReadOptions<String, String> readOptions = StreamReadOptions.empty().count(batchSize);
return redisTemplate.opsForStream().read(readOptions, StreamOffset.create(LOGS_STREAM_KEY, ReadOffset.lastConsumed()), consumer);
}
}
GEO類型
Redis的GEO數(shù)據(jù)類型自3.2版本起引入,專為存儲和高效操作含有經(jīng)緯度坐標(biāo)的地理位置信息而設(shè)計(jì)。開發(fā)人員利用這一類型可以輕松管理地理位置數(shù)據(jù),同時(shí)兼顧內(nèi)存效率和響應(yīng)速度。
利用GEOADD命令,可以將帶有精確經(jīng)緯度坐標(biāo)的數(shù)據(jù)點(diǎn)歸檔至指定鍵名下的集合中。
可借助GEOPOS命令獲取某一成員的具體經(jīng)緯度坐標(biāo)。
通過GEODIST命令,可以準(zhǔn)確計(jì)算任意兩個(gè)地理位置成員之間的地球表面距離,支持多種計(jì)量單位,包括米、千米、英里和英尺。
使用GEORADIUS命令,系統(tǒng)可以根據(jù)指定的經(jīng)緯度中心點(diǎn)及半徑范圍檢索出處于該區(qū)域內(nèi)的所有成員地理位置。
GEORADIUSBYMEMBER命令也用于范圍查詢,但其查詢依據(jù)是選定成員自身的位置,以此為圓心劃定搜索范圍。
GEO類型在許多場景下都非常有用,例如移動應(yīng)用中的附近好友查找、商店位置搜索、物流配送中的最近司機(jī)調(diào)度等。
@Service
public class FriendService {
private static final String FRIEND_LOCATIONS_KEY = "friend_locations";
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private GeoOperations<String, FriendLocation> geoOperations; // 自動裝配GeoOperations
public void saveFriendLocation(FriendLocation location) {
geoOperations.add(FRIEND_LOCATIONS_KEY, location.getLongitude(), location.getLatitude(), location);
}
public List<FriendLocation> findFriendsNearby(double myLongitude, double myLatitude, Distance radius) {
Circle circle = new Circle(new Point(myLongitude, myLatitude), radius);
return geoOperations.radius(FRIEND_LOCATIONS_KEY, circle, Metric.KILOMETERS).getContent();
}
}
總結(jié)
Redis作為一款高性能、內(nèi)存型的NoSQL數(shù)據(jù)庫,憑借其豐富的數(shù)據(jù)結(jié)構(gòu)、極高的讀寫速度以及靈活的數(shù)據(jù)持久化策略,在現(xiàn)代分布式系統(tǒng)中扮演著至關(guān)重要的角色。它的關(guān)鍵價(jià)值體現(xiàn)在以下幾個(gè)方面:
- 緩存優(yōu)化:Redis將頻繁訪問的數(shù)據(jù)存儲在內(nèi)存中,顯著減少了數(shù)據(jù)庫的讀取壓力,提升了系統(tǒng)的整體性能和響應(yīng)速度。
- 分布式支持:通過主從復(fù)制、哨兵和集群模式,Redis實(shí)現(xiàn)了高度可擴(kuò)展性和高可用性,滿足大規(guī)模分布式系統(tǒng)的需求。
- 數(shù)據(jù)結(jié)構(gòu)多樣性:Redis支持字符串、哈希、列表、集合、有序集合、Bitmaps、HyperLogLog、Geo等多樣化的數(shù)據(jù)結(jié)構(gòu),為多種應(yīng)用場景提供了便利,如排行榜、社交關(guān)系、消息隊(duì)列、計(jì)數(shù)器、限速器等。
- 實(shí)時(shí)處理與分析:隨著Redis 5.0引入Stream數(shù)據(jù)結(jié)構(gòu),使得Redis在日志收集、實(shí)時(shí)分析、物聯(lián)網(wǎng)數(shù)據(jù)流處理等方面有了更多的可能性。
- 地理位置服務(wù):GEO類型提供了便捷的空間索引和距離計(jì)算功能,使得Redis能夠在電商、出行、社交等領(lǐng)域提供附近地點(diǎn)搜索、路線規(guī)劃等服務(wù)。