NNG pair 异步通信

news2025/7/12 17:02:56

一,利用NNG pair模式,实现异步通信。

二,manager端  绑定地址,回调函数里 接收 异步消息:

#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#include <nng/nng.h>
#include <nng/protocol/pair0/pair.h>
#include <nng/supplemental/util/platform.h>

#include <iostream>
#include <thread>
#include <chrono>
#include <atomic>
#include <signal.h>
#include <sys/wait.h>

using namespace std;
using namespace std::chrono;

static bool exit_flag = false;

void recv_data_callback(void *arg);
static void sig_handler(int sig)
{
    exit_flag = true;
    std::cout << "sig_handler " << exit_flag << endl;
}

void fatal(const char *func, int rv)
{
    fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
    exit(1);
}

class Manager
{
public:
    //初始化
    bool init()
    {
        //创建io 并绑定回调函数
        rv = nng_aio_alloc(&aio, recv_data_callback, this);
        if (rv < 0)
        {
            fatal("cannot allocate aio", rv);
        }

        //打开
        rv = nng_pair0_open(&sock);
        if (rv != 0)
        {
            fatal("nng_pair0_open", rv);
        }

        //设置缓冲区大小
        nng_socket_set_int(sock, NNG_OPT_SENDBUF, 2048);
        nng_socket_set_int(sock, NNG_OPT_RECVBUF, 2048);

        //开始监听
        if ((rv = nng_listen(sock, url.c_str(), NULL, 0)) != 0)
        {
            fatal("nng_listen", rv);
        }
        nng_recv_aio(sock, aio);

        isInit = true;
        return isInit;
    }

    //发送数据
    void send(const std::string &msgStr)
    {
        if (!isInit)
            return;

        if (!isInit)
			return;

		nng_msg *msg = NULL;
		nng_msg_alloc(&msg, sizeof(msgStr));
		memcpy(nng_msg_body(msg), msgStr.c_str(), sizeof(msgStr));

		nng_sendmsg(sock, msg, 0);
    }

public:
    nng_socket sock;
    nng_aio *aio{nullptr};

private:
    int rv;
    std::string url{"ipc:///tmp/pair"};
    bool isInit{false};
};

void recv_data_callback(void *arg)
{
    int rv = 0;
    Manager *manager = static_cast<Manager*>(arg);
    nng_msg *msg = NULL;
    size_t            json_len = 0;
    char *            json_str = NULL;

    rv = nng_aio_result(manager->aio);
    if (0 != rv) {
        fatal("nng_recv error ", rv);
    }

    msg      = nng_aio_get_msg(manager->aio);
    json_str = static_cast<char*>(nng_msg_body(msg));
    json_len = nng_msg_len(msg);

    std::cout<<"recv_data_callback "<<json_str<<std::endl;

    nng_msg_free(msg);
    nng_recv_aio(manager->sock, manager->aio);
}

int main(int argc, char *grgv[])
{
    signal(SIGINT, sig_handler);
    signal(SIGTERM, sig_handler);
    signal(SIGABRT, sig_handler);

    Manager manager;
    if (manager.init())
    {
        cout << "init success" << endl;
    }
    else
    {
        cout << "init failed" << endl;
    }

    while (!exit_flag)
    {
        manager.send("Not bad");
        this_thread::sleep_for(seconds(1));
    }
    return 0;
}

三,adapter 端,同步发送数据,单开一个线程 进行数据的轮询接收。


#include <nng/nng.h>
#include <nng/protocol/pair0/pair.h>
#include <nng/supplemental/util/platform.h>

#include <iostream>
#include <thread>
#include <chrono>
#include <atomic>
#include <signal.h>
#include <sys/wait.h>
#include <string.h>

using namespace std;
using namespace std::chrono;

static bool exit_flag = false;

static void sig_handler(int sig)
{
	exit_flag = true;
	std::cout << "sig_handler " << exit_flag << endl;
}

void fatal(const char *func, int rv)
{
	fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
	exit(1);
}

void recv_data_callback(void *arg)
{
}

class Adapter
{
public:
	//初始化
	bool init()
	{
		//打开
		rv = nng_pair0_open(&sock);
		if (rv != 0)
		{
			fatal("nng_pair0_open", rv);
		}

		//设置缓冲区大小
		nng_socket_set_int(sock, NNG_OPT_SENDBUF, 2048);
		nng_socket_set_int(sock, NNG_OPT_RECVBUF, 2048);

		rv = nng_dial(sock, url.c_str(), &dialer, 0);
		if (rv != 0)
		{
			fatal("nng_dial", rv);
		}

		isInit = true;
		return isInit;
	}

	//开始接收
	void start()
	{
		if (!isInit)
			return;
		std::thread t([&]()
					  {
                    while (!isStop)
        {
            nng_msg * msg = NULL;
			char *  json_str = NULL;
			nng_recvmsg(sock, &msg, 0);
			json_str = static_cast<char*>(nng_msg_body(msg));
			std::cout<<"nng_recvmsg "<<json_str<<std::endl;
        } });
		t.detach();
	}

