云原生时代顶流消息中间件Apache Pulsar部署实操之Pulsar IO与Pulsar SQL

news2025/7/10 21:38:17

文章目录

  • Pulsar IO (Connector连接器)
    • 基础定义
    • 安装Pulsar和内置连接器
    • 连接Pulsar到Cassandra
      • 安装cassandra集群
      • 配置Cassandra接收器
      • 创建Cassandra Sink
      • 验证Cassandra Sink结果
      • 删除Cassandra Sink
    • 连接Pulsar到PostgreSQL
      • 安装PostgreSQL集群
      • 配置JDBC接收器
      • 创建JDBC Sink
      • 验证JDBC Sink结果
  • Pulsar SQL
    • 定义
    • 简单使用

Pulsar IO (Connector连接器)

基础定义

Pulsar IO连接器能够轻松地创建、部署和管理与外部系统(如Apache Cassandra、Aerospike等)交互的连接器。IO连接器有两种类型:源连接器和接收器连接器。

image-20230212113058962

可以通过Connector Admin CLI使用源和接收器子命令管理Pulsar连接器(例如,在连接器上创建、更新、启动、停止、重新启动、重新加载、删除和执行其他操作)。有关最新和完整的信息,请参阅Pulsar管理文档。

安装Pulsar和内置连接器

在将Pulsar连接到数据库之前,需要先安装Pulsar和所需的内置连接器。要启用Pulsar连接器,您需要在下载页面上下载连接器的tarball版本。

# 下载最新版本2.11.0的pulsar-io-cassandra和pulsar-io-jdbc-postgres,需要什么连接器可以从官方查看是否支持并下载,这里举例就下载两个
https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.11.0/connectors/pulsar-io-cassandra-2.11.0.nar
https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.11.0/connectors/pulsar-io-jdbc-postgres-2.11.0.nar
# 在pulsar根目录下创建目录
mkdir connectors
# 将压缩文件移动connectors目录
mv pulsar-io-jdbc-postgres-2.11.0.nar pulsar-io-jdbc-postgres-2.11.0.nar connectors
# 重启pulsar
# 查看可用连接器列表
curl -w '\n' -s http://localhost:8080/admin/v2/functions/connectors

image-20230308101658342

连接Pulsar到Cassandra

安装cassandra集群

# 下载镜像并启动cassandra测试容器
docker run -d --rm --name=cassandra -p 9042:9042 cassandra
# 查看进程
docker ps
# 查看运行日志
docker logs cassandra
# 等待一小段时间后查看Cassandra集群状态
docker exec cassandra nodetool status
# 使用cqlsh连接到Cassandra集群

image-20230308091133556

# 使用cqlsh连接到Cassandra集群
docker exec -ti cassandra cqlsh localhost
# 创建一个密钥空间pulsar_itxs_keyspace
CREATE KEYSPACE pulsar_itxs_keyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};
# 创建一个表pulsar_itxs_table
USE pulsar_itxs_keyspace;
CREATE TABLE pulsar_itxs_table (key text PRIMARY KEY, col text);

image-20230308092441377

配置Cassandra接收器

现在已经有一个Cassandra集群在本地运行;要运行Cassandra接收器连接器,需要准备一个配置文件,其中包括Pulsar连接器运行时需要知道的信息,例如Pulsar连接器如何找到Cassandra集群,Pulsar连接器用于写入Pulsar消息的键空间和表是什么等等;可以使用Json或者Yaml这两种格式创建配置文件。

vim examples/cassandra-sink.json

{
    "roots": "192.168.3.100:9042",
    "keyspace": "pulsar_itxs_keyspace",
    "columnFamily": "pulsar_itxs_table",
    "keyname": "key",
    "columnName": "col"
}

vim examples/cassandra-sink.yml

configs:
    roots: "192.168.3.100:9042"
    keyspace: "pulsar_itxs_keyspace"
    columnFamily: "pulsar_itxs_table"
    keyname: "key"
    columnName: "col"

创建Cassandra Sink

可以使用Connector Admin CLI创建sink连接器和操作。运行下面命令来创建一个Cassandra接收器连接器,接收器类型为Cassandra,配置文件为上一步创建的examples/cassandra-sink.yml。

