一個(gè)測(cè)試記錄:利用【分段鎖】來(lái)處理并發(fā)情況下的資源競(jìng)爭(zhēng)問(wèn)題
目錄
- 問(wèn)題描述
- 測(cè)試代碼
- 測(cè)試結(jié)果
- 測(cè)試代碼簡(jiǎn)介
別人的經(jīng)驗(yàn),我們的階梯!
在開(kāi)發(fā)中經(jīng)常遇到多個(gè)并發(fā)執(zhí)行的線程,需要對(duì)同一個(gè)資源進(jìn)行訪問(wèn),也就是發(fā)生資源競(jìng)爭(zhēng)。
在這種場(chǎng)景中,一般的做法就是加鎖,通過(guò)鎖機(jī)制對(duì)臨界區(qū)進(jìn)行保護(hù),以達(dá)到資源獨(dú)占的目的。
這篇文章主要描述的就是使用分段鎖來(lái)解決這個(gè)問(wèn)題,說(shuō)起來(lái)很簡(jiǎn)單:就是把鎖的粒度降低,以達(dá)到資源獨(dú)占、最大程度避免競(jìng)爭(zhēng)的目的。
問(wèn)題描述
周末和朋友聊天說(shuō)到最近的工作,他們有個(gè)項(xiàng)目,需要把之前的一個(gè)單片機(jī)程序,移植到x86平臺(tái)。
由于歷史的原因,代碼中到處都充斥著全局變量,你懂得:在以前的單片機(jī)中充斥著大量的全局變量,方便、好用啊!
在代碼中,盡量避免使用全局變量。壞處有:不方便模塊化,函數(shù)不可重入,耦合性大。。。
由于大部分的單片機(jī)都只有一個(gè)CPU,是真正的串行操作。
也許你會(huì)說(shuō):會(huì)發(fā)生中斷啊,這也是一種異步操作。
沒(méi)錯(cuò),但是可以在訪問(wèn)全局變量的地方把中斷關(guān)掉,這樣就不會(huì)避免了資源競(jìng)爭(zhēng)的情況了。
但是,移植到x86平臺(tái)之后,在多核的情況下,多個(gè)線程(任務(wù))是真正的并發(fā)執(zhí)行序列。
如果多個(gè)線程同時(shí)操作某一個(gè)全局變量,就一定存在競(jìng)爭(zhēng)的情況。
針對(duì)這個(gè)問(wèn)題,首先想到的方案就是:分配一般互斥鎖,無(wú)論哪個(gè)線程想訪問(wèn)全局變量,首先獲取到鎖,然后才能操作全局變量,操作完成之后再釋放鎖。
但是,這個(gè)方案有一個(gè)很大的問(wèn)題,就是:當(dāng)并發(fā)線程很多的情況下,程序的執(zhí)行效率太低。
他們最后的解決方案是分段加鎖,也就是對(duì)全局變量按照數(shù)據(jù)索引進(jìn)行分割,每一段數(shù)據(jù)分配一把鎖。
至于每一段的數(shù)據(jù)長(zhǎng)度是多少,這需要根據(jù)實(shí)際的業(yè)務(wù)場(chǎng)景進(jìn)行調(diào)整,以達(dá)到最優(yōu)的性能。
回來(lái)之后,我覺(jué)得這個(gè)想法非常巧妙。
這個(gè)機(jī)制看起來(lái)很簡(jiǎn)單,但是真的能解決大問(wèn)題。
于是我就寫(xiě)了一段代碼來(lái)測(cè)試一下:這種方案對(duì)程序的性能有多大的影響。
測(cè)試代碼
在測(cè)試代碼中,定義了一個(gè)全局變量:
volatile int test_data[DATA_TOTAL_NUM];
數(shù)組的長(zhǎng)度是10000(宏定義:DATA_TOTAL_NUM),然后創(chuàng)建100個(gè)線程來(lái)并發(fā)訪問(wèn)這個(gè)全局變量,每個(gè)線程訪問(wèn)100000次。
然后執(zhí)行3個(gè)測(cè)試用例:
測(cè)試1:不使用鎖
00個(gè)線程同時(shí)操作全局變量,訪問(wèn)的數(shù)據(jù)索引隨機(jī)產(chǎn)生,最后統(tǒng)計(jì)每個(gè)線程的平均執(zhí)行時(shí)間。
不使用鎖的話,最后的結(jié)果(全局變量中的數(shù)據(jù)內(nèi)容)肯定是錯(cuò)誤的,這里僅僅是為了看一下時(shí)間消耗。
測(cè)試2:使用一把全局鎖(大鎖)
100個(gè)線程使用一把鎖。
每個(gè)線程在操作全局變量之前,首先要獲取到這把鎖,然后才能操作全局變量,否則的話只能阻塞著等其它線程釋放鎖。
測(cè)試3:使用分段鎖
根據(jù)全局變量的長(zhǎng)度,分配多把鎖。
每個(gè)線程在訪問(wèn)的時(shí)候,根據(jù)訪問(wèn)的數(shù)據(jù)索引,獲取不同的鎖,這樣就降低了競(jìng)爭(zhēng)的幾率。
在這個(gè)測(cè)試場(chǎng)景中,全局變量test_data的長(zhǎng)度是10000,每100個(gè)數(shù)據(jù)分配一把鎖,那么一共需要100把鎖。
比如,在某個(gè)時(shí)刻:
- 線程1想訪問(wèn)test_data[110]。
- 線程2想訪問(wèn)test_data[120]。
- 線程3想訪問(wèn)test_data[9900]。
首先根據(jù)每個(gè)線程要訪問(wèn)的數(shù)據(jù)索引進(jìn)行計(jì)算:這個(gè)索引對(duì)應(yīng)的哪一把鎖?
計(jì)算方式:訪問(wèn)索引 % 每把鎖對(duì)應(yīng)的數(shù)據(jù)長(zhǎng)度
經(jīng)過(guò)計(jì)算得知:線程1、線程2就會(huì)對(duì)第二把鎖進(jìn)行競(jìng)爭(zhēng);
而線程3就可以獨(dú)自獲取最后一把鎖,這樣的話線程3就避開(kāi)了與線程1、線程2的競(jìng)爭(zhēng)。
測(cè)試結(jié)果
$ ./a.out
test1_naked: average = 2876 ms
test2_one_big_lock: average = 11233 ms
test3_segment_lock: average = 3216 ms
從測(cè)試結(jié)果上看,分段加鎖比使用一把全局鎖,對(duì)于程序性能的提高確實(shí)很明顯。
當(dāng)然了,測(cè)試結(jié)果與不同的系統(tǒng)環(huán)境、業(yè)務(wù)場(chǎng)景有關(guān),特別是線程的競(jìng)爭(zhēng)程度、在臨界區(qū)中的執(zhí)行時(shí)間等。
測(cè)試代碼簡(jiǎn)介
測(cè)試代碼沒(méi)有考慮跨邊界的情況。
比如:某個(gè)線程需要訪問(wèn)190 ~ 210這些索引上的數(shù)據(jù),這個(gè)區(qū)間正好跨越了200這個(gè)分界點(diǎn)。
第0把鎖:0 ~ 99。
第1把鎖:100 ~ 199。
第2把鎖:200 ~ 299。
因此,訪問(wèn)190 ~ 210就需要同時(shí)獲取到第1、2把鎖。
在實(shí)際項(xiàng)目中需要考慮到這種跨邊界的情況,通過(guò)計(jì)算開(kāi)始和結(jié)束索引,把這些鎖都獲取到才可以。
當(dāng)然了,為了防止發(fā)生死鎖,需要按照順序來(lái)獲取。
#define THREAD_NUMBER 100 // 線程個(gè)數(shù)
#define LOOP_TIMES_EACH_THREAD 100000 // 每個(gè)線程中 for 循環(huán)的執(zhí)行次數(shù)
#define DATA_TOTAL_NUM 10000 // 全局變量的長(zhǎng)度
#define SEGMENT_LEN 100 // 多少個(gè)數(shù)據(jù)分配一把鎖
volatile int test_data[DATA_TOTAL_NUM]; // 被競(jìng)爭(zhēng)的全局變量
void main(void)
{
test1_naked();
test2_one_big_lock();
test3_segment_lock();
while (1)
sleep(3); // 主線程保持運(yùn)行,也可以使用getchar();
}
// 測(cè)試1:子線程執(zhí)行的函數(shù)
void *test1_naked_function(void *arg)
{
struct timeval tm1, tm2;
gettimeofday(&tm1, NULL);
for (unsigned int i = 0; i < LOOP_TIMES_EACH_THREAD; i++)
{
do_some_work(); // 模擬業(yè)務(wù)操作
unsigned int pos = rand() % DATA_TOTAL_NUM;
test_data[pos] = i * i; // 隨機(jī)訪問(wèn)全局變量中的某個(gè)數(shù)據(jù)
}
gettimeofday(&tm2, NULL);
return (tm2 - tm1);
}
// 測(cè)試2:子線程執(zhí)行的函數(shù)
void *test2_one_big_lock_function(void *arg)
{
test2_one_big_lock_arg *data = (test2_one_big_lock_arg *)arg;
struct timeval tm1, tm2;
gettimeofday(&tm1, NULL);
for (unsigned int i = 0; i < LOOP_TIMES_EACH_THREAD; i++)
{
pthread_mutex_lock(&data->lock); // 上鎖
do_some_work(); // 模擬業(yè)務(wù)操作
unsigned int pos = rand() % DATA_TOTAL_NUM;
test_data[pos] = i * i; // 隨機(jī)訪問(wèn)全局變量中的某個(gè)數(shù)據(jù)
pthread_mutex_unlock(&data->lock); // 解鎖
}
gettimeofday(&tm2, NULL);
return (tm2 - tm1);
}
// 測(cè)試3:子線程執(zhí)行的函數(shù)
void *test3_segment_lock_function(void *arg)
{
test3_segment_lock_arg *data = (test3_segment_lock_arg *)arg;
struct timeval tm1, tm2;
gettimeofday(&tm1, NULL);
for (unsigned int i = 0; i < LOOP_TIMES_EACH_THREAD; i++)
{
unsigned int pos = rand() % DATA_TOTAL_NUM; // 產(chǎn)生隨機(jī)訪問(wèn)的索引
unsigned int lock_index = pos / SEGMENT_LEN; // 根據(jù)索引計(jì)算需要獲取哪一把鎖
pthread_mutex_lock(data->lock + lock_index); // 上鎖
do_some_work(); // 模擬業(yè)務(wù)操作
test_data[pos] = i * i; // 隨機(jī)訪問(wèn)全局變量中的某個(gè)數(shù)據(jù)
pthread_mutex_unlock(data->lock + lock_index); // 解鎖
}
gettimeofday(&tm2, NULL);
return (tm2 - tm1);
}
void test1_naked()
{
創(chuàng)建 100 個(gè)線程,線程執(zhí)行函數(shù)是 test1_naked_function()
printf("test1_naked: average = %ld ms \n", ms_total / THREAD_NUMBER);
}
void test2_one_big_lock()
{
創(chuàng)建 100 個(gè)線程,線程執(zhí)行函數(shù)是 test2_one_big_lock_function(),需要把鎖作為參數(shù)傳遞給子線程。
printf("test2_one_big_lock: average = %ld ms \n", ms_total / THREAD_NUMBER);
}
void test3_segment_lock()
{
根據(jù)全局變量的長(zhǎng)度,初始化很多把鎖。
創(chuàng)建 100 個(gè)線程,線程執(zhí)行函數(shù)是 test2_one_big_lock_function(),需要把鎖作為參數(shù)傳遞給子線程。
printf("test3_segment_lock: average = %ld ms \n", ms_total / THREAD_NUMBER);
}
在Linux系統(tǒng)中可以直接編譯、執(zhí)行,拿來(lái)即用。
祝您好運(yùn)!
本文轉(zhuǎn)載自微信公眾號(hào)「IOT物聯(lián)網(wǎng)小鎮(zhèn)」