	void stop()
	{
		isStop = true;
		cout << "stop " << isStop << endl;
	}

	void send(const std::string &msgStr)
	{
		if (!isInit)
			return;

		nng_msg *msg = NULL;
		nng_msg_alloc(&msg, sizeof(msgStr));
		memcpy(nng_msg_body(msg), msgStr.c_str(), sizeof(msgStr));

		nng_sendmsg(sock, msg, 0);
	}

public:
	nng_socket sock;
	nng_dialer dialer;
	std::atomic<bool> isStop{false};

private:
	std::string url{"ipc:///tmp/pair"};
	int rv;
	bool isInit{false};
};

int main(int argc, char *grgv[])
{
	signal(SIGINT, sig_handler);
	signal(SIGTERM, sig_handler);
	signal(SIGABRT, sig_handler);

	Adapter adapter;
	if (adapter.init())
	{
		cout << "init success" << endl;
	}
	else
	{
		cout << "init failed" << endl;
	}
	adapter.start();

	while (!exit_flag)
	{
		adapter.send("How are you?");
		this_thread::sleep_for(seconds(1));
	}
	adapter.stop();
	return 0;
}

 

3,CMakeLists.txt  两端 基本一致

cmake_minimum_required (VERSION 2.8.12)
project(adapter)
set(TARGET_NAME adapter)

find_package(nng CONFIG REQUIRED)

find_package(Threads)

add_executable(${TARGET_NAME} adapter.cpp)
target_link_libraries(${TARGET_NAME} nng::nng)
target_compile_definitions(${TARGET_NAME} PRIVATE NNG_ELIDE_DEPRECATED)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/8857.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

HTTP协议中Gzip格式的流量分析与识别

背景 在协议分析过程中&#xff0c;经常会发现gzip压缩的数据&#xff0c;例如在HTTP协议中&#xff0c;在HTTP头中会标示&#xff0c;内容编码为gzip、DEFLATE。 但是&#xff0c;还有很多情况&#xff0c;例如一些非HTTP协议&#xff0c;特别是私有协议中&#xff0c;数据同…

强大的图片处理工具GraphicsMagick

前言 项目中我们经常需要对图片进行压缩、剪切、添加水印、生成缩略图、图片合成等图片处理操作&#xff0c;关于这些图片复杂处理&#xff0c;我们将如何实现呢&#xff0c;本文将介绍GraphicsMagick对图片进行相关处理功能。 简介 GraphicsMagick是一个免费的创建、编辑、…

16.Redis系列之Redisson分布式锁原理

本文学习Redisson分布式锁的原理以及优缺点 1. Redisson分布式锁原理 lua脚本是原子操作&#xff0c;redis会将整个脚本作为一个整体执行&#xff0c;中间不会被其他命令打断 # RedissonLock.tryLockInnerAsync方法内lua脚本加锁 <T> RFuture<T> tryLockInnerAs…

代码随想录算法训练营第三十四天| LeetCode1005. K 次取反后最大化的数组和、LeetCode134. 加油站、LeetCode135. 分发糖果

一、LeetCode1005. K 次取反后最大化的数组和 1&#xff1a;题目描述&#xff08;1005. K 次取反后最大化的数组和&#xff09; 给你一个整数数组 nums 和一个整数 k &#xff0c;按以下方法修改该数组&#xff1a; 选择某个下标 i 并将 nums[i] 替换为 -nums[i] 。重复这个过…

TNF 又见 《Cell》

现有的研究表明&#xff0c;需要新的机会来增加免疫检查点封锁&#xff08;Immunecheckpoint blockade&#xff0c;ICB&#xff09;的影响。尽管干扰素&#xff08;IFN&#xff09;γ途径同时具有 ICB 抗性因子和治疗机会&#xff0c;但至今为止&#xff0c;研究人员尚未系统地…

采用新项目管理软件的四个步骤

这是采用新项目管理软件的有趣之处&#xff1a;它本身实际上是一个重大项目&#xff0c;而且您的组织越大&#xff0c;这个过程就越艰巨。 当然&#xff0c;成功的项目管理实施最终将有助于简化您的运营并最大限度地提高跨部门的效率——这有利于团队成员的士气、客户满意度…

国内外的免费AI作图工具

1.文心一格 文心一格 - AI艺术和创意辅助平台 “推荐”页面操作比较简单&#xff0c;只需要需要简单的一句话&#xff0c;等几分钟就可以直接生成&#xff1a; 主要可以用来生成不同“氛围感”十足的场景&#xff1a; 美丽的花田&#xff1a; 优点&#xff1a; 1.比较容易…

【数据结构】—— 二叉树(C)

二叉树 文章目录二叉树二叉树的概念&#xff1a;树的术语二叉树的大概样式先序创建二叉树二叉树的遍历方式先序遍历中序遍历后序遍历二叉树的概念&#xff1a; 二叉树&#xff08;Binary Tree&#xff09;是n(n>0)个结点的有限集合&#xff0c;该集合或者为空集&#xff08…

