NNG 通信模式

news2025/7/10 18:26:46

NNG 是 nanomsg 的继任版本,纯c语言开发,工作模式分为几种:

1,Pipeline (A One-Way Pipe)

单向通信,类似与生产者消费者模型的消息队列,消息从推方流向拉方。

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>

#include <nng/nng.h>
#include <nng/protocol/pipeline0/pull.h>
#include <nng/protocol/pipeline0/push.h>

#define NODE0 "node0"
#define NODE1 "node1"

void
fatal(const char *func, int rv)
{
        fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
        exit(1);
}

int
node0(const char *url)
{
        nng_socket sock;
        int rv;

        if ((rv = nng_pull0_open(&sock)) != 0) {
                fatal("nng_pull0_open", rv);
        }
        if ((rv = nng_listen(sock, url, NULL, 0)) != 0) {
                fatal("nng_listen", rv);
        }
        for (;;) {
                char *buf = NULL;
                size_t sz;
                if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
                        fatal("nng_recv", rv);
                }
                printf("NODE0: RECEIVED \"%s\"\n", buf); 
                nng_free(buf, sz);
        }
}

int
node1(const char *url, char *msg)
{
        int sz_msg = strlen(msg) + 1; // '\0' too
        nng_socket sock;
        int rv;
        int bytes;

        if ((rv = nng_push0_open(&sock)) != 0) {
                fatal("nng_push0_open", rv);
        }
        if ((rv = nng_dial(sock, url, NULL, 0)) != 0) {
                fatal("nng_dial", rv);
        }
        printf("NODE1: SENDING \"%s\"\n", msg);
        if ((rv = nng_send(sock, msg, strlen(msg)+1, 0)) != 0) {
                fatal("nng_send", rv);
        }
        sleep(1); // wait for messages to flush before shutting down
        nng_close(sock);
        return (0);
}

int
main(int argc, char **argv)
{
        if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0))
                return (node0(argv[2]));

        if ((argc > 2) && (strcmp(NODE1, argv[1]) == 0))
                return (node1(argv[2], argv[3]));

        fprintf(stderr, "Usage: pipeline %s|%s <URL> <ARG> ...'\n",
                NODE0, NODE1);
        return (1);
}

//gcc pipeline.c -lnng -o pipeline

2,Request/Reply (I ask, you answer)

请求-应答模式,如果不应答,请求方会一直请求。

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <time.h>

#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
#include <nng/protocol/reqrep0/req.h>

#define NODE0 "node0"
#define NODE1 "node1"
#define DATE "DATE"

void
fatal(const char *func, int rv)
{
        fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
        exit(1);
}

char *
date(void)
{
        time_t now = time(&now);
        struct tm *info = localtime(&now);
        char *text = asctime(info);
        text[strlen(text)-1] = '\0'; // remove '\n'
        return (text);
}

int
node0(const char *url)
{
        nng_socket sock;
        int rv;

        if ((rv = nng_rep0_open(&sock)) != 0) {
                fatal("nng_rep0_open", rv);
        }
          if ((rv = nng_listen(sock, url, NULL, 0)) != 0) {
                fatal("nng_listen", rv);
        }
        for (;;) {
                char *buf = NULL;
                size_t sz;
                if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
                        fatal("nng_recv", rv);
                }
                if ((sz == (strlen(DATE) + 1)) && (strcmp(DATE, buf) == 0)) {
                        printf("NODE0: RECEIVED DATE REQUEST\n");
                        char *d = date();
                        printf("NODE0: SENDING DATE %s\n", d);
                        if ((rv = nng_send(sock, d, strlen(d) + 1, 0)) != 0) {
                                fatal("nng_send", rv);
                        }
                }
                nng_free(buf, sz);
        }
}

int
node1(const char *url)
{
        nng_socket sock;
        int rv;
        size_t sz;
        char *buf = NULL;

        if ((rv = nng_req0_open(&sock)) != 0) {
                fatal("nng_socket", rv);
        }
        if ((rv = nng_dial(sock, url, NULL, 0)) != 0) {
                fatal("nng_dial", rv);
        }
        printf("NODE1: SENDING DATE REQUEST %s\n", DATE);
        if ((rv = nng_send(sock, DATE, strlen(DATE)+1, 0)) != 0) {
                fatal("nng_send", rv);
        }
        if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
                fatal("nng_recv", rv);
        }
        printf("NODE1: RECEIVED DATE %s\n", buf);  
        nng_free(buf, sz);
        nng_close(sock);
        return (0);
}

