MariaDB線程池源碼分析
在引入線程池之前,MySQL支持的線程處理方式(thread_handling參數(shù)控制)有no-threads和one-thread-per-connection兩種方式,no-threads方式是指任一時(shí)刻最多只有一個(gè)連接可以連接到server,一般用于實(shí)驗(yàn)性質(zhì)。 one-thread-per-connection是指針對(duì)每個(gè)連接創(chuàng)建一個(gè)線程來(lái)處理這個(gè)連接的所有請(qǐng)求,直到連接斷開(kāi),線程 結(jié)束。是thread_handling的默認(rèn)方式。
one-thread-per-connection存在的問(wèn)題就是需要為每個(gè)連接創(chuàng)建一個(gè)新的thread,當(dāng)并發(fā)連接數(shù)達(dá)到一定 程度,性能會(huì)有明顯下降,因?yàn)檫^(guò)多的線程會(huì)導(dǎo)致頻繁的上下文切換,CPU cache命中率降低和鎖的競(jìng)爭(zhēng) 更加激烈。
解決one-thread-per-connection的方法就是降低線程數(shù),這樣就需要多個(gè)連接共用線程,這便引入了線程 池的概念。線程池中的線程是針對(duì)請(qǐng)求的,而不是針對(duì)連接的,也就是說(shuō)幾個(gè)連接可能使用相同的線程處理 各自的請(qǐng)求。
MariaDB在5.5引入了一個(gè)動(dòng)態(tài)的線程池方案,可以根據(jù)當(dāng)前請(qǐng)求的并發(fā)情況自動(dòng)增加或減少線程數(shù),還好 MariaDB完全開(kāi)源,本文結(jié)合MariaDB的代碼來(lái)介紹下thread pool的實(shí)現(xiàn)。這里使用的MariaDB 10.0的 代碼樹(shù)。
1 相關(guān)參數(shù)
MySQL的參數(shù)都寫(xiě)在sys_vars.cc文件下。
- static Sys_var_uint Sys_threadpool_idle_thread_timeout(
- "thread_pool_idle_timeout",
- "Timeout in seconds for an idle thread in the thread pool."
- "Worker thread will be shut down after timeout",
- GLOBAL_VAR(threadpool_idle_timeout), CMD_LINE(REQUIRED_ARG),
- VALID_RANGE(1, UINT_MAX), DEFAULT(60), BLOCK_SIZE(1)
- );
- static Sys_var_uint Sys_threadpool_oversubscribe(
- "thread_pool_oversubscribe",
- "How many additional active worker threads in a group are allowed.",
- GLOBAL_VAR(threadpool_oversubscribe), CMD_LINE(REQUIRED_ARG),
- VALID_RANGE(1, 1000), DEFAULT(3), BLOCK_SIZE(1)
- );
- static Sys_var_uint Sys_threadpool_size(
- "thread_pool_size",
- "Number of thread groups in the pool. "
- "This parameter is roughly equivalent to maximum number of concurrently "
- "executing threads (threads in a waiting state do not count as executing).",
- GLOBAL_VAR(threadpool_size), CMD_LINE(REQUIRED_ARG),
- VALID_RANGE(1, MAX_THREAD_GROUPS), DEFAULT(my_getncpus()), BLOCK_SIZE(1),
- NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
- ON_UPDATE(fix_threadpool_size)
- );
- static Sys_var_uint Sys_threadpool_stall_limit(
- "thread_pool_stall_limit",
- "Maximum query execution time in milliseconds,"
- "before an executing non-yielding thread is considered stalled."
- "If a worker thread is stalled, additional worker thread "
- "may be created to handle remaining clients.",
- GLOBAL_VAR(threadpool_stall_limit), CMD_LINE(REQUIRED_ARG),
- VALID_RANGE(10, UINT_MAX), DEFAULT(500), BLOCK_SIZE(1),
- NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
- ON_UPDATE(fix_threadpool_stall_limit)
- );
這幾個(gè)參數(shù)都有相應(yīng)的描述,這里再稍微具體介紹一下。
thread_pool_size: 線程池的分組(group)個(gè)數(shù)。MariaDB的線程池并不是說(shuō)一整個(gè) 大池子,而是分成了不同的group,而且是按照到來(lái)connection的順序進(jìn)行分組的,如 第一個(gè)connection分配到group[0],那么第二個(gè)connection就分配到group[1],是一種 Round Robin的輪詢分配方式。默認(rèn)值是CPU core個(gè)數(shù)。
thread_pool_idle_timeout: 線程最大空閑時(shí)間,如果某個(gè)線程空閑的時(shí)間大于這個(gè) 參數(shù),則線程退出。
thread_pool_stall_limit: 監(jiān)控間隔時(shí)間,thread pool有個(gè)監(jiān)控線程,每隔這個(gè)時(shí)間, 會(huì)檢查每個(gè)group的線程可用數(shù)等狀態(tài),然后進(jìn)行相應(yīng)的處理,如wake up或者create thread。
thread_pool_oversubscribe: 允許的每個(gè)group上的活躍的線程數(shù),注意這并不是每個(gè)group上的 最大線程數(shù),而只是可以處理請(qǐng)求的線程數(shù)。
2 thread handling設(shè)置
thread pool模式其實(shí)是新增了一種thread_handling的方式,即在配置文件中設(shè)置:
- [mysqld]
- thread_handling=pool-of-threads.
- ....
MySQL內(nèi)部是有一個(gè)scheduler_functions結(jié)構(gòu)體,不論thread_handling是哪種方式,都是通過(guò)設(shè)置這個(gè) 結(jié)構(gòu)體中的函數(shù)來(lái)進(jìn)行不同的調(diào)度。
- /** scheduler_functions結(jié)構(gòu)體 */
- struct scheduler_functions
- {
- uint max_threads, *connection_count;
- ulong *max_connections;
- bool (*init)(void);
- bool (*init_new_connection_thread)(void);
- void (*add_connection)(THD *thd);
- void (*thd_wait_begin)(THD *thd, int wait_type);
- void (*thd_wait_end)(THD *thd);
- void (*post_kill_notification)(THD *thd);
- bool (*end_thread)(THD *thd, bool cache_thread);
- void (*end)(void);
- };
- static int get_options(int *argc_ptr, char ***argv_ptr)
- {
- ...
- /** 根據(jù)thread_handling選項(xiàng)的設(shè)置,選擇不同的處理方式*/
- if (thread_handling <= SCHEDULER_ONE_THREAD_PER_CONNECTION)
- /**one thread per connection 方式 */
- one_thread_per_connection_scheduler(thread_scheduler, &max_connections,
- &connection_count);
- else if (thread_handling == SCHEDULER_NO_THREADS)
- /** no thread 方式 */
- one_thread_scheduler(thread_scheduler);
- else
- /** thread pool 方式 */
- pool_of_threads_scheduler(thread_scheduler, &max_connections,
- &connection_count);
- ...
- }
- static scheduler_functions tp_scheduler_functions=
- {
- 0, // max_threads
- NULL,
- NULL,
- tp_init, // init
- NULL, // init_new_connection_thread
- tp_add_connection, // add_connection
- tp_wait_begin, // thd_wait_begin
- tp_wait_end, // thd_wait_end
- post_kill_notification, // post_kill_notification
- NULL, // end_thread
- tp_end // end
- };
- void pool_of_threads_scheduler(struct scheduler_functions *func,
- ulong *arg_max_connections,
- uint *arg_connection_count)
- {
- /** 設(shè)置scheduler_functions結(jié)構(gòu)體為tp_scheduler_functions */
- *func = tp_scheduler_functions;
- func->max_threads= threadpool_max_threads;
- func->max_connections= arg_max_connections;
- func->connection_count= arg_connection_count;
- scheduler_init();
- }
上面可以看到設(shè)置了thread_scheduler的處理函數(shù)為tp_scheduler_functions,即 為thread pool方式,這種方式對(duì)應(yīng)的初始函數(shù)為tp_init, 創(chuàng)建新連接的函數(shù)為 tp_add_connection,等待開(kāi)始函數(shù)為tp_wait_begin,等待結(jié)束函數(shù)為tp_wait_end. 這里說(shuō)明下等待函數(shù)的意義,等待函數(shù)一般是在等待磁盤(pán)I/O,等待鎖資源,SLEEP,或者等待 網(wǎng)絡(luò)消息的時(shí)候,調(diào)用wait_begin,在等待結(jié)束后調(diào)用wait_end,那么為什么要等待的時(shí)候 調(diào)用等待函數(shù)呢?這個(gè)在后面進(jìn)行介紹。
上面講的其實(shí)和thread pool關(guān)系不是很大,下面開(kāi)始thread pool流程的介紹。thread pool涉及 到的源碼在emphsql/threadpool_common.cc和emphsql/threadpool_unix.cc, 對(duì)于windows而言,還有emphsql/threadpool_win.cc.
3 線程池初始化——tp_init
- >tp_init
- | >thread_group_init
- | >start_timer
tp_init非常簡(jiǎn)單,首先是調(diào)用了thread_group_init進(jìn)行組的初始化, 然后調(diào)用的start_timer開(kāi)啟了監(jiān)控線程timer_thread。 至此為止,thread pool里面只有一個(gè)監(jiān)控線程啟動(dòng),而沒(méi)有任何工作線程, 直到有新的連接到來(lái)。
4 添加新連接——tp_add_connection
- void tp_add_connection(THD *thd)
- {
- DBUG_ENTER("tp_add_connection");
- threads.append(thd);
- mysql_mutex_unlock(&LOCK_thread_count);
- connection_t *connection= alloc_connection(thd);
- if (connection)
- {
- thd->event_scheduler.data= connection;
- /* Assign connection to a group. */
- thread_group_t *group=
- &all_groups[thd->thread_id%group_count];
- connection->thread_group=group;
- mysql_mutex_lock(&group->mutex);
- group->connection_count++;
- mysql_mutex_unlock(&group->mutex);
- /*
- Add connection to the work queue.Actual logon
- will be done by a worker thread.
- */
- queue_put(group, connection);
- }
- else
- {
- /* Allocation failed */
- threadpool_remove_connection(thd);
- }
- DBUG_VOID_RETURN;
- }
但server的主監(jiān)聽(tīng)線程監(jiān)聽(tīng)到有客戶端的connect時(shí),會(huì)調(diào)用tp_add_connection函數(shù)進(jìn)行處理。 首先根據(jù)thread_id對(duì)group_count取模,找到其所屬的group,然后調(diào)用queue_put將此connection 放入到group中的queue中。這里涉及到兩個(gè)新的結(jié)構(gòu)體,connection_t和thread_group_t。
- struct connection_t
- {
- THD *thd;
- thread_group_t *thread_group;
- connection_t *next_in_queue;
- connection_t **prev_in_queue;
- ulonglong abs_wait_timeout; //等待超時(shí)時(shí)間
- bool logged_in; //是否進(jìn)行了登錄驗(yàn)證
- bool bound_to_poll_descriptor; //是否添加到了epoll進(jìn)行監(jiān)聽(tīng)
- bool waiting; //是否在等待狀態(tài),如I/O, sleep
- };
- struct thread_group_t
- {
- mysql_mutex_t mutex;
- connection_queue_t queue; //connection請(qǐng)求鏈表
- worker_list_t waiting_threads; //group中正在等待被喚醒的thread
- worker_thread_t *listener; //當(dāng)前group中用于監(jiān)聽(tīng)的線程
- pthread_attr_t *pthread_attr;
- int pollfd; //epoll 文件描述符,用于綁定group中的所有連接
- int thread_count; //線程數(shù)
- int active_thread_count;//活躍線程數(shù)
- int connection_count; //連接數(shù)
- /* Stats for the deadlock detection timer routine.*/
- int io_event_count; //epoll產(chǎn)生的事件數(shù)
- int queue_event_count; //工作線程消化的事件數(shù)
- ulonglong last_thread_creation_time;
- int shutdown_pipe[2];
- bool shutdown;
- bool stalled; // 工作線程是否處于停滯狀態(tài)
- } MY_ALIGNED(512);
上面對(duì)這些參數(shù)進(jìn)行了說(shuō)明,理解這些參數(shù)的意義,才能了解這個(gè)動(dòng)態(tài)thread pool的管理機(jī)制, 因?yàn)槊總€(gè)參數(shù)都會(huì)影響到thread pool的增長(zhǎng)或收縮。
介紹完結(jié)構(gòu)體,繼續(xù)回到新的連接到來(lái),這時(shí)會(huì)調(diào)用queue_put函數(shù),將此connection放到 group的隊(duì)列queue中。
- static void queue_put(thread_group_t *thread_group, connection_t *connection)
- {
- DBUG_ENTER("queue_put");
- mysql_mutex_lock(&thread_group->mutex);
- thread_group->queue.push_back(connection);
- if (thread_group->active_thread_count == 0)
- wake_or_create_thread(thread_group);
- mysql_mutex_unlock(&thread_group->mutex);
- DBUG_VOID_RETURN;
- }
注意,這時(shí)候有個(gè)active_thread_count的判斷,如果沒(méi)有活躍的線程,那么就無(wú)法處理 這個(gè)新到的請(qǐng)求啊,這時(shí)就需要調(diào)用wake_or_create_thread,這個(gè)函數(shù)首先會(huì)嘗試喚醒group 等待線程鏈表waiting_threads中的線程,如果沒(méi)有等待中的線程,則需要?jiǎng)?chuàng)建一個(gè)線程。 至此,新到的connection被掛到了group的queue上,這樣一個(gè)連接算是add進(jìn)隊(duì)列了,那么如何 處理這個(gè)連接呢?我們繼續(xù)往下看。
5 工作線程——worker_main
由于是第一個(gè)連接到來(lái),那么肯定沒(méi)有waiting_threads,此時(shí)會(huì)調(diào)用create_worker 函數(shù)創(chuàng)建一個(gè)工作線程。我們直接來(lái)看下工作線程。
- static void *worker_main(void *param)
- {
- ...
- DBUG_ENTER("worker_main");
- thread_group_t *thread_group = (thread_group_t *)param;
- /* Run event loop */
- for(;;)
- {
- connection_t *connection;
- struct timespec ts;
- set_timespec(ts,threadpool_idle_timeout);
- connection = get_event(&this_thread, thread_group, &ts);
- if (!connection)
- break;
- this_thread.event_count++;
- handle_event(connection);
- }
- ....
- my_thread_end();
- return NULL;
- }
上面是整個(gè)工作線程的邏輯,可以看到是一個(gè)循環(huán),get_event用來(lái)獲取新的需要處理的 connection,然后調(diào)用handle_event進(jìn)行處理相應(yīng)的connection。one thread per connection 中每個(gè)線程也是一個(gè)循環(huán)體,這兩者之間的區(qū)別就是,thread pool的循環(huán)等待的是一個(gè)可用的event, 并不局限于某個(gè)固定的connection的event,而one thread per connection的循環(huán)等待是等待固定的 connection上的event,這就是兩者最大的區(qū)別。
6 事件獲取——get_event
工作線程通過(guò)get_event獲取需要處理的connection,
- connection_t *get_event(worker_thread_t *current_thread,
- thread_group_t *thread_group, struct timespec *abstime)
- {
- ...
- for(;;)
- {
- ...
- /** 從QUEUE中獲取connection */
- connection = queue_get(thread_group);
- if(connection) {
- fprintf(stderr, "Thread %x get a new connection.\n", (unsigned int)pthread_self());
- break;
- }
- ...
- /**監(jiān)聽(tīng)epoll */
- if(!thread_group->listener)
- {
- thread_group->listener= current_thread;
- thread_group->active_thread_count--;
- mysql_mutex_unlock(&thread_group->mutex);
- fprintf(stderr, "Thread %x waiting for a new event.\n", (unsigned int)pthread_self());
- connection = listener(current_thread, thread_group);
- fprintf(stderr, "Thread %x get a new event for connection %p.\n",
- (unsigned int)pthread_self(), connection);
- mysql_mutex_lock(&thread_group->mutex);
- thread_group->active_thread_count++;
- /* There is no listener anymore, it just returned. */
- thread_group->listener= NULL;
- break;
- }
- ...
- }
這個(gè)get_event的函數(shù)邏輯稍微有點(diǎn)多,這里只抽取了獲取事件的兩個(gè)點(diǎn), 我們接著按照第一個(gè)連接到來(lái)是的情形進(jìn)行說(shuō)明, 第一個(gè)連接到來(lái),queue中有了一個(gè)connection,這是get_event便會(huì)從queue中獲取到一個(gè) connection,返回給worker_main線程。worker_main接著調(diào)用handle_event進(jìn)行事件處理。
每個(gè)新的connection連接到服務(wù)器后,其socket會(huì)綁定到group的epoll中,所以,如果queue中 沒(méi)有connection,需要從epool中獲取,每個(gè)group的所有連接的socket都綁定在group的epool 中,所以任何一個(gè)時(shí)刻,最多只有一個(gè)線程能夠監(jiān)聽(tīng)epoll,如果epoll監(jiān)聽(tīng)到有event的話,也會(huì)返回 相應(yīng)的connection,然后再調(diào)用handle_event進(jìn)行處理。
7 事件處理——handle_event
handle_event的邏輯比較簡(jiǎn)單,就是根據(jù)connection_t上是否登錄過(guò),進(jìn)行分支,如果沒(méi) 登錄過(guò),說(shuō)明是新到的連接,則進(jìn)行驗(yàn)證,否則直接進(jìn)行請(qǐng)求處理。
- static void handle_event(connection_t *connection)
- {
- DBUG_ENTER("handle_event");
- int err;
- if (!connection->logged_in) //處理登錄
- {
- err= threadpool_add_connection(connection->thd);
- connection->logged_in= true;
- }
- else //處理請(qǐng)求
- {
- err= threadpool_process_request(connection->thd);
- }
- if(err)
- goto end;
- set_wait_timeout(connection);
- /** 設(shè)置socket到epoll的監(jiān)聽(tīng) */
- err= start_io(connection);
- end:
- if (err)
- connection_abort(connection);
- DBUG_VOID_RETURN;
- }
- static int start_io(connection_t *connection)
- {
- int fd = mysql_socket_getfd(connection->thd->net.vio->mysql_socket);
- ...
- /* 綁定到epoll *。
- if (!connection->bound_to_poll_descriptor)
- {
- connection->bound_to_poll_descriptor= true;
- return io_poll_associate_fd(group->pollfd, fd, connection);
- }
- return io_poll_start_read(group->pollfd, fd, connection);
- }
注意,在handle_event之后,會(huì)調(diào)用start_io,這個(gè)函數(shù)很重要,這個(gè)函數(shù)會(huì)將新 到的connection的socket綁定到group的epoll上進(jìn)行監(jiān)聽(tīng)。
8 線程等待
當(dāng)group中的線程沒(méi)有任務(wù)執(zhí)行時(shí),所有線程都會(huì)在get_event處等待,但是有兩種等待方式, 一種是在epoll上等待事件,每個(gè)group中只有一個(gè)線程會(huì)做這個(gè)事情,且這個(gè)會(huì)一直等待,直到有新 的事件到來(lái)。另一種就是等待一定的時(shí)間, 即參數(shù)thread_pool_idle_time這個(gè)時(shí)間,如果超過(guò)了這個(gè)時(shí)間,那么當(dāng)前的線程的get_event就會(huì) 返回空,然后worker_main線程就會(huì)退出。如果在線程等待的過(guò)程被喚醒的話,那么就會(huì)繼續(xù)在 get_event中進(jìn)行循環(huán),等待新的事件。
9 喚醒等待線程
有兩種方式會(huì)喚醒等待的線程,一種是監(jiān)控線程timer_thread,另一種就是一些active的線程碰到 需要等待的時(shí)候,會(huì)調(diào)用tp_wait_begin,這個(gè)函數(shù)如果判斷當(dāng)前沒(méi)有active的thread且沒(méi)有thread監(jiān)聽(tīng) epoll,則會(huì)調(diào)用wake_or_create_thread。
監(jiān)控線程timer_thread用于定期監(jiān)控group中的thread使用情況,具體的檢查函數(shù)是check_stall.
- void check_stall(thread_group_t *thread_group)
- {
- ...
- /** 如果沒(méi)有線程監(jiān)聽(tīng)epoll且自上次檢查到現(xiàn)在沒(méi)有新的event事件產(chǎn)生,說(shuō)明所有的
- 活躍線程都在 忙于執(zhí)行長(zhǎng)任務(wù),則需要喚醒或創(chuàng)建工作線程 */
- if (!thread_group->listener && !thread_group->io_event_count)
- {
- wake_or_create_thread(thread_group);
- mysql_mutex_unlock(&thread_group->mutex);
- return;
- }
- /* Reset io event count */
- thread_group->io_event_count= 0;
- /** 如果隊(duì)列queue中有請(qǐng)求,且自上次檢查到現(xiàn)在queue中的請(qǐng)求沒(méi)有被消化,
- 則說(shuō)明所有活躍線程忙于執(zhí)行長(zhǎng)任務(wù),需要喚醒或創(chuàng)建工作線程*/
- if (!thread_group->queue.is_empty() && !thread_group->queue_event_count)
- {
- thread_group->stalled= true;
- wake_or_create_thread(thread_group);
- }
- /* Reset queue event count */
- thread_group->queue_event_count= 0;
- mysql_mutex_unlock(&thread_group->mutex);
- }
10 小結(jié)
MariaDB的thread pool的實(shí)現(xiàn)相對(duì)比較簡(jiǎn)單,總體上就是將group中所有的connection的socket掛在 group的epoll_fd上進(jìn)行事件監(jiān)聽(tīng),監(jiān)聽(tīng)到的事件或被當(dāng)前線程執(zhí)行,或者被push到group的queue上 被其他線程執(zhí)行。
監(jiān)控線程timer_thread定期的根據(jù)需要去喚醒等待線程或創(chuàng)建新的線程,來(lái)達(dá)到動(dòng)態(tài)增加的thread的 目的。而thread的收縮則是通過(guò)線程等待事件超時(shí)來(lái)完成的。
btw,在跟蹤代碼的過(guò)程中,也發(fā)現(xiàn)了使用thread pool時(shí)導(dǎo)致server crash的情況,提交了個(gè) bug給MariaDB,發(fā)現(xiàn)當(dāng)天就有回復(fù), 并立刻修復(fù)push到source tree上了,看來(lái)MariaDB的團(tuán)隊(duì)反映夠迅速的,贊一個(gè)。
原文鏈接:http://www.cnblogs.com/nocode/archive/2013/05/25/3098317.html