目录
一 原理
二 API
1. ftok
2. msgget
3. msgctl
4. msgsnd
5. msgrcv
三 demo代码
四 基于责任链模式和消息队列对数据处理
1. 什么是责任链模式
2. 下面基于责任链模式来对消息队列获取的消息进行处理
前置
其实system v 版本的进程间通信,设计的接口都类似,所以会用一个也就会用其他的了,比如前面的共享内存的shmget获取shmid,本章讲的消息队列提供的msgget获取msgid是一样的,接口都大同小异。
不过消息队列不同于共享内存的是,发送的数据是有类型的,不像共享内存没有类型的数据发送。
下面的消息结构
struct msgbuf {
long mtype; // 谁发送的
char mtext[N]; // 发送的消息
};
一 原理
1. 和共享内存一样通过系统调用并让物理内存和共享区进行映射。
2. 但不一样的是共享内存通信的时候不需要系统调用,但消息队列需要,所以消息队列读写操作,默认没数据读会阻塞,缓冲区满了写会阻塞,自带同步与互斥。
3. 先通过系统调用建立共享区与物理内存的映射,后续在通过系统调用进行发送和接收消息,发送消息的时候要携带消息id,代表是谁发的,后续收消息要根据这个id区分拿谁发的消息,毕竟总不能自己发自己收吧。
二 API
1. ftok
#include <sys/types.h>
#include <sys/ipc.h>
key_t ftok(const char *pathname, int proj_id);
和共享内存一样这里就不详细介绍了,形成唯一的 key_t 类型标识唯一性。
2. msgget
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(
key_t key, // ftok的返回值
int msgflg // 和共享内存一样 IPC_CREAT/IPC_EXCL
// IPC_CREAT 存在则返回已经存在的,否则创建新的
// IPC_CREAT | IPC_EXCL 不纯在则创建新的,纯在则出错返回
);
// 返回值为0正常,-1错误
3. msgctl
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgctl(
int msqid, // msgget的返回值
int cmd, // 对已经存在的消息队列 CURD 操作
struct msqid_ds *buf // 消息队列的属性字段
);
// 返回值为0正常,失败-1
4. msgsnd
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgsnd(
int msqid, // msgget的返回值
const void *msgp, // 消息的数据类型结构
size_t msgsz, // 大小(不携带该结构第一个字段 (long mtype))
int msgflg // 怎么发,阻塞/非阻塞等
);
// 返回值0正常,-1错误
5. msgrcv
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
ssize_t msgrcv(
int msqid, // msgget的返回值
void *msgp, // 消息结构类型
size_t msgsz, // 大小(不携带类型第一个字段)
long msgtyp, // 该类型第一个字段,表示收谁的消息
int msgflg // 怎么收,阻塞/非阻塞等
);
// 返回值>0实际读到的数据个数,为-1读出错
三 demo代码
#pragma once
#include <iostream>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <unistd.h>
#include <cstring>
// 初始化数据
const int mydefault = -1;
// 标识谁发的消息
const int Ser = 1;
const int Cli = 2;
// 协同 ftok 形成的 key_t 类型
const static std::string path = "/home/CD/linux/message_queue/Demo_mes";
const static int proj_id = 1234;
// 缓冲区大小
const size_t mes_buff = 4096;
// 消息类型
struct mymessage
{
long mtype;
char mtext[mes_buff];
};
// 转16进制
void To_Hex(int val)
{
char buff[1024] = {0};
snprintf(buff, sizeof(buff), "0x%x", val);
std::cout << buff << std::endl;
return;
}
// 获取 key_t
key_t Getkey(const std::string &mypath, int myproj)
{
key_t key = ftok(mypath.c_str(), myproj);
return key;
}
// 公共方法
class Message_Queue
{
public:
Message_Queue(const std::string &mypath, int myproj) : _msqid(mydefault), _path(mypath), _proj_id(myproj), _key(mydefault)
{
_key = Getkey(_path, _proj_id);
if (_key == -1)
{
std::cout << "Get key failed" << std::endl;
exit(-1);
}
To_Hex(_key);
}
// 创建者创建消息队列
void Create()
{
// 创建并设置权限为 读写
_msqid = msgget(_key, IPC_CREAT | IPC_EXCL | 0666);
if (_msqid == -1)
{
std::cout << "Create _msqid failed" << std::endl;
exit(-2);
}
}
// 获取已经存在的消息独立额
void User()
{
_msqid = msgget(_key, IPC_CREAT);
if (_msqid == -1)
{
std::cout << "User get failed" << std::endl;
exit(-2);
}
}
// 释放消息队列
void destroy()
{
if (_msqid != -1)
{
int n = msgctl(_msqid, IPC_RMID, nullptr);
if (n == -1)
{
std::cout << "remove messque falied" << std::endl;
exit(3);
}
}
}
// 发送消息
void Sendmessage()
{
// 定义消息对象并初始化
mymessage data;
memset(&data, 0, sizeof(data));
// 标明谁发的
data.mtype = Ser;
while (true)
{
std::string s;
std::getline(std::cin, s);
memcpy(&data.mtext, s.c_str(), sizeof(s));
// 发送消息 不能携带第一个字段 缓冲区满了就阻塞
int n = msgsnd(_msqid, &data, sizeof(data.mtext), 0);
if (n == -1)
{
std::cout << "msgsnd failed" << std::endl;
}
if (data.mtext[0] == 'q')
break;
}
}
// 接收消息
void Recvmessage()
{
// 定义消息对象
mymessage data;
while (true)
{
// 接收消息 不能携带第一个字段 收谁的消息 缓冲区为空就阻塞
int n = msgrcv(_msqid, &data, sizeof(data.mtext), Ser, 0);
data.mtext[n] = 0;
if (n == -1)
{
std::cout << "msgrcv failed" << std::endl;
}
if (data.mtext[0] == 'q')
break;
std::cout << data.mtext << std::endl;
}
}
~Message_Queue()
{}
private:
// 标识消息队列id
int _msqid;
// 形成的 key_t
std::string _path;
int _proj_id;
key_t _key;
};
// 创建者/收消息
class server : public Message_Queue
{
public:
server() : Message_Queue(path, proj_id)
{
Message_Queue::Create();
Message_Queue::Recvmessage();
}
~server()
{
Message_Queue::destroy();
}
};
// 获取者/发消息
class client : public Message_Queue
{
public:
client() : Message_Queue(path, proj_id)
{
Message_Queue::User();
Message_Queue::Sendmessage();
}
~client() {}
};
四 基于责任链模式和消息队列对数据处理
1. 什么是责任链模式
责任链属于行为类设计模式,也就是程序在运行的时候每个模块之间都有任务和优先级,哪些任务完成或者不完成的结果要交给下一个节点(处理点),也可以不启动这个任务,就好比食堂打饭打完这个菜,后面的菜可打可不打。
2. 下面基于责任链模式来对消息队列获取的消息进行处理
- 给消息加上时间戳
- 把消息保存到文件
- 如果文件行数超过一个范围则重命名
基于多态实现的责任链
- 先分别创建3个任务类继承自剧中调度类
- 因为都继承了这个类,在这个类定义一个指针,在把这个指针指向下一个节点的剧中调度类
- 调用这个父类被继承的方法,就会构成多态转而去调用子类的方法进行数据处理
3. demo代码
Chai_of_responsibility.hpp
#include <iostream>
#include <memory>
#include <ctime>
#include <string>
#include <unistd.h>
#include <fstream>
#include <cstdio>
class base
{
public:
base() : _status(true) {}
virtual ~base() {}
virtual void hander(const std::string &message) = 0;
public:
void setnext(std::shared_ptr<base> next)
{
_next = next;
}
void setstatusfalse()
{
_status = false;
}
void setstatustrue()
{
_status = true;
}
protected:
std::shared_ptr<base> _next;
bool _status;
};
class format : public base
{
public:
format() {}
~format() {}
void hander(const std::string &message) override
{
std::string str;
if (_status == true)
{
// 拼上时间戳/pid
str += std::to_string(time(nullptr));
str += ' ';
str += std::to_string(getpid()) + '\n';
std::cout << "拼上 时间戳和pid" << std::endl;
}
str += message;
std::cout << "str -> " << str << std::endl;
if (_next != nullptr)
{
_next->hander(str);
}
else
{
std::cout << "format hander over" << std::endl;
}
}
};
const std::string default_path = "/home/CD/linux/message_queue/Demo_mes/test_path/";
const std::string default_name = "111.txt";
class save : public base
{
public:
save(const std::string &path = default_path, const std::string &name = default_name)
: _path(path), _name(name)
{
}
~save() {}
void hander(const std::string &message) override
{
// 保存到文件
std::ofstream os;
if (_status == true)
{
std::string str = _path + _name;
os.open(str, std::ios::app);
if (!os.is_open())
{
std::cout << "save file failed" << std::endl;
}
os << message;
std::cout << "保存成功" << std::endl;
}
if (_next != nullptr)
{
_next->hander(message);
}
else
{
std::cout << "save hander over" << std::endl;
}
os.close();
}
private:
std::string _path;
std::string _name;
};
const int range = 5;
class backup : public base
{
public:
backup(const std::string &path = default_path, const std::string &name = default_name)
: _path(path), _name(name)
{
}
~backup() {}
private:
bool line_range(const std::string &file_path)
{
std::ifstream ifs;
ifs.open(file_path);
int cnt = 0;
std::string s;
while (std::getline(ifs, s))
{
cnt++;
}
ifs.close();
return cnt > range;
}
public:
void hander(const std::string &message) override
{
// 重命名/备份
std::string str = _path + _name;
std::string sname = _name + std::to_string(time(nullptr));
if (_status == true)
{
if (line_range(_path + _name))
{
if (rename(str.c_str(), (_path + sname).c_str()) == -1)
{
std::cout << "rename failed" << std::endl;
}
std::cout << "重命名成功" << std::endl;
}
}
if (_next != nullptr)
{
_next->hander(str);
}
else
{
std::cout << "backup hander over" << std::endl;
}
}
private:
std::string _path;
std::string _name;
};
class enter
{
public:
enter()
{
_fm = std::make_shared<format>();
_sa = std::make_shared<save>();
_bk = std::make_shared<backup>();
_fm->setnext(_sa);
_sa->setnext(_bk);
_bk->setnext(nullptr);
}
void Choose(bool fm, bool sa, bool bk)
{
fm ? _fm->setstatustrue() : _fm->setstatusfalse();
sa ? _sa->setstatustrue() : _sa->setstatusfalse();
bk ? _bk->setstatustrue() : _bk->setstatusfalse();
}
void run(const std::string messgae)
{
_fm->hander(messgae);
}
~enter()
{
}
private:
std::shared_ptr<format> _fm;
std::shared_ptr<save> _sa;
std::shared_ptr<backup> _bk;
};