Linux学习笔记-Thrift

  1. 1. Thrift概述
    1. 1.1 基本概念
    2. 1.2 Thrift IDL
    3. 1.3 如何创建Thrift服务?
    4. 1.4 实例讲解
  2. 2. Thrift教程
    1. 2.1 match_server框架
    2. 2.2 match_client框架与实现
    3. 2.3 match_server_v2.0实现
    4. 2.4 save_client实现
    5. 2.5 match_server_v3.0实现
    6. 2.6 match_server_v4.0实现
    7. 2.7 match_server_v5.0实现

本文记录 Linux 的学习过程,内容为 RPC 软件框架:Thrift。
Thrift 官网:Apache Thrift

1. Thrift概述

1.1 基本概念

Thrift 是一个 RPC(远程过程调用协议 Remote Procedure Call Protocol)软件框架,用来进行可扩展且跨语言的服务的开发。它结合了功能强大的软件堆栈和代码生成引擎,以构建在 C++、Java、Go、Python、PHP、Ruby、Erlang、Perl、Haskell、C#、Cocoa、JavaScript、Node.js、Smalltalk、OCaml 这些编程语言间无缝结合的、高效的服务。Thrift 允许定义一个简单的定义文件中的数据类型和服务接口,以作为输入文件,编译器生成代码用来方便地生成 RPC 客户端和服务器通信的无缝跨编程语言。

1.2 Thrift IDL

Thrift 采用接口定义语言 IDL(Interface Definition Language)来定义通用的服务接口,然后通过 Thrift 提供的编译器,可以将服务接口编译成不同语言编写的代码,通过这个方式来实现跨语言的功能。

  • 通过命令调用 Thrift 提供的编译器将服务接口编译成不同语言编写的代码。
  • 这些代码又分为服务端和客户端,将所在不同进程(或服务器)的功能连接起来。
1
thrift -r --gen <language> <Thrift filename>

1.3 如何创建Thrift服务?

  1. 定义服务接口(存放接口的文件夹就是 Thrift 文件)。
  2. 作为服务端的服务,需要生成 server
  3. 作为请求端的服务,需要生成 client

1.4 实例讲解

假设我们要实现一个游戏的匹配系统,这个游戏的功能可能运行在一个或多个服务器(进程)上,而 Thrift 就是将不同服务器不同语言的功能连接起来。

游戏本体(假设用 Python 实现)、匹配系统(假设用 C++ 实现)、数据存储服务器这三个节点(功能)是完全独立的,既可以在同一个服务器上,也可以在不同服务器上。每一个节点就是一个进程,每个进程可以使用不同的语言来实现。

游戏节点到匹配节点需要实现一条有向边(可以包含多个函数),表示向匹配系统添加和移除玩家 add_userremove_user,因此游戏节点需要实现 match_client 端,表示可以调用匹配服务器的函数;匹配系统需要实现 match_server 端,表示可以让游戏节点的 Client 端调用自身的函数。同时匹配系统还需实现 save_client 端,因为需要将数据传给服务器存储 save_data(假设数据存储服务器已实现 save_server 端)。

2. Thrift教程

首先创建一个游戏系统文件夹 game、匹配系统文件夹 match_system、保存各种接口的文件夹 thrift

2.1 match_server框架

thrift 文件夹中创建一个文件:match.thrift,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
namespace cpp match_service

struct User /**定义结构体存储用户信息*/
{
1: i32 id, /**i32表示int*/
2: string name,
3: i32 score /**按照分值匹配*/
}

service Match
{
i32 add_user(1: User user, 2: string info),

i32 remove_user(1: User user, 2: string info)
}

前往 Thrift 官网,点击 Tutorial,再点击 C++,即可看到如何通过这个接口生成一个 C++ 版本的服务器。命令如下:

1
thrift -r --gen cpp tutorial.thrift

match_system 文件夹中创建一个文件夹 src,表示源文件。在 src 文件夹中输入以下命令:

1
thrift -r --gen cpp ../../thrift/match.thrift