int
main(const int argc, const char **argv)
{
        if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0))
                return (node0(argv[2]));

        if ((argc > 1) && (strcmp(NODE1, argv[1]) == 0))
                return (node1(argv[2]));

      fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", NODE0, NODE1);
      return (1);
}

3,Pair (Two Way Radio)

一对一双向通信,类似对讲机

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include <nng/nng.h>
#include <nng/protocol/pair0/pair.h>

#define NODE0 "node0"
#define NODE1 "node1"

void
fatal(const char *func, int rv)
{
        fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
        exit(1);
}

int
send_name(nng_socket sock, char *name)
{
        int rv;
        printf("%s: SENDING \"%s\"\n", name, name);
        if ((rv = nng_send(sock, name, strlen(name) + 1, 0)) != 0) {
                fatal("nng_send", rv);
        }
        return (rv);
}

int
recv_name(nng_socket sock, char *name)
{
        char *buf = NULL;
        int rv;
        size_t sz;
        if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) == 0) {
                printf("%s: RECEIVED \"%s\"\n", name, buf); 
                nng_free(buf, sz);
        }
        return (rv);
}

int
send_recv(nng_socket sock, char *name)
{
        int rv;
        if ((rv = nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 100)) != 0) {
                fatal("nng_setopt_ms", rv);
        }
        for (;;) {
                recv_name(sock, name);
                sleep(1);
                send_name(sock, name);
        }
}

int
node0(const char *url)
{
        nng_socket sock;
        int rv;
        if ((rv = nng_pair0_open(&sock)) != 0) {
                fatal("nng_pair0_open", rv);
        }
         if ((rv = nng_listen(sock, url, NULL, 0)) !=0) {
                fatal("nng_listen", rv);
        }
        return (send_recv(sock, NODE0));
}

int
node1(const char *url)
{
        nng_socket sock;
        int rv;
        sleep(1);
        if ((rv = nng_pair0_open(&sock)) != 0) {
                fatal("nng_pair0_open", rv);
        }
        if ((rv = nng_dial(sock, url, NULL, 0)) != 0) {
                fatal("nng_dial", rv);
        }
        return (send_recv(sock, NODE1));
}

int
main(int argc, char **argv)
{
        if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0))
                return (node0(argv[2]));

        if ((argc > 1) && (strcmp(NODE1, argv[1]) == 0))
                return (node1(argv[2]));

        fprintf(stderr, "Usage: pair %s|%s <URL> <ARG> ...\n", NODE0, NODE1);
        return 1;
}

4,Pub/Sub (Topics & Broadcast) 

单向广播

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <unistd.h>

#include <nng/nng.h>
#include <nng/protocol/pubsub0/pub.h>
#include <nng/protocol/pubsub0/sub.h>

#define SERVER "server"
#define CLIENT "client"

void
fatal(const char *func, int rv)
{
        fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
}

char *
date(void)
{
        time_t now = time(&now);
        struct tm *info = localtime(&now);
        char *text = asctime(info);
        text[strlen(text)-1] = '\0'; // remove '\n'
        return (text);
}

int
server(const char *url)
{
        nng_socket sock;
        int rv;

        if ((rv = nng_pub0_open(&sock)) != 0) {
                fatal("nng_pub0_open", rv);
        }
        if ((rv = nng_listen(sock, url, NULL, 0)) < 0) {
                fatal("nng_listen", rv);
        }
        for (;;) {
                char *d = date();
                printf("SERVER: PUBLISHING DATE %s\n", d);
                if ((rv = nng_send(sock, d, strlen(d) + 1, 0)) != 0) {
                        fatal("nng_send", rv);
                }
                sleep(1);
        }
}

int
client(const char *url, const char *name)
{
        nng_socket sock;
        int rv;

        if ((rv = nng_sub0_open(&sock)) != 0) {
                fatal("nng_sub0_open", rv);
        }

        // subscribe to everything (empty means all topics)
        if ((rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) {
                fatal("nng_setopt", rv);
        }
        if ((rv = nng_dial(sock, url, NULL, 0)) != 0) {
                fatal("nng_dial", rv);
        }
        for (;;) {
                char *buf = NULL;
                size_t sz;
                if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
                        fatal("nng_recv", rv);
                }
                printf("CLIENT (%s): RECEIVED %s\n", name, buf); 
                nng_free(buf, sz);
        }
}

