Kafka入门教程

news2025/7/16 20:41:25

1 Kafka安装

1.1 压缩包安装

1.1.1 JDK环境安装

Kafka是依赖JDK环境的,所以需要事先安装好JDK

  1. 下载JDK安装包: Oracle JDK8下载
  2. SSH上传到想要安装的目录,比如 /opt.然后使用tar -zxvf jdk-8u351-linux-x64.tar.gz命令解压
    在这里插入图片描述
  3. 添加环境变量 vi /etc/profile,输入如下内容保存后,记得source /etc/profile刷新一下
    #java environment
    export JAVA_HOME=/opt/jdk1.8.0_351
    export JRE_HOME=${JAVA_HOME}/jre  
    export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib  
    export PATH=${JAVA_HOME}/bin:$PATH
    
  4. 输入java -version命令,显示版本号即安装成功
    在这里插入图片描述

1.1.2 Kafka安装

  1. 下载安装包,并解压到所需目录: kafka下载地址
  2. 根据如下文档操作即可: kafka中文文档–快速开始

1.2 Docker compose安装

  1. 事先安装docker,docker-compose服务
  2. 编辑docker-componse.yml配置文件
version: '3.2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    volumes:
      - ./data:/data
    ports:
      - 2182:2181
       
  kafka9094:
    image: wurstmeister/kafka
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 0
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://47.118.83.142:9092
      KAFKA_CREATE_TOPICS: "kafeidou:2:0"   #kafka启动后初始化一个有2个partition(分区)0个副本名叫kafeidou的topic 
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
    volumes:
      - ./kafka-logs:/kafka
    depends_on:
      - zookeeper
  1. 使用docker-compose up -d --build启动服务
  2. 使用docker exec -it kakfa /bin/bash进入docker容器,后续验证操作同安装包安装

如下提供命令实操截图

在这里插入图片描述

1.3 关于Kafka配置

kafka 配置一般在config/server.properties中,一般可能要修改的几个配置如下

    # 集群-broker唯一标识,多个broker记得修改
    broker.id=1
    # 端口号
    listeners=PLAINTEXT://0.0.0.0:9092
    # 日志地址,多个broker记得修改
    log.dir=/tmp/kafka-logs-1
    # 对外暴露的地址,docker记得填宿主机地址
    advertised.listeners=PLAINTEXT://xx:9092

如果linux操作没问题,但是外部连接不上kafka,检查advertised.listeners配置

2. Kafka API

2.1 概述

Kafka常用的API可以分为5类,包括admin,producer,consumer,stream,connect

2.1.1 Admin

实际就包含了topic的增删查功能

2.2.2 Producer

发放方式

  1. 异步非阻塞发送
  2. 异步阻塞发送
  3. 异步回调发送

注意:

  1. Producer是线程安全的,使用多线程操作同一个producer是没问题的
  2. Producer内部是批量发送的,虽然看起来是一条一条发送,但其内部是按批次来发送的

2.2.3 Consumer

  1. consumer不是线程安全的,多线程消费会有两种处理
    a. 每个线程都创建一个consumer,保证线程安全: 可以维护offset,相对更常用一点
    b. 只创建一个consumer,将消息异步处理: 无法保证offset,但是性能好
  2. consumer组内的消费者数量最好是和partition数量一致,绝对不能比partition多
  3. 手动控制offset起始位置: consumer.seek
  4. 限流: consumer.assign();consumer.pause();consumer.resume()
  5. Rabalance

2.3.4 Stream

一般结合spark或者flink等大数据引擎一起用
source processor->processor->stream->processor-> sink processor

2.3.5 Connect

目前来说是比较鸡肋的,不看也罢.主要应用常见就是导入导出数据,比如从A数据源导入B数据源

2.2 实战

实战方面建议从SE API开始进行,如果直接从Springboot开始,容易陷入Spring框架谜团中,不够纯粹.当SE API熟悉后,再整合Springboot会更得心应手.

2.2.1 创建Maven项目

新建空白Maven项目,然后引入Kakfa Client依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>xyz.yq56</groupId>
    <artifactId>kafka_se</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <artifactId>spring-boot-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>2.2.5.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.8.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
    </dependencies>

</project>

2.2.2 Admin

创建和查询topic

public class AdminSample {

    public static final String TOPIC_NAME = "yq_topic";

    private static AdminClient client;

    public static AdminClient adminClient() {
        Properties props = new Properties();
        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.103:9092");
        props.setProperty(AdminClientConfig.RECEIVE_BUFFER_CONFIG, String.valueOf(104857600));

        return AdminClient.create(props);
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        client = adminClient();
        System.out.println(client);

        createTopic();

        listTopics();
    }

