Go 语言的便捷,协程的高效,GC以及自身的安全性,使得它成为后端业务开发的利器.
之前看过来自 Mail.Ru 工程师分享 Go的百万连接数的WebSocket服务优化 ,*中文翻译版.描述通过使用优化 Go 协程和零拷贝Http Header 实现了对内存的高效利用.
但其中自定义的 epoll 结构中依旧用了锁,个人觉得还是有优化的空间.
可以用高效的 uWebSockets 库配合,使得接入层用高效的 C++ 语言,而后端业务处理则用 Go 语言.这样的好处就是 C++ 可以最高效精准的把控接入层的逻辑处理和内存的使用,而后端的业务开发则是以高效安全为主,使用 Go 正好合适.
这里使用的 uWebSockets 为 0.14.8 tag 的版本,配合 libuv作为事件驱动,zlib 作为压缩算法库,openssl 作为加密库,四个编译为 .a
类型的静态库,方便一同编译到程序中,避免运行时动态库缺失的问题.
先设置要用到的全局的变量的头文件definition.h
,定义单消息的最大长度:
#pragma once
//与客户端交互的序列化后的上下行消息尺寸最大值(字节)
#define MAX_BUF_SIZE 4096
//c与go之间交换的消息类型
enum struct SwMsgType:char{
Free=0, //表示空闲,可重用
Conn=1, //表示已建立连接
Disconn=2, //表示断开连接
Msg=10, //表示有消息
};
//交换的消息结构
typedef struct SwMsg{
enum SwMsgType type;
short wsRawLen; //ws上下行的原始长度
int time; //msg进入队列的时间
long socket; //ws连接socket
char wsRawBuf[MAX_BUF_SIZE]; //ws上下行包
}SwMsg;
//go对C这边的一些调控
typedef struct SwControl{
unsigned int maxConnCount; //最大连接数的控制
//...other vars
}SwControl;
//C这边的一些状态指标
typedef struct SwStatus{
unsigned int uptime;
unsigned int currConnCount;
unsigned long recvMsgCount;
unsigned long sw2GoMsgCount;
unsigned long sw2GoMsgByteLen;
unsigned long sw2CMsgCount;
unsigned long sw2CMsgByteLen;
unsigned long abandonCDownMsgCount;
unsigned long abandonGoDownMsgCount;
unsigned long abandonCUpMsgCount;
unsigned long abandonGoUpMsgCount;
}SwStatus;
typedef struct SwInfo{
int recv;
int capacity;
int head;
int tail;
SwMsg* msg;
}SwInfo;
//与Go主要交换对象
typedef struct Switch{
SwControl ctrl;
SwStatus status;
SwInfo upList;
SwInfo downList;
}Switch;
//最后就是声明供 go 那边调用的全局函数.
extern Switch* Init(unsigned int maxUpMsgCount, unsigned int maxDownMsgCount); //初始化
extern int Run(int port,bool isEchoMode); //运行
extern int Exit(); //退出
定义实际运行处理函数的引用头文件,可以叫做process.h
:
#pragma once
#include <map>
#include "uWS.h"
#include "definition.h"
void ProcUpMsg(uWS::WebSocket<uWS::SERVER> *ws, char *message, size_t length); //处理上行消息包
void ProcHeartMsg(uWS::WebSocket<uWS::SERVER> *ws, char *message, size_t length); //处理心跳包
void ProcDownMsgLoop(uv_timer_t *pHandle); //下行的定时回调
void ProcConnect(uWS::WebSocket<uWS::SERVER> *ws, uWS::HttpRequest req); //处理新的连接
void ProcDisConnect(uWS::WebSocket<uWS::SERVER> *ws, int code, char *message, size_t length); //处理断线请求
bool UpsendConnect(long handle);
bool UpsendDisConnect(long handle);
//defined in cgo.cpp
extern Switch Cswitch;
extern std::map<long, void *> SocketsMap;
实现process.cpp
:
#include <thread>
#include <malloc.h>
#include <uv.h>
#include <process.h>
Switch Cswitch;
std::map<long, void*> SocketsMap;
volatile static bool _inited = false;
volatile static bool _running = false;
volatile static bool _stop = false;
实现Init
函数,功能是初始化交换区:
Switch* Init(unsigned int maxUpMsgCount, unsigned int maxDownMsgCount){
if(_inited) return nullptr;
memset(&CSwitch,0,sizeof(CSwitch));
CSwitch.upList.msg = (SwMsg*)calloc(maxUpMsgCount,sizeof(SwMsg));
CSwitch.upList.capacity = maxUpMsgCount;
CSwitch.downList.msg = (SwMsg*)calloc(maxDownMsgCount,sizeof(SwMsg));
CSwitch.downList.capacity = maxDownMsgCount;
CSwitch.status.uptime = time(0);
_inited = true;
return &CSwitch;
}
实现Run
函数,这里需要另开线程,因为uWebSockets 启动后是阻塞的,因此需要用另外一个线程去调用:
int Run(int port,bool isEchoMode){
if(!_inited || _running)
return -1; //print error info
thread t(wsRun,port);
_running = true;
t.detach();
return 0;
}
void ProcLoop(uv_timer_t *pHandle){
uWS::Hub *m_hub = (uWS::Hub*)pHandle->data;
if(_stop){
uv_loop_t *m_pLoop = (uv_loop_t *)m_hub->getLoop();
uv_stop(m_pLoop);
_running=false;
}
if(_running){
#ifdef ECHOMODE
return;
#endif
ProcDownMsgLoop(pHandle);
}
}
static void wsRun(int port){
static uWS::Hub m_hub;
m_hub.onMessage([](uWS::WebSocket<uWS::SERVER> *ws, char *message, size_t length, uWS::OpCode opCode) {
CSwitch.status.recvMsgCount++;
#ifdef ECHOMODE
ws->send(message, length, opCode);//echo mode
return;
#endif
switch(opCode){
case uWS::OpCode::BINARY:
ProcUpMsg(ws, message, length);
break;
case uWS::OpCode::TEXT: //文本消息作为心跳处理
ProcHeartMsg(ws, message, length);
break;
default:
printf("C模块:onMessage length=%d,opCode=%d, error!n", (int)length, opCode);
break;
}
});
m_hub.onConnection([](uWS::WebSocket<uWS::SERVER> *ws, uWS::HttpRequest req) {
CSwitch.status.currConnCount++;
#ifdef ECHOMODE
return;
#endif
ProcConnect(ws,req);
});
m_hub.onDisconnection([](uWS::WebSocket<uWS::SERVER> *ws, int code, char *message, size_t length) {
CSwitch.status.currConnCount--;
#ifdef ECHOMODE
return;
#endif
ProcDisConnect(ws,code,message,length);
});
uv_loop_t *m_pLoop = (uv_loop_t *)m_hub->getLoop();
uv_timer_t *m_pHandler = (uv_timer_t *)calloc(1,sizeof(uv_timer_t));
uv_timer_init(m_pLoop, m_pHandler);
m_pHandler->data = (void*)m_hub;
uv_timer_start(m_pHandler, (uv_timer_cb)ProcLoop, 10, 10);
if(!m_hub.listen(port))
return;//print error info
m_hub.run();
_stop = true;// not run
}
最后实现Exit
函数,然后编译为.a
的静态库文件:
int Exit(){
if(!_running)
return 0;//print info
if(_stop)
return 0; //print info
else
_stop = true;
for(int i=0;i<5;++i){
usleep(1000000); //1s
if(!_running)
return;
}
}
在 go 文件中引入:
package uws
import "context"
import "sync"
import "unsafe"
/*
#cgo CFLAGS: -I ./
#cgo LDFLAGS: -L ./ -ldefinition -luWebSockets -lssl -lcrypto -lzlib -luv -lpthread -ldl -lstdc++
#include "definition.h"
*/
import "C"
//剩下的自由发挥
Pressure
使用 echo 模式对 C 这边进行压力测试,环境是 i7 6700hq / 1600MHz / CentOS 7.
模式 | 配置 | 连接数 | 消息长度 | 收发包数 | 内存 | CPU(单核百分比) |
---|---|---|---|---|---|---|
echo | 4U8G | 3.6w | 128 Byte | ≈1.2w/s | 12MB | 27~31% |
echo | 4U8G | 20w | 19 Byte | ≈4w/s | 80MB | 34~43% |
没有更好的测试环境,但可以估计出在 300w 连接下,echo 模式下 60 w/s 的发包速率(假设网卡自带包转发控制器,CPU影响忽略不计) 那么可以估算出内存的使用大致在1200MB
附近,当然了,由于是单线程,预估计 60w 的连接数就会到阈值.不过加上现在的微服务化,可以使用分布式进行负载均衡.没必要都堆到一个服务里,不然一旦崩溃,影响的用户数也会太大.