本文记录 Linux 的学习过程,内容为 RPC 软件框架:Thrift。
Thrift 官网:Apache Thrift 。
1. Thrift概述
1.1 基本概念
Thrift 是一个 RPC(远程过程调用协议 Remote Procedure Call Protocol)软件框架,用来进行可扩展且跨语言的服务的开发。它结合了功能强大的软件堆栈和代码生成引擎,以构建在 C++、Java、Go、Python、PHP、Ruby、Erlang、Perl、Haskell、C#、Cocoa、JavaScript、Node.js、Smalltalk、OCaml 这些编程语言间无缝结合的、高效的服务。Thrift 允许定义一个简单的定义文件中的数据类型和服务接口,以作为输入文件,编译器生成代码用来方便地生成 RPC 客户端和服务器通信的无缝跨编程语言。
1.2 Thrift IDL
Thrift 采用接口定义语言 IDL(Interface Definition Language)来定义通用的服务接口,然后通过 Thrift 提供的编译器,可以将服务接口编译成不同语言编写的代码,通过这个方式来实现跨语言的功能。
通过命令调用 Thrift 提供的编译器将服务接口编译成不同语言编写的代码。
这些代码又分为服务端和客户端,将所在不同进程(或服务器)的功能连接起来。
1 thrift -r --gen <language> <Thrift filename>
1.3 如何创建Thrift服务?
定义服务接口(存放接口的文件夹就是 Thrift 文件)。
作为服务端的服务,需要生成 server
。
作为请求端的服务,需要生成 client
。
1.4 实例讲解
假设我们要实现一个游戏的匹配系统,这个游戏的功能可能运行在一个或多个服务器(进程)上,而 Thrift 就是将不同服务器不同语言的功能连接起来。
游戏本体(假设用 Python 实现)、匹配系统(假设用 C++ 实现)、数据存储服务器这三个节点(功能)是完全独立的,既可以在同一个服务器上,也可以在不同服务器上。每一个节点就是一个进程,每个进程可以使用不同的语言来实现。
游戏节点到匹配节点需要实现一条有向边(可以包含多个函数),表示向匹配系统添加和移除玩家 add_user
、remove_user
,因此游戏节点需要实现 match_client
端,表示可以调用匹配服务器的函数;匹配系统需要实现 match_server
端,表示可以让游戏节点的 Client 端调用自身的函数。同时匹配系统还需实现 save_client
端,因为需要将数据传给服务器存储 save_data
(假设数据存储服务器已实现 save_server
端)。
2. Thrift教程
首先创建一个游戏系统文件夹 game
、匹配系统文件夹 match_system
、保存各种接口的文件夹 thrift
。
2.1 match_server框架
在 thrift
文件夹中创建一个文件:match.thrift
,内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 namespace cpp match_servicestruct User { 1 : i32 id, 2 : string name, 3 : i32 score } service Match { i32 add_user (1 : User user, 2 : string info) , i32 remove_user (1 : User user, 2 : string info) }
前往 Thrift 官网,点击 Tutorial,再点击 C++,即可看到如何通过这个接口生成一个 C++ 版本的服务器。命令如下:
1 thrift -r --gen cpp tutorial.thrift
在 match_system
文件夹中创建一个文件夹 src
,表示源文件。在 src
文件夹中输入以下命令:
1 thrift -r --gen cpp ../../thrift/match.thrift
执行后会发现该目录下生成了一个 gen-cpp
的文件夹,为了后续方便操作,将文件夹改个名:
将自动实现好的文件移出来:
1 mv match_server/Match_server.skeleton.cpp main.cpp
由于该文件里的函数还没有进行逻辑实现,因此先在每个函数中加上 return 0;
后编译一遍,文件内容如下(可以使用 gg=G
进行格式化):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 #include "match_server/Match.h" #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/server/TSimpleServer.h> #include <thrift/transport/TServerSocket.h> #include <thrift/transport/TBufferTransports.h> using namespace ::apache::thrift;using namespace ::apache::thrift::protocol;using namespace ::apache::thrift::transport;using namespace ::apache::thrift::server;using namespace ::match_service;class MatchHandler : virtual public MatchIf { public : MatchHandler () { } int32_t add_user (const User& user, const std::string& info) { printf ("add_user\n" ); return 0 ; } int32_t remove_user (const User& user, const std::string& info) { printf ("remove_user\n" ); return 0 ; } }; int main (int argc, char **argv) { int port = 9090 ; ::std::shared_ptr<MatchHandler> handler (new MatchHandler()) ; ::std::shared_ptr<TProcessor> processor (new MatchProcessor(handler)) ; ::std::shared_ptr<TServerTransport> serverTransport (new TServerSocket(port)) ; ::std::shared_ptr<TTransportFactory> transportFactory (new TBufferedTransportFactory()) ; ::std::shared_ptr<TProtocolFactory> protocolFactory (new TBinaryProtocolFactory()) ; TSimpleServer server (processor, serverTransport, transportFactory, protocolFactory) ; server.serve (); return 0 ; }
接下来进行编译链接,链接的时候需要用到 Thrift 的动态链接库,需要加上 -lthrift
:
1 2 g++ -c main.cpp match_server/*.cpp g++ *.o -o main -lthrift
这时输入 ./main
即可运行程序,但是此时什么内容都没有。Thrift 只是将接口实现好了,具体的业务逻辑没有实现。我们可以先将文件上传至 Git,上传的时候注意一般不将 .o
文件和可执行文件上传:
1 2 3 4 5 6 git add . git restore --staged *.o git restore --staged main git status git commit -m "add match_server" git push
2.2 match_client框架与实现
首先同样在 game
文件夹中创建 src
文件夹,进入 src
文件夹后我们需要生成 Python 代码:
1 thrift -r --gen py ../../thrift/match.thrift
生成后该目录下有个文件夹 gen-py
,也就是生成了 Python 的服务器端,同样将其改个名:
创建文件 client.py
,将官网中 Python 客户端的代码(前四行是为了将当前路径加入到 Python 的环境变量中,可以删掉)复制过来,并进行简单的修改:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 from match_client.match import Matchfrom match_client.match .ttypes import Userfrom thrift import Thriftfrom thrift.transport import TSocketfrom thrift.transport import TTransportfrom thrift.protocol import TBinaryProtocoldef main (): transport = TSocket.TSocket('localhost' , 9090 ) transport = TTransport.TBufferedTransport(transport) protocol = TBinaryProtocol.TBinaryProtocol(transport) client = Match.Client(protocol) transport.open () user = User(1 , 'yyj' , 1500 ) client.add_user(user, "" ) transport.close() if __name__ == "__main__" : main()
然后我们将 match_system/src
中的 main
执行后,再执行 game/src
中的 client.py
:
可以看到 main
程序那边输出:add_user
。说明我们的 match_client
端和 match_server
端已经初步实现了,此时更新一下 Git,注意 .pyc
文件也最好不要上传:
1 2 3 4 5 git add . git restore --staged *.pyc git status git commit -m "add match_client" git push
接着我们进行优化,从控制台输入用户信息,并指定是添加还是删除用户,修改后的 client.py
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 from match_client.match import Matchfrom match_client.match .ttypes import Userfrom thrift import Thriftfrom thrift.transport import TSocketfrom thrift.transport import TTransportfrom thrift.protocol import TBinaryProtocolfrom sys import stdindef operate (op, user_id, username, score ): transport = TSocket.TSocket('localhost' , 9090 ) transport = TTransport.TBufferedTransport(transport) protocol = TBinaryProtocol.TBinaryProtocol(transport) client = Match.Client(protocol) transport.open () user = User(user_id, username, score) if op == "add" : client.add_user(user, "" ) elif op == "remove" : client.remove_user(user, "" ) transport.close() def main (): for line in stdin: op, user_id, username, score = line.split(' ' ) operate(op, int (user_id), username, int (score)) if __name__ == "__main__" : main()
这样我们的 match_client
端就算是完成了。
2.3 match_server_v2.0实现
由于 server
端一方面需要读入或者移出用户,另一方面还要不断地去匹配,因此需要有一个线程去不断添加用户进来,一个线程去进行匹配,匹配完后再将信息传给一个服务器,且这两个操作是完全独立的,有可能长时间没有用户添加进来,但是匹配系统能够匹配两个已经匹配了很久的人。因此在这里需要用到并行技术,C++ 多线程需要使用到 <thread>
头文件。
多线程相关知识点:
IP 和端口:如果把 IP 地址比作一间房子,端口就是出入这间房子的门。真正的房子只有几个门,但是一个 IP 地址的端口可以有65536个之多!端口是通过端口号来标记的,端口号只有整数,范围是从0到65535。同一个端口只能由一个进程来监听。所以我们一旦启动了一个服务,那么这个服务就不能在被另一个进程启动了。服务器的端口号要与客户端的端口号相同。
<thread>
库:C++ 中有一个 thread
的库,可以用来开线程。通过定义一个变量将函数名作为参数,就能开一个线程了,具体使用可以看后文代码。
首先定义线程的操作:并行中经典的生产者和消费者模型。生产者、消费者是两个线程。本样例中的生产者:add_user()
、remove_user()
;消费者:匹配用户的功能。
生产者和消费者之间需要一个媒介。这个媒介可以有很多种方法。比如:消费队列。很多语言都有自己实现的消费队列,也可以自己实现消费队列。实现消费队列,就需要用到一些锁(Mutex)。锁是并行编程的基本概念。
互斥锁:在编程中,引入了对象互斥锁的概念,来保证共享数据操作的完整性。每个对象都对应于一个可称为“互斥锁”的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象 。
锁有两个操作:一个 P 操作(上锁),一个 V 操作(解锁)。
定义互斥锁:mutex m
。锁一般使用信号量来实现的,mutex
其实就是一个信号量(它特殊也叫互斥量)。互斥量就是同一时间能够分给一个人,即 S = 1。信号量 S = 10 表示可以将信号量分给10个人来用。
P 操作的主要动作是:
(1)S - 1;
(2)若 S - 1 后仍大于或等于0,则进程继续执行;
(3)若 S - 1 后小于0,则该进程被阻塞后放入等待该信号量的等待队列中,然后转进程调度。
V 操作的主要动作是:
(1)S + 1;
(2)若 S + 1 后结果大于0,则进程继续执行;
(3)若 S + 1 后结果小于或等于0,则从该信号的等待队列中释放一个等待进程,然后再返回原进程继续执行或转进程调度。
对于 P 和 V 都是原子操作,就是在执行 P 和 V 操作时,不会被插队,从而实现对共享变量操作的原子性。
特殊:S = 1 表示互斥量,表示同一时间,信号量只能分配给一个线程。
多线程为啥要用锁?因为多线程可能共享一个内存空间 ,导致出现重复读取并修改的现象。
我们将程序功能修改为傻瓜式匹配,只要匹配池中的玩家数大于等于2,那么就将前两名玩家进行匹配,修改后的 match_server
端 main.cpp
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 #include "match_server/Match.h" #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/server/TSimpleServer.h> #include <thrift/transport/TServerSocket.h> #include <thrift/transport/TBufferTransports.h> #include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <vector> using namespace ::apache::thrift;using namespace ::apache::thrift::protocol;using namespace ::apache::thrift::transport;using namespace ::apache::thrift::server;using namespace ::match_service;using namespace std;struct Task { User user; string type; }; struct MessageQueue { queue<Task> q; mutex m; condition_variable cv; }message_queue; class Pool { public : void save_result (int a, int b) { printf ("Match result: %d %d\n" , a, b); } void match () { while (users.size () > 1 ) { auto a = users[0 ], b = users[1 ]; users.erase (users.begin ()); users.erase (users.begin ()); save_result (a.id, b.id); } } void add (User user) { users.push_back (user); } void remove (User user) { for (uint32_t i = 0 ; i < users.size (); i++) if (users[i].id == user.id) { users.erase (users.begin () + i); break ; } } private : vector<User> users; }pool; class MatchHandler : virtual public MatchIf { public : MatchHandler () { } int32_t add_user (const User& user, const std::string& info) { printf ("add_user\n" ); unique_lock<mutex> lck (message_queue.m) ; message_queue.q.push ({ user, "add" }); message_queue.cv.notify_all (); return 0 ; } int32_t remove_user (const User& user, const std::string& info) { printf ("remove_user\n" ); unique_lock<mutex> lck (message_queue.m) ; message_queue.q.push ({ user, "remove" }); message_queue.cv.notify_all (); return 0 ; } }; void consume_task () { while (true ) { unique_lock<mutex> lck (message_queue.m) ; if (message_queue.q.empty ()) { message_queue.cv.wait (lck); } else { auto task = message_queue.q.front (); message_queue.q.pop (); lck.unlock (); if (task.type == "add" ) pool.add (task.user); else if (task.type == "remove" ) pool.remove (task.user); pool.match (); } } } int main (int argc, char **argv) { int port = 9090 ; ::std::shared_ptr<MatchHandler> handler (new MatchHandler()) ; ::std::shared_ptr<TProcessor> processor (new MatchProcessor(handler)) ; ::std::shared_ptr<TServerTransport> serverTransport (new TServerSocket(port)) ; ::std::shared_ptr<TTransportFactory> transportFactory (new TBufferedTransportFactory()) ; ::std::shared_ptr<TProtocolFactory> protocolFactory (new TBinaryProtocolFactory()) ; TSimpleServer server (processor, serverTransport, transportFactory, protocolFactory) ; printf ("Start Match Server\n" ); thread matching_thread (consume_task) ; server.serve (); return 0 ; }
由于使用了线程库,因此编译的时候需要加上参数 -pthread
:
1 2 g++ -c main.cpp g++ *.o -o main -lthrift -pthread
此时可以打开 match_server
端和 match_client
端,然后在 client
端添加玩家看看 server
端的匹配结果。
2.4 save_client实现
假设 save_server
端已经实现,在 thrift
文件夹中创建文件 save.thrift
,内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 namespace cpp save_service service Save { /** * username: myserver的名称 * password: myserver的密码的md5sum的前8位 * 用户名密码验证成功会返回0,验证失败会返回1 * 验证成功后,结果会被保存到myserver:homework/lesson_6/result.txt中 */ i32 save_data(1: string username, 2: string password, 3: i32 player1_id, 4: i32 player2_id) }
可以看到直接调用 save_server
端的接口函数 save_data
即可(该函数的实现我们不关心)。
在 match_system/src
文件夹中输入以下指令生成接口的 C++ 实现,并重命名,然后需要将自动生成的服务端代码删去:
1 2 3 4 thrift -r --gen cpp ../../thrift/save.thrift mv gen-cpp save_client cd save_client rm Save_server.skeleton.cpp
接下来我们将 Thrift 官网 C++ 教程中的 Client 端代码抄下来并相应进行修改,修改后的 main.cpp
内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 #include "match_server/Match.h" #include "save_client/Save.h" #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/server/TSimpleServer.h> #include <thrift/transport/TServerSocket.h> #include <thrift/transport/TBufferTransports.h> #include <thrift/transport/TTransportUtils.h> #include <thrift/transport/TSocket.h> #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <vector> using namespace ::apache::thrift;using namespace ::apache::thrift::protocol;using namespace ::apache::thrift::transport;using namespace ::apache::thrift::server;using namespace ::match_service;using namespace ::save_service; using namespace std;struct Task { User user; string type; }; struct MessageQueue { queue<Task> q; mutex m; condition_variable cv; }message_queue; class Pool { public : void save_result (int a, int b) { printf ("Match result: %d %d\n" , a, b); std::shared_ptr<TTransport> socket (new TSocket("123.57.47.211" , 9090 )) ; std::shared_ptr<TTransport> transport (new TBufferedTransport(socket)) ; std::shared_ptr<TProtocol> protocol (new TBinaryProtocol(transport)) ; SaveClient client (protocol) ; try { transport->open (); int res = client.save_data ("acs_2077" , "4503f06d" , a, b); if (!res) puts ("Success" ); else puts ("Failed" ); transport->close (); } catch (TException& tx) { cout << "ERROR: " << tx.what () << endl; } } void match () { while (users.size () > 1 ) { auto a = users[0 ], b = users[1 ]; users.erase (users.begin ()); users.erase (users.begin ()); save_result (a.id, b.id); } } void add (User user) { users.push_back (user); } void remove (User user) { for (uint32_t i = 0 ; i < users.size (); i++) if (users[i].id == user.id) { users.erase (users.begin () + i); break ; } } private : vector<User> users; }pool; class MatchHandler : virtual public MatchIf { public : MatchHandler () { } int32_t add_user (const User& user, const std::string& info) { printf ("add_user\n" ); unique_lock<mutex> lck (message_queue.m) ; message_queue.q.push ({ user, "add" }); message_queue.cv.notify_all (); return 0 ; } int32_t remove_user (const User& user, const std::string& info) { printf ("remove_user\n" ); unique_lock<mutex> lck (message_queue.m) ; message_queue.q.push ({ user, "remove" }); message_queue.cv.notify_all (); return 0 ; } }; void consume_task () { while (true ) { unique_lock<mutex> lck (message_queue.m) ; if (message_queue.q.empty ()) { message_queue.cv.wait (lck); } else { auto task = message_queue.q.front (); message_queue.q.pop (); lck.unlock (); if (task.type == "add" ) pool.add (task.user); else if (task.type == "remove" ) pool.remove (task.user); pool.match (); } } } int main (int argc, char **argv) { int port = 9090 ; ::std::shared_ptr<MatchHandler> handler (new MatchHandler()) ; ::std::shared_ptr<TProcessor> processor (new MatchProcessor(handler)) ; ::std::shared_ptr<TServerTransport> serverTransport (new TServerSocket(port)) ; ::std::shared_ptr<TTransportFactory> transportFactory (new TBufferedTransportFactory()) ; ::std::shared_ptr<TProtocolFactory> protocolFactory (new TBinaryProtocolFactory()) ; TSimpleServer server (processor, serverTransport, transportFactory, protocolFactory) ; printf ("Start Match Server\n" ); thread matching_thread (consume_task) ; server.serve (); return 0 ; }
2.5 match_server_v3.0实现
通过修改匹配函数 match()
实现将分差小于等于50的玩家进行匹配,修改后的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 #include "match_server/Match.h" #include "save_client/Save.h" #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/server/TSimpleServer.h> #include <thrift/transport/TServerSocket.h> #include <thrift/transport/TBufferTransports.h> #include <thrift/transport/TTransportUtils.h> #include <thrift/transport/TSocket.h> #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <vector> #include <unistd.h> using namespace ::apache::thrift;using namespace ::apache::thrift::protocol;using namespace ::apache::thrift::transport;using namespace ::apache::thrift::server;using namespace ::match_service;using namespace ::save_service; using namespace std;struct Task { User user; string type; }; struct MessageQueue { queue<Task> q; mutex m; condition_variable cv; }message_queue; class Pool { public : void save_result (int a, int b) { } void match () { while (users.size () > 1 ) { sort (users.begin (), users.end (), [&](User &a, User &b){ return a.score < b.score; }); bool flag = true ; for (uint32_t i = 1 ; i < users.size (); i++) { auto a = users[i - 1 ], b = users[i]; if (b.score - a.score <= 50 ) { save_result (a.id, b.id); users.erase (users.begin () + i - 1 , users.begin () + i + 1 ); flag = false ; break ; } } if (flag) break ; } } void add (User user) { users.push_back (user); } void remove (User user) { for (uint32_t i = 0 ; i < users.size (); i++) if (users[i].id == user.id) { users.erase (users.begin () + i); break ; } } private : vector<User> users; }pool; class MatchHandler : virtual public MatchIf { public : MatchHandler () { } int32_t add_user (const User& user, const std::string& info) { printf ("add_user\n" ); unique_lock<mutex> lck (message_queue.m) ; message_queue.q.push ({ user, "add" }); message_queue.cv.notify_all (); return 0 ; } int32_t remove_user (const User& user, const std::string& info) { printf ("remove_user\n" ); unique_lock<mutex> lck (message_queue.m) ; message_queue.q.push ({ user, "remove" }); message_queue.cv.notify_all (); return 0 ; } }; void consume_task () { while (true ) { unique_lock<mutex> lck (message_queue.m) ; if (message_queue.q.empty ()) { lck.unlock (); pool.match (); sleep (1 ); } else { auto task = message_queue.q.front (); message_queue.q.pop (); lck.unlock (); if (task.type == "add" ) pool.add (task.user); else if (task.type == "remove" ) pool.remove (task.user); pool.match (); } } } int main (int argc, char **argv) { int port = 9090 ; ::std::shared_ptr<MatchHandler> handler (new MatchHandler()) ; ::std::shared_ptr<TProcessor> processor (new MatchProcessor(handler)) ; ::std::shared_ptr<TServerTransport> serverTransport (new TServerSocket(port)) ; ::std::shared_ptr<TTransportFactory> transportFactory (new TBufferedTransportFactory()) ; ::std::shared_ptr<TProtocolFactory> protocolFactory (new TBinaryProtocolFactory()) ; TSimpleServer server (processor, serverTransport, transportFactory, protocolFactory) ; printf ("Start Match Server\n" ); thread matching_thread (consume_task) ; server.serve (); return 0 ; }
2.6 match_server_v4.0实现
通过 Thrift 官网 C++ 教程下的 Server 端代码可以将 match_server
改为多线程,修改后的 main.cpp
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 #include "match_server/Match.h" #include "save_client/Save.h" #include <thrift/concurrency/ThreadManager.h> #include <thrift/concurrency/ThreadFactory.h> #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/server/TSimpleServer.h> #include <thrift/server/TThreadedServer.h> #include <thrift/transport/TServerSocket.h> #include <thrift/transport/TBufferTransports.h> #include <thrift/transport/TTransportUtils.h> #include <thrift/transport/TSocket.h> #include <thrift/TToString.h> #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <vector> #include <unistd.h> using namespace ::apache::thrift;using namespace ::apache::thrift::protocol;using namespace ::apache::thrift::transport;using namespace ::apache::thrift::server;using namespace ::match_service;using namespace ::save_service; using namespace std;struct Task { User user; string type; }; struct MessageQueue { queue<Task> q; mutex m; condition_variable cv; }message_queue; class Pool { public : void save_result (int a, int b) { } void match () { } void add (User user) { users.push_back (user); } void remove (User user) { for (uint32_t i = 0 ; i < users.size (); i++) if (users[i].id == user.id) { users.erase (users.begin () + i); break ; } } private : vector<User> users; }pool; class MatchHandler : virtual public MatchIf { public : MatchHandler () { } int32_t add_user (const User& user, const std::string& info) { printf ("add_user\n" ); unique_lock<mutex> lck (message_queue.m) ; message_queue.q.push ({ user, "add" }); message_queue.cv.notify_all (); return 0 ; } int32_t remove_user (const User& user, const std::string& info) { printf ("remove_user\n" ); unique_lock<mutex> lck (message_queue.m) ; message_queue.q.push ({ user, "remove" }); message_queue.cv.notify_all (); return 0 ; } }; class MatchCloneFactory : virtual public MatchIfFactory { public : ~MatchCloneFactory () override = default ; MatchIf* getHandler (const ::apache::thrift::TConnectionInfo& connInfo) override { std::shared_ptr<TSocket> sock = std::dynamic_pointer_cast <TSocket>(connInfo.transport); return new MatchHandler; } void releaseHandler (MatchIf* handler) override { delete handler; } }; void consume_task () { while (true ) { unique_lock<mutex> lck (message_queue.m) ; if (message_queue.q.empty ()) { lck.unlock (); pool.match (); sleep (1 ); } else { auto task = message_queue.q.front (); message_queue.q.pop (); lck.unlock (); if (task.type == "add" ) pool.add (task.user); else if (task.type == "remove" ) pool.remove (task.user); pool.match (); } } } int main (int argc, char **argv) { TThreadedServer server ( std::make_shared<MatchProcessorFactory>(std::make_shared<MatchCloneFactory>()), std::make_shared<TServerSocket>(9090 ), std::make_shared<TBufferedTransportFactory>(), std::make_shared<TBinaryProtocolFactory>()) ; printf ("Start Match Server\n" ); thread matching_thread (consume_task) ; server.serve (); return 0 ; }
2.7 match_server_v5.0实现
通过对匹配机制的修改,实现玩家每等待一秒钟,匹配的分数区间扩大50分,修改后的 main.cpp
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 #include "match_server/Match.h" #include "save_client/Save.h" #include <thrift/concurrency/ThreadManager.h> #include <thrift/concurrency/ThreadFactory.h> #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/server/TSimpleServer.h> #include <thrift/server/TThreadedServer.h> #include <thrift/transport/TServerSocket.h> #include <thrift/transport/TBufferTransports.h> #include <thrift/transport/TTransportUtils.h> #include <thrift/transport/TSocket.h> #include <thrift/TToString.h> #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <vector> #include <unistd.h> using namespace ::apache::thrift;using namespace ::apache::thrift::protocol;using namespace ::apache::thrift::transport;using namespace ::apache::thrift::server;using namespace ::match_service;using namespace ::save_service; using namespace std;struct Task { User user; string type; }; struct MessageQueue { queue<Task> q; mutex m; condition_variable cv; }message_queue; class Pool { public : void save_result (int a, int b) { printf ("Match result: %d %d\n" , a, b); std::shared_ptr<TTransport> socket (new TSocket("123.57.47.211" , 9090 )) ; std::shared_ptr<TTransport> transport (new TBufferedTransport(socket)) ; std::shared_ptr<TProtocol> protocol (new TBinaryProtocol(transport)) ; SaveClient client (protocol) ; try { transport->open (); int res = client.save_data ("acs_2077" , "4503f06d" , a, b); if (!res) puts ("Success" ); else puts ("Failed" ); transport->close (); } catch (TException& tx) { cout << "ERROR: " << tx.what () << endl; } } bool check_match (uint32_t i, uint32_t j) { auto a = users[i], b = users[j]; int dt = abs (a.score - b.score); int a_max_dif = wt[i] * 50 , b_max_dif = wt[j] * 50 ; return dt <= a_max_dif && dt <= b_max_dif; } void match () { for (uint32_t i = 0 ; i < wt.size (); i++) wt[i]++; while (users.size () > 1 ) { bool flag = true ; for (uint32_t i = 0 ; i < users.size (); i++) { for (uint32_t j = i + 1 ; j < users.size (); j++) if (check_match (i, j)) { auto a = users[i], b = users[j]; users.erase (users.begin () + j); users.erase (users.begin () + i); wt.erase (wt.begin () + j); wt.erase (wt.begin () + i); save_result (a.id, b.id); flag = false ; break ; } if (!flag) break ; } if (flag) break ; } } void add (User user) { users.push_back (user); wt.push_back (0 ); } void remove (User user) { for (uint32_t i = 0 ; i < users.size (); i++) if (users[i].id == user.id) { users.erase (users.begin () + i); wt.erase (wt.begin () + i); break ; } } private : vector<User> users; vector<int > wt; }pool; class MatchHandler : virtual public MatchIf { public : MatchHandler () { } int32_t add_user (const User& user, const std::string& info) { printf ("add_user\n" ); unique_lock<mutex> lck (message_queue.m) ; message_queue.q.push ({ user, "add" }); message_queue.cv.notify_all (); return 0 ; } int32_t remove_user (const User& user, const std::string& info) { printf ("remove_user\n" ); unique_lock<mutex> lck (message_queue.m) ; message_queue.q.push ({ user, "remove" }); message_queue.cv.notify_all (); return 0 ; } }; class MatchCloneFactory : virtual public MatchIfFactory { public : ~MatchCloneFactory () override = default ; MatchIf* getHandler (const ::apache::thrift::TConnectionInfo& connInfo) override { std::shared_ptr<TSocket> sock = std::dynamic_pointer_cast <TSocket>(connInfo.transport); return new MatchHandler; } void releaseHandler (MatchIf* handler) override { delete handler; } }; void consume_task () { while (true ) { unique_lock<mutex> lck (message_queue.m) ; if (message_queue.q.empty ()) { lck.unlock (); pool.match (); sleep (1 ); } else { auto task = message_queue.q.front (); message_queue.q.pop (); lck.unlock (); if (task.type == "add" ) pool.add (task.user); else if (task.type == "remove" ) pool.remove (task.user); } } } int main (int argc, char **argv) { TThreadedServer server ( std::make_shared<MatchProcessorFactory>(std::make_shared<MatchCloneFactory>()), std::make_shared<TServerSocket>(9090 ), std::make_shared<TBufferedTransportFactory>(), std::make_shared<TBinaryProtocolFactory>()) ; printf ("Start Match Server\n" ); thread matching_thread (consume_task) ; server.serve (); return 0 ; }
上一章:Linux学习笔记-SSH与Git 。
下一章:Linux学习笔记-管道、环境变量与Docker 。