执行后会发现该目录下生成了一个 gen-cpp 的文件夹,为了后续方便操作,将文件夹改个名:

1
mv gen-cpp match_server

将自动实现好的文件移出来:

1
mv match_server/Match_server.skeleton.cpp main.cpp

由于该文件里的函数还没有进行逻辑实现,因此先在每个函数中加上 return 0; 后编译一遍,文件内容如下(可以使用 gg=G 进行格式化):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.

#include "match_server/Match.h" // 注意已经将该文件移出来了,因此头文件路径要改
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;

using namespace ::match_service;

class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}

int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user\n");

return 0;
}

int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user\n");

return 0;
}

};

int main(int argc, char **argv) {
int port = 9090;
::std::shared_ptr<MatchHandler> handler(new MatchHandler());
::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());

TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
server.serve();
return 0;
}

接下来进行编译链接,链接的时候需要用到 Thrift 的动态链接库,需要加上 -lthrift

1
2
g++ -c main.cpp match_server/*.cpp
g++ *.o -o main -lthrift

这时输入 ./main 即可运行程序,但是此时什么内容都没有。Thrift 只是将接口实现好了,具体的业务逻辑没有实现。我们可以先将文件上传至 Git,上传的时候注意一般不将 .o 文件和可执行文件上传:

1
2
3
4
5
6
git add .
git restore --staged *.o
git restore --staged main
git status
git commit -m "add match_server"
git push

2.2 match_client框架与实现

首先同样在 game 文件夹中创建 src 文件夹,进入 src 文件夹后我们需要生成 Python 代码:

1
thrift -r --gen py ../../thrift/match.thrift

生成后该目录下有个文件夹 gen-py,也就是生成了 Python 的服务器端,同样将其改个名:

1
mv gen-py match_client

创建文件 client.py,将官网中 Python 客户端的代码(前四行是为了将当前路径加入到 Python 的环境变量中,可以删掉)复制过来,并进行简单的修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from match_client.match import Match
from match_client.match.ttypes import User

from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol


def main():
# Make socket
transport = TSocket.TSocket('localhost', 9090)

# Buffering is critical. Raw sockets are very slow
transport = TTransport.TBufferedTransport(transport)

# Wrap in a protocol
protocol = TBinaryProtocol.TBinaryProtocol(transport)

# Create a client to use the protocol encoder
client = Match.Client(protocol)

# Connect!
transport.open()

user = User(1, 'yyj', 1500)
client.add_user(user, "")

# Close!
transport.close()

if __name__ == "__main__":
main()

然后我们将 match_system/src 中的 main 执行后,再执行 game/src 中的 client.py

1
python3 client.py

可以看到 main 程序那边输出:add_user。说明我们的 match_client 端和 match_server 端已经初步实现了,此时更新一下 Git,注意 .pyc 文件也最好不要上传:

1
2
3
4
5
git add .
git restore --staged *.pyc
git status
git commit -m "add match_client"
git push

接着我们进行优化,从控制台输入用户信息,并指定是添加还是删除用户,修改后的 client.py 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from match_client.match import Match
from match_client.match.ttypes import User

from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

from sys import stdin


def operate(op, user_id, username, score):
# Make socket
transport = TSocket.TSocket('localhost', 9090)

# Buffering is critical. Raw sockets are very slow
transport = TTransport.TBufferedTransport(transport)

# Wrap in a protocol
protocol = TBinaryProtocol.TBinaryProtocol(transport)

# Create a client to use the protocol encoder
client = Match.Client(protocol)

# Connect!
transport.open()

user = User(user_id, username, score)

if op == "add":
client.add_user(user, "")

elif op == "remove":
client.remove_user(user, "")

# Close!
transport.close()

def main():
for line in stdin:
op, user_id, username, score = line.split(' ')
operate(op, int(user_id), username, int(score))

if __name__ == "__main__":
main()

这样我们的 match_client 端就算是完成了。

2.3 match_server_v2.0实现

由于 server 端一方面需要读入或者移出用户,另一方面还要不断地去匹配,因此需要有一个线程去不断添加用户进来,一个线程去进行匹配,匹配完后再将信息传给一个服务器,且这两个操作是完全独立的,有可能长时间没有用户添加进来,但是匹配系统能够匹配两个已经匹配了很久的人。因此在这里需要用到并行技术,C++ 多线程需要使用到 <thread> 头文件。

多线程相关知识点:

  • IP 和端口:如果把 IP 地址比作一间房子,端口就是出入这间房子的门。真正的房子只有几个门,但是一个 IP 地址的端口可以有65536个之多!端口是通过端口号来标记的,端口号只有整数,范围是从0到65535。同一个端口只能由一个进程来监听。所以我们一旦启动了一个服务,那么这个服务就不能在被另一个进程启动了。服务器的端口号要与客户端的端口号相同。
  • <thread> 库:C++ 中有一个 thread 的库,可以用来开线程。通过定义一个变量将函数名作为参数,就能开一个线程了,具体使用可以看后文代码。
  • 首先定义线程的操作:并行中经典的生产者和消费者模型。生产者、消费者是两个线程。本样例中的生产者:add_user()remove_user();消费者:匹配用户的功能。
  • 生产者和消费者之间需要一个媒介。这个媒介可以有很多种方法。比如:消费队列。很多语言都有自己实现的消费队列,也可以自己实现消费队列。实现消费队列,就需要用到一些锁(Mutex)。锁是并行编程的基本概念。
  • 互斥锁:在编程中,引入了对象互斥锁的概念,来保证共享数据操作的完整性。每个对象都对应于一个可称为“互斥锁”的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象
    • 锁有两个操作:一个 P 操作(上锁),一个 V 操作(解锁)。
    • 定义互斥锁:mutex m。锁一般使用信号量来实现的,mutex 其实就是一个信号量(它特殊也叫互斥量)。互斥量就是同一时间能够分给一个人,即 S = 1。信号量 S = 10 表示可以将信号量分给10个人来用。
    • P 操作的主要动作是:
      (1)S - 1;
      (2)若 S - 1 后仍大于或等于0,则进程继续执行;
      (3)若 S - 1 后小于0,则该进程被阻塞后放入等待该信号量的等待队列中,然后转进程调度。
    • V 操作的主要动作是:
      (1)S + 1;
      (2)若 S + 1 后结果大于0,则进程继续执行;
      (3)若 S + 1 后结果小于或等于0,则从该信号的等待队列中释放一个等待进程,然后再返回原进程继续执行或转进程调度。
    • 对于 P 和 V 都是原子操作,就是在执行 P 和 V 操作时,不会被插队,从而实现对共享变量操作的原子性。
    • 特殊:S = 1 表示互斥量,表示同一时间,信号量只能分配给一个线程。
  • 多线程为啥要用锁?因为多线程可能共享一个内存空间,导致出现重复读取并修改的现象。

我们将程序功能修改为傻瓜式匹配,只要匹配池中的玩家数大于等于2,那么就将前两名玩家进行匹配,修改后的 match_servermain.cpp 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.

#include "match_server/Match.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>

#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;

using namespace ::match_service;
using namespace std;

struct Task
{
User user;
string type;
};

struct MessageQueue
{
queue<Task> q;
mutex m;
condition_variable cv;
}message_queue;

class Pool // 玩家匹配池
{
public:
void save_result(int a, int b)
{
printf("Match result: %d %d\n", a, b);
}

void match()
{
while (users.size() > 1)
{
auto a = users[0], b = users[1];
users.erase(users.begin());
users.erase(users.begin());

save_result(a.id, b.id);
}
}

void add(User user)
{
users.push_back(user);
}

void remove(User user)
{
for (uint32_t i = 0; i < users.size(); i++)
if (users[i].id == user.id)
{
users.erase(users.begin() + i);
break;
}
}

private:
vector<User> users;
}pool;

class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}

int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user\n");

unique_lock<mutex> lck(message_queue.m); // 当lck变量消失即函数结束后锁会自动释放
message_queue.q.push({ user, "add" });
message_queue.cv.notify_all(); // 唤醒条件变量

return 0;
}

int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user\n");

unique_lock<mutex> lck(message_queue.m);
message_queue.q.push({ user, "remove" });
message_queue.cv.notify_all();

return 0;
}

};

void consume_task() // 消费者模型
{
while (true)
{
unique_lock<mutex> lck(message_queue.m);
if (message_queue.q.empty()) // 如果消息队列为空则应该先阻塞,而不能一直循环
{
message_queue.cv.wait(lck); // 先将锁释放,然后卡住,直到在其他地方将这个条件变量唤醒
}
else
{
auto task = message_queue.q.front();
message_queue.q.pop();
lck.unlock(); // 尽早解锁,若等处理完task再解锁就等待时间太长了

// do task
if (task.type == "add") pool.add(task.user);
else if (task.type == "remove") pool.remove(task.user);

pool.match();
}
}
}

int main(int argc, char **argv) {
int port = 9090;
::std::shared_ptr<MatchHandler> handler(new MatchHandler());
::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());

TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);

printf("Start Match Server\n");

thread matching_thread(consume_task);

server.serve();
return 0;
}

由于使用了线程库,因此编译的时候需要加上参数 -pthread

1
2
g++ -c main.cpp
g++ *.o -o main -lthrift -pthread

此时可以打开 match_server 端和 match_client 端,然后在 client 端添加玩家看看 server 端的匹配结果。

2.4 save_client实现

假设 save_server 端已经实现,在 thrift 文件夹中创建文件 save.thrift,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
namespace cpp save_service

service Save {

/**
* username: myserver的名称
* password: myserver的密码的md5sum的前8位
* 用户名密码验证成功会返回0,验证失败会返回1
* 验证成功后,结果会被保存到myserver:homework/lesson_6/result.txt中
*/
i32 save_data(1: string username, 2: string password, 3: i32 player1_id, 4: i32 player2_id)
}

