使用cgo实现高性能WebSocket服务

Go 语言的便捷,协程的高效,GC以及自身的安全性,使得它成为后端业务开发的利器.

之前看过来自 Mail.Ru 工程师分享 Go的百万连接数的WebSocket服务优化 ,*中文翻译版.描述通过使用优化 Go 协程和零拷贝Http Header 实现了对内存的高效利用.

但其中自定义的 epoll 结构中依旧用了锁,个人觉得还是有优化的空间.

可以用高效的 uWebSockets 库配合,使得接入层用高效的 C++ 语言,而后端业务处理则用 Go 语言.这样的好处就是 C++ 可以最高效精准的把控接入层的逻辑处理和内存的使用,而后端的业务开发则是以高效安全为主,使用 Go 正好合适.

这里使用的 uWebSockets 为 0.14.8 tag 的版本,配合 libuv作为事件驱动,zlib 作为压缩算法库,openssl 作为加密库,四个编译为 .a类型的静态库,方便一同编译到程序中,避免运行时动态库缺失的问题.

先设置要用到的全局的变量的头文件definition.h,定义单消息的最大长度:

#pragma once
//与客户端交互的序列化后的上下行消息尺寸最大值(字节)
#define MAX_BUF_SIZE 4096

//c与go之间交换的消息类型
enum struct SwMsgType:char{
    Free=0,    //表示空闲,可重用
    Conn=1,    //表示已建立连接
    Disconn=2, //表示断开连接
    Msg=10,     //表示有消息
};

//交换的消息结构
typedef struct SwMsg{
    enum SwMsgType type;
    short wsRawLen;           //ws上下行的原始长度
    int time;                 //msg进入队列的时间
    long socket;              //ws连接socket
    char wsRawBuf[MAX_BUF_SIZE]; //ws上下行包
}SwMsg;

//go对C这边的一些调控
typedef struct SwControl{
    unsigned int maxConnCount;  //最大连接数的控制
    //...other vars
}SwControl;

//C这边的一些状态指标
typedef struct SwStatus{
    unsigned int uptime;
    unsigned int currConnCount;
    unsigned long recvMsgCount;
    unsigned long sw2GoMsgCount;
    unsigned long sw2GoMsgByteLen;
    unsigned long sw2CMsgCount;
    unsigned long sw2CMsgByteLen;
    unsigned long abandonCDownMsgCount;
    unsigned long abandonGoDownMsgCount;
    unsigned long abandonCUpMsgCount;
    unsigned long abandonGoUpMsgCount;
}SwStatus;

typedef struct SwInfo{
    int recv;
    int capacity;
    int head;
    int tail;
    SwMsg* msg;
}SwInfo;

//与Go主要交换对象
typedef struct Switch{
    SwControl ctrl;
    SwStatus status;
    SwInfo upList;
    SwInfo downList;
}Switch;

//最后就是声明供 go 那边调用的全局函数.
extern Switch* Init(unsigned int maxUpMsgCount, unsigned int maxDownMsgCount);  //初始化
extern int Run(int port,bool isEchoMode);  //运行
extern int Exit();  //退出

定义实际运行处理函数的引用头文件,可以叫做process.h:

#pragma once

#include <map>
#include "uWS.h"
#include "definition.h"

void ProcUpMsg(uWS::WebSocket<uWS::SERVER> *ws, char *message, size_t length); //处理上行消息包
void ProcHeartMsg(uWS::WebSocket<uWS::SERVER> *ws, char *message, size_t length);  //处理心跳包
void ProcDownMsgLoop(uv_timer_t *pHandle); //下行的定时回调
void ProcConnect(uWS::WebSocket<uWS::SERVER> *ws, uWS::HttpRequest req); //处理新的连接
void ProcDisConnect(uWS::WebSocket<uWS::SERVER> *ws, int code, char *message, size_t length); //处理断线请求

bool UpsendConnect(long handle); 
bool UpsendDisConnect(long handle);

//defined in cgo.cpp
extern Switch Cswitch;
extern std::map<long, void *> SocketsMap;

实现process.cpp:

#include <thread>
#include <malloc.h>
#include <uv.h>
#include <process.h>

Switch Cswitch;
std::map<long, void*> SocketsMap;

volatile static bool _inited = false;
volatile static bool _running = false;
volatile static bool _stop = false;

实现Init函数,功能是初始化交换区:

