自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

C++多進(jìn)程并發(fā)框架

開發(fā) 后端
三年來一直從事服務(wù)器程序開發(fā),一直都是忙忙碌碌,不久前結(jié)束了職業(yè)生涯的第一份工作,有了一個禮拜的休息時間,終于可以寫寫總結(jié)了。于是把以前的開源代碼做了整理和優(yōu)化,這就是FFLIB。

三年來一直從事服務(wù)器程序開發(fā),一直都是忙忙碌碌,不久前結(jié)束了職業(yè)生涯的***份工作,有了一個禮拜的休息時間,終于可以寫寫總結(jié)了。于是把以前的開源代碼做了整理和優(yōu)化,這就是FFLIB。雖然這邊總結(jié)看起來像日記,有很多廢話,但是此文仍然是有很大針對性的。針對服務(wù)器開發(fā)中常見的問題,如多線程并發(fā)、消息轉(zhuǎn)發(fā)、異步、性能優(yōu)化、單元測試,提出自己的見解。

面對的問題

從事開發(fā)工程中,遇到過不少問題,很多時候由于時間緊迫,沒有使用優(yōu)雅的方案。在跟業(yè)內(nèi)的一些朋友交流過程中,我也意識到有些問題是大家都存在的。簡單列舉如下:

  • 多線程與并發(fā)
  • 異步消息/接口調(diào)用
  • 消息的序列化與Reflection
  • 性能優(yōu)化
  • 單元測試

多線程與并發(fā)

現(xiàn)在是多核時代,并發(fā)才能實現(xiàn)更高的吞吐量、更快的響應(yīng),但也是把雙刃劍??偨Y(jié)如下幾個用法:

  • 多線程+顯示鎖;接口是被多線程調(diào)用的,當(dāng)被調(diào)用時,顯示加鎖,再操作實體數(shù)據(jù)。悲劇的是,工程師為了優(yōu)化會設(shè)計多個鎖,以減少鎖的粒度,甚至有些地方使用了原子操作。這些都為領(lǐng)域邏輯增加了額外的設(shè)計負(fù)擔(dān)。最壞的情況是會出現(xiàn)死鎖。
  • 多線程+任務(wù)隊列;接口被多線程調(diào)用,但請求會被暫存到任務(wù)隊列,而任務(wù)隊列會被單線程不斷執(zhí)行,典型生產(chǎn)者消費者模式。它的并發(fā)在于不同的接口可以使用不同的任務(wù)隊列。這也是我最常用的并發(fā)方式。

這是兩種最常見的多線程并發(fā),它們有個天生的缺陷——Scalability。一個機(jī)器的性能總是有瓶頸的。兩個場景的邏輯雖然由多個線程實現(xiàn)了并發(fā),但是運算量十分有可能是一臺機(jī)器無法承載的。如果是多進(jìn)程并發(fā),那么可以分布式把其部署到其他機(jī)器(也可部署在一臺機(jī)器)。所以多進(jìn)程并發(fā)比多線程并發(fā)更加Scalability。另外采用多進(jìn)程后,每個進(jìn)程單線程設(shè)計,這樣的程序更加Simplicity。多進(jìn)程的其他優(yōu)點如解耦、模塊化、方便調(diào)試、方便重用等就不贅言了。

異步消息/接口調(diào)用

提到分布式,就要說一下分布式的通訊技術(shù)。常用的方式如下:

  • 類RPC;包括WebService、RPC、ICE等,特點是遠(yuǎn)程同步調(diào)用。遠(yuǎn)程的接口和本地的接口非常相似。但是游戲服務(wù)器程序一般非常在意延遲和吞吐量,所以這些阻塞線程的同步遠(yuǎn)程調(diào)用方式并不常用。但是我們必須意識到他的優(yōu)點,就是非常利于調(diào)用和測試。
  • 全異步消息;當(dāng)調(diào)用遠(yuǎn)程接口的時候,異步發(fā)送請求消息,接口響應(yīng)后返回一個結(jié)果消息,調(diào)用方的回調(diào)函數(shù)處理結(jié)果消息繼續(xù)邏輯操作。所以有些邏輯就會被切割成ServiceStart和ServiceCallback兩段。有時異步會講領(lǐng)域邏輯變得支離破碎。另外消息處理函數(shù)中一般會寫一坨的 switch/case 處理不同的消息。***的問題在于單元測試,這種情況傳統(tǒng)單元測試根本束手無策。