可以看到直接调用 save_server 端的接口函数 save_data 即可(该函数的实现我们不关心)。

match_system/src 文件夹中输入以下指令生成接口的 C++ 实现,并重命名,然后需要将自动生成的服务端代码删去:

1
2
3
4
thrift -r --gen cpp ../../thrift/save.thrift
mv gen-cpp save_client
cd save_client
rm Save_server.skeleton.cpp

接下来我们将 Thrift 官网 C++ 教程中的 Client 端代码抄下来并相应进行修改,修改后的 main.cpp 内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.

#include "match_server/Match.h"
#include "save_client/Save.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/transport/TSocket.h>

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;

using namespace ::match_service;
using namespace ::save_service; // 注意加上save_client端的命名空间
using namespace std;

struct Task
{
User user;
string type;
};

struct MessageQueue
{
queue<Task> q;
mutex m;
condition_variable cv;
}message_queue;

class Pool // 玩家匹配池
{
public:
void save_result(int a, int b)
{
printf("Match result: %d %d\n", a, b);

std::shared_ptr<TTransport> socket(new TSocket("123.57.47.211", 9090));
std::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
SaveClient client(protocol);

try {
transport->open();

int res = client.save_data("acs_2077", "4503f06d", a, b);
if (!res) puts("Success");
else puts("Failed");

transport->close();
} catch (TException& tx) {
cout << "ERROR: " << tx.what() << endl;
}
}

void match()
{
while (users.size() > 1)
{
auto a = users[0], b = users[1];
users.erase(users.begin());
users.erase(users.begin());

save_result(a.id, b.id);
}
}

void add(User user)
{
users.push_back(user);
}

void remove(User user)
{
for (uint32_t i = 0; i < users.size(); i++)
if (users[i].id == user.id)
{
users.erase(users.begin() + i);
break;
}
}

private:
vector<User> users;
}pool;