bugku-web-安慰奖

题目没给提示 点开链接 是空白页面 查看源代码 base64加密 拿去解码 备份文件 使用工具跑一下目录 &#xff08;dirsearch) 存在一个flag.php文件 但是访问没有结果 锁定index.php.bak 文件 下载下来 打开 进行代码审计 是php序列化 反序列化的内容 代码审计&…

【Python】Numpy生成坐标网格

文章目录meshgridmgrid和ogridindicesmeshgrid 在三维图的绘制过程中&#xff0c;一般需要x,y,zx,y,zx,y,z之间的对应关系&#xff0c;但对于图像而言&#xff0c;其x,yx,yx,y轴坐标是体现在像素栅格中的&#xff0c;从而图像矩阵中的像素强度&#xff0c;其实表示的是zzz轴的…

深度学习算法应用——使用LSTM对双色球进行统计与预测

前言 福彩双色球的玩法和规则是双色球投注区分为红色球号码区和蓝色球号码区&#xff0c;红色球号码从1-33&#xff0c;蓝色球号码是从1-16。投注方法是&#xff0c;从红色区选出6个不重复的号码再加上蓝色区的一个号组成一个投注组。双色球通过摇奖器确定中奖号码&#xff0c…

Zookeeper中的watch机制

客户端&#xff0c;可以通过在znode上设置watch&#xff0c;实现实时监听znode的变化Watch事件是⼀个⼀次性的触发器&#xff0c;当被设置了Watch的数据发⽣了改变的时候&#xff0c;则服务器将这个改变发送给设置了Watch的客户端⽗节点的创建&#xff0c;修改&#xff0c;删除…

观测云产品更新|新增观测云、SLS 联合解决方案;新增 3 个智能巡检配置文档;新增链路错误追踪查看器等

观测云更新 新增观测云、SLS 联合解决方案 观测云新增 SLS 存储方案&#xff0c;支持阿里云 SLS 用户能够快速使用观测云做数据查看分析。在观测云进行商业版注册/升级时&#xff0c;选择”阿里云账号结算“后&#xff0c;您可以选择SLS 存储方案&#xff0c;将数据存放在自…

SpringBoot SpringBoot 开发实用篇 1 热部署 1.3 热部署范围配置

SpringBoot 【黑马程序员SpringBoot2全套视频教程&#xff0c;springboot零基础到项目实战&#xff08;spring boot2完整版&#xff09;】 SpringBoot 开发实用篇 文章目录SpringBootSpringBoot 开发实用篇1 热部署1.3 热部署范围配置1.3.1 问题引入1.3.2 热部署配置范围1.3.…

Spring Cloud(十):Spring Cloud Skywalking 以及 JavaAgent

链路追踪组件选型 Zipkin是Twitter开源的调用链分析工具&#xff0c;目前基于springcloud sleuth得到了广泛的使用&#xff0c;特点是轻量&#xff0c;使用部署简单。Pinpoint是韩国人开源的基于字节码注入的调用链分析&#xff0c;以及应用监控分析工具。特点是支持多种插件&…

CASIO程序(线路计算6.0版)

一、扩展变量设置说明 统计各种要素点的数目 各要素点数目表 名 称 平曲线交点 竖曲线变坡点 超高起始点 最多台阶数 线路导线点 数目&#xff08;个&#xff09; a b c d e 要素点数目为0时取值 -1 -5/3 0 0 0 备 注 不含起终点 不含起终点 含起…

【LeetCode 力扣】1.两数之和 Java实现 哈希表

题目链接&#xff1a;1.两数之和 1 原题描述&#xff1a; 2 解题思路 初看题目相信大家都能想到枚举的做法&#xff0c;简单来说把数组里面的所有值&#xff0c;均两两组合相加。若结果与target相等&#xff0c;则将两个数字的下标返回即可。 代码实现1&#xff1a; class …

C++入门学习5-共用体,枚举类型,宏定义

入门学习五共用体枚举类型宏定义共用体 共用体也称为联合体&#xff0c;其特点就是用一段连续的内存存储多个不同数据类型的数据&#xff0c;在写法上与结构体相似&#xff0c;但是在同一时刻&#xff0c;共用体中只有一个值是有效的&#xff0c;其大小由共用体中最大的数据类…

做PPT绝对不能错过这5个网站

免费高质量PPT模板网站&#xff0c;建议收藏&#xff01;1、菜鸟图库 https://www.sucai999.com/pptx.html?vNTYxMjky菜鸟图库里面有各种类型的PPT模板和素材。下载后模板可以直接套用&#xff0c;也可以自己添加素材进行修改。所有素材都一一进行了详细的分类&#xff0c;而且…

【前端】Flet:一款支持python及多语言开发的UI库

文章目录介绍开发生态支持语言运行体验组件API热更新开发计划 Roadmap2022 7月-8月安全手机端桌面端Controls(控件)核心功能用户指引&#xff08;User education&#xff09;2022 9月到12月手机端控件&#xff08;Controls&#xff09;编程语言支持核心功能介绍 Flet enables …