消息的序列化與Reflection

實現(xiàn)消息的序列化和反序列化的方式有很多,常見的有Struct、json、Protobuff等都有很成功的應(yīng)用。我個人傾向于使用輕量級的二進(jìn)制序列化,優(yōu)點是比較透明和高效,一切在掌握之中。在FFLIB 中實現(xiàn)了bin_encoder_t 和 bin_decoder_t 輕量級的消息序列化,幾十行代碼而已。

性能優(yōu)化

已經(jīng)寫過關(guān)于性能方面的總結(jié),參見

http://www.cnblogs.com/zhiranok/archive/2012/06/06/cpp_perf.html

有的網(wǎng)友提到profiler、cpuprofiler、callgrind等工具。這些工具我都使用過,說實話,對于我來說,我太認(rèn)同它有很高的價值。***他們只能用于開發(fā)測試階段,可以初步得到一些性能上參考數(shù)據(jù)。第二它們?nèi)绾螌崿F(xiàn)跟蹤人們無從得知。運行其會使程序變慢,不能反映真實數(shù)據(jù)。第三重要的是,開發(fā)測試階段性能和上線后的能一樣嗎?Impossible !

關(guān)于性能,原則就是數(shù)據(jù)說話,詳見博文,不在贅述。

單元測試

關(guān)于單元測試,前邊已經(jīng)談?wù)摿艘恍S螒蚍?wù)器程序一般都比較龐大,但是不可思議的是,鄙人從來沒見有項目(c++ 后臺架構(gòu)的)有完整單元測試的。由于存在著異步和多線程,傳統(tǒng)的單元測試框架無法勝任,而開發(fā)支持異步的測試框架又是不現(xiàn)實的。我們必須看到的是,傳統(tǒng)的單元測試框架已經(jīng)取得了非常大的成功。據(jù)我了解,使用web 架構(gòu)的游戲后臺已經(jīng)對于單元測試的使用已經(jīng)非常成熟,取得了極其好的效果。所以我的思路是利用現(xiàn)有的單元測試框架,將異步消息、多線程的架構(gòu)做出調(diào)整。

已經(jīng)多次談?wù)搯卧獪y試了。其實在開發(fā)FFLIB的思路很大程度來源于此,否則可能只是一個c++ 網(wǎng)絡(luò)庫而已。我決定嘗試去解決這個問題的時候,把FFLIB 定位于框架。

先來看一段非常簡單的單元測試的代碼 :

  1. Assert(2 == Add(11)); 

請允許我對這行代碼做些解釋,對Add函數(shù)輸入?yún)?shù),驗證返回值是否是預(yù)期的結(jié)果。這不就是單元測試的本質(zhì)嗎?在想一下我們異步發(fā)送消息的過程,如果每個輸入消息約定一個結(jié)果消息包,每次發(fā)送請求時都綁定一個回調(diào)函數(shù)接收和驗證結(jié)果消息包。這樣的話就恰恰滿足了傳統(tǒng)單元測試的步驟了。***還需解決一個問題,Assert是不能處理異步的返回值的。幸運的是,future機(jī)制可以化異步為同步。不了解future 模式的可以參考這里:

http://blog.chinaunix.net/uid-23093301-id-190969.html

http://msdn.microsoft.com/zh-cn/library/dd764564.aspx#Y300