bin/pulsar-admin sinks create \
    --tenant my-test \
    --namespace my-namespace \
    --name cassandra-itxs-sink \
    --sink-type cassandra \
    --sink-config-file examples/cassandra-sink.yml \
    --inputs persistent://my-test/my-namespace/itxs_cassandra    

命令执行后,Pulsar创建接收器连接器cassandra-itxs-sink。这个接收器连接器作为Pulsar函数运行,并将主题itxs_cassandra中产生的消息写入Cassandra表pulsar_itxs_table;

image-20230308103049990

可以使用Connector Admin CLI对连接器进行监控和其他操作。

  • 获取连接器的信息
bin/pulsar-admin sinks get \
  --tenant my-test \
  --namespace my-namespace \
  --name cassandra-itxs-sink
  • 检查连接器的状态
bin/pulsar-admin sinks status \
  --tenant my-test \
  --namespace my-namespace \
  --name cassandra-itxs-sink

验证Cassandra Sink结果

生成一些消息到Cassandra接收器itxs_cassandra的输入主题

for i in {0..9}; do bin/pulsar-client produce -m "itxskey-$i" -n 1 persistent://my-test/my-namespace/itxs_cassandra; done

再次查看连接器的状态,可以有10条记录处理统计信息

image-20230308103247012

查看Cassandra的pulsar_itxs_table

USE pulsar_itxs_keyspace;
select * from pulsar_itxs_table;

image-20230308105728900

删除Cassandra Sink

bin/pulsar-admin sinks delete \
    --tenant my-test \
    --namespace my-namespace \
    --name cassandra-itxs-sink

连接Pulsar到PostgreSQL

安装PostgreSQL集群

这里使用PostgreSQL 12 docker镜像在docker中启动一个单节点PostgreSQL集群。

# 从Docker中拉取PostgreSQL 12映像
docker pull postgres:12
# 启动postgres容器
docker run -d -it --rm \
    --name pulsar-postgres \
    -p 5432:5432 \
    -e POSTGRES_PASSWORD=password \
    -e POSTGRES_USER=postgres \
    postgres:12
# 查看运行日志
docker logs -f pulsar-postgres
# 进入容器
docker exec -it pulsar-postgres /bin/bash
# 使用默认用户名和密码登录PostgreSQL
psql -U postgres postgres
# 使用以下命令创建pulsar_postgres_jdbc_sink表:
create table if not exists pulsar_postgres_jdbc_sink
(
id serial PRIMARY KEY,
name VARCHAR(255) NOT NULL
);

配置JDBC接收器

现在有一个本地运行的PostgreSQ,接下来需要配置JDBC接收器连接器。

  • 创建配置文件vim connectors/pulsar-postgres-jdbc-sink.yaml
configs:
  userName: "postgres"
  password: "password"
  jdbcUrl: "jdbc:postgresql://192.169.3.100:5432/postgres"
  tableName: "pulsar_postgres_jdbc_sink"

创建JDBC Sink

执行下面命令后,Pulsar将创建接收器连接器pulse -postgres-jdbc-sink。这个sink连接器作为Pulsar函数运行,并将Topic为pulsar-postgres-jdbc-sink-topic中产生的消息写入PostgreSQL表pulsar_postgres_jdbc_sink。

bin/pulsar-admin sinks create \
    --tenant my-test \
    --namespace my-namespace \
    --archive ./connectors/pulsar-io-jdbc-postgres-2.11.0.nar \
    --inputs persistent://my-test/my-namespace/pulsar-postgres-jdbc-sink-topic \
    --name pulsar-postgres-my-jdbc-sink \
    --sink-config-file ./connectors/pulsar-postgres-jdbc-sink.yaml \
    --parallelism 1

列出所有的sink

bin/pulsar-admin sinks list \
    --tenant my-test \
    --namespace my-namespace

image-20230308140145820

验证JDBC Sink结果

通过JavaAPI生成一些消息到Cassandra接收器pulsar-postgres-jdbc-sink-topic这个主题,在Java项目添加maven依赖

    <properties>
        <pulsar.version>2.11.0</pulsar.version>
    </properties>        
        

        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client</artifactId>
            <version>${pulsar.version}</version>
        </dependency>

这里演示实体类成员变量简单就直接使用public声明了

package sn.itxs.pulsar.io;

public class User{
    public int id;
    public String name;
}

新增ClientDemo.java

package sn.itxs.pulsar.io;

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.schema.AvroSchema;