int
main(const int argc, const char **argv)
{
        if ((argc >= 2) && (strcmp(SERVER, argv[1]) == 0))
                return (server(argv[2]));

          if ((argc >= 3) && (strcmp(CLIENT, argv[1]) == 0))
                return (client (argv[2], argv[3]));

        fprintf(stderr, "Usage: pubsub %s|%s <URL> <ARG> ...\n",
            SERVER, CLIENT);
        return 1;
}

5,Survey (Everybody Votes) 

 用于多节点表决或者服务发现

 

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>

#include <nng/nng.h>
#include <nng/protocol/survey0/survey.h>
#include <nng/protocol/survey0/respond.h>

#define SERVER "server"
#define CLIENT "client"
#define DATE   "DATE"

void
fatal(const char *func, int rv)
{
        fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
        exit(1);
}

char *
date(void)
{
        time_t now = time(&now);
        struct tm *info = localtime(&now);
        char *text = asctime(info);
        text[strlen(text)-1] = '\0'; // remove '\n'
        return (text);
}

int
server(const char *url)
{
        nng_socket sock;
        int rv;

        if ((rv = nng_surveyor0_open(&sock)) != 0) {
                fatal("nng_surveyor0_open", rv);
        }
        if ((rv = nng_listen(sock, url, NULL, 0)) != 0) {
                fatal("nng_listen", rv);
        }
        for (;;) {
                printf("SERVER: SENDING DATE SURVEY REQUEST\n");
                if ((rv = nng_send(sock, DATE, strlen(DATE) + 1, 0)) != 0) {
                        fatal("nng_send", rv);
                }

                for (;;) {
                        char *buf = NULL;
                        size_t sz;
                        rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC);
                        if (rv == NNG_ETIMEDOUT) {
                                break;
                        }
                        if (rv != 0) {
                                fatal("nng_recv", rv);
                        }
                        printf("SERVER: RECEIVED \"%s\" SURVEY RESPONSE\n",
                            buf); 
                        nng_free(buf, sz);
                }

                printf("SERVER: SURVEY COMPLETE\n");
        }
}

int
client(const char *url, const char *name)
{
        nng_socket sock;
        int rv;

        if ((rv = nng_respondent0_open(&sock)) != 0) {
                fatal("nng_respondent0_open", rv);
        }
        if ((rv = nng_dial(sock, url, NULL, NNG_FLAG_NONBLOCK)) != 0) {
                fatal("nng_dial", rv);
        }
        for (;;) {
                char *buf = NULL;
                size_t sz;
                if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) == 0) {
                        printf("CLIENT (%s): RECEIVED \"%s\" SURVEY REQUEST\n",
                            name, buf); 
                        nng_free(buf, sz);
                        char *d = date();
                        printf("CLIENT (%s): SENDING DATE SURVEY RESPONSE\n",
                           name);
                        if ((rv = nng_send(sock, d, strlen(d) + 1, 0)) != 0) {
                                fatal("nng_send", rv);
                        }
                }
        }
}

int
main(const int argc, const char **argv)
{
        if ((argc >= 2) && (strcmp(SERVER, argv[1]) == 0))
                return (server(argv[2]));

        if ((argc >= 3) && (strcmp(CLIENT, argv[1]) == 0))
                return (client(argv[2], argv[3]));

        fprintf(stderr, "Usage: survey %s|%s <URL> <ARG> ...\n",
            SERVER, CLIENT);
        return 1;
}

6, Bus (Routing)

网状连接通信,每个加入节点都可以发送/接受广播消息

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include <nng/nng.h>
#include <nng/protocol/bus0/bus.h>

void
fatal(const char *func, int rv)
{
        fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
        exit(1);
}

