C++多進(jìn)程并發(fā)框架
三年來一直從事服務(wù)器程序開發(fā),一直都是忙忙碌碌,不久前結(jié)束了職業(yè)生涯的***份工作,有了一個禮拜的休息時間,終于可以寫寫總結(jié)了。于是把以前的開源代碼做了整理和優(yōu)化,這就是FFLIB。雖然這邊總結(jié)看起來像日記,有很多廢話,但是此文仍然是有很大針對性的。針對服務(wù)器開發(fā)中常見的問題,如多線程并發(fā)、消息轉(zhuǎn)發(fā)、異步、性能優(yōu)化、單元測試,提出自己的見解。
面對的問題
從事開發(fā)工程中,遇到過不少問題,很多時候由于時間緊迫,沒有使用優(yōu)雅的方案。在跟業(yè)內(nèi)的一些朋友交流過程中,我也意識到有些問題是大家都存在的。簡單列舉如下:
- 多線程與并發(fā)
- 異步消息/接口調(diào)用
- 消息的序列化與Reflection
- 性能優(yōu)化
- 單元測試
多線程與并發(fā)
現(xiàn)在是多核時代,并發(fā)才能實現(xiàn)更高的吞吐量、更快的響應(yīng),但也是把雙刃劍??偨Y(jié)如下幾個用法:
- 多線程+顯示鎖;接口是被多線程調(diào)用的,當(dāng)被調(diào)用時,顯示加鎖,再操作實體數(shù)據(jù)。悲劇的是,工程師為了優(yōu)化會設(shè)計多個鎖,以減少鎖的粒度,甚至有些地方使用了原子操作。這些都為領(lǐng)域邏輯增加了額外的設(shè)計負(fù)擔(dān)。最壞的情況是會出現(xiàn)死鎖。
- 多線程+任務(wù)隊列;接口被多線程調(diào)用,但請求會被暫存到任務(wù)隊列,而任務(wù)隊列會被單線程不斷執(zhí)行,典型生產(chǎn)者消費者模式。它的并發(fā)在于不同的接口可以使用不同的任務(wù)隊列。這也是我最常用的并發(fā)方式。
這是兩種最常見的多線程并發(fā),它們有個天生的缺陷——Scalability。一個機(jī)器的性能總是有瓶頸的。兩個場景的邏輯雖然由多個線程實現(xiàn)了并發(fā),但是運算量十分有可能是一臺機(jī)器無法承載的。如果是多進(jìn)程并發(fā),那么可以分布式把其部署到其他機(jī)器(也可部署在一臺機(jī)器)。所以多進(jìn)程并發(fā)比多線程并發(fā)更加Scalability。另外采用多進(jìn)程后,每個進(jìn)程單線程設(shè)計,這樣的程序更加Simplicity。多進(jìn)程的其他優(yōu)點如解耦、模塊化、方便調(diào)試、方便重用等就不贅言了。
異步消息/接口調(diào)用
提到分布式,就要說一下分布式的通訊技術(shù)。常用的方式如下:
- 類RPC;包括WebService、RPC、ICE等,特點是遠(yuǎn)程同步調(diào)用。遠(yuǎn)程的接口和本地的接口非常相似。但是游戲服務(wù)器程序一般非常在意延遲和吞吐量,所以這些阻塞線程的同步遠(yuǎn)程調(diào)用方式并不常用。但是我們必須意識到他的優(yōu)點,就是非常利于調(diào)用和測試。
- 全異步消息;當(dāng)調(diào)用遠(yuǎn)程接口的時候,異步發(fā)送請求消息,接口響應(yīng)后返回一個結(jié)果消息,調(diào)用方的回調(diào)函數(shù)處理結(jié)果消息繼續(xù)邏輯操作。所以有些邏輯就會被切割成ServiceStart和ServiceCallback兩段。有時異步會講領(lǐng)域邏輯變得支離破碎。另外消息處理函數(shù)中一般會寫一坨的 switch/case 處理不同的消息。***的問題在于單元測試,這種情況傳統(tǒng)單元測試根本束手無策。
消息的序列化與Reflection
實現(xiàn)消息的序列化和反序列化的方式有很多,常見的有Struct、json、Protobuff等都有很成功的應(yīng)用。我個人傾向于使用輕量級的二進(jìn)制序列化,優(yōu)點是比較透明和高效,一切在掌握之中。在FFLIB 中實現(xiàn)了bin_encoder_t 和 bin_decoder_t 輕量級的消息序列化,幾十行代碼而已。
性能優(yōu)化
已經(jīng)寫過關(guān)于性能方面的總結(jié),參見
http://www.cnblogs.com/zhiranok/archive/2012/06/06/cpp_perf.html
有的網(wǎng)友提到profiler、cpuprofiler、callgrind等工具。這些工具我都使用過,說實話,對于我來說,我太認(rèn)同它有很高的價值。***他們只能用于開發(fā)測試階段,可以初步得到一些性能上參考數(shù)據(jù)。第二它們?nèi)绾螌崿F(xiàn)跟蹤人們無從得知。運行其會使程序變慢,不能反映真實數(shù)據(jù)。第三重要的是,開發(fā)測試階段性能和上線后的能一樣嗎?Impossible !
關(guān)于性能,原則就是數(shù)據(jù)說話,詳見博文,不在贅述。
單元測試
關(guān)于單元測試,前邊已經(jīng)談?wù)摿艘恍S螒蚍?wù)器程序一般都比較龐大,但是不可思議的是,鄙人從來沒見有項目(c++ 后臺架構(gòu)的)有完整單元測試的。由于存在著異步和多線程,傳統(tǒng)的單元測試框架無法勝任,而開發(fā)支持異步的測試框架又是不現(xiàn)實的。我們必須看到的是,傳統(tǒng)的單元測試框架已經(jīng)取得了非常大的成功。據(jù)我了解,使用web 架構(gòu)的游戲后臺已經(jīng)對于單元測試的使用已經(jīng)非常成熟,取得了極其好的效果。所以我的思路是利用現(xiàn)有的單元測試框架,將異步消息、多線程的架構(gòu)做出調(diào)整。
已經(jīng)多次談?wù)搯卧獪y試了。其實在開發(fā)FFLIB的思路很大程度來源于此,否則可能只是一個c++ 網(wǎng)絡(luò)庫而已。我決定嘗試去解決這個問題的時候,把FFLIB 定位于框架。
先來看一段非常簡單的單元測試的代碼 :
- Assert(2 == Add(1, 1));
請允許我對這行代碼做些解釋,對Add函數(shù)輸入?yún)?shù),驗證返回值是否是預(yù)期的結(jié)果。這不就是單元測試的本質(zhì)嗎?在想一下我們異步發(fā)送消息的過程,如果每個輸入消息約定一個結(jié)果消息包,每次發(fā)送請求時都綁定一個回調(diào)函數(shù)接收和驗證結(jié)果消息包。這樣的話就恰恰滿足了傳統(tǒng)單元測試的步驟了。***還需解決一個問題,Assert是不能處理異步的返回值的。幸運的是,future機(jī)制可以化異步為同步。不了解future 模式的可以參考這里:
http://blog.chinaunix.net/uid-23093301-id-190969.html
http://msdn.microsoft.com/zh-cn/library/dd764564.aspx#Y300
來看一下在FFLIB框架下遠(yuǎn)程調(diào)用echo 服務(wù)的示例:
- struct lambda_t
- {
- static void callback(echo_t::out_t& msg_)
- {
- echo_t::in_t in;
- in.value = "XXX_echo_test_XXX";
- singleton_t<msg_bus_t>::instance()
- .get_service_group("echo")
- ->get_service(1)->async_call(in, &lambda_t::callback);
- }
- };
- echo_t::in_t in;
- in.value = "XXX_echo_test_XXX";
- singleton_t<msg_bus_t>::instance().get_service_group("echo")->get_service(1)->async_call(in, &lambda_t::callback);
當(dāng)需要調(diào)用遠(yuǎn)程接口時,async_call(in, &lambda_t::callback); 異步調(diào)用必須綁定一個回調(diào)函數(shù),回調(diào)函數(shù)接收結(jié)果消息,可以觸發(fā)后續(xù)操作。這樣的話,如果對echo 的遠(yuǎn)程接口做單元測試,可以這樣做:
- rpc_future_t< echo_t::out_t> rpc_future;
- echo_t::in_t in;
- in.value = "XXX_echo_test_XXX";
- const echo_t::out_t& out = rpc_future.call(
- singleton_t<msg_bus_t>::instance()
- .get_service_group("echo")->get_service(1), in);
- Assert(in.value == out.value);
這樣所有的遠(yuǎn)程接口都可以被單元測試覆蓋。
FFLIB 介紹
FFLIB 結(jié)構(gòu)圖
如圖所示,Client 不會直接和Service 相連接,而是通過Broker 中間層完成了消息傳遞。關(guān)于Broker 模式可以參見:http://blog.chinaunix.net/uid-23093301-id-90459.html
進(jìn)程間通信采用TPC,而不是多線程使用的共享內(nèi)存方式。Service 一般是單線程架構(gòu)的,通過啟動多進(jìn)程實現(xiàn)相對于多線程的并發(fā)。由于Broker模式天生石分布式的,所以有很好的Scalability。
消息時序圖
#p#
如何注冊服務(wù)和接口
來看一下Echo 服務(wù)的實現(xiàn):
- struct echo_service_t
- {
- public:
- void echo(echo_t::in_t& in_msg_, rpc_callcack_t<echo_t::out_t>& cb_)
- {
- logtrace((FF, "echo_service_t::echo done value<%s>", in_msg_.value.c_str()));
- echo_t::out_t out;
- out.value = in_msg_.value;
- cb_(out);
- }
- };
- int main(int argc, char* argv[])
- {
- int g_index = 1;
- if (argc > 1)
- {
- g_index = atoi(argv[1]);
- }
- char buff[128];
- snprintf(buff, sizeof(buff), "tcp://%s:%s", "127.0.0.1", "10241");
- msg_bus_t msg_bus;
- assert(0 == singleton_t<msg_bus_t>::instance().open("tcp://127.0.0.1:10241") && "can't connnect to broker");
- echo_service_t f;
- singleton_t<msg_bus_t>::instance().create_service_group("echo");
- singleton_t<msg_bus_t>::instance().create_service("echo", g_index)
- .bind_service(&f)
- .reg(&echo_service_t::echo);
- signal_helper_t::wait();
- singleton_t<msg_bus_t>::instance().close();
- //usleep(1000);
- cout <<"\noh end\n";
- return 0;
- }
- create_service_group 創(chuàng)建一個服務(wù)group,一個服務(wù)組可能有多個并行的實例
- create_service 以特定的id 創(chuàng)建一個服務(wù)實例
- reg 為該服務(wù)注冊接口
- 接口的定義規(guī)范為void echo(echo_t::in_t& in_msg_, rpc_callcack_t<echo_t::out_t>& cb_),***個參數(shù)為輸入的消息struct,第二個參數(shù)為回調(diào)函數(shù)的模板特例,模板參數(shù)為返回消息的struct 類型。接口無需知道發(fā)送消息等細(xì)節(jié),只需將結(jié)果callback 即可。
- 注冊到Broker 后,所有Client都可獲取該服務(wù)
消息定義的規(guī)范
我們約定每個接口(遠(yuǎn)程或本地都應(yīng)滿足)都包含一個輸入消息和一個結(jié)果消息。來看一下echo 服務(wù)的消息定義:
- struct echo_t
- {
- struct in_t: public msg_i
- {
- in_t():
- msg_i("echo_t::in_t")
- {}
- virtual string encode()
- {
- return (init_encoder() << value).get_buff();
- }
- virtual void decode(const string& src_buff_)
- {
- init_decoder(src_buff_) >> value;
- }
- string value;
- };
- struct out_t: public msg_i
- {
- out_t():
- msg_i("echo_t::out_t")
- {}
- virtual string encode()
- {
- return (init_encoder() << value).get_buff();
- }
- virtual void decode(const string& src_buff_)
- {
- init_decoder(src_buff_) >> value;
- }
- string value;
- };
- };
- 每個接口必須包含in_t消息和out_t消息,并且他們定義在接口名(如echo _t)的內(nèi)部
- 所有消息都繼承于msg_i, 其封裝了二進(jìn)制的序列化、反序列化等。構(gòu)造時賦予類型名作為消息的名稱。
- 每個消息必須實現(xiàn)encode 和 decode 函數(shù)
這里需要指出的是,F(xiàn)FLIB 中不需要為每個消息定義對應(yīng)的CMD。當(dāng)接口如echo向Broker 注冊時,reg接口通過C++ 模板的類型推斷會自動將該msg name 注冊給Broker, Broker為每個msg name 分配唯一的msg_id。Msg_bus 中自動維護(hù)了msg_name 和msg_id 的映射。Msg_i 的定義如下:
- struct msg_i : public codec_i
- {
- msg_i(const char* msg_name_):
- cmd(0),
- uuid(0),
- service_group_id(0),
- service_id(0),
- msg_id(0),
- msg_name(msg_name_)
- {}
- void set(uint16_t group_id, uint16_t id_, uint32_t uuid_, uint16_t msg_id_)
- {
- service_group_id = group_id;
- service_id = id_;
- uuid = uuid_;
- msg_id = msg_id_;
- }
- uint16_t cmd;
- uint16_t get_group_id() const{ return service_group_id; }
- uint16_t get_service_id() const{ return service_id; }
- uint32_t get_uuid() const{ return uuid; }
- uint16_t get_msg_id() const{ return msg_id; }
- const string& get_name() const
- {
- if (msg_name.empty() == false)
- {
- return msg_name;
- }
- return singleton_t<msg_name_store_t>::instance().id_to_name(this->get_msg_id());
- }
- void set_uuid(uint32_t id_) { uuid = id_; }
- void set_msg_id(uint16_t id_) { msg_id = id_;}
- void set_sgid(uint16_t sgid_) { service_group_id = sgid_;}
- void set_sid(uint16_t sid_) { service_id = sid_; }
- uint32_t uuid;
- uint16_t service_group_id;
- uint16_t service_id;
- uint16_t msg_id;
- string msg_name;
- virtual string encode(uint16_t cmd_)
- {
- this->cmd = cmd_;
- return encode();
- }
- virtual string encode() = 0;
- bin_encoder_t& init_encoder()
- {
- return encoder.init(cmd) << uuid << service_group_id << service_id<< msg_id;
- }
- bin_encoder_t& init_encoder(uint16_t cmd_)
- {
- return encoder.init(cmd_) << uuid << service_group_id << service_id << msg_id;
- }
- bin_decoder_t& init_decoder(const string& buff_)
- {
- return decoder.init(buff_) >> uuid >> service_group_id >> service_id >> msg_id;
- }
- bin_decoder_t decoder;
- bin_encoder_t encoder;
- };
關(guān)于性能
由于遠(yuǎn)程接口的調(diào)用必須通過Broker, Broker會為每個接口自動生成性能統(tǒng)計數(shù)據(jù),并每10分鐘輸出到perf.txt 文件中。文件格式為CSV,參見:
http://www.cnblogs.com/zhiranok/archive/2012/06/06/cpp_perf.html
總結(jié)
FFLIB框架擁有如下的特點:
- 使用多進(jìn)程并發(fā)。Broker 把Client 和Service 的位置透明化
- Service 的接口要注冊到Broker, 所有連接Broker的Client 都可以調(diào)用(publisher/ subscriber)
- 遠(yuǎn)程調(diào)用必須綁定回調(diào)函數(shù)
- 利用future 模式實現(xiàn)同步,從而支持單元測試
- 消息定義規(guī)范簡單直接高效
- 所有service的接口性能監(jiān)控數(shù)據(jù)自動生成,免費的午餐
- Service 單線程話,更simplicity
源代碼:
Svn co http://ffown.googlecode.com/svn/trunk/
運行示例:
- Cd example/broker && make && ./app_broker –l http://127.0.0.1:10241
- Cd example/echo_server && make && ./app_echo_server
- Cd example/echo_client && make && ./app_echo_client
原文鏈接:C++多進(jìn)程并發(fā)框架