來看一下在FFLIB框架下遠(yuǎn)程調(diào)用echo 服務(wù)的示例:

  1. struct lambda_t  
  2. {  
  3.   static void callback(echo_t::out_t& msg_)  
  4.   {  
  5.     echo_t::in_t in;  
  6.     in.value = "XXX_echo_test_XXX";  
  7.     singleton_t<msg_bus_t>::instance()  
  8.        .get_service_group("echo")  
  9.        ->get_service(1)->async_call(in, &lambda_t::callback);  
  10.   }  
  11. };  
  12. echo_t::in_t in;  
  13. in.value = "XXX_echo_test_XXX";  
  14. singleton_t<msg_bus_t>::instance().get_service_group("echo")->get_service(1)->async_call(in, &lambda_t::callback); 

當(dāng)需要調(diào)用遠(yuǎn)程接口時,async_call(in, &lambda_t::callback); 異步調(diào)用必須綁定一個回調(diào)函數(shù),回調(diào)函數(shù)接收結(jié)果消息,可以觸發(fā)后續(xù)操作。這樣的話,如果對echo 的遠(yuǎn)程接口做單元測試,可以這樣做:

  1. rpc_future_t< echo_t::out_t> rpc_future;  
  2. echo_t::in_t in;  
  3. in.value = "XXX_echo_test_XXX";  
  4. const echo_t::out_t& out = rpc_future.call(  
  5.     singleton_t<msg_bus_t>::instance()  
  6.         .get_service_group("echo")->get_service(1), in);  
  7. Assert(in.value == out.value); 

這樣所有的遠(yuǎn)程接口都可以被單元測試覆蓋。

FFLIB 介紹

 FFLIB 結(jié)構(gòu)圖

如圖所示,Client 不會直接和Service 相連接,而是通過Broker 中間層完成了消息傳遞。關(guān)于Broker 模式可以參見:http://blog.chinaunix.net/uid-23093301-id-90459.html

進(jìn)程間通信采用TPC,而不是多線程使用的共享內(nèi)存方式。Service 一般是單線程架構(gòu)的,通過啟動多進(jìn)程實現(xiàn)相對于多線程的并發(fā)。由于Broker模式天生石分布式的,所以有很好的Scalability。

消息時序圖

#p#

如何注冊服務(wù)和接口

