hiredis 是来自 redis 官方的 C 语言驱动(官方没有推出C++版的驱动,其他的都不是官方的,请认准一家),支持所有 redis 功能的调用,并且包含同步与异步模式,效率极高。
下载最新的稳定版0.13.3,使用make
生成静态库和动态库,选择是否install
安装到系统环境中。我通常直接将库放到项目中,指定include
路径,这样可以减少项目中开发的环境不一致问题。
再要求高并发的项目中,应该尽量避免使用同步的方法,虽然同步的编程难度相比于异步而言要低很多。hiredis 中同时提供了同步和异步的调用接口,其代码质量十分优秀。
使用异步的接口需要导入头文件hiredis.h
和async.h
,最后还要导入异步事件的驱动头文件,hiredis 提供了对 ae,libev,libevent,libuv,qt 还有 macosx 异步库的支持,相应头文件在 adapters 文件夹中,这里用 uv 库.
头文件如下:
#include "hiredis/hiredis.h"
#include "hiredis/async.h"
#include "hiredis/adapters/libuv.h"
typedef struct redisConfig {
char addr[256];//the full domain max length is 255 bytes, IPv4 max length is 15 bytes and IPv6 is 39 bytes.
char pwd[513];//the redis pwd max length is 512 bytes
unsigned short port;
unsigned short index;
char instanceCode[32];
char clusterCode[32];
}redisConfig;
enum connRedisStatus :char {
AuthFailed = -2,
RedisError = -1,
Disconnect=0,
Connecting,
Connected,
Authing,
AuthedSuccess,
};
struct privateData{
class redisAsyncConnector* connector;
class redisAsyncInstance* instance;
};
class redisAsyncConnector {
private:
uv_loop_t* m_loop;
public:
bool connect(redisAsyncInstance* rdsAsyIns);
bool disconnect(redisAsyncInstance* rdsAsyIns);
void verifyConnect(const redisAsyncContext* ac,const char* pwd) noexcept;
static void onVerifyConnect(redisAsyncContext* ac,void* reply,void* privdata);
void onConnected(const redisAsyncContext* ac,int status); //est
void onDisconnected(const redisAsyncContext* ac,int status);
static void invokeConnected(const redisAsyncContext* ac,int status)noexcept; //for redisAsyncConnect callback fn
static void invokeDisconnected(const redisAsyncContext* ac,int status)noexcept;
};
class redisAsyncInstance {
protected:
redisAsyncContext* m_rdsAc;
uv_loop_t* m_loop;
bool m_isFirstConn;
connRedisStatus m_status;
redisAsyncConnector* m_connector;
public:
redisConfig* m_rdsCfg;
redisAsyncInstance(redisAsyncInstance&) = delete;
redisAsyncInstance & operator = (redisAsyncInstance &) = delete;
void init(redisConfig* cfg,uv_loop_t* loop,redisAsyncConnector* conntor) noexcept;
static void redisInvoke(redisAsyncContext* ac,void *reply,void* privdata) noexcept; //return data to caller when callback finish
unsigned short getInstanceIndex() const noexcept;
char *getInstanceCode() const noexcept;
bool asyncCommand(const char* const cmd,void* privdata /*other parameters */);
friend class redisAsyncConnector;
private:
void onException(/* some parameters */);
inline bool verifyConnection(){
return m_status == connRedisStatus::AuthedSuccess;
}
inline privateData* makeCallbackData(/* some parameters */);
void onChangeStatus(connRedisStatus status, const char* infostr = nullptr);
};
创建相应的实例和连接器,将实例的功能只对应基本的执行,一些连接,断开连接,重连等功能交给连接器统一管理。
部分功能实现:
void redisAsyncInstance::init(redisConfig *cfg, uv_loop_t *loop,redisAsyncConnector* connector) noexcept
{
this->m_loop = loop;
this->m_rdsCfg = cfg;
this->m_isFirstConn = true;
this->m_status = connRedisStatus::Disconnect;
this->m_connector = connector;
this->m_rdsAc = nullptr;
//...
}
void redisAsyncInstance::redisInvoke(redisAsyncContext *ac, void *reply, void *privdata) noexcept
{
//data processing
}
unsigned short redisAsyncInstance::getInstanceIndex() const noexcept
{
return this->m_rdsCfg->index;
}
char *redisAsyncInstance::getInstanceCode() const noexcept
{
return this->m_rdsCfg->instanceCode;
}
bool __attribute__((hot)) redisAsyncInstance::asyncCommand(const char * const cmd, void* privdata)
{
if(verifyConnection() == false){
return false;
}
redisAsyncCommand(this->m_rdsAc,&redisInvoke,privdata,cmd);
}
void redisAsyncInstance::onChangeStatus(connRedisStatus status, const char *infostr)
{
#define INFO_FORMAT "%-10s / %s %s:%d"
#define INFO_FIELDS this->m_rdsCfg->clusterCode,this->m_rdsCfg->instanceCode,this->m_rdsCfg->addr,this->m_rdsCfg->port
this->m_status = status;
switch (status)
{
case AuthFailed:
printf("[INFO][REDIS] 验证失败 " INFO_FORMAT , INFO_FIELDS);
break;
case RedisError:
printf("[INFO][REDIS] 实例错误 " INFO_FORMAT , INFO_FIELDS);
break;
case Disconnect:
printf("[INFO][REDIS] 连接断开 " INFO_FORMAT , INFO_FIELDS);
break;
case Connecting:
printf("[INFO][REDIS] 正在连接 " INFO_FORMAT , INFO_FIELDS);
break;
case Connected:
printf("[INFO][REDIS] 连接成功 " INFO_FORMAT , INFO_FIELDS);
break;
case AuthedSuccess:
printf("[INFO][REDIS] 授权成功 " INFO_FORMAT , INFO_FIELDS);
break;
default:
printf("[INFO][REDIS] 未知状态 " INFO_FORMAT , INFO_FIELDS);
break;
}
if (infostr == nullptr)
printf(".\n");
else
printf(" %s.\n",infostr);
return;
}
bool redisAsyncConnector::connect(redisAsyncInstance *rdsAsyIns)
{
if (rdsAsyIns == nullptr)
return false;//print log
if(rdsAsyIns->m_status > 0 && rdsAsyIns->m_rdsAc && rdsAsyIns->m_rdsAc->err == REDIS_OK)
return true;
if(rdsAsyIns->m_rdsAc == nullptr){
rdsAsyIns->onChangeStatus(connRedisStatus::Connecting);
rdsAsyIns->m_rdsAc = redisAsyncConnect(rdsAsyIns->m_rdsCfg->addr,rdsAsyIns->m_rdsCfg->port);
}
if(rdsAsyIns->m_rdsAc == nullptr || rdsAsyIns->m_rdsAc->err){
if(rdsAsyIns->m_rdsAc){
rdsAsyIns->onChangeStatus(connRedisStatus::RedisError,rdsAsyIns->m_rdsAc->errstr);
redisAsyncFree(rdsAsyIns->m_rdsAc);
}//print log
return false;
}
if(redisLibuvAttach(rdsAsyIns->m_rdsAc,rdsAsyIns->m_loop)!=REDIS_OK){
rdsAsyIns->onChangeStatus(connRedisStatus::Disconnect);
redisAsyncFree(rdsAsyIns->m_rdsAc);
return false;
}
if(rdsAsyIns->m_rdsAc->data == nullptr){
privateData* cbData = new privateData();
cbData->connector = this;
cbData->instance = rdsAsyIns;
rdsAsyIns->m_rdsAc->data = (void*)cbData;
}
if(redisAsyncSetConnectCallback(rdsAsyIns->m_rdsAc,&invokeConnected) != REDIS_OK){
redisAsyncFree(rdsAsyIns->m_rdsAc);
return false;//print log
}
if(redisAsyncSetDisconnectCallback(rdsAsyIns->m_rdsAc,&invokeDisconnected) != REDIS_OK){
redisAsyncFree(rdsAsyIns->m_rdsAc);
return false;//print log
}
}
bool redisAsyncConnector::disconnect(redisAsyncInstance *rdsAsyIns)
{
if(rdsAsyIns->m_rdsAc->data)
delete (privateData*)rdsAsyIns->m_rdsAc->data;
if(rdsAsyIns->m_rdsAc)
redisAsyncDisconnect(rdsAsyIns->m_rdsAc);
rdsAsyIns->m_rdsAc = nullptr;
}
void redisAsyncConnector::verifyConnect(const redisAsyncContext *ac, const char *pwd) noexcept
{
if(pwd && *pwd)
redisAsyncCommand((redisAsyncContext*)ac,&onVerifyConnect,nullptr,"auth %s",pwd);
else
redisAsyncCommand((redisAsyncContext*)ac,&onVerifyConnect,nullptr,"ping");
}
void redisAsyncConnector::onVerifyConnect(redisAsyncContext *ac, void* reply,void* privdata)
{
privateData* pdata = (privateData*)ac->data;
if(((redisReply*)reply)->type == REDIS_REPLY_ERROR){
pdata->instance->onChangeStatus(connRedisStatus::AuthFailed,ac->errstr);
exit(-1);
}
pdata->instance->onChangeStatus(connRedisStatus::AuthedSuccess,ac->errstr);
pdata->instance->m_isFirstConn = false;
redisEnableKeepAlive(&(ac->c));
}
void redisAsyncConnector::onConnected(const redisAsyncContext *ac, int status)
{
privateData* cbData = (privateData*)ac->data;
if(ac->err != REDIS_OK){
cbData->instance->onChangeStatus(connRedisStatus::Disconnect,cbData->instance->m_rdsAc->errstr);
cbData->connector->connect(cbData->instance);
return;//ok,there will run at connect fn
}
cbData->instance->onChangeStatus(connRedisStatus::Connected,cbData->instance->m_rdsAc->errstr);
if(cbData->instance->m_isFirstConn){
if(cbData->instance->m_rdsAc != ac){
delete cbData->instance->m_rdsAc->data;
redisAsyncDisconnect(cbData->instance->m_rdsAc);
cbData->instance->m_rdsAc = (redisAsyncContext*)ac;
}
}
verifyConnect(ac,cbData->instance->m_rdsCfg->pwd);
}
void redisAsyncConnector::onDisconnected(const redisAsyncContext *ac, int status)
{
privateData* cbData = (privateData*)ac->data;
cbData->instance->onChangeStatus(connRedisStatus::Disconnect,ac->errstr);
connect(cbData->instance); //reconnect;
}
void redisAsyncConnector::invokeConnected(const redisAsyncContext *ac, int status) noexcept
{
privateData* cbData = (privateData*)ac->data;
cbData->connector->onConnected(ac,status);
}
void redisAsyncConnector::invokeDisconnected(const redisAsyncContext *ac, int status) noexcept
{
privateData* cbData = (privateData*)ac->data;
cbData->connector->onDisconnected(ac,status);
}