int
node(int argc, char **argv)
{
        nng_socket sock;
        int rv;
        size_t sz;

        if ((rv = nng_bus0_open(&sock)) != 0) {
                fatal("nng_bus0_open", rv);
        }
        if ((rv = nng_listen(sock, argv[2], NULL, 0)) != 0) {
                fatal("nng_listen", rv);
        }

        sleep(1); // wait for peers to bind
        if (argc >= 3) {
                for (int x = 3; x < argc; x++) {
                        if ((rv = nng_dial(sock, argv[x], NULL, 0)) != 0) {
                                fatal("nng_dial", rv);
                        }
                }
        }

        sleep(1); // wait for connects to establish

        // SEND
        sz = strlen(argv[1]) + 1; // '\0' too
        printf("%s: SENDING '%s' ONTO BUS\n", argv[1], argv[1]);
        if ((rv = nng_send(sock, argv[1], sz, 0)) != 0) {
                fatal("nng_send", rv);
        }

        // RECV
        for (;;) {
                char *buf = NULL;
                size_t sz;
                if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) !=0) {
                        if (rv == NNG_ETIMEDOUT) {
                                fatal("nng_recv", rv);
                        }
                }
                printf("%s: RECEIVED '%s' FROM BUS\n", argv[1], buf); 
                nng_free(buf, sz);
        }
        nng_close(sock);
        return (0);
}