來看一下Echo 服務(wù)的實現(xiàn):

  1. struct echo_service_t  
  2. {  
  3. public:  
  4.     void echo(echo_t::in_t& in_msg_, rpc_callcack_t<echo_t::out_t>& cb_)  
  5.     {  
  6.         logtrace((FF, "echo_service_t::echo done value<%s>", in_msg_.value.c_str()));  
  7.         echo_t::out_t out;  
  8.         out.value = in_msg_.value;  
  9.         cb_(out);  
  10.     }  
  11. };  
  12.  
  13. int main(int argc, char* argv[])  
  14. {  
  15.     int g_index = 1;  
  16.     if (argc > 1)  
  17.     {  
  18.         g_index = atoi(argv[1]);  
  19.     }  
  20.     char buff[128];  
  21.     snprintf(buff, sizeof(buff), "tcp://%s:%s""127.0.0.1""10241");  
  22.  
  23.     msg_bus_t msg_bus;  
  24.     assert(0 == singleton_t<msg_bus_t>::instance().open("tcp://127.0.0.1:10241") && "can't connnect to broker");  
  25.  
  26.     echo_service_t f;  
  27.  
  28.     singleton_t<msg_bus_t>::instance().create_service_group("echo");  
  29.     singleton_t<msg_bus_t>::instance().create_service("echo", g_index)  
  30.             .bind_service(&f)  
  31.             .reg(&echo_service_t::echo);  
  32.  
  33.     signal_helper_t::wait();  
  34.  
  35.     singleton_t<msg_bus_t>::instance().close();  
  36.     //usleep(1000);  
  37.     cout <<"\noh end\n";  
  38.     return 0;  
  • create_service_group 創(chuàng)建一個服務(wù)group,一個服務(wù)組可能有多個并行的實例  
  • create_service 以特定的id 創(chuàng)建一個服務(wù)實例
  • reg 為該服務(wù)注冊接口
  • 接口的定義規(guī)范為void echo(echo_t::in_t& in_msg_, rpc_callcack_t<echo_t::out_t>& cb_),***個參數(shù)為輸入的消息struct,第二個參數(shù)為回調(diào)函數(shù)的模板特例,模板參數(shù)為返回消息的struct 類型。接口無需知道發(fā)送消息等細(xì)節(jié),只需將結(jié)果callback 即可。
  • 注冊到Broker 后,所有Client都可獲取該服務(wù)

消息定義的規(guī)范

我們約定每個接口(遠(yuǎn)程或本地都應(yīng)滿足)都包含一個輸入消息和一個結(jié)果消息。來看一下echo 服務(wù)的消息定義:

  1. struct echo_t  
  2. {  
  3.     struct in_t: public msg_i  
  4.     {  
  5.         in_t():  
  6.             msg_i("echo_t::in_t")  
  7.         {}  
  8.         virtual string encode()  
  9.         {  
  10.             return (init_encoder() << value).get_buff();  
  11.         }  
  12.         virtual void decode(const string& src_buff_)  
  13.         {  
  14.             init_decoder(src_buff_) >> value;  
  15.         }  
  16.  
  17.         string value;  
  18.     };  
  19.     struct out_t: public msg_i  
  20.     {  
  21.         out_t():  
  22.             msg_i("echo_t::out_t")  
  23.         {}  
  24.         virtual string encode()  
  25.         {  
  26.             return (init_encoder() << value).get_buff();  
  27.         }  
  28.         virtual void decode(const string& src_buff_)  
  29.         {  
  30.             init_decoder(src_buff_) >> value;  
  31.         }  
  32.  
  33.         string value;  
  34.     };  
  35. }; 
  •  每個接口必須包含in_t消息和out_t消息,并且他們定義在接口名(如echo _t)的內(nèi)部
  • 所有消息都繼承于msg_i, 其封裝了二進(jìn)制的序列化、反序列化等。構(gòu)造時賦予類型名作為消息的名稱。
  • 每個消息必須實現(xiàn)encode 和 decode 函數(shù)

這里需要指出的是,F(xiàn)FLIB 中不需要為每個消息定義對應(yīng)的CMD。當(dāng)接口如echo向Broker 注冊時,reg接口通過C++ 模板的類型推斷會自動將該msg name 注冊給Broker, Broker為每個msg name 分配唯一的msg_id。Msg_bus 中自動維護(hù)了msg_name 和msg_id 的映射。Msg_i 的定義如下:

  1. struct msg_i : public codec_i  
  2. {  
  3.     msg_i(const char* msg_name_):  
  4.         cmd(0),  
  5.         uuid(0),  
  6.         service_group_id(0),  
  7.         service_id(0),  
  8.         msg_id(0),  
  9.         msg_name(msg_name_)  
  10.     {}  
  11.  
  12.     void set(uint16_t group_id, uint16_t id_, uint32_t uuid_, uint16_t msg_id_)  
  13.     {  
  14.         service_group_id = group_id;  
  15.         service_id       = id_;  
  16.         uuid             = uuid_;  
  17.         msg_id           = msg_id_;  
  18.     }  
  19.  
  20.     uint16_t cmd;  
  21.     uint16_t get_group_id()   constreturn service_group_id; }  
  22.     uint16_t get_service_id() constreturn service_id;       }  
  23.     uint32_t get_uuid()       constreturn uuid;             }  
  24.  
  25.     uint16_t get_msg_id()     constreturn msg_id;           }  
  26.     const string& get_name()  const 
  27.     {  
  28.         if (msg_name.empty() == false)  
  29.         {  
  30.             return msg_name;  
  31.         }  
  32.         return singleton_t<msg_name_store_t>::instance().id_to_name(this->get_msg_id());  
  33.     }  
  34.  
  35.     void     set_uuid(uint32_t id_)   { uuid = id_;  }  
  36.     void     set_msg_id(uint16_t id_) { msg_id = id_;}  
  37.     void     set_sgid(uint16_t sgid_) { service_group_id = sgid_;}  
  38.     void     set_sid(uint16_t sid_)   { service_id = sid_; }  
  39.     uint32_t uuid;  
  40.     uint16_t service_group_id;  
  41.     uint16_t service_id;  
  42.     uint16_t msg_id;  
  43.     string   msg_name;  
  44.  
  45.     virtual string encode(uint16_t cmd_)  
  46.     {  
  47.         this->cmd = cmd_;  
  48.         return encode();  
  49.     }  
  50.     virtual string encode() = 0;  
  51.     bin_encoder_t& init_encoder()  
  52.     {  
  53.         return encoder.init(cmd)  << uuid << service_group_id << service_id<< msg_id;  
  54.     }  
  55.     bin_encoder_t& init_encoder(uint16_t cmd_)  
  56.     {  
  57.         return encoder.init(cmd_) << uuid << service_group_id << service_id << msg_id;  
  58.     }  
  59.     bin_decoder_t& init_decoder(const string& buff_)  
  60.     {  
  61.         return decoder.init(buff_) >> uuid >> service_group_id >> service_id >> msg_id;  
  62.     }  
  63.     bin_decoder_t decoder;  
  64.     bin_encoder_t encoder;  
  65. }; 

