探索數(shù)據(jù)結(jié)構(gòu)之美:有序集合的內(nèi)部機(jī)制
在現(xiàn)代軟件開(kāi)發(fā)中,高效的數(shù)據(jù)結(jié)構(gòu)和算法設(shè)計(jì)對(duì)于構(gòu)建高性能系統(tǒng)至關(guān)重要。有序集合(Sorted Set)作為一種常用的數(shù)據(jù)結(jié)構(gòu),在許多應(yīng)用場(chǎng)景中發(fā)揮著重要作用,例如緩存、索引、排名等。本文將深入探討有序集合的內(nèi)部機(jī)制,分析其源代碼,并揭示其實(shí)現(xiàn)細(xì)節(jié)。
注意,本著對(duì)核心數(shù)據(jù)結(jié)構(gòu)的剖析,本文有序集合的所有指令操作都是以字典+跳表這兩個(gè)編碼展開(kāi)討論,對(duì)于壓縮列表的實(shí)現(xiàn)細(xì)節(jié)就不會(huì)涉及。
一、詳解有序集合核心指令實(shí)現(xiàn)
1. 跳表相關(guān)導(dǎo)讀
本文著重于講解有序集合內(nèi)部核心實(shí)現(xiàn),會(huì)涉及大量跳表的知識(shí)點(diǎn),需要了解的讀者建議閱讀一下筆者下面這篇關(guān)于跳表設(shè)計(jì)與實(shí)現(xiàn)的文章:
《高效索引的秘密:Redis跳表設(shè)計(jì)與實(shí)現(xiàn)》
2. 元素添加指令zadd
有序集合添加指令就是zadd,它支持添加一個(gè)或者多個(gè)指令,對(duì)應(yīng)的操作示例如下,可以看到操作成功之后就會(huì)返回添加的元素?cái)?shù):
redis> ZADD myzset 1 "one"
(integer) 1
redis> ZADD myzset 1 "uno"
(integer) 1
redis> ZADD myzset 2 "two" 3 "three"
(integer) 2
對(duì)應(yīng)的指令函數(shù)源碼的實(shí)現(xiàn)是zaddCommand,該函數(shù)大體按照如下步驟進(jìn)行:
- 檢查元素是否能被2整除來(lái)校驗(yàn)參數(shù)個(gè)數(shù)是否正確。
- 檢查有序集合的key是否存在且類(lèi)型確實(shí)是有序集合。
- 逐個(gè)遍歷元素及其score查看該元素是否存在,如果不存在則先插入操有序集合底層的跳表中來(lái)維護(hù)元素之間的先后順序。確保這步操作成功后再將元素指針添加到有序集合的字典中,保證單元素檢索的效率。
- 如果元素已存在,則會(huì)將該元素從跳表中刪除,保證關(guān)于這個(gè)元素的索引都清理干凈后在進(jìn)行插入,以保證跳表的正確性,因?yàn)樵匾呀?jīng)在有序集合字典中存在了,所以更新操作就不會(huì)操作字典了。
對(duì)應(yīng)我們也給出操作的源碼細(xì)節(jié),讀者可以參照筆者的上述的講解了解一下源碼的細(xì)節(jié):
//調(diào)用zaddGenericCommand并傳入0,意為告知zaddGenericCommand要返回本次操作的添加數(shù)(不包括更新)
void zaddCommand(redisClient *c) {
zaddGenericCommand(c,0);
}
/* This generic command implements both ZADD and ZINCRBY. */
void zaddGenericCommand(redisClient *c, int incr) {
//.......
//拿到有序集合里面element和score有幾對(duì)
int j, elements = (c->argc-2)/2;
int added = 0, updated = 0;
//檢查參數(shù)是否是基數(shù)個(gè),如果是則報(bào)錯(cuò)
if (c->argc % 2) {
addReply(c,shared.syntaxerr);
return;
}
//創(chuàng)建score數(shù)組
scores = zmalloc(sizeof(double)*elements);
//遍歷score轉(zhuǎn)為double類(lèi)型,將轉(zhuǎn)換后的結(jié)果存到scores數(shù)組中,后續(xù)元素的score都依次按照順序從數(shù)組中獲取
for (j = 0; j < elements; j++) {
if (getDoubleFromObjectOrReply(c,c->argv[2+j*2],&scores[j],NULL)
!= REDIS_OK) goto cleanup;
}
//查看這個(gè)有序集合是否在redis中存在
zobj = lookupKeyWrite(c->db,key);
//如果不存在則進(jìn)行初始化,然后添加到redis數(shù)據(jù)庫(kù)中
if (zobj == NULL) {
if (server.zset_max_ziplist_entries == 0 ||
server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr))
{
//創(chuàng)建有序集合對(duì)象
zobj = createZsetObject();
} else {
//......
}
//添加到內(nèi)存數(shù)據(jù)庫(kù)中
dbAdd(c->db,key,zobj);
}
//遍歷傳入的每一個(gè)元素
for (j = 0; j < elements; j++) {
//拿到元素的score
score = scores[j];
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
//......
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {//如果轉(zhuǎn)為跳表,則直接走跳表的邏輯
//......
//將元素進(jìn)行編碼轉(zhuǎn)換
ele = c->argv[3+j*2] = tryObjectEncoding(c->argv[3+j*2]);
//查看字典中是否存在這個(gè)元素
de = dictFind(zs->dict,ele);
//如果存在,則進(jìn)行更新操作
if (de != NULL) {
//......
//比對(duì)socre與之前的結(jié)果是否一致,,如果不一致則說(shuō)明該元素的排名要改變,需將其從跳表中移除再插入維護(hù)節(jié)點(diǎn)之間新的關(guān)系
if (score != curscore) {
redisAssertWithInfo(c,curobj,zslDelete(zs->zsl,curscore,curobj));
znode = zslInsert(zs->zsl,score,curobj);
incrRefCount(curobj); /* Re-inserted in skiplist. */
dictGetVal(de) = &znode->score; /* Update score ptr. */
server.dirty++;
updated++;
}
} else {
//先插入到跳表,然后再插入到字典中
znode = zslInsert(zs->zsl,score,ele);
incrRefCount(ele); /* Inserted in skiplist. */
redisAssertWithInfo(c,NULL,dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
incrRefCount(ele); /* Added to dictionary. */
server.dirty++;
added++;
}
} else {
redisPanic("Unknown sorted set encoding");
}
}
//因?yàn)槲覀儌魅氲闹凳?,所以返回添加結(jié)果added
if (incr) /* ZINCRBY */
addReplyDouble(c,score);
else /* ZADD */
addReplyLongLong(c,added);
//......
}
3. 有序集合元素?cái)?shù)量查詢(xún)指令zcard
zcard常用于查看當(dāng)前有序集合的長(zhǎng)度,對(duì)應(yīng)的使用示例如下所示:
redis> ZADD myzset 1 "one"
(integer) 1
redis> ZADD myzset 2 "two"
(integer) 1
redis> ZCARD myzset
(integer) 2
redis>
因?yàn)樯鲜龅牟僮?,有序集合底層都?huì)通過(guò)壓縮列表或者跳表維護(hù)長(zhǎng)度,所以調(diào)用zcard的時(shí)候,本質(zhì)上就是通過(guò)length字段返回當(dāng)前有序集合的長(zhǎng)度:
void zcardCommand(redisClient *c) {
robj *key = c->argv[1];
robj *zobj;
//查看key是否存在,如果不存在則返回0,如果不是有序集合則返回錯(cuò)誤碼
if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
checkType(c,zobj,REDIS_ZSET)) return;
//調(diào)用zsetLength返回有序集合中的元素?cái)?shù)
addReplyLongLong(c,zsetLength(zobj));
}
4. 有序集合順序遍歷指令zrange
ZRANGE指令用于順序遍歷有序集合中所有元素,如果加上WITHSCORES關(guān)鍵字那么該指令就會(huì)返回元素及其score:
redis 127.0.0.1:6379> ZRANGE salary 0 -1 WITHSCORES # 顯示整個(gè)有序集成員
1) "jack"
2) "3500"
3) "tom"
4) "5000"
5) "boss"
6) "10086"
redis 127.0.0.1:6379> ZRANGE salary 1 2 WITHSCORES # 顯示有序集下標(biāo)區(qū)間 1 至 2 的成員
1) "tom"
2) "5000"
3) "boss"
4) "10086"
我們以下面這張圖為例,假設(shè)我們希望查詢(xún)索引1即orange及其之后的元素,有序集合會(huì)從最高層索引開(kāi)始,依次按照下述步驟執(zhí)行:
- 在L3的頭節(jié)點(diǎn)開(kāi)始,走兩步就到達(dá)orange的索引。
- 基于該索引定位到orange元素。
- 基于orange的forward指針不斷向前進(jìn)即完成后續(xù)元素遍歷。
對(duì)此我們給出zrange指令實(shí)現(xiàn)的函數(shù)zrangeCommand,可以看到其底層是調(diào)用zrangeGenericCommand并傳入0進(jìn)行順序查找遍歷輸出的:
void zrangeCommand(redisClient *c) {
//傳入0,代表找到后進(jìn)行順序遍歷輸出
zrangeGenericCommand(c,0);
}
我們?cè)俳o出zrangeGenericCommand的核心代碼段,可以看到在進(jìn)行必要的數(shù)值類(lèi)型轉(zhuǎn)換后,有序集合就會(huì)調(diào)用跳表的方法zslGetElementByRank按照我們上圖所講解的方式定位到元素,然后基于該元素的forward指針不斷步進(jìn)遍歷元素并輸出:
void zrangeGenericCommand(redisClient *c, int reverse) {
//......
//判斷數(shù)值轉(zhuǎn)換是否正常
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
//如果參數(shù)5個(gè)且最后一個(gè)是withscores,說(shuō)明需要輸出對(duì)應(yīng)的節(jié)點(diǎn)和score
if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
withscores = 1;
} else if (c->argc >= 5) {//大于5個(gè)則報(bào)錯(cuò)
addReply(c,shared.syntaxerr);
return;
}
//非空查詢(xún)和類(lèi)型判斷
if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
|| checkType(c,zobj,REDIS_ZSET)) return;
//......
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
//......
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *ln;
robj *ele;
/* Check if starting point is trivial, before doing log(N) lookup. */
if (reverse) {
ln = zsl->tail;
if (start > 0)
ln = zslGetElementByRank(zsl,llen-start);
} else {//順序遍歷,從索引0層開(kāi)始遍歷
ln = zsl->header->level[0].forward;
if (start > 0)//如果大于0 則定位到要查詢(xún)的元素位置,傳入start+1即走start+1步到指定位置
ln = zslGetElementByRank(zsl,start+1);
}
//基于range不斷步進(jìn)完成后續(xù)遍歷
while(rangelen--) {
redisAssertWithInfo(c,zobj,ln != NULL);
ele = ln->obj;
addReplyBulk(c,ele);
//如果傳入withscores則輸出對(duì)應(yīng)元素的score
if (withscores)
addReplyDouble(c,ln->score);
//基于forward或者backward向前或者向后遍歷
ln = reverse ? ln->backward : ln->level[0].forward;
}
} else {
redisPanic("Unknown sorted set encoding");
}
}
5. 基于給定元素值返回元素排名
獲取元素排名的指令為zrank,例如我們有序集合中有如下3個(gè)元素,查詢(xún)tom他在索引1位置,所以返回1,即排第2(索引1再加上1):
redis 127.0.0.1:6379> ZRANGE salary 0 -1 WITHSCORES # 顯示所有成員及其 score 值
1) "peter"
2) "3500"
3) "tom"
4) "4000"
5) "jack"
6) "5000"
redis 127.0.0.1:6379> ZRANK salary tom # 顯示 tom 的薪水排名,第二
(integer) 1
其內(nèi)部調(diào)用的函數(shù)是zrankCommand,然后其內(nèi)部調(diào)用zrankGenericCommand傳入0,代表查詢(xún)當(dāng)前元素的順序排名:
void zrankCommand(redisClient *c) {
//傳入0順序查詢(xún)排名
zrankGenericCommand(c, 0);
}
zrankGenericCommand的源碼如下所示,以跳表結(jié)構(gòu)為例,該方法首先通過(guò)字典定位到這個(gè)元素的score,然后通過(guò)跳表的zslGetRank查詢(xún)這個(gè)元素,按照我們上文中一直強(qiáng)調(diào)的多級(jí)索引跳越查詢(xún)得到元素經(jīng)過(guò)多少個(gè)span,以tom也就是要走2個(gè)span,那么zslGetRank就會(huì)返回2,基于這個(gè)查詢(xún)結(jié)果減去1返回給客戶(hù)端,客戶(hù)端即可知曉tom在跳表中的索引值是1:
void zrankGenericCommand(redisClient *c, int reverse) {
//......
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
//......
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
dictEntry *de;
double score;
//對(duì)象類(lèi)型轉(zhuǎn)換
ele = c->argv[2] = tryObjectEncoding(c->argv[2]);
//定位跳表
de = dictFind(zs->dict,ele);
if (de != NULL) {
//從字典中定位score
score = *(double*)dictGetVal(de);
//從跳表中獲取rank
rank = zslGetRank(zsl,score,ele);
redisAssertWithInfo(c,ele,rank); /* Existing elements always have a rank. */
if (reverse)
addReplyLongLong(c,llen-rank);
else //返回跨度值-1即得到元素在跳表中的索引
addReplyLongLong(c,rank-1);
} else {
addReply(c,shared.nullbulk);
}
} else {
redisPanic("Unknown sorted set encoding");
}
}
6. 刪除節(jié)點(diǎn)指令zrem
當(dāng)有序集合刪除元素時(shí),除了將該元素從字典中移除,還需要將跳表中關(guān)于該元素的指針和索引全部移除,如下圖,假設(shè)我們要?jiǎng)h除orange,本質(zhì)上就是將orange和score到跳表中定位,將其元素和索引全部刪除,然后再將其從字典中刪除,并返回刪除個(gè)數(shù)。
需要補(bǔ)充的是,跳表完成刪除操作后還會(huì)檢查最高層索引,如果最高層索引沒(méi)有任何索引,那么跳表的索引層級(jí)就會(huì)減去1,可能有讀者會(huì)問(wèn)為什么只檢查最高層索引,其實(shí)這和跳表的設(shè)計(jì)思想有關(guān),當(dāng)跳表為新建的節(jié)點(diǎn)生成隨機(jī)層級(jí)索引時(shí),創(chuàng)建索引的層級(jí)永遠(yuǎn)和生成的等級(jí)一致,例如:
- 創(chuàng)建節(jié)點(diǎn)orange,生成索引級(jí)別為3,那么1、2層索引也會(huì)創(chuàng)建。
- 生成banana生成索引為2,那么1層也會(huì)創(chuàng)建索引。
生成apple生成索引層為1,那么就只會(huì)創(chuàng)建1層索引。
這就導(dǎo)致高層索引的個(gè)數(shù)永遠(yuǎn)是最少的,最先出現(xiàn)索引空的情況永遠(yuǎn)是最高層索引,所以跳表進(jìn)行節(jié)點(diǎn)刪除后索引維護(hù)工作永遠(yuǎn)從最高層級(jí)開(kāi)始。
對(duì)此我們也給出有序集合中關(guān)于zrem操作的源碼實(shí)現(xiàn),其核心流程就是筆者說(shuō)的調(diào)用zslDelete從跳表中定位刪除,然后再同步刪除字典中的元素:
void zremCommand(redisClient *c) {
//......
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
//......
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
dictEntry *de;
double score;
//遍歷key通過(guò)字典刪除
for (j = 2; j < c->argc; j++) {
//查找有序集合中是否存
de = dictFind(zs->dict,c->argv[j]);
//如果存在則執(zhí)行刪除操作
if (de != NULL) {
deleted++;
/* Delete from the skiplist */
score = *(double*)dictGetVal(de);
//基于元素值和score到跳表進(jìn)行刪除
redisAssertWithInfo(c,c->argv[j],zslDelete(zs->zsl,score,c->argv[j]));
/* Delete from the hash table */
//然后再?gòu)淖值渲袆h除
dictDelete(zs->dict,c->argv[j]);
//......
}
}
} else {
redisPanic("Unknown sorted set encoding");
}
//......
//返回刪除數(shù)
addReplyLongLong(c,deleted);
}
7. 查詢(xún)?cè)財(cái)?shù)值z(mì)score
該方法并沒(méi)有利用到跳表,而是直接傳入元素值到字典中定位拿到其score返回,在字典沖突較少的情況下時(shí)間復(fù)雜度為O(1),對(duì)應(yīng)源碼如下,比較簡(jiǎn)單,讀者可自行參閱注釋了解:
void zscoreCommand(redisClient *c) {
//.......
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
//......
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {//通過(guò)字典拿到成績(jī)
zset *zs = zobj->ptr;
dictEntry *de;
c->argv[2] = tryObjectEncoding(c->argv[2]);
//將元素待入字典中定位
de = dictFind(zs->dict,c->argv[2]);
//不為空取出它的score返回
if (de != NULL) {
score = *(double*)dictGetVal(de);
//返回score
addReplyDouble(c,score);
} else {
addReply(c,shared.nullbulk);
}
} else {
redisPanic("Unknown sorted set encoding");
}
}