public class ClientDemo {
    public static void main(String[] args) throws Exception {
        PulsarClient client = null;
        Producer<User> producer = null;
        try {
            client = PulsarClient.builder()
                    .serviceUrl("pulsar://192.168.5.52:6650")
                    .build();

            producer = client.newProducer(AvroSchema.of(User.class))
                    .topic("persistent://my-test/my-namespace/pulsar-postgres-jdbc-sink-topic")
                    .create();
            User user = new User();
            int index = 10;
            while (index++ < 20) {
                try {
                    user.id = index;
                    user.name = "this is a test " + index;
                    producer.newMessage().value(user).send();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            System.out.println("send finish");
        } catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            if (producer!=null){
                producer.close();
            }
            if (client!=null){
                client.close();
            }
        }
    }
}

运行程序后查看PostgreSQL表pulsar_postgres_jdbc_sink,已经有刚才

image-20230308163802240

上面由于在Java中创建了Schema,因此不需要手工创建,可以查看当前persistent://my-test/my-namespace/pulsar-postgres-jdbc-sink-topic主体已生成Schema信息如下:

image-20230308165053977

如果要从pulsar-admin命令行创建schema可以这样操作

  • 创建schema,创建一个avro-schema文件,将以下内容复制到该文件中,并将该文件放在pulsar/connectors文件夹中。vim connectors/avro-schema
{
  "type": "AVRO",
  "schema": "{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"]},{\"name\":\"name\",\"type\":[\"null\",\"string\"]}]}",
  "properties": {}
}
  • 上传schema到topic,将avro-schema模式上传到pulsar-postgres-jdbc-sink-topic主题
bin/pulsar-admin schemas upload persistent://my-test/my-namespace/pulsar-postgres-jdbc-sink-topic -f ./connectors/avro-schema
  • 检查模式是否上传成功。
bin/pulsar-admin schemas get persistent://my-test/my-namespace/pulsar-postgres-jdbc-sink-topic1

image-20230308145650209

如需stop停止、restart重启指定的sinks可以如下操作,当然也可以更新指定sinks,详细命令可以查阅官网

bin/pulsar-admin sinks stop \
    --tenant my-test \
    --namespace my-namespace \
    --name pulsar-postgres-my-jdbc-sink \

Pulsar SQL

定义

Apache Pulsar用于存储事件数据流,事件数据由预定义的字段构成。通过模式注册表的实现,可以在Pulsar中存储结构化数据,并使用Trino(以前是Presto SQL)查询数据。作为Pulsar SQL的核心,Pulsar Trino插件使Trino集群中的Trino worker能够查询来自Pulsar的数据.

image-20230308170103068

由于Pulsar采用了基于两级段的架构,因此查询性能高效且可扩展性强。Pulsar中的主题在Apache BookKeeper中存储为段。每个主题段被复制到一些BookKeeper节点上,从而支持并发读和高读吞吐量。在Pulsar Trino连接器中,数据直接从BookKeeper中读取,因此Trino worker可以同时从水平可扩展数量的BookKeeper节点中读取

image-20230308170332105

简单使用

在Pulsar中查询数据前,需要安装Pulsar和内置连接器。

# 这里演示就直接启动独立集群
PULSAR_STANDALONE_USE_ZOOKEEPER=1 ./bin/pulsar standalone
# 启动一个Pulsar SQL worker
./bin/pulsar sql-worker run
# 初始化Pulsar独立集群和SQL worker后,执行SQL CLI:
./bin/pulsar sql
show catalogs;
show schemas in pulsar;
show tables in pulsar."public/default";

image-20230308172341425

通过前面的Java示例,我们改为Json格式写入Pulsar的user-topic

package sn.itxs.pulsar.io;

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;

public class ClientSqlDemo {
    public static void main(String[] args) throws Exception {
        PulsarClient client = null;
        Producer<User> producer = null;
        try {
            client = PulsarClient.builder()
                    .serviceUrl("pulsar://192.168.5.52:6650")
                    .build();

            producer = client.newProducer(Schema.JSON(User.class))
                    .topic("user-topic")
                    .create();
            User user = new User();
            int index = 10;
            while (index++ < 20) {
                try {
                    user.id = index;
                    user.name = "this is a test " + index;
                    producer.newMessage().value(user).send();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            System.out.println("send finish");
        } catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            if (producer!=null){
                producer.close();
            }
            if (client!=null){
                client.close();
            }
        }
    }
}

运行程序后再来查询就有刚才发送的消息数据,_开头的字段为Pulsar 自带的。

select * from pulsar."public/default"."user-topic";

image-20230308175830023