關(guān)于性能

由于遠(yuǎn)程接口的調(diào)用必須通過Broker, Broker會為每個接口自動生成性能統(tǒng)計數(shù)據(jù),并每10分鐘輸出到perf.txt 文件中。文件格式為CSV,參見:

http://www.cnblogs.com/zhiranok/archive/2012/06/06/cpp_perf.html

總結(jié)

FFLIB框架擁有如下的特點:

  • 使用多進(jìn)程并發(fā)。Broker 把Client 和Service 的位置透明化
  • Service 的接口要注冊到Broker, 所有連接Broker的Client 都可以調(diào)用(publisher/ subscriber)
  • 遠(yuǎn)程調(diào)用必須綁定回調(diào)函數(shù)
  • 利用future 模式實現(xiàn)同步,從而支持單元測試
  • 消息定義規(guī)范簡單直接高效
  • 所有service的接口性能監(jiān)控數(shù)據(jù)自動生成,免費的午餐
  • Service 單線程話,更simplicity

源代碼:

Svn co http://ffown.googlecode.com/svn/trunk/

運行示例:

  • Cd example/broker && make && ./app_broker –l http://127.0.0.1:10241
  • Cd example/echo_server && make && ./app_echo_server
  • Cd example/echo_client && make && ./app_echo_client

原文鏈接:C++多進(jìn)程并發(fā)框架

責(zé)任編輯:林師授 來源: mysqlops.com
相關(guān)推薦

2012-08-09 08:56:34

C++

2017-06-30 10:12:46

Python多進(jìn)程

2022-04-01 13:10:20

C++服務(wù)器代碼

2015-04-21 13:37:44

Google開源CC++版

2024-12-27 08:11:44

Python編程模式IO

2010-07-15 12:51:17

Perl多進(jìn)程

2010-02-01 10:54:37

C++框架

2024-09-29 10:39:14

并發(fā)Python多線程

2024-01-03 10:03:26

PythonTCP服務(wù)器

2021-10-12 09:52:30

Webpack 前端多進(jìn)程打包

2024-03-29 06:44:55

Python多進(jìn)程模塊工具

2016-01-11 10:29:36

Docker容器容器技術(shù)

2020-10-20 17:35:42

srpcRPC語言

2025-01-27 00:54:31

2024-08-26 08:39:26

PHP孤兒進(jìn)程僵尸進(jìn)程

2010-01-14 14:17:20

Visual C++

2019-02-26 11:15:25

進(jìn)程多線程多進(jìn)程

2009-04-21 09:12:45

Java多進(jìn)程運行

2021-02-25 11:19:37

谷歌Android開發(fā)者

2022-03-09 17:01:32

Python多線程多進(jìn)程
點贊
收藏

51CTO技術(shù)棧公眾號