hiredis异步调用的接口封装

hiredis 是来自 redis 官方的 C 语言驱动(官方没有推出C++版的驱动,其他的都不是官方的,请认准一家),支持所有 redis 功能的调用,并且包含同步与异步模式,效率极高。
下载最新的稳定版0.13.3,使用make生成静态库和动态库,选择是否install安装到系统环境中。我通常直接将库放到项目中,指定include路径,这样可以减少项目中开发的环境不一致问题。

再要求高并发的项目中,应该尽量避免使用同步的方法,虽然同步的编程难度相比于异步而言要低很多。hiredis 中同时提供了同步和异步的调用接口,其代码质量十分优秀。

使用异步的接口需要导入头文件hiredis.hasync.h,最后还要导入异步事件的驱动头文件,hiredis 提供了对 ae,libev,libevent,libuv,qt 还有 macosx 异步库的支持,相应头文件在 adapters 文件夹中,这里用 uv 库.

头文件如下:

#include "hiredis/hiredis.h"
#include "hiredis/async.h"
#include "hiredis/adapters/libuv.h"

typedef struct redisConfig {
    char addr[256];//the full domain max length is 255 bytes, IPv4 max length is 15 bytes and IPv6 is 39 bytes.
    char pwd[513];//the redis pwd max length is 512 bytes
    unsigned short port;
    unsigned short index;
    char instanceCode[32];
    char clusterCode[32];
}redisConfig;

enum connRedisStatus :char {
    AuthFailed = -2,
    RedisError = -1,
    Disconnect=0,
    Connecting,
    Connected,
    Authing,
    AuthedSuccess,

};
struct privateData{
    class redisAsyncConnector* connector;
    class redisAsyncInstance* instance;
};

class redisAsyncConnector {
private:
    uv_loop_t* m_loop;
public:
    bool connect(redisAsyncInstance* rdsAsyIns);
    bool disconnect(redisAsyncInstance* rdsAsyIns);
    void verifyConnect(const redisAsyncContext* ac,const char* pwd) noexcept;
    static void onVerifyConnect(redisAsyncContext* ac,void* reply,void* privdata);
    void onConnected(const redisAsyncContext* ac,int status); //est
    void onDisconnected(const redisAsyncContext* ac,int status);
    static void invokeConnected(const redisAsyncContext* ac,int status)noexcept; //for redisAsyncConnect callback fn
    static void invokeDisconnected(const redisAsyncContext* ac,int status)noexcept;


};

class redisAsyncInstance {
protected:
    redisAsyncContext* m_rdsAc;
    uv_loop_t* m_loop;
    bool m_isFirstConn;
    connRedisStatus m_status;
    redisAsyncConnector* m_connector;

public:
    redisConfig* m_rdsCfg;
    redisAsyncInstance(redisAsyncInstance&) = delete;
    redisAsyncInstance & operator = (redisAsyncInstance &) = delete;

    void init(redisConfig* cfg,uv_loop_t* loop,redisAsyncConnector* conntor) noexcept;
    static void redisInvoke(redisAsyncContext* ac,void *reply,void* privdata) noexcept; //return data to caller when callback finish
    unsigned short getInstanceIndex() const noexcept;
    char *getInstanceCode() const noexcept;

    bool asyncCommand(const char* const cmd,void* privdata /*other parameters */);

    friend class redisAsyncConnector;
private:
    void onException(/* some parameters */);

    inline bool verifyConnection(){
        return m_status == connRedisStatus::AuthedSuccess;
    }
    inline privateData* makeCallbackData(/* some parameters */);

    void onChangeStatus(connRedisStatus status, const char* infostr = nullptr);
};

创建相应的实例和连接器,将实例的功能只对应基本的执行,一些连接,断开连接,重连等功能交给连接器统一管理。

部分功能实现:

void redisAsyncInstance::init(redisConfig *cfg, uv_loop_t *loop,redisAsyncConnector* connector) noexcept
{
    this->m_loop = loop;
    this->m_rdsCfg = cfg;
    this->m_isFirstConn = true;
    this->m_status = connRedisStatus::Disconnect;
    this->m_connector = connector;
    this->m_rdsAc = nullptr;
    //...
}

void redisAsyncInstance::redisInvoke(redisAsyncContext *ac, void *reply, void *privdata) noexcept
{
    //data processing
}

unsigned short redisAsyncInstance::getInstanceIndex() const noexcept
{
    return this->m_rdsCfg->index;
}

char *redisAsyncInstance::getInstanceCode() const noexcept
{
    return this->m_rdsCfg->instanceCode;
}

bool __attribute__((hot)) redisAsyncInstance::asyncCommand(const char * const cmd, void* privdata)
{
    if(verifyConnection() == false){
        return false;
    }
    redisAsyncCommand(this->m_rdsAc,&redisInvoke,privdata,cmd);
}

void redisAsyncInstance::onChangeStatus(connRedisStatus status, const char *infostr)
{
#define INFO_FORMAT "%-10s / %s %s:%d"
#define INFO_FIELDS this->m_rdsCfg->clusterCode,this->m_rdsCfg->instanceCode,this->m_rdsCfg->addr,this->m_rdsCfg->port

        this->m_status = status;

        switch (status)
        {
        case AuthFailed:
            printf("[INFO][REDIS] 验证失败 " INFO_FORMAT , INFO_FIELDS);
            break;
        case RedisError:
            printf("[INFO][REDIS] 实例错误 " INFO_FORMAT , INFO_FIELDS);
            break;
        case Disconnect:
            printf("[INFO][REDIS] 连接断开 " INFO_FORMAT , INFO_FIELDS);
            break;
        case Connecting:
            printf("[INFO][REDIS] 正在连接 " INFO_FORMAT , INFO_FIELDS);
            break;
        case Connected:
            printf("[INFO][REDIS] 连接成功 " INFO_FORMAT , INFO_FIELDS);
            break;
        case AuthedSuccess:
            printf("[INFO][REDIS] 授权成功 " INFO_FORMAT , INFO_FIELDS);
            break;
        default:
            printf("[INFO][REDIS] 未知状态 " INFO_FORMAT , INFO_FIELDS);
            break;
        }
        if (infostr == nullptr)
            printf(".\n");
        else
            printf(" %s.\n",infostr);
        return;
}

