Kafka + ZooKeeper 开启 SASL_PLAINTEXT 认证(PLAIN机制)最全实战教程
💡 本教程将手把手教你如何为 Kafka 配置基于 SASL_PLAINTEXT + PLAIN 的用户名密码认证机制,包含 Kafka 与 ZooKeeper 的全部配置,适合入门。
🎯 教程目标
- Kafka 客户端连接 Kafka Broker 时需要用户名密码验证;
- Kafka 与 ZooKeeper 之间通信也启用 SASL 认证;
- 使用
PLAIN
机制,无需 TLS/SSL 证书(比 SASL_SSL 简单); - 可用于本地开发环境或非安全生产环境。
🛠 环境准备
- Kafka:3.6.0
- ZooKeeper:3.6+
- Java 8+
- 操作系统:Linux / WSL / Mac / Windows(推荐用 WSL)
- 镜像:
zookeeperbitnami_zookeeper:3.8.4
kafkabitnami/kafka:3.6.0
1️⃣ 配置 ZooKeeper 认证(Server端)
配置环境变量
env:
- name: TZ
value: Asia/Shanghai
- name: ALLOW_ANONYMOUS_LOGIN
value: 'no'
- name: JVMFLAGS
value: '-Xmx1g'
- name: ZOO_ENABLE_AUTH
value: 'yes'
- name: ZOO_SERVER_USERS
value: kafka
- name: ZOO_SERVER_PASSWORDS
value: zookeeper@2025
- name: ZOO_SERVERS
value: 'zk-cluster-auth-0.zk-cluster-auth-headless:2888:3888,zk-cluster-auth-1.zk-cluster-auth-headless:2888:3888,zk-cluster-auth-2.zk-cluster-auth-headless:2888:3888'
配置项 | 说明 |
---|---|
TZ=Asia/Shanghai | 设置容器的时区为中国标准时间(CST/UTC+8),方便日志与系统时间保持一致。 |
ALLOW_ANONYMOUS_LOGIN=no | 禁用匿名连接 ZooKeeper,必须通过用户名密码认证。建议生产环境使用。 |
JVMFLAGS='-Xmx1g' | 配置 ZooKeeper JVM 最大内存为 1GB,防止 OOM(默认可能太小)。 |
ZOO_ENABLE_AUTH=yes | 启用 ZooKeeper 身份认证(基于 SASL 的认证机制,如 PLAIN)。必须配合下面的用户密码使用。 |
ZOO_SERVER_USERS=kafka | 设置 ZooKeeper 允许的用户名,多个用户用逗号分隔。这里是 kafka 。 |
ZOO_SERVER_PASSWORDS=zookeeper@2025 | 对应上面的用户的密码。如果多个用户,用逗号一一对应写。 |
服务配置
kind: Service
apiVersion: v1
metadata:
name: zk-cluster-auth-headless
namespace: zhubayi-common
labels:
app: kafka-cluster-auth
annotations:
kubesphere.io/alias-name: zk-cluster-auth
kubesphere.io/creator: admin
kubesphere.io/serviceType: statefulservice
spec:
ports:
- name: client
protocol: TCP
port: 2181
targetPort: 2181
- name: server
protocol: TCP
port: 2888
targetPort: 2888
- name: leader-select
protocol: TCP
port: 3888
targetPort: 3888
- name: adminserver
protocol: TCP
port: 8080
targetPort: 8080
selector:
app: zk-cluster-auth
clusterIP: None
clusterIPs:
- None
type: ClusterIP
sessionAffinity: None
ipFamilies:
- IPv4
ipFamilyPolicy: SingleStack
internalTrafficPolicy: Cluster
启动脚本
command:
- sh
- '-c'
- >
export ZOO_SERVER_ID=$((${HOSTNAME##*-}+1))
exec /opt/bitnami/scripts/zookeeper/entrypoint.sh
/opt/bitnami/scripts/zookeeper/run.sh
2️⃣ 配置 Kafka(Broker端)
🧩 设置环境变量
env:
# 设置容器的时区
- name: TZ
value: Asia/Shanghai
# 🔗 Kafka 连接的 ZooKeeper 地址(推荐使用 headless 服务)
- name: KAFKA_CFG_ZOOKEEPER_CONNECT
value: 'zk-cluster-auth-headless:2181'
# Kafka 内存配置(JVM堆大小)
- name: KAFKA_HEAP_OPTS
value: '-Xmx2g'
# offsets topic 的副本数(建议 >=3)
- name: KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR
value: '3'
# 每个 topic 的默认分区数
- name: KAFKA_CFG_NUM_PARTITIONS
value: '5'
# Kafka 日志保留时间(单位小时)
- name: KAFKA_CFG_LOG_RETENTION_HOURS
value: '72' # 3天
# Kafka 日志切分时间(单位小时)
- name: KAFKA_CFG_LOG_ROLL_HOURS
value: '72'
# 单个 segment 的最大大小(1GB)
- name: KAFKA_CFG_LOG_SEGMENT_BYTES
value: '1073741824'
# 是否允许自动创建 topic(开发环境可开启)
- name: KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE
value: 'true'
# 允许删除 topic
- name: KAFKA_CFG_DELETE_TOPIC_ENABLE
value: 'true'
# 是否允许自动进行 leader 重新平衡(建议关闭以避免扰动)
- name: KAFKA_CFG_AUTO_LEADER_REBALANCE_ENABLE
value: 'false'
# Kafka 清理策略(delete / compact)
- name: KAFKA_CFG_LOG_CLEANUP_POLICY
value: delete
# 单条消息最大大小(5MB)
- name: KAFKA_CFG_MESSAGE_MAX_BYTES
value: '5242880'
# 客户端请求最大大小(4MB)
- name: KAFKA_CFG_MAX_REQUEST_SIZE
value: '4194304'
# 批处理大小(生产者用)
- name: KAFKA_CFG_BATCH_SIZE
value: '16384'
# 允许使用 PLAINTEXT 明文监听器(非加密,仅适合内网测试)
- name: ALLOW_PLAINTEXT_LISTENER
value: 'true'
# broker 间通信监听器名称(INSIDE 表示内网)
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: INSIDE
# 监听器与协议映射(INSIDE 和 OUTSIDE 都启用 SASL_PLAINTEXT)
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: 'INSIDE:SASL_PLAINTEXT,OUTSIDE:SASL_PLAINTEXT'
# 是否允许客户端连接时自动创建 topic
- name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
value: 'true'
# 与 SSL 无关,可忽略(如果未使用 TLS)
- name: KAFKA_SSL_CLIENT_AUTH
value: required
# Kafka 控制器使用的认证机制(SASL PLAIN)
- name: KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL
value: PLAIN
# broker 间通信使用的认证机制(SASL PLAIN)
- name: KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL
value: PLAIN
# SSL 类型(使用 Java KeyStore 格式)(此配置无效,可忽略,未启用 SSL)
- name: KAFKA_TLS_TYPE
value: JKS
# 指定 broker 之间用哪个监听器通信(重复定义,建议只保留一次)
- name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME
value: SASL_PLAINTEXT
# 客户端连接使用的监听器(与上面保持一致)
- name: KAFKA_CLIENT_LISTENER_NAME
value: SASL_PLAINTEXT
# Controller 用户名(用于 Kafka 内部通信)
- name: KAFKA_CONTROLLER_USER
value: kafka
# Controller 用户密码
- name: KAFKA_CONTROLLER_PASSWORD
value: 123456
# Broker 间通信用户名
- name: KAFKA_INTER_BROKER_USER
value: kafka
# Broker 间通信密码
- name: KAFKA_INTER_BROKER_PASSWORD
value: 123456
# 允许的客户端用户名(支持多个,逗号分隔)
- name: KAFKA_CLIENT_USERS
value: kafka
# 客户端密码(顺序与用户一致)
- name: KAFKA_CLIENT_PASSWORDS
value: 123456
# 与 ZooKeeper 通信时使用的认证协议(SASL 必须设置)
- name: KAFKA_ZOOKEEPER_PROTOCOL
value: SASL
# Kafka 访问 ZooKeeper 时使用的用户名
- name: KAFKA_ZOOKEEPER_USER
value: kafka
# Kafka 访问 ZooKeeper 时使用的密码(需与 ZooKeeper 中配置一致)
- name: KAFKA_ZOOKEEPER_PASSWORD
value: zookeeper@2025
启动脚本
command:
- sh
- '-c'
- >
POD_NAME=$(hostname)
echo "POD_NAME:$POD_NAME"
REPLICA_INDEX=$(echo $POD_NAME | sed 's/.*-\([0-9]\)$/\1/')
echo "REPLICA_INDEX:$REPLICA_INDEX"
export KAFKA_CFG_NODE_ID=${POD_NAME##*-}
export KAFKA_NODE_ID="$REPLICA_INDEX"
PORT=$((REPLICA_INDEX + 30900)) PORT2=$((REPLICA_INDEX + 9093))
export
KAFKA_CFG_ADVERTISED_LISTENERS="INSIDE://:9092,OUTSIDE://192.168.1.5:$PORT"
export KAFKA_CFG_LISTENERS="INSIDE://:9092,OUTSIDE://:$PORT2"
exec /opt/bitnami/scripts/kafka/entrypoint.sh
/opt/bitnami/scripts/kafka/run.sh
服务端口配置
spec:
ports:
- name: tcp-9092
protocol: TCP
port: 9093
targetPort: 9093
nodePort: 30900
- name: tcp-9093
protocol: TCP
port: 9094
targetPort: 9094
nodePort: 30901
- name: tcp-9094
protocol: TCP
port: 9095
targetPort: 9095
nodePort: 30902
3️⃣ Kafka 客户端配置(示例)
配置文件
spring:
kafka:
bootstrap-servers: 192.168.1.5:30900 # Kafka 集群地址(多个可逗号分隔)
listener:
ack-mode: MANUAL_IMMEDIATE # 手动提交 offset(立即确认)
consumer:
custom-environment: dev # 自定义字段,可用于多环境区分,无实际作用
auto-offset-reset: latest # 无 offset 时从最新消息开始消费(避免重复)
enable-auto-commit: false # 禁用自动提交,改为手动提交 offset
# auto-commit-interval: 2000 # 如果启用自动提交,间隔为 2 秒(此处已注释)
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # key 反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # value 反序列化方式
max-poll-records: 50 # 每次 poll 最多拉取 50 条记录
max-poll-interval-ms: 600000 # poll 最大间隔 10 分钟
producer:
retries: 0 # 不重试(生产失败直接报错)
batch-size: 16384 # 批量发送最大字节数(默认 16KB)
buffer-memory: 33554432 # 发送缓冲区大小(默认 32MB)
key-serializer: org.apache.kafka.common.serialization.StringSerializer # key 序列化
value-serializer: org.apache.kafka.common.serialization.StringSerializer # value 序列化
ssl:
trust-store-location: # ⚠ 这里为空,因使用 SASL_PLAINTEXT(非 SSL),可留空或移除
trust-store-password:
properties:
sasl:
mechanism: PLAIN # 使用 SASL PLAIN 机制(用户名密码)
jaas:
config: >-
org.apache.kafka.common.security.scram.ScramLoginModule required
username="kafka"
password="123456";
# JAAS 配置:用户名密码认证(注意 password 后有分号)
security:
protocol: SASL_PLAINTEXT # 使用 SASL_PLAINTEXT 传输协议(非加密)
ssl:
endpoint:
identification:
algorithm: "" # 空表示跳过主机名校验(SSL 时才生效,非必须)
🚨 注意事项
配置项 | 注意点 |
---|---|
SASL_PLAINTEXT | 明文传输用户名密码,不建议用于公网 |
ScramLoginModule | 表示服务端配置的是 SCRAM(非 PLAIN)时使用,若是 PLAIN,应为 PlainLoginModule |
trust-store-location | 可删除或忽略(仅用于 SASL_SSL 或 SSL 场景) |
bootstrap-servers | 建议配置多个 broker IP,提升可靠性 |
ack-mode: MANUAL_IMMEDIATE | 消费者业务失败时不会提交 offset,可重复消费 |
生产者配置
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private String enableAutoCommit;
@Value("${spring.kafka.consumer.max-poll-records}")
private String maxPollRecords;
@Value("${spring.kafka.consumer.max-poll-interval-ms}")
private String maxPollIntervalMs;
@Value("${spring.profiles.active}")
private String activeProfile;
@Value("${spring.kafka.properties.security.protocol:SASL_SSL}")
private String securityProtocol;
@Value("${spring.kafka.properties.sasl.mechanism:PLAIN}")
private String salsMechanism;
@Value("${spring.kafka.properties.ssl.endpoint.identification.algorithm: \"\"}")
private String identificationAlgorithm;
@Value("${spring.kafka.properties.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username=\"kafka\" password=\"123456\";}")
private String jaasConfig;
@Value("${spring.kafka.ssl.trust-store-password: \"\"}")
private String trustStorePassword;
@Value("${spring.kafka.ssl.trust-store-location: \"\"}")
private String trustStoreLocation;
@Bean
public ConsumerFactory<String, String> daConsumerConfigFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConsumerComponent.DAGROUP);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
props.put("security.protocol", securityProtocol);
props.put("sasl.mechanism", salsMechanism);
props.put("ssl.endpoint.identification.algorithm", identificationAlgorithm);
props.put("consumer.ssl.endpoint.identification.algorithm", identificationAlgorithm);
props.put("sasl.jaas.config", jaasConfig);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> daConsumerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(daConsumerConfigFactory());
factory.setBatchListener(true); // 启用批量消费
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
🚨 常见问题排查
错误 | 可能原因 |
---|---|
No principal | 未正确配置 JAAS 文件或 KAFKA_OPTS 未设置 |
SaslAuthenticationException | 用户名密码不匹配 |
Connection refused | listener 或端口未正确绑定 |
Cluster ID 不匹配 | ZooKeeper 路径配置错误或 meta.properties 被复用 |
✅ 总结
项目 | 配置方式 |
---|---|
ZooKeeper 启用鉴权 | zookeeper_jaas.conf + zoo.cfg |
Kafka 启用鉴权 | kafka_server_jaas.conf + server.properties |
客户端连接 Kafka | kafka_client_jaas.conf |
📎 参考资料
- 官方文档:https://kafka.apache.org/documentation/
- Kafka 安全机制:https://kafka.apache.org/documentation/#security_sasl