  • 本人博客网站IT小神 www.itxiaoshen.com

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/396465.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

redis cluster配置之read-mode

背景生产部署了redis集群&#xff0c;三台机器&#xff08;三主三从&#xff0c;主从不在同一台机器上&#xff09;&#xff0c;redission连接使用。当有一个master节点挂掉时&#xff0c;redis整个集群不可用。解决过程运维登上机器上&#xff0c;执行cluster info发现集群OK状…

JAVA开发(数据类型String和HasMap的实现原理)

在JAVA开发中&#xff0c;使用最多的数据类型恐怕是String 和 HasMap两种数据类型。在开发的过程中我们每天都使用的不亦乐乎。但是相信很多人都没有考虑过String数据类型的实现原理或者说是在数据结构中的存储原理&#xff0c;还有一个就是是HashMap&#xff0c;也很少有人去了…

SNAP中根据入射角和相干图使用波段计算计算垂直形变--以门源地震为例

SNAP计算垂直形变0 写在前面1 具体步骤1.1 准备数据1.2 在SNAP中打开波段运算Band Maths1.3 之前计算的水平位移displacement如下图数据的其他处理请参考博文在SNAP中用sentinel-1数据做InSAR测量&#xff0c;以门源地震为例 0 写在前面 如果假设没有平行于传感器视线的水平运…

案例27-单表从9个更新语句调整为2个

目录 一&#xff1a;背景介绍 二&#xff1a;思路&方案 三&#xff1a;过程 1.项目结构 2.准备一个普通的maven项目&#xff0c;部署好mysql数据库 3.在项目中引入pom依赖 5.编写MyBitis配置文件 6.编写Mysql配置类 7.编写通用Update语句 8.项目启动类 四&#xff1a;总…

用GRU实现情感分析:不需要长记忆,也能看懂你的心情

❤️觉得内容不错的话&#xff0c;欢迎点赞收藏加关注&#x1f60a;&#x1f60a;&#x1f60a;&#xff0c;后续会继续输入更多优质内容❤️&#x1f449;有问题欢迎大家加关注私戳或者评论&#xff08;包括但不限于NLP算法相关&#xff0c;linux学习相关&#xff0c;读研读博…

JavaScript_Object.keys() Object.values()

目录 一、Object.keys() 二、Object.values() 一、Object.keys() Object.keys( ) 的 用法 : 作用 &#xff1a;遍历对象 { } 返回结果&#xff1a;返回 对象中 每一项 的 key 值 返回值 : 是一个 *** [ 数 组 ] *** 例子 ( 1 ) : <script>// 1. 定义一个对象var obj …

【硬件】P沟道和N沟道MOS管开关电路设计

场效应管做的开关电路一般分为两种&#xff0c;一种是N沟道&#xff0c;另一种是P沟道&#xff0c;如果电路设计中要应用到高端驱动的话&#xff0c;可以采用PMOS来导通。P沟道MOS管开关电路PMOS的特性&#xff0c;Vgs小于一定的值就会导通&#xff0c;当Vgs<0,即Vs>Vg,管…

扬帆优配|高送转+高分红+高增长潜力股揭秘

高送转且高分红的高增加股票&#xff0c;有望跑赢大盘。 此前七连阴的泽宇智能&#xff0c;今日早盘大幅高开。到上午收盘&#xff0c;该股飙涨9.3%&#xff0c;位居涨幅榜前列。音讯面上&#xff0c;3月7日晚间&#xff0c;泽宇智能发表2022年年报&#xff0c;年报显现&#x…

深入学习Spring——笔记

实习之余多学点&#xff0c;希望一个月之内能够完成这个笔记 Spring笔记3-8BeanFactory && ApplicationContextBeanFactoryApplicationContext3-8 BeanFactory && ApplicationContext BeanFactory 首先&#xff0c;从SpringBoot的主启动类来看 SpringBootA…

JMU软件20 UML复习资料

碎碎念 &#xff08;印象中复习资料漏了的知识点&#xff1a;P175&#xff1a;通信图&#xff0c;P168&#xff1a;UML顺序图的图框&#xff0c;都考完了也懒得再整理了&#xff0c;自己翻书看吧&#xff09; 昂&#xff0c;把下面这些都学会大概率不会不及格&#xff0c;要求…

蓝库云|数字化车间建设,是加速制造业数智化转型的关键因素

什么是制造业的数字化车间&#xff0c;对于传统制造业来说这也许是个新词汇&#xff0c;但在企业数字化转型中&#xff0c;数字化车间的存在至关重要&#xff0c;其意思就是将制造业车间里所有的工作流程数字化&#xff0c;实现设备、生产流程、工人等各环节之间的数字化管理与…

SQL注入——时间盲注

目录 一&#xff0c;时间盲注概述 二&#xff0c;关键函数 sleep() if() 三&#xff0c;注入原理 四&#xff0c;实例 一&#xff0c;时间盲注概述 web页面只返回一个正常页面。利用页面响应时间不同&#xff0c;逐个猜解数据。但是前提是数据库会执行命令代码&#xff…

dashboard疏散主机提示报错:无法疏散主机...处理方法、openstack虚拟机状态卡在重启处理方法、openstack在数据库修改虚拟机状态的方法

文章目录dashboard疏散主机提示报错&#xff1a;无法疏散主机...处理方法报错说明【状态卡在reboot状态】解决方法【登录nova数据库修改虚拟机信息】首先获取nova数据库的密码登录nova数据库并做修改验证信息是否修改成功再次迁移并验证报错说明【虚拟机状态error也会导致疏散失…

二叉树的遍历(前序、中序、后序)| C语言

目录 0.写在前面 1.前序遍历 步骤详解 代码实现 2.中序遍历 步骤详解 代码实现 3.后序遍历 步骤详解 代码实现 0.写在前面 认识二叉树结构最简单的方式就是遍历二叉树。所谓遍历二叉树就是按照某种特定的规则&#xff0c;对二叉树的每一个节点进行访问&#xff0c;…

QML动画(Animator)

在Qt5.2之后&#xff0c;引入Animator动画元素。这种方式可以直接所用于Qt Quick的场景图形系统&#xff0c;这使得基于Animator元素的动画及时在ui界面线程阻塞的情况下仍然能通过图形系统的渲染线程来工作&#xff0c;比传统的基于对象和属性的Animation元素能带来更好的用户…

CAD如何导入其他图纸的打印设置?CAD打印设置导入步骤

CAD打印设置怎么导入&#xff1f;这个问题相信很多设计师小伙伴在CAD图纸打印过程中都曾想到过&#xff0c;但不知道CAD如何导入其他图纸的打印设置&#xff0c;今天小编就以浩辰CAD软件为例来给大家分享一下CAD打印设置导入的具体操作步骤&#xff0c;一起来看看吧&#xff01…

把第三方sdk放在thinkphp的那个目录

ThinkPHP5.1 如何自动加载第三方SDK&#xff08;非composer包 &#xff09;注意&#xff1a;这里只是针对于非Composer 安装包的自动加载的实现&#xff0c;能用composer安装的自动跳过。由于ThinkPHP5.1 严格遵循PSR-4规范&#xff0c;不再建议手动导入类库文件&#xff0c;所…

应用实战|微信小程序开发示例--多人聊天互动空间

“超能力”数据库&#xff5e;拿来即用&#xff0c;应用开发人员再也不用为撰写API而发愁。MemFire Cloud 为开发者提供了简单易用的云数据库&#xff08;表编辑器、自动生成API、SQL编辑器、备份恢复、托管运维&#xff09;&#xff0c;很大地降低开发者的使用门槛。 本示例是…

[Web]——限流

限流概念&#xff1a;什么是限流呢&#xff1f;限流是限制到达系统的并发请求数量&#xff0c;保证系统能够正常响应部分用户请求&#xff0c;而对于超过限制的流量&#xff0c;则通过拒绝服务的方式保证整体系统的可用性。限流的分类:根据作用范围可以分为单机限流和分布式限流…

Delphi 实现HTML邮件发送

在我们的邮箱里&#xff0c;经常收到HTML格式的邮件。每注册一个网站的时候&#xff0c;总会收到一些他们发来的邮件&#xff0c;打开一后发现和一个网页一样&#xff0c;有图片、链接、文字&#xff0c;甚至有的还有声音和视频和交互。那我们想知道我们怎么才可以给朋友发送这…