bool redisAsyncConnector::connect(redisAsyncInstance *rdsAsyIns)
{
    if (rdsAsyIns == nullptr)
        return false;//print log
    if(rdsAsyIns->m_status > 0 && rdsAsyIns->m_rdsAc && rdsAsyIns->m_rdsAc->err == REDIS_OK)
        return true;
    if(rdsAsyIns->m_rdsAc == nullptr){
        rdsAsyIns->onChangeStatus(connRedisStatus::Connecting);
        rdsAsyIns->m_rdsAc = redisAsyncConnect(rdsAsyIns->m_rdsCfg->addr,rdsAsyIns->m_rdsCfg->port);
    }
    if(rdsAsyIns->m_rdsAc == nullptr || rdsAsyIns->m_rdsAc->err){
        if(rdsAsyIns->m_rdsAc){
            rdsAsyIns->onChangeStatus(connRedisStatus::RedisError,rdsAsyIns->m_rdsAc->errstr);
            redisAsyncFree(rdsAsyIns->m_rdsAc);
        }//print log
        return false;
    }
    if(redisLibuvAttach(rdsAsyIns->m_rdsAc,rdsAsyIns->m_loop)!=REDIS_OK){
        rdsAsyIns->onChangeStatus(connRedisStatus::Disconnect);
        redisAsyncFree(rdsAsyIns->m_rdsAc);
        return false;
    }
    if(rdsAsyIns->m_rdsAc->data == nullptr){
        privateData* cbData = new privateData();
        cbData->connector = this;
        cbData->instance = rdsAsyIns;
        rdsAsyIns->m_rdsAc->data = (void*)cbData;
    }
    if(redisAsyncSetConnectCallback(rdsAsyIns->m_rdsAc,&invokeConnected) != REDIS_OK){
        redisAsyncFree(rdsAsyIns->m_rdsAc);
        return false;//print log
    }
    if(redisAsyncSetDisconnectCallback(rdsAsyIns->m_rdsAc,&invokeDisconnected) != REDIS_OK){
        redisAsyncFree(rdsAsyIns->m_rdsAc);
        return false;//print log
    }
}

bool redisAsyncConnector::disconnect(redisAsyncInstance *rdsAsyIns)
{
    if(rdsAsyIns->m_rdsAc->data)
        delete (privateData*)rdsAsyIns->m_rdsAc->data;
    if(rdsAsyIns->m_rdsAc)
        redisAsyncDisconnect(rdsAsyIns->m_rdsAc);
    rdsAsyIns->m_rdsAc = nullptr;
}

void redisAsyncConnector::verifyConnect(const redisAsyncContext *ac, const char *pwd) noexcept
{
    if(pwd && *pwd)
        redisAsyncCommand((redisAsyncContext*)ac,&onVerifyConnect,nullptr,"auth %s",pwd);
    else
        redisAsyncCommand((redisAsyncContext*)ac,&onVerifyConnect,nullptr,"ping");
}

void redisAsyncConnector::onVerifyConnect(redisAsyncContext *ac, void* reply,void* privdata)
{
    privateData* pdata = (privateData*)ac->data;
    if(((redisReply*)reply)->type == REDIS_REPLY_ERROR){
        pdata->instance->onChangeStatus(connRedisStatus::AuthFailed,ac->errstr);
        exit(-1);
    }
    pdata->instance->onChangeStatus(connRedisStatus::AuthedSuccess,ac->errstr);
    pdata->instance->m_isFirstConn = false;
    redisEnableKeepAlive(&(ac->c));
}


void redisAsyncConnector::onConnected(const redisAsyncContext *ac, int status)
{
    privateData* cbData = (privateData*)ac->data;
    if(ac->err != REDIS_OK){
        cbData->instance->onChangeStatus(connRedisStatus::Disconnect,cbData->instance->m_rdsAc->errstr);
        cbData->connector->connect(cbData->instance);
        return;//ok,there will run at connect fn
    }
    cbData->instance->onChangeStatus(connRedisStatus::Connected,cbData->instance->m_rdsAc->errstr);
    if(cbData->instance->m_isFirstConn){
        if(cbData->instance->m_rdsAc != ac){
            delete cbData->instance->m_rdsAc->data;
            redisAsyncDisconnect(cbData->instance->m_rdsAc);
            cbData->instance->m_rdsAc = (redisAsyncContext*)ac;
        }
    }
    verifyConnect(ac,cbData->instance->m_rdsCfg->pwd);
}

void redisAsyncConnector::onDisconnected(const redisAsyncContext *ac, int status)
{
    privateData* cbData = (privateData*)ac->data;
    cbData->instance->onChangeStatus(connRedisStatus::Disconnect,ac->errstr);
    connect(cbData->instance); //reconnect;
}

void redisAsyncConnector::invokeConnected(const redisAsyncContext *ac, int status) noexcept
{
      privateData* cbData = (privateData*)ac->data;
      cbData->connector->onConnected(ac,status);
}

void redisAsyncConnector::invokeDisconnected(const redisAsyncContext *ac, int status) noexcept
{
    privateData* cbData = (privateData*)ac->data;
    cbData->connector->onDisconnected(ac,status);
}