    public static void createTopic() {
        short rs = 1;
        NewTopic newTopic = new NewTopic(TOPIC_NAME, 1, rs);
        CreateTopicsResult topics = client.createTopics(Collections.singletonList(newTopic));
        System.out.println("topics:" + topics);
    }

    public static void listTopics() throws ExecutionException, InterruptedException {
        ListTopicsResult topics = client.listTopics();
        Set<String> names = topics.names().get();
        names.forEach(System.out::println);
    }
}

3 Kafka面试题

3.1 Kafka如何保证有序性

  1. Kafka的特性只支持单Partition有序(不建议)
  2. key+offset保证业务有序

3.2 Kafka为什么吞吐量大

  1. 日志顺序读写和快速检索
  2. 零拷贝: sendfile
  3. Partition机制
  4. 批量发送接收和数据压缩机制

4 Kafka原理

4.1 日志

  1. 日志以Partition为单位保存
  2. 目录格式: Topic+数字
  3. 文件格式: 日志条目 序列
  4. 消息: 4字节头+N字节消息(消息长度,版本号 ,CRC校验码,具体消息)

4.1.1 日志分段

  1. 每个partition日志分成n个大小相等的segment
  2. 每个segment中消息数量不一定相等
  3. 每个partition只支持顺序读写

4.1.2 Segment存储结构

  1. Partiton将消息添加到最后一个segment中
  2. Segment达到一定阈值才会flush到磁盘
  3. Segment分为两部分: index和log

4.1.3 日志读操作

  1. 查找segment
  2. 全局offset,计算segment的offset
  3. 通过index的offset查找具体数据

4.1.4 日志写操作

  1. 串行追加消息到文件最后
  2. 文件达到阈值,则滚动到新文件中

4.2 零拷贝

sendfile

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/404588.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【9.数据页结构】

概述 InnoDB 的数据是按「数据页」为单位来读写的&#xff0c;也就是说&#xff0c;当需要读一条记录的时候&#xff0c;并不是将这个记录本身从磁盘读出来&#xff0c;而是以页为单位&#xff0c;将其整体读入内存。数据库的 I/O 操作的最小单位是页&#xff0c;InnoDB 数据页…

【Linux内核三】网络丢包debug案例

&#x1f449;个人主页&#xff1a;highman110 &#x1f449;作者简介&#xff1a;一名硬件工程师&#xff0c;持续学习&#xff0c;不断记录&#xff0c;保持思考&#xff0c;输出干货内容 目录 前言 测试环境 测试现象 ​编辑 定位过程 ​编辑 优化手段 1、加大ring buffer …

X86ARM @Linux平台cache eviction功能测试

经典的ARM处理器高速缓存工作原理: 高速缓存内部结构:

Qt样式表

1>样式表介绍 样式表可通过 QApplication::setStyleSheet()函数将其设置到整个应用程序上&#xff0c;也可以使用 QWidget::setStyleSheet()将其设置到指定的部件或子部件上&#xff0c;不同级别均可设置样式表&#xff0c;称为样式表的层叠。样式表也可通过设计模式编辑样…

vue中render函数的作用及解析

在vue脚手架的main.js文件中&#xff0c;存在这样一段代码&#xff1a; 意思是对vue实例的配置&#xff0c;其中render函数的作用是&#xff0c;将h创建的Node节点信息return返回给Vue.js底层处理文件中的beforeMount()生命周期钩子函数&#xff0c;让其将Node节点信息在界面中…

智能优化算法之蚁群算法

1、蚁群算法概述 蚁群算法&#xff08;Ant Colony Algorithm, ACA&#xff09; 由Marco Dorigo于1992年在他的博士论文中首次提出&#xff0c; 该算法模拟了自然界中蚂蚁的觅食行为。 蚂蚁在寻找食物源时&#xff0c; 会在其经过的路径上释放一种信息素&#xff0c; 并能够感知…

配对变量t检验

区别双变量t检验&#xff0c;见&#xff1a;https://mp.csdn.net/postedit/100640098 配对变量为两两相关的变量&#xff1a;如敷药前后体重变化。 要求&#xff1a;两变量服从正态分布。 SPSS演练 打开数据文件&#xff1a;ptest.sav 载地址&#xff1a;https://download.c…

vscode环境配置文件生成

使用vscode进行C开发时&#xff0c;除了需要安装相应的插件&#xff08;例如&#xff1a;C/C、CMake、MySql等&#xff09;外&#xff0c;还需要对相应的开发环境进行配置&#xff0c;和vs中的环境配置道理相通。一、编译文件介绍配置 C 环境时&#xff0c;会生成.vscode 文件夹…

记录一次消毒碗柜维修