class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}

int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user\n");

unique_lock<mutex> lck(message_queue.m); // 当lck变量消失即函数结束后锁会自动释放
message_queue.q.push({ user, "add" });
message_queue.cv.notify_all(); // 唤醒条件变量

return 0;
}

int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user\n");

unique_lock<mutex> lck(message_queue.m);
message_queue.q.push({ user, "remove" });
message_queue.cv.notify_all();

return 0;
}

};

void consume_task() // 消费者模型
{
while (true)
{
unique_lock<mutex> lck(message_queue.m);
if (message_queue.q.empty()) // 如果消息队列为空则应该先阻塞,而不能一直循环
{
message_queue.cv.wait(lck); // 先将锁释放,然后卡住,直到在其他地方将这个条件变量唤醒
}
else
{
auto task = message_queue.q.front();
message_queue.q.pop();
lck.unlock(); // 尽早解锁,若等处理完task再解锁就等待时间太长了

// do task
if (task.type == "add") pool.add(task.user);
else if (task.type == "remove") pool.remove(task.user);

pool.match();
}
}
}

int main(int argc, char **argv) {
int port = 9090;
::std::shared_ptr<MatchHandler> handler(new MatchHandler());
::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());

TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);

printf("Start Match Server\n");

thread matching_thread(consume_task);