int
main(int argc, char **argv)
{
        if (argc >= 3) {
                return (node(argc, argv));
        }
        fprintf(stderr, "Usage: bus <NODE_NAME> <URL> <URL> ...\n");
        return 1;
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/8819.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[附源码]java毕业设计基于的图书馆管理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

RabbitMQ(四):RabbitMQ高级特性

消息队列在使用过程中&#xff0c;面临着很多实际问题需要思考&#xff1a; 消息可靠性问题&#xff1a;如何确保发送的消息至少被消费—次延迟消息问题&#xff1a;如何实现消息的延迟投递消息堆积问题&#xff1a;如何解决数百万消息堆积&#xff0c;无法及时消费的问题高可用…

面试官:MySQL 上亿大表如何优化?

背景 XX 实例&#xff08;一主一从&#xff09;xxx 告警中每天凌晨在报 SLA 报警&#xff0c;该报警的意思是存在一定的主从延迟。&#xff08;若在此时发生主从切换&#xff0c;需要长时间才可以完成切换&#xff0c;要追延迟来保证主从数据的一致性&#xff09; XX 实例的慢…

Oracle LiveLabs实验:Manage Database Instance and Memory for Oracle Database 21c

概述 此实验申请地址在这里。 实验帮助在这里。 此实验预估完成时间100分钟。 该研讨会介绍了 Oracle 数据库实例的基本知识&#xff0c;并指导您管理 Oracle 数据库的初始化参数和内存结构。 管理初始化参数以在 Oracle 数据库上执行关键任务&#xff0c;例如管理数据库实…

【教学类-08-01】20221010《门牌号(6层*3间 黑色版)》(大班主题《我们的城市》)

效果展示 背景需求&#xff1a; 我的小课题《运用Python设计大班层次性纸类学具的案例研究》获得2022年MHQ小课题立项&#xff0c;在前期的《学号名字描字帖》《身份证》《数字分合》《破译电话号码》的基础上&#xff0c;需要设计更多与大班主题活动书上的主题相关的学习材料。…

利用css 动画实现节流

节流指的避免过于频繁的执行一个函数&#xff0c;例如&#xff1a;一个保存按钮&#xff0c;为了避免重复提交或者服务器考虑&#xff0c;往往需要对点击行为做一定的限制&#xff0c;不然会频繁的请求接口&#xff0c;之前基本上是通过js去控制节流问题&#xff0c;其实css也能…

k-form-design 改成自己组件步骤

1&#xff09;修改package.json {"name": "kk-form-design","version": "1.0.2","private": false,"description": "基于vue、ant-design-vue的表单设计器,可视化开发表单","license": &quo…

Linux下的截图工具 —— Flameshot

一、简介 Flameshot是一款功能强大但易于使用的屏幕截图软件&#xff0c;中文名称火焰截图。Flameshot 简单易用并有一个CLI版本&#xff0c;所以你也可以从命令行来进行截图。Flameshot 是一个Linux发行版中完全免费且开源的截图工具。 特性&#xff1a; 外观可定制化。易于…

数据结构-红黑树

红黑树 二分查找 二叉树 二叉平衡树 平衡因子不超1 查找和二叉查找一样的 删除和插入比较复杂 四种失去平衡的方法 LR 两步 RL 两步 不断旋转比较耗时 进一步改进&#xff1a; 红黑树RBT 调整的次数少 平衡性不如二叉平衡树 &#xff0c; 插入删除频繁的使用红黑树&…

redis的主从复制,哨兵和cluster集群

一、redis性能管理 &#xff08;1&#xff09; redis-cli 127.0.0.1:6379> info memory ​ &#xff08;2&#xff09; redis-cli info memory used_memory_rss&#xff1a;是Redis向操作系统申请的内存。used_memory&#xff1a;是Redis中的数据占用的内存。used_memo…

新手零基础自学Python,安装并配置环境+教程

第一步&#xff1a;搭建python运行环境 在 Windows 上安装 Python 和安装普通软件一样简单&#xff0c;下载安装包以后猛击“下一步”即可。 Python 安装包下载地址&#xff1a;https://www.python.org/downloads/ 打开该链接&#xff0c;可以看到有两个版本的 Python&#…

java框架 —— Spring

Spring[TOC](Spring)1、概述1.1、优点1.2、组成2. IOC概述2.1 什么是IOC2.1.1 推导过程2.1.2 IOC本质2.2 HelloSpring2.2.1 导入Jar包2.2.2 编写代码2.2.2 思考2.3 IOC过程2.4 IOC 接口3. Bean 管理3.1 基于xml方式——set方法注入3.2 FactoryBean3.3 bean 作用域3.4 bean 生命…

Java——面向对象进阶(封装、继承、多态)

Java面向对象三大特性——封装、继承、多态一、封装1.封装基本概念2.访问修饰符3.Java中封装的理解4.封装的优点二、继承1.为什么需要继承2.继承层次结构3.super和this关键字4.继承语法与设计一个继承体系三、多态1.多态的概念2.多态的实现条件3.多态的优缺点一、封装 1.封装基…

【微服务架构组件】Nacos

初识nacos 最近在整合nacos做配置的热下发&#xff0c;总结下。 Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service的首字母简称&#xff0c;一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。 阿里开源产品&#xff1a;什么是 Nacos 如 Na…

[Leetcode]138. 复制带随机指针的链表

目录 1.题目链接 2.1解法①(暴力) 2.1.1解法思路&#xff1a; 2.1.2代码实现&#xff1a; 2.2解法②(进阶) 2.1.1解法思路&#xff1a; 2.2.2代码实现&#xff1a; 1.题目链接 138. 复制带随机指针的链表 - 力扣&#xff08;LeetCode&#xff09; 2.1解法①(暴力) 2.1.…

软考 - 操作系统

操作系统概述 bit和byte区别 bit 位 说白了就是0或者1&#xff1b;计算机内存中的存储都是01这两个东西。 byte(B) 字节 1byte8bit&#xff08;一字节 8比特&#xff09; 1byte就是1B 1byte 存1个英文字母&#xff0c;2个byte存一个汉字。 了解 操作系统的作用&#xff1…

SpringBoot使用EasyExcel类一键导出数据库数据生成Excel,导入Excle生成List<>数据(作者直接给demo项目)

文章目录一、简单一键导出Excel直接给出生成效果Empty&#xff0c;这个很关键controller层EasyExcel类的多种使用方式二、导入Excel生成List<>数据controller层&#xff0c;简单写法监听器写法&#xff08;观察者模式&#xff09;&#xff0c;稍微麻烦其他如果要使类中的…

动态拼接 merge 语句

【问题】 Hello everyone, I have one query, would be great if anyone can help me out on this. In SQL, I have two tables with same column names. Want to query if there is any difference in the column values and if yes will update the values(in the first ta…

LEADTOOLS 入门教程: 使用 AWS Lambda 转换文档 - C# .NET Core

LEADTOOLS 是一个综合工具包的集合&#xff0c;用于将识别、文档、医疗、成像和多媒体技术整合到桌面、服务器、平板电脑、网络和移动解决方案中&#xff0c;是一项企业级文档自动化解决方案&#xff0c;有捕捉&#xff0c;OCR&#xff0c;OMR&#xff0c;表单识别和处理&#…

SSM框架真没那么难,这份阿里大佬的进阶实战笔记真给讲透了!

SSM框架&#xff1a; SSM框架是spring MVC &#xff0c;spring和mybatis框架的整合&#xff0c;是标准的MVC模式&#xff0c;将整个系统划分为表现层&#xff0c;controller层&#xff0c;service层&#xff0c;DAO层四层 使用spring MVC负责请求的转发和视图管理 spring实现…