欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

rabbitmq---- 数据管理模块-交换机数据管理

最编程 2024-10-03 15:06:30
...

管理的字段

需要管理的数据有下面这个5个
交换机名称:交换机的唯一标识
交换机类型:交换机有三种类型,直接交换/广播交换/主题交换。决定了消息的转发方式。
持久化标识:决定了交换机信息是否持久化存储。方便断电后恢复。
剩下的俩个字段不需要关心,是为了以后进行扩展的。

 struct Exchange
{
	using ptr = std::shared_ptr<Exchange>;
	std::string name;                                  // 交换机名字
	ExchangeType type;                                 // 交换机类型
	bool durable;                                      // 交换机持久化标志位
	bool auto_delete;                                  // 交换机自动删除标志位 (还未实现该功能)
	google::protobuf::Map<std::string, std::string> args; // 其他参数 (方便以后扩展)
}

持久化管理类

交换机的信息提供了持久化管理的操作,我们使用sqlite进行存储。
要管理一个sqlite的操作句柄,这个句柄对象也是我们封装了一下sqlite的操作。

在构造函数种需要传入一个文件路径,也就是存储交换机信息的文件。
sqlite是一个本地化的数据库,不需要通过网络客户端服务器的模式来进行通信。本地提供一个文件就可以进行存储。一个文件就相当于一个数据库database,可以在这个数据库种创建多个表。
我们的交换机/队列/绑定关系信息的数据都是存储在这个文件种的.

class ExchangeMapper
{
private:
    SqliteHelper _sql_helper;	//sqlite句柄
public:
     ExchangeMapper(const std::string &dbfile)
        : _sql_helper(dbfile)
    {
        // 创建父级目录
        const std::string path = FileHelper::parentDirectory(dbfile);
        FileHelper::createDirectory(path);

        // 创建/打开数据库文件
        assert(_sql_helper.open());

        // 创建交换机数据表
        createTable();
    }
}

这就是创建的交换机表。

#define CREATE_TABLE "create table if not exists exchange_table(\
                                    name varchar(32) primary key, \
                                    type int, \
                                    durable int, \
                                    auto_delete int, \
                                    args varchar(128));"
bool ret = _sql_helper.exec(CREATE_TABLE, nullptr, nullptr);
if (ret == false)
{
   DLOG("创建交换机数据库表失败!!");
   abort(); // 直接异常退出程序
}

这个类还提供一个恢复的接口,他会查询交换机表中的所有记录,存放到一个哈希表中。

//返回交换机表中所有数据,用于重启后恢复
std::unordered_map<std::string, Exchange::ptr> recovery(){

	std::unordered_map<std::string, Exchange::ptr> res;
	std::string sql = "select name, type, durable, auto_delete, args from exchange_table";
	_sql_helper.exec(sql, selectCallBack, &res);

	return res;	
}

内存管理类

在内存管理类中包含了交换机持久化管理类的对象,和一个哈希表,用来管理已经存在交换机信息。

在他的构造函数中调用了持久化管理的数据恢复接口,他会查询数据库表中所有的字段,返回一个哈希表,也就完成了交换机数据恢复。

//交换机数据内存管理类,这个类才是对外提供的
class ExchangeManager
{
private:
    std::mutex _mutex;  //这个类对象可能被多线程访问,我们要加锁
    ExchangeMapper _mapper;
    std::unordered_map<std::string,Exchange::ptr> _Exchanges;   //管理已经存在的交换机
  ExchangeManager(const std::string &dbfile)
          :_mapper(dbfile)
   {
       //恢复交换机
       _Exchanges = _mapper.recovery();
   }

申明交换机

在rabbitMQ中不叫创建交换机,而是叫做申明交换机,它是一种强断言的思想,代表着存在及ok,不存在就创建。这个操作也很简单。
先看看哈希表中存不存在,存在就返回true,不存在就构建一个交换机对象,插入哈希表

bool declareExchange(const std::string &name,
            ExchangeType type, bool durable, bool auto_delete,
            const google::protobuf::Map<std::string, std::string> &args)
	{
	std::unique_lock<std::mutex> lock(_mutex);
	auto it = _Exchanges.find(name);
	if(it != _Exchanges.end()){
	   //存在直接return
	   return true;
	}
	
	//定义一个Exchange对象
	auto ecp = std::make_shared<Exchange>(name,type,durable,auto_delete,args);
	//插入进数据库
	if(durable == true) {
	   bool ret = _mapper.insert(ecp);
	   if(ret == false) return false;
	}
	_Exchanges.insert({name,ecp});
	return true;
	}

删除交换机

根据交换机的名称进行一个删除。同时如果持久化存储了,也要删除数据库中的数据。

void deleteExchange(const std::string &name){
            std::unique_lock<std::mutex> lock(_mutex);
            auto it = _Exchanges.find(name);
 if(it == _Exchanges.end()){
         return;
     }

     //删除数据库中数据
     if(it->second->durable == true){
         _mapper.remove(name);
     }
     _Exchanges.erase(name);
 }

获取指定交换机

根据交换机姓名获取指定交换机。

//获取指定交换机
        Exchange::ptr selectExchange(const std::string &name){
            std::unique_lock<std::mutex> lock(_mutex);
            auto it = _Exchanges.find(name);
            if(it == _Exchanges.end()){
                //交换机不存在
                return Exchange::ptr();
            }

            return it->second;
        }

推荐阅读