Switch* Init(unsigned int maxUpMsgCount, unsigned int maxDownMsgCount){
    if(_inited) return nullptr;
    memset(&CSwitch,0,sizeof(CSwitch));
    CSwitch.upList.msg = (SwMsg*)calloc(maxUpMsgCount,sizeof(SwMsg));
    CSwitch.upList.capacity = maxUpMsgCount;
    CSwitch.downList.msg = (SwMsg*)calloc(maxDownMsgCount,sizeof(SwMsg));
    CSwitch.downList.capacity = maxDownMsgCount;
    CSwitch.status.uptime = time(0);
    _inited = true;
    return &CSwitch;
}

实现Run函数,这里需要另开线程,因为uWebSockets 启动后是阻塞的,因此需要用另外一个线程去调用:

int Run(int port,bool isEchoMode){
    if(!_inited || _running)
        return -1; //print error info
    thread t(wsRun,port);
    _running = true;
    t.detach();
    return 0;
}

void ProcLoop(uv_timer_t *pHandle){
    uWS::Hub *m_hub = (uWS::Hub*)pHandle->data;
    if(_stop){
         uv_loop_t *m_pLoop = (uv_loop_t *)m_hub->getLoop();
         uv_stop(m_pLoop);
         _running=false;
    }

    if(_running){
#ifdef ECHOMODE
        return;
#endif
        ProcDownMsgLoop(pHandle);
    }
}

static void wsRun(int port){
    static uWS::Hub m_hub;
    m_hub.onMessage([](uWS::WebSocket<uWS::SERVER> *ws, char *message, size_t length, uWS::OpCode opCode) {
        CSwitch.status.recvMsgCount++;
#ifdef ECHOMODE
        ws->send(message, length, opCode);//echo mode
        return;
#endif
        switch(opCode){
        case uWS::OpCode::BINARY:
            ProcUpMsg(ws, message, length);
            break;
        case uWS::OpCode::TEXT: //文本消息作为心跳处理
            ProcHeartMsg(ws, message, length);
            break;
        default:
            printf("C模块:onMessage length=%d,opCode=%d, error!n", (int)length, opCode);
            break;
        }
    });

    m_hub.onConnection([](uWS::WebSocket<uWS::SERVER> *ws, uWS::HttpRequest req) {
        CSwitch.status.currConnCount++;
#ifdef ECHOMODE
        return;
#endif
        ProcConnect(ws,req);
    });
    m_hub.onDisconnection([](uWS::WebSocket<uWS::SERVER> *ws, int code, char *message, size_t length) {
        CSwitch.status.currConnCount--;
#ifdef ECHOMODE
        return;
#endif
        ProcDisConnect(ws,code,message,length);
    });

    uv_loop_t *m_pLoop = (uv_loop_t *)m_hub->getLoop();
    uv_timer_t *m_pHandler = (uv_timer_t *)calloc(1,sizeof(uv_timer_t));
    uv_timer_init(m_pLoop, m_pHandler);
    m_pHandler->data = (void*)m_hub;
    uv_timer_start(m_pHandler, (uv_timer_cb)ProcLoop, 10, 10);

    if(!m_hub.listen(port))
        return;//print error info
    m_hub.run();
    _stop = true;// not run 
}

最后实现Exit函数,然后编译为.a的静态库文件:

int Exit(){
    if(!_running)
        return 0;//print info

    if(_stop)
        return 0; //print info
    else
        _stop = true;
    for(int i=0;i<5;++i){
        usleep(1000000); //1s
        if(!_running)
            return;
    }
}

在 go 文件中引入:

package uws


import "context"
import "sync"
import "unsafe"

/*
#cgo CFLAGS: -I ./
#cgo LDFLAGS: -L ./ -ldefinition -luWebSockets -lssl -lcrypto -lzlib -luv -lpthread -ldl -lstdc++
#include "definition.h"
*/
import "C"

//剩下的自由发挥

Pressure

使用 echo 模式对 C 这边进行压力测试,环境是 i7 6700hq / 1600MHz / CentOS 7.

模式 配置 连接数 消息长度 收发包数 内存 CPU(单核百分比)
echo 4U8G 3.6w 128 Byte ≈1.2w/s 12MB 27~31%
echo 4U8G 20w 19 Byte ≈4w/s 80MB 34~43%

没有更好的测试环境,但可以估计出在 300w 连接下,echo 模式下 60 w/s 的发包速率(假设网卡自带包转发控制器,CPU影响忽略不计) 那么可以估算出内存的使用大致在1200MB附近,当然了,由于是单线程,预估计 60w 的连接数就会到阈值.不过加上现在的微服务化,可以使用分布式进行负载均衡.没必要都堆到一个服务里,不然一旦崩溃,影响的用户数也会太大.