C++多進(jìn)程并發(fā)框架FFLIB之Tutorial
基本介紹可以看這里:
http://www.cnblogs.com/zhiranok/archive/2012/07/30/fflib_framework.html
其中之所以特意采用了Broker模式,是吸收了MPI和Erlang的思想。
關(guān)于MPI:http://www.mcs.anl.gov/research/projects/mpi/
關(guān)于Erlang:http://www.erlang.org/
FFLIB 目前處于alpha階段,一些有用的功能還需繼續(xù)添加。但是FFLIB一開始就是為了解決實(shí)際問題而生。Broker 即可以以獨(dú)立進(jìn)程運(yùn)行,也可以集成到某個特定的進(jìn)程中啟動。除了這些,F(xiàn)FLIB中使用epoll實(shí)現(xiàn)的網(wǎng)絡(luò)層也***參考價值。網(wǎng)上有一些關(guān)于epoll ET 和 LT的討論,關(guān)于哪種方式更簡單,本人的答案是ET。ET模式下epoll 就是一個完全狀態(tài)機(jī)。開發(fā)者只需實(shí)現(xiàn)FD的read、write、error 三種狀態(tài)即可。
我進(jìn)一步挖掘FFLIB的功能。寫一篇FFLIB的Tutorial。創(chuàng)建更多的FFLIB使用示例,以此來深入探討FFLIB的意義。在游戲開發(fā)中,或者一些分布式的環(huán)境中,有許多大家熟悉的模式。,本文挑選了如下作為FFLIB示例:
Request/Reply
點(diǎn)對點(diǎn)通訊
阻塞通訊
多播通訊
Map/Reduce
Request/Reply
異步的Request/Reply
在FFLIB中所有的消息都是Request和Reply一一對應(yīng)的,默認(rèn)情況下工作在異步模式。假設(shè)如下場景,F(xiàn)lash連入GatewayServer并發(fā)送Login消息包,GatewaServer 解析用戶名密碼,調(diào)用LoginServer 驗(yàn)證。
首先定義msg:
- struct user_login_t
- {
- struct in_t: public msg_i
- {
- in_t():
- msg_i("user_login_t::in_t")
- {}
- string encode()
- {
- return (init_encoder() << uid << value).get_buff();
- }
- void decode(const string& src_buff_)
- {
- init_decoder(src_buff_) >> uid >> value;
- }
- long uid;
- string value;
- };
- struct out_t: public msg_i
- {
- out_t():
- msg_i("user_login_t::out_t")
- {}
- string encode()
- {
- return (init_encoder() << value).get_buff();
- }
- void decode(const string& src_buff_)
- {
- init_decoder(src_buff_) >> value;
- }
- bool value;
- };
- };
LoginServer中如此定義接口:
- class login_server_t
- {
- public:
- void verify(user_login_t::in_t& in_msg_, rpc_callcack_t<user_login_t::out_t>& cb_)
- {
- user_login_t::out_t out;
- out.value = true;
- cb_(out);
- }
- };
- login_server_t login_server;
- singleton_t<msg_bus_t>::instance().create_service("login_server", 1)
- .bind_service(&login_server)
- .reg(&login_server_t::verify);
在GatewayServer中調(diào)用上面接口:
- struct lambda_t
- {
- static void callback(user_login_t::out_t& msg_, socket_ptr_t socket_)
- {
- if (true == msg_.value)
- {
- //! socket_->send_msg("login ok");
- }
- else
- {
- //! socket_->send_msg("login failed");
- }
- }
- };
- user_login_t::in_t in;
- in.uid = 520;
- in.value = "ILoveYou";
- socket_ptr_t flash_socket = NULL;//! TODO
- singleton_t<msg_bus_t>::instance()
- .get_service_group("login_server_t")
- ->get_service(1)
- ->async_call(in, binder_t::callback(&lambda_t::callback, flash_socket));
如上所示, async_call 可以通過binder_t模板函數(shù)為回調(diào)函綁定參數(shù)。
同步的Request/Reply
大部分時候我們期望Reply被異步處理,但有時Reply 必須被首先處理后才能觸發(fā)后續(xù)操作,一般這種情況發(fā)生在程序初始化之時。假設(shè)如下場景,SceneServer啟動時必須從SuperServer中獲取配置,然后才能執(zhí)行加載場景數(shù)據(jù)等后續(xù)初始化操作。
首先定義通信的msg:
- struct config_t
- {
- struct in_t: public msg_i
- {
- in_t():
- msg_i("config_t::in_t")
- {}
- string encode()
- {
- return (init_encoder() << server_type << server_id).get_buff();
- }
- void decode(const string& src_buff_)
- {
- init_decoder(src_buff_) >> server_type >> server_id;
- }
- int server_type;
- int server_id;
- };
- struct out_t: public msg_i
- {
- out_t():
- msg_i("config_t::out_t")
- {}
- string encode()
- {
- return (init_encoder() << value).get_buff();
- }
- void decode(const string& src_buff_)
- {
- init_decoder(src_buff_) >> value;
- }
- map<string, string> value;
- };
- };
如上所示, msg 序列化自動支持map。
SuperServer 中定義返回配置的接口:
- super_server_t super_server;
- singleton_t<msg_bus_t>::instance().create_service("super_server", 1)
- .bind_service(&super_server)
- .reg(&super_server_t::get_config);
- SceneServer 可以如此實(shí)現(xiàn)同步Request/Reply:
- rpc_future_t<config_t::out_t> rpc_future;
- config_t::in_t in;
- in.server_type = 1;
- in.server_id = 1;
- const config_t::out_t& out = rpc_future.call( singleton_t<msg_bus_t>::instance().get_service_group("super_server")
- ->get_service(1), in);
- cout << out.value.size() <<"\n";
- //std::foreach(out.value.begin(), out.value.end(), fuctor_xx);
點(diǎn)對點(diǎn)通訊
異步Request/Reply 已經(jīng)能夠解決大部分問題了,但是有時處理Push模式時稍顯吃了。我們知道消息推算有Push 和Poll兩種方式。了解二者:
http://blog.sina.com.cn/s/blog_6617106b0100hrm1.html
上面提到的Request/Reply 非常適合poll模式,以上一個獲取配置為例,SuperServer由于定義接口的時候只需知道callback,并不知道SceneServer的具體連接。,所以SuperServer不能向SceneServer Push消息。在FFLIB中并沒有限定某個節(jié)點(diǎn)必須是Client或只能是Service,實(shí)際上可以兼有二者的角色。SceneServer 也可以提供接口供SuperServer調(diào)用,這就符合了Push的語義。假設(shè)如下場景,GatewayServer需要在用戶登入時調(diào)用通知SessionServer,而某一時刻SessionServer也可能呢通知GatewayServer 強(qiáng)制某用戶下線。二者互為client和service。大家必須知道,在FFLIB中實(shí)現(xiàn)兩個節(jié)點(diǎn)的通信只需知道對方的服務(wù)名稱即可,Broker 在此時實(shí)現(xiàn)解耦的作用非常明顯,若要增加對其他節(jié)點(diǎn)的通信,只需通過服務(wù)名稱async_call即可。
定義通信的msg:
- struct user_online_t
- {
- struct in_t: public msg_i
- {
- in_t():
- msg_i("user_online_t::in_t")
- {}
- string encode()
- {
- return (init_encoder() << uid).get_buff();
- }
- void decode(const string& src_buff_)
- {
- init_decoder(src_buff_) >> uid;
- }
- long uid;
- };
- struct out_t: public msg_i
- {
- out_t():
- msg_i("user_online_t::out_t")
- {}
- string encode()
- {
- return (init_encoder() << value).get_buff();
- }
- void decode(const string& src_buff_)
- {
- init_decoder(src_buff_) >> value;
- }
- bool value;
- };
- };
- struct force_user_offline_t
- {
- struct in_t: public msg_i
- {
- in_t():
- msg_i("force_user_offline_t::in_t")
- {}
- string encode()
- {
- return (init_encoder() << uid).get_buff();
- }
- void decode(const string& src_buff_)
- {
- init_decoder(src_buff_) >> uid;
- }
- long uid;
- };
- struct out_t: public msg_i
- {
- out_t():
- msg_i("force_user_offline_t::out_t")
- {}
- string encode()
- {
- return (init_encoder() << value).get_buff();
- }
- void decode(const string& src_buff_)
- {
- init_decoder(src_buff_) >> value;
- }
- bool value;
- };
- };
GatewayServer 通知SessionServer 用戶上線,并提供強(qiáng)制用戶下線的接口:
- class gateway_server_t
- {
- public:
- void force_user_offline(force_user_offline_t::in_t& in_msg_, rpc_callcack_t<force_user_offline_t::out_t>& cb_)
- {
- //! close user socket
- force_user_offline_t::out_t out;
- out.value = true;
- cb_(out);
- }
- };
- gateway_server_t gateway_server;
- singleton_t<msg_bus_t>::instance().create_service("gateway_server", 1)
- .bind_service(&gateway_server)
- .reg(&gateway_server_t::force_user_offline);
- user_online_t::in_t in;
- in.uid = 520;
- singleton_t<msg_bus_t>::instance()
- .get_service_group("session_server")
- ->get_service(1)
- ->async_call(in, callback_TODO);
SessionServer 提供用戶上線接口,可能會調(diào)用GatewayServer 的接口強(qiáng)制用戶下線。
- class session_server_t
- {
- public:
- void user_login(user_online_t::in_t& in_msg_, rpc_callcack_t<user_online_t::out_t>& cb_)
- {
- //! close user socket
- user_online_t::out_t out;
- out.value = true;
- cb_(out);
- }
- };
- session_server_t session_server;
- singleton_t<msg_bus_t>::instance().create_service("session_server", 1)
- .bind_service(&session_server)
- .reg(&session_server_t::user_login);
- force_user_offline_t::in_t in;
- in.uid = 520;
- singleton_t<msg_bus_t>::instance()
- .get_service_group("gateway_server")
- ->get_service(1)
- ->async_call(in, callback_TODO);
多播通信
和點(diǎn)對點(diǎn)通信一樣,要實(shí)現(xiàn)多播,只需要知道目標(biāo)的服務(wù)名稱。特別提一點(diǎn)的是,F(xiàn)FLIB中有服務(wù)組的概念。比如啟動了多個場景服務(wù)器SceneServer,除了數(shù)據(jù)不同,二者接口完全相同,有可能只是相同進(jìn)程的不同實(shí)例。在FFLIB框架中把這些服務(wù)歸為一個服務(wù)組,然后再為每個實(shí)例分配索引id。
假設(shè)如下場景,SuperServer 中要實(shí)現(xiàn)一個GM接口,通知所有SceneServer 重新加載配置。
定義通信的msg:
- struct reload_config_t
- struct in_t: public msg_i
- {
- in_t():
- msg_i("reload_config_t::in_t")
- {}
- string encode()
- {
- return (init_encoder()).get_buff();
- }
- void decode(const string& src_buff_)
- {
- init_decoder(src_buff_);
- }
- };
- struct out_t: public msg_i
- {
- out_t():
- msg_i("reload_config_t::out_t")
- {}
- string encode()
- {
- return (init_encoder() << value).get_buff();
- }
- void decode(const string& src_buff_)
- {
- init_decoder(src_buff_) >> value;
- }
- bool value;
- };
- ;
SceneServer 提供重新載入配置接口:
- class scene_server_t
- {
- public:
- void reload_config(reload_config_t::in_t& in_msg_, rpc_callcack_t<reload_config_t::out_t>& cb_)
- {
- //! close user socket
- reload_config_t::out_t out;
- out.value = true;
- cb_(out);
- }
- };
- scene_server_t scene_server;
- singleton_t<msg_bus_t>::instance().create_service("scene_server", 1)
- .bind_service(&scene_server)
- .reg(&scene_server_t::reload_config);
在SuperServer 中如此實(shí)現(xiàn)多播(跟準(zhǔn)確是廣播,大同小異):
- struct lambda_t
- {
- static void reload_config(rpc_service_t* rs_)
- {
- reload_config_t::in_t in;
- rs_->async_call(in, callback_TODO);
- }
- };
- singleton_t<msg_bus_t>::instance()
- .get_service_group("scene_server")
- ->foreach(&lambda_t::reload_config);
Map/Reduce
在游戲中使用Map/reduce 的情形并不多見,本人找到網(wǎng)上最常見的Map/reduce 實(shí)例 WordCount。情形如下:有一些文本字符串,統(tǒng)計(jì)每個字符出現(xiàn)的次數(shù)。
Map操作,將文本分為多個子文本,分發(fā)給多個Worker 進(jìn)程進(jìn)行統(tǒng)計(jì)
Reduce 操作,將多組worker 進(jìn)程計(jì)算的結(jié)果匯總
Worker:為文本統(tǒng)計(jì)各個字符出現(xiàn)的次數(shù)
定義通信消息:
- struct word_count_t
- {
- struct in_t: public msg_i
- {
- in_t():
- msg_i("word_count_t::in_t")
- {}
- string encode()
- {
- return (init_encoder() << str).get_buff();
- }
- void decode(const string& src_buff_)
- {
- init_decoder(src_buff_) >> str;
- }
- string str;
- };
- struct out_t: public msg_i
- {
- out_t():
- msg_i("word_count_t::out_t")
- {}
- string encode()
- {
- return (init_encoder() << value).get_buff();
- }
- void decode(const string& src_buff_)
- {
- init_decoder(src_buff_) >> value;
- }
- map<char, int> value;
- };
- };
定義woker的接口:
- class worker_t
- {
- public:
- void word_count(word_count_t::in_t& in_msg_, rpc_callcack_t<word_count_t::out_t>& cb_)
- {
- //! close user socket
- word_count_t::out_t out;
- for (size_t i = 0; i < in_msg_.str.size(); ++i)
- {
- map<int, int>::iterator it = out.value.find(in_msg_.str[i]);
- if (it != out.value.end())
- {
- it->second += 1;
- }
- else
- {
- out.value[in_msg_.str[i]] = 1;
- }
- }
- cb_(out);
- }
- };
- worker_t worker;
- for (int i = 0; i < 5; ++i)
- {
- singleton_t<msg_bus_t>::instance().create_service("worker", 1)
- .bind_service(&worker)
- .reg(&worker_t::word_count);
- }
模擬Map/reduce 操作:
- struct lambda_t
- {
- static void reduce(word_count_t::out_t& msg_, map<int, int>* result_, size_t* size_)
- {
- for (map<int, int>::iterator it = msg_.value.begin(); it != msg_.value.end(); ++it)
- {
- map<int, int>::iterator it2 = result_->find(it->first);
- if (it2 != result_->end())
- {
- it2->second += it->second;
- }
- else
- {
- (*result_)[it->first] = it->second;
- }
- }
- if (-- size_ == 0)
- {
- //reduce end!!!!!!!!!!!!!!!!
- delete result_;
- delete size_;
- }
- }
- static void do_map(const char** p, size_t size_)
- {
- map<int, int>* result = new map<int, int>();
- size_t* dest_size = new size_t();
- *dest_size = size_;
- for (size_t i = 0; i < size_; ++i)
- {
- word_count_t::in_t in;
- in.str = p[i];
- singleton_t<msg_bus_t>::instance()
- .get_service_group("worker")
- ->get_service(1 + i % singleton_t<msg_bus_t>::instance().get_service_group("worker")->size())
- ->async_call(in, binder_t::callback(&lambda_t::reduce, result, dest_size));
- }
- }
- };
- const char* str_vec[] = {"oh nice", "oh fuck", "oh no", "oh dear", "oh wonderful", "oh bingo"};
- lambda_t::do_map(str_vec, 6);
總結(jié):
FFLIB 使進(jìn)程間通信更容易
source code: https://ffown.googlecode.com/svn/trunk
原文鏈接:http://www.cnblogs.com/zhiranok/archive/2012/08/08/fflib_tutorial.html
【編輯推薦】