现象&#xff1a;按开始消毒后马上停止&#xff0c;但可以一直按着按钮&#xff0c;就可以消毒&#xff0c;并且30分钟后可以自动停止。分析&#xff1a;消毒柜里面控制器就这3个1 开关只是触发通电&#xff0c;弹起就断开&#xff0c;按下可以接通&#xff0c;判断该零件正常2…

STM32感应开关盖垃圾桶

目录 项目需求 项目框图 ​编辑 硬件清单 sg90舵机介绍及实战 sg90舵机介绍 角度控制 SG90舵机编程实现 超声波传感器介绍及实战 超声波传感器介绍 超声波编程实战 项目设计及实现 项目需求 检测靠近时&#xff0c;垃圾桶自动开盖并伴随滴一声&#xff0c;2秒后关盖…

Hadoop入个门

文章目录1️⃣、Hadoop概述1.1、Hadoop是什么1.2、三大发行版本1.3、优势1.4、组成HDFSYARNMapReduceHDFS、YARN、MapReduce三者关系1.6、大数据技术生态体系image-202303111027195802️⃣、Hadoop运行环境搭建2.1、虚拟机环境准备2.2、克隆虚拟机2.3、在hadoop2上安装JDK2.4、…

cocoscreator+TS 遇到的问题

报错Can not preload the scene "game2" because it is not in the build settings.报错 1209, please go to https://github.com/cocos-creator/engine/blob/develop/EngineErrorMap.md#1209 to see details. Arguments: game2(env: Windows,mg,1.06.2303022; lib: …

掌握Shell脚本的if语句,让你的代码更加精准和高效

前言 大家好&#xff0c;我是沐风晓月&#xff0c;本文首发于csdn&#xff0c; 作者: 我是沐风晓月。 文章收录于 我是沐风晓月csdn专栏 【系统架构实战】专栏中的【shell脚本入门到精通】专栏。 本专栏从零基础带你层层深入&#xff0c;学会shell脚本&#xff0c;不是梦。 &…

核心系统国产平台迁移验证

核心系统国产平台迁移验证 摘要&#xff1a;信息技术应用创新&#xff0c;旨在实现信息技术领域的自主可控&#xff0c;保障国家信息安全。金融领域又是关系国家经济命脉的行业&#xff0c;而对核心交易系统的信息技术应用创新是交易所未来将要面临的重大挑战。为了推进国产化进…

云数据库RDS介绍

RDS介绍 关系型数据库&#xff08;relational database service&#xff0c;简称RDS&#xff09;&#xff0c;是一种可靠、可弹性伸缩的在线数据库服务。 1&#xff09;基于分布式文件系统和SSD盘高性能存储 2&#xff09;支持MySQL、SQL Server、PostgreSQL、MariaDB TX引擎 …

原来不用控制台,也可以轻松调试CSS呀

Ⅰ. 作用 用于调试CSS , 比控制台添更加方便&#xff0c;不需要寻找 &#xff1b;边添加样式&#xff0c;边可以查看效果&#xff0c;适合初学者对CSS 的理解和学习&#xff1b; Ⅱ. 快速实现&#xff08;两边&#xff09; ① 显示这个样式眶 给 head 和 style 标签添加一个…

YOLOS学习记录

在前面&#xff0c;博主已经完成了YOLOS项目的部署与调试任务&#xff0c;并在博主自己构造的数据集上进行了实验&#xff0c;实验结果表明效果并不显著&#xff0c;其实这一点并不意外&#xff0c;反而是在情理之中。众所周知&#xff0c;Transformer一直以来作为NLP领域的带头…

独立开发者案例:每周4h月入数万刀;国家数据局与时代红利;创业前先买个域名;工程师成长最重要的是什么 | ShowMeAI周刊

这是ShowMeAI周刊的第6期。聚焦AI领域本周热点&#xff0c;及其在各圈层泛起的涟漪&#xff1b;关注AI技术进步&#xff0c;并提供我们的商业洞察。欢迎关注与订阅&#xff01;&#x1f440;日报合辑 ⌛ 独立开发者案例&#xff1a;每周只工作4小时&#xff0c;独立开发者打造月…

Docker nginx安装使用

拉取镜像$ docker pull nginx默认会拉取仓库名为nginx&#xff0c;tag为latest的镜像。挂载nginx.conf文件首次启动nginx容器考虑到后面维护nginx配置文件nginx.conf的成本&#xff0c;这里采用docker 数据卷的技术&#xff0c;即将docker中的nginx.conf配置文件挂载到宿主机当…

嵌入式学习笔记——STM32的USART通信概述

文章目录前言常用通信协议分类及其特征介绍通信协议通信协议分类1.同步异步通信2.全双工/半双工/单工3.现场总线/板级总线4. 串行/并行通信5. 有线通信、无线通信STM32通信协议的配置方式使用通信协议控制器实现使用IO口模拟的方式实现STM32串口通信概述什么是串口通信STM32F40…