真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯網站制作重慶分公司

C++多進程并發(fā)框架FFLIB中Tutorial的原理是什么

C++多進程并發(fā)框架FFLIB中Tutorial的原理是什么,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。

納溪網站建設公司創(chuàng)新互聯,納溪網站設計制作,有大型網站制作公司豐富經驗。已為納溪1000多家提供企業(yè)網站建設服務。企業(yè)網站搭建\外貿網站制作要多少錢,請找那個售后服務好的納溪做網站的公司定做!

特意采用了Broker模式,是吸收了MPI和Erlang的思想。

關于MPI:http://www.mcs.anl.gov/research/projects/mpi/

關于Erlang:http://www.erlang.org/

FFLIB 目前處于alpha階段,一些有用的功能還需繼續(xù)添加。但是FFLIB一開始就是為了解決實際問題而生。Broker 即可以以獨立進程運行,也可以集成到某個特定的進程中啟動。除了這些,FFLIB中使用epoll實現的網絡層也***參考價值。網上有一些關于epoll ET 和 LT的討論,關于哪種方式更簡單,本人的答案是ET。ET模式下epoll 就是一個完全狀態(tài)機。開發(fā)者只需實現FD的read、write、error 三種狀態(tài)即可。

我進一步挖掘FFLIB的功能。寫一篇FFLIB的Tutorial。創(chuàng)建更多的FFLIB使用示例,以此來深入探討FFLIB的意義。在游戲開發(fā)中,或者一些分布式的環(huán)境中,有許多大家熟悉的模式。,本文挑選了如下作為FFLIB示例:

Request/Reply

點對點通訊

阻塞通訊

多播通訊

Map/Reduce

Request/Reply

異步的Request/Reply

在FFLIB中所有的消息都是Request和Reply一一對應的,默認情況下工作在異步模式。假設如下場景,Flash連入GatewayServer并發(fā)送Login消息包,GatewaServer 解析用戶名密碼,調用LoginServer 驗證。

首先定義msg:

struct user_login_t {     struct in_t: public msg_i     {         in_t():             msg_i("user_login_t::in_t")         {}         string encode()         {             return (init_encoder() << uid << value).get_buff();         }         void decode(const string& src_buff_)         {             init_decoder(src_buff_) >> uid >> value;         }         long   uid;         string value;     };     struct out_t: public msg_i     {         out_t():             msg_i("user_login_t::out_t")         {}         string encode()         {             return (init_encoder() << value).get_buff();         }         void decode(const string& src_buff_)         {             init_decoder(src_buff_) >> value;         }         bool value;     }; };

LoginServer中如此定義接口:

class login_server_t { public:     void verify(user_login_t::in_t& in_msg_, rpc_callcack_t& cb_)     {         user_login_t::out_t out;         out.value = true;         cb_(out);     } }; login_server_t login_server; singleton_t::instance().create_service("login_server", 1)             .bind_service(&login_server)             .reg(&login_server_t::verify);

在GatewayServer中調用上面接口:

struct lambda_t     {         static void callback(user_login_t::out_t& msg_, socket_ptr_t socket_)         {             if (true == msg_.value)             {                 //! socket_->send_msg("login ok");             }             else             {                 //! socket_->send_msg("login failed");             }         }     };     user_login_t::in_t in;     in.uid  = 520;     in.value = "ILoveYou";     socket_ptr_t flash_socket = NULL;//! TODO     singleton_t::instance()          .get_service_group("login_server_t")         ->get_service(1)        ->async_call(in, binder_t::callback(&lambda_t::callback, flash_socket));

如上所示, async_call 可以通過binder_t模板函數為回調函綁定參數。

同步的Request/Reply

大部分時候我們期望Reply被異步處理,但有時Reply 必須被首先處理后才能觸發(fā)后續(xù)操作,一般這種情況發(fā)生在程序初始化之時。假設如下場景,SceneServer啟動時必須從SuperServer中獲取配置,然后才能執(zhí)行加載場景數據等后續(xù)初始化操作。

首先定義通信的msg:

struct config_t {     struct in_t: public msg_i     {         in_t():             msg_i("config_t::in_t")         {}         string encode()         {             return (init_encoder() << server_type << server_id).get_buff();         }         void decode(const string& src_buff_)         {             init_decoder(src_buff_) >> server_type >> server_id;         }         int server_type;         int server_id;     };     struct out_t: public msg_i     {         out_t():             msg_i("config_t::out_t")         {}         string encode()         {             return (init_encoder() << value).get_buff();         }         void decode(const string& src_buff_)         {             init_decoder(src_buff_) >> value;         }         map value;     }; };

如上所示, msg 序列化自動支持map。

SuperServer 中定義返回配置的接口:

super_server_t super_server; singleton_t::instance().create_service("super_server", 1)     .bind_service(&super_server)     .reg(&super_server_t::get_config); SceneServer 可以如此實現同步Request/Reply: rpc_future_t rpc_future; config_t::in_t in; in.server_type = 1; in.server_id   = 1; const config_t::out_t& out = rpc_future.call(  singleton_t::instance().get_service_group("super_server")         ->get_service(1), in); cout << out.value.size() <<"\n"; //std::foreach(out.value.begin(), out.value.end(), fuctor_xx);

點對點通訊

異步Request/Reply 已經能夠解決大部分問題了,但是有時處理Push模式時稍顯吃了。我們知道消息推算有Push 和Poll兩種方式。了解二者:

    http://blog.sina.com.cn/s/blog_6617106b0100hrm1.html

上面提到的Request/Reply 非常適合poll模式,以上一個獲取配置為例,SuperServer由于定義接口的時候只需知道callback,并不知道SceneServer的具體連接。,所以SuperServer不能向SceneServer Push消息。在FFLIB中并沒有限定某個節(jié)點必須是Client或只能是Service,實際上可以兼有二者的角色。SceneServer 也可以提供接口供SuperServer調用,這就符合了Push的語義。假設如下場景,GatewayServer需要在用戶登入時調用通知SessionServer,而某一時刻SessionServer也可能呢通知GatewayServer 強制某用戶下線。二者互為client和service。大家必須知道,在FFLIB中實現兩個節(jié)點的通信只需知道對方的服務名稱即可,Broker 在此時實現解耦的作用非常明顯,若要增加對其他節(jié)點的通信,只需通過服務名稱async_call即可。

定義通信的msg:

struct user_online_t {     struct in_t: public msg_i     {         in_t():             msg_i("user_online_t::in_t")         {}         string encode()         {             return (init_encoder() << uid).get_buff();         }         void decode(const string& src_buff_)         {             init_decoder(src_buff_) >> uid;         }         long uid;     };     struct out_t: public msg_i     {         out_t():             msg_i("user_online_t::out_t")         {}         string encode()         {             return (init_encoder() << value).get_buff();         }         void decode(const string& src_buff_)         {             init_decoder(src_buff_) >> value;         }         bool value;     }; }; struct force_user_offline_t {     struct in_t: public msg_i     {         in_t():             msg_i("force_user_offline_t::in_t")         {}         string encode()         {             return (init_encoder() << uid).get_buff();         }         void decode(const string& src_buff_)         {             init_decoder(src_buff_) >> uid;         }        long uid;     };     struct out_t: public msg_i     {         out_t():             msg_i("force_user_offline_t::out_t")         {}        string encode()         {             return (init_encoder() << value).get_buff();         }         void decode(const string& src_buff_)         {             init_decoder(src_buff_) >> value;         }         bool value;     }; };

GatewayServer 通知SessionServer 用戶上線,并提供強制用戶下線的接口:

class gateway_server_t { public:     void force_user_offline(force_user_offline_t::in_t& in_msg_, rpc_callcack_t& cb_)     {         //! close user socket         force_user_offline_t::out_t out;         out.value = true;         cb_(out);     } }; gateway_server_t gateway_server; singleton_t::instance().create_service("gateway_server", 1)             .bind_service(&gateway_server)             .reg(&gateway_server_t::force_user_offline); user_online_t::in_t in; in.uid = 520; singleton_t::instance()     .get_service_group("session_server")     ->get_service(1)     ->async_call(in, callback_TODO);

SessionServer 提供用戶上線接口,可能會調用GatewayServer 的接口強制用戶下線。

class session_server_t { public:     void user_login(user_online_t::in_t& in_msg_, rpc_callcack_t& cb_)     {         //! close user socket         user_online_t::out_t out;         out.value = true;         cb_(out);     } }; session_server_t session_server; singleton_t::instance().create_service("session_server", 1)             .bind_service(&session_server)             .reg(&session_server_t::user_login); force_user_offline_t::in_t in; in.uid = 520; singleton_t::instance()     .get_service_group("gateway_server")     ->get_service(1)     ->async_call(in, callback_TODO);

多播通信

和點對點通信一樣,要實現多播,只需要知道目標的服務名稱。特別提一點的是,FFLIB中有服務組的概念。比如啟動了多個場景服務器SceneServer,除了數據不同,二者接口完全相同,有可能只是相同進程的不同實例。在FFLIB框架中把這些服務歸為一個服務組,然后再為每個實例分配索引id。

假設如下場景,SuperServer 中要實現一個GM接口,通知所有SceneServer 重新加載配置。

定義通信的msg:

struct reload_config_t     struct in_t: public msg_i    {        in_t():            msg_i("reload_config_t::in_t")        {}        string encode()        {            return (init_encoder()).get_buff();        }        void decode(const string& src_buff_)        {            init_decoder(src_buff_);        }   };    struct out_t: public msg_i    {       out_t():            msg_i("reload_config_t::out_t")        {}         string encode()        {            return (init_encoder() << value).get_buff();        }        void decode(const string& src_buff_)        {            init_decoder(src_buff_) >> value;        }        bool value;    }; ;

SceneServer 提供重新載入配置接口:

class scene_server_t { public:     void reload_config(reload_config_t::in_t& in_msg_, rpc_callcack_t& cb_)     {         //! close user socket         reload_config_t::out_t out;         out.value = true;         cb_(out);     } }; scene_server_t scene_server; singleton_t::instance().create_service("scene_server", 1)             .bind_service(&scene_server)             .reg(&scene_server_t::reload_config);

在SuperServer 中如此實現多播(跟準確是廣播,大同小異):

struct lambda_t { static void reload_config(rpc_service_t* rs_) {           reload_config_t::in_t in;           rs_->async_call(in, callback_TODO); } }; singleton_t::instance()     .get_service_group("scene_server")     ->foreach(&lambda_t::reload_config);

Map/Reduce

在游戲中使用Map/reduce 的情形并不多見,本人找到網上最常見的Map/reduce 實例 WordCount。情形如下:有一些文本字符串,統計每個字符出現的次數。

Map操作,將文本分為多個子文本,分發(fā)給多個Worker 進程進行統計

Reduce 操作,將多組worker 進程計算的結果匯總

Worker:為文本統計各個字符出現的次數

定義通信消息:

struct word_count_t {     struct in_t: public msg_i     {         in_t():             msg_i("word_count_t::in_t")         {}         string encode()         {             return (init_encoder() << str).get_buff();         }         void decode(const string& src_buff_)         {             init_decoder(src_buff_) >> str;         }         string str;     };     struct out_t: public msg_i     {         out_t():             msg_i("word_count_t::out_t")         {}         string encode()         {             return (init_encoder() << value).get_buff();         }         void decode(const string& src_buff_)         {             init_decoder(src_buff_) >> value;         }        map value;     };  };

定義woker的接口:

class worker_t { public:     void word_count(word_count_t::in_t& in_msg_, rpc_callcack_t& cb_)     {         //! close user socket         word_count_t::out_t out;         for (size_t i = 0; i < in_msg_.str.size(); ++i)         {             map::iterator it = out.value.find(in_msg_.str[i]);             if (it != out.value.end())             {                 it->second += 1;             }             else             {                 out.value[in_msg_.str[i]] = 1;             }         }         cb_(out);     } }; worker_t worker;    for (int i = 0; i < 5; ++i)     {        singleton_t::instance().create_service("worker", 1)             .bind_service(&worker)             .reg(&worker_t::word_count);     }

模擬Map/reduce 操作:

struct lambda_t {     static void reduce(word_count_t::out_t& msg_, map* result_, size_t* size_)     {         for (map::iterator it = msg_.value.begin(); it != msg_.value.end(); ++it)         {             map::iterator it2 = result_->find(it->first);             if (it2 != result_->end())             {                 it2->second += it->second;             }             else             {                 (*result_)[it->first] = it->second;             }         }         if (-- size_ == 0)         {             //reduce end!!!!!!!!!!!!!!!!             delete result_;             delete size_;         }     }     static void do_map(const char** p, size_t size_)     {         map* result  = new map();        size_t*    dest_size   = new size_t();         *dest_size = size_;         for (size_t i = 0; i < size_; ++i)         {             word_count_t::in_t in;             in.str = p[i];             singleton_t::instance()                 .get_service_group("worker")                 ->get_service(1 + i % singleton_t::instance().get_service_group("worker")->size())                ->async_call(in, binder_t::callback(&lambda_t::reduce, result, dest_size));         }     } }; const char* str_vec[] = {"oh nice", "oh fuck", "oh no", "oh dear", "oh wonderful", "oh bingo"}; lambda_t::do_map(str_vec, 6);

總結:

FFLIB 使進程間通信更容易

看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注創(chuàng)新互聯行業(yè)資訊頻道,感謝您對創(chuàng)新互聯的支持。


當前名稱:C++多進程并發(fā)框架FFLIB中Tutorial的原理是什么
網頁路徑:http://weahome.cn/article/iedihs.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部