server.serve();
return 0;
}

2.5 match_server_v3.0实现

通过修改匹配函数 match() 实现将分差小于等于50的玩家进行匹配,修改后的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.

#include "match_server/Match.h"
#include "save_client/Save.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/transport/TSocket.h>

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
#include <unistd.h>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;

using namespace ::match_service;
using namespace ::save_service; // 注意加上save_client端的命名空间
using namespace std;

struct Task
{
User user;
string type;
};

struct MessageQueue
{
queue<Task> q;
mutex m;
condition_variable cv;
}message_queue;

class Pool // 玩家匹配池
{
public:
void save_result(int a, int b)
{
// do save task
}

void match()
{
while (users.size() > 1)
{
sort(users.begin(), users.end(), [&](User &a, User &b){
return a.score < b.score;
});
bool flag = true; // 防止玩家之间分数差距都很大导致死循环
for (uint32_t i = 1; i < users.size(); i++)
{
auto a = users[i - 1], b = users[i];
if (b.score - a.score <= 50)
{
save_result(a.id, b.id);
users.erase(users.begin() + i - 1, users.begin() + i + 1);
flag = false;
break;
}
}
if (flag) break;
}
}

void add(User user)
{
users.push_back(user);
}

void remove(User user)
{
for (uint32_t i = 0; i < users.size(); i++)
if (users[i].id == user.id)
{
users.erase(users.begin() + i);
break;
}
}

private:
vector<User> users;
}pool;

class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}

int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user\n");

unique_lock<mutex> lck(message_queue.m); // 当lck变量消失即函数结束后锁会自动释放
message_queue.q.push({ user, "add" });
message_queue.cv.notify_all(); // 唤醒条件变量

return 0;
}

int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user\n");

unique_lock<mutex> lck(message_queue.m);
message_queue.q.push({ user, "remove" });
message_queue.cv.notify_all();

return 0;
}

};

void consume_task() // 消费者模型
{
while (true)
{
unique_lock<mutex> lck(message_queue.m);
if (message_queue.q.empty()) // 如果消息队列为空则应该先阻塞,而不能一直循环
{
// message_queue.cv.wait(lck); // 先将锁释放,然后卡住,直到在其他地方将这个条件变量唤醒
lck.unlock();
pool.match();
sleep(1); // 每秒匹配一次
}
else
{
auto task = message_queue.q.front();
message_queue.q.pop();
lck.unlock(); // 尽早解锁,若等处理完task再解锁就等待时间太长了

// do task
if (task.type == "add") pool.add(task.user);
else if (task.type == "remove") pool.remove(task.user);

pool.match();
}
}
}

int main(int argc, char **argv) {
int port = 9090;
::std::shared_ptr<MatchHandler> handler(new MatchHandler());
::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());

TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);

printf("Start Match Server\n");

thread matching_thread(consume_task);

server.serve();
return 0;
}

2.6 match_server_v4.0实现

通过 Thrift 官网 C++ 教程下的 Server 端代码可以将 match_server 改为多线程,修改后的 main.cpp 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.

#include "match_server/Match.h"
#include "save_client/Save.h"
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/server/TThreadedServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/transport/TSocket.h>
#include <thrift/TToString.h>

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
#include <unistd.h>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;

using namespace ::match_service;
using namespace ::save_service; // 注意加上save_client端的命名空间
using namespace std;

struct Task
{
User user;
string type;
};

struct MessageQueue
{
queue<Task> q;
mutex m;
condition_variable cv;
}message_queue;

class Pool // 玩家匹配池
{
public:
void save_result(int a, int b)
{
// do save task
}

void match()
{
// do match task
}

void add(User user)
{
users.push_back(user);
}

void remove(User user)
{
for (uint32_t i = 0; i < users.size(); i++)
if (users[i].id == user.id)
{
users.erase(users.begin() + i);
break;
}
}

private:
vector<User> users;
}pool;

class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}

int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user\n");

unique_lock<mutex> lck(message_queue.m); // 当lck变量消失即函数结束后锁会自动释放
message_queue.q.push({ user, "add" });
message_queue.cv.notify_all(); // 唤醒条件变量

return 0;
}

int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user\n");

unique_lock<mutex> lck(message_queue.m);
message_queue.q.push({ user, "remove" });
message_queue.cv.notify_all();

return 0;
}

};

class MatchCloneFactory : virtual public MatchIfFactory {
public:
~MatchCloneFactory() override = default;
MatchIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) override
{
std::shared_ptr<TSocket> sock = std::dynamic_pointer_cast<TSocket>(connInfo.transport);
/*cout << "Incoming connection\n";
cout << "\tSocketInfo: " << sock->getSocketInfo() << "\n";
cout << "\tPeerHost: " << sock->getPeerHost() << "\n";
cout << "\tPeerAddress: " << sock->getPeerAddress() << "\n";
cout << "\tPeerPort: " << sock->getPeerPort() << "\n";*/
return new MatchHandler;
}
void releaseHandler(MatchIf* handler) override {
delete handler;
}
};

void consume_task() // 消费者模型
{
while (true)
{
unique_lock<mutex> lck(message_queue.m);
if (message_queue.q.empty()) // 如果消息队列为空则应该先阻塞,而不能一直循环
{
// message_queue.cv.wait(lck); // 先将锁释放,然后卡住,直到在其他地方将这个条件变量唤醒
lck.unlock();
pool.match();
sleep(1); // 每秒匹配一次
}
else
{
auto task = message_queue.q.front();
message_queue.q.pop();
lck.unlock(); // 尽早解锁,若等处理完task再解锁就等待时间太长了

// do task
if (task.type == "add") pool.add(task.user);
else if (task.type == "remove") pool.remove(task.user);

pool.match();
}
}
}

int main(int argc, char **argv) {
TThreadedServer server(
std::make_shared<MatchProcessorFactory>(std::make_shared<MatchCloneFactory>()),
std::make_shared<TServerSocket>(9090), //port
std::make_shared<TBufferedTransportFactory>(),
std::make_shared<TBinaryProtocolFactory>());

printf("Start Match Server\n");

thread matching_thread(consume_task);

server.serve();
return 0;
}

2.7 match_server_v5.0实现

通过对匹配机制的修改,实现玩家每等待一秒钟,匹配的分数区间扩大50分,修改后的 main.cpp 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.

#include "match_server/Match.h"
#include "save_client/Save.h"
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/server/TThreadedServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/transport/TSocket.h>
#include <thrift/TToString.h>

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
#include <unistd.h>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;

using namespace ::match_service;
using namespace ::save_service; // 注意加上save_client端的命名空间
using namespace std;

struct Task
{
User user;
string type;
};

struct MessageQueue
{
queue<Task> q;
mutex m;
condition_variable cv;
}message_queue;

class Pool // 玩家匹配池
{
public:
void save_result(int a, int b)
{
printf("Match result: %d %d\n", a, b);

std::shared_ptr<TTransport> socket(new TSocket("123.57.47.211", 9090));
std::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
SaveClient client(protocol);

try {
transport->open();

int res = client.save_data("acs_2077", "4503f06d", a, b);
if (!res) puts("Success");
else puts("Failed");

transport->close();
} catch (TException& tx) {
cout << "ERROR: " << tx.what() << endl;
}
}

bool check_match(uint32_t i, uint32_t j)
{
auto a = users[i], b = users[j];
int dt = abs(a.score - b.score);
int a_max_dif = wt[i] * 50, b_max_dif = wt[j] * 50;
return dt <= a_max_dif && dt <= b_max_dif;
}

void match()
{
for (uint32_t i = 0; i < wt.size(); i++)
wt[i]++; // 表示等待秒数+1

while (users.size() > 1)
{
bool flag = true; // 防止玩家之间分数差距都很大导致死循环
for (uint32_t i = 0; i < users.size(); i++)
{
for (uint32_t j = i + 1; j < users.size(); j++)
if (check_match(i, j))
{
auto a = users[i], b = users[j];
users.erase(users.begin() + j); // 先删后面的再删前面的
users.erase(users.begin() + i);
wt.erase(wt.begin() + j);
wt.erase(wt.begin() + i);
save_result(a.id, b.id);
flag = false;
break;
}
if (!flag) break;
}
if (flag) break;
}
}

void add(User user)
{
users.push_back(user);
wt.push_back(0);
}

void remove(User user)
{
for (uint32_t i = 0; i < users.size(); i++)
if (users[i].id == user.id)
{
users.erase(users.begin() + i);
wt.erase(wt.begin() + i);
break;
}
}

private:
vector<User> users;
vector<int> wt; // 表示玩家的waiting time
}pool;

class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}

int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user\n");

unique_lock<mutex> lck(message_queue.m); // 当lck变量消失即函数结束后锁会自动释放
message_queue.q.push({ user, "add" });
message_queue.cv.notify_all(); // 唤醒条件变量

return 0;
}

int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user\n");

unique_lock<mutex> lck(message_queue.m);
message_queue.q.push({ user, "remove" });
message_queue.cv.notify_all();

return 0;
}

};

class MatchCloneFactory : virtual public MatchIfFactory {
public:
~MatchCloneFactory() override = default;
MatchIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) override
{
std::shared_ptr<TSocket> sock = std::dynamic_pointer_cast<TSocket>(connInfo.transport);
/*cout << "Incoming connection\n";
cout << "\tSocketInfo: " << sock->getSocketInfo() << "\n";
cout << "\tPeerHost: " << sock->getPeerHost() << "\n";
cout << "\tPeerAddress: " << sock->getPeerAddress() << "\n";
cout << "\tPeerPort: " << sock->getPeerPort() << "\n";*/
return new MatchHandler;
}
void releaseHandler(MatchIf* handler) override {
delete handler;
}
};

void consume_task() // 消费者模型
{
while (true)
{
unique_lock<mutex> lck(message_queue.m);
if (message_queue.q.empty()) // 如果消息队列为空则应该先阻塞,而不能一直循环
{
// message_queue.cv.wait(lck); // 先将锁释放,然后卡住,直到在其他地方将这个条件变量唤醒
lck.unlock();
pool.match();
sleep(1); // 每秒匹配一次
}
else
{
auto task = message_queue.q.front();
message_queue.q.pop();
lck.unlock(); // 尽早解锁,若等处理完task再解锁就等待时间太长了

// do task
if (task.type == "add") pool.add(task.user);
else if (task.type == "remove") pool.remove(task.user);
}
}
}

int main(int argc, char **argv) {
TThreadedServer server(
std::make_shared<MatchProcessorFactory>(std::make_shared<MatchCloneFactory>()),
std::make_shared<TServerSocket>(9090), //port
std::make_shared<TBufferedTransportFactory>(),
std::make_shared<TBinaryProtocolFactory>());

printf("Start Match Server\n");

thread matching_thread(consume_task);

server.serve();
return 0;
}

上一章:Linux学习笔记-SSH与Git

下一章:Linux学习笔记-管道、环境变量与Docker