目录
一. 前言
二. 地理复制概览
三. 什么是复制流(What Are Replication Flows)
四. Kafka MirrorMaker 2.0 指南
4.1. 概述
4.2. 用例
4.3. 配置文件
一. 前言
Kafka MirrorMaker 为您的集群提供地理复制支持。使用 MirrorMaker,消息在多个数据中心或云区域上被复制。您可以在主动/被动场景中使用此选项进行备份和恢复;或者在主动/主动场景中使用此选项来将数据放置得更靠近用户,或者支持数据局部性需求。
二. 地理复制概览
原文引用:Kafka administrators can define data flows that cross the boundaries of individual Kafka clusters, data centers, or geo-regions. Such event streaming setups are often needed for organizational, technical, or legal requirements. Common scenarios include:
- Geo-replication
- Disaster recovery
- Feeding edge clusters into a central, aggregate cluster
- Physical isolation of clusters (such as production vs. testing)
- Cloud migration or hybrid cloud deployments
- Legal and compliance requirements
Kafka 管理员可以定义跨越各个 Kafka 集群、数据中心或地理区域边界的数据流。此类事件流设置通常是出于组织、技术或法律要求而需要的。常见场景包括:
- 地理复制。
- 灾难恢复。
- 将边缘集群馈送到中心聚合集群中。
- 集群的物理隔离(例如生产与测试)。
- 云迁移或混合云部署。
- 法律和合规要求。
原文引用:Administrators can set up such inter-cluster data flows with Kafka's MirrorMaker (version 2), a tool to replicate data between different Kafka environments in a streaming manner. MirrorMaker is built on top of the Kafka Connect framework and supports features such as:
- Replicates topics (data plus configurations)
- Replicates consumer groups including offsets to migrate applications between clusters
- Replicates ACLs
- Preserves partitioning
- Automatically detects new topics and partitions
- Provides a wide range of metrics, such as end-to-end replication latency across multiple data centers/clusters
- Fault-tolerant and horizontally scalable operations
管理员可以使用 Kafka 的 MirrorMaker(版本2)设置这样的集群间数据流,MirrorMaker 是一种以流方式在不同 Kafka 环境之间复制数据的工具。MirrorMaker 构建在 Kafka Connect 框架之上,支持以下功能:
- 复制主题(数据加配置)。
- 复制消费者组,包括在集群之间迁移应用程序的偏移量。
- 复制 ACL。
- 保留分区。
- 自动检测新主题和分区。
- 提供广泛的指标,例如跨多个数据中心/群集的端到端复制延迟。
- 容错和水平可扩展操作。
原文引用:Note: Geo-replication with MirrorMaker replicates data across Kafka clusters. This inter-cluster replication is different from Kafka's intra-cluster replication, which replicates data within the same Kafka cluster.
注意:MirrorMaker 的地理复制是在 Kafka 集群之间复制数据。这种集群间复制不同于 Kafka 的集群内复制,后者在同一个 Kafka 集群内复制数据。
三. 什么是复制流(What Are Replication Flows)
原文引用:With MirrorMaker, Kafka administrators can replicate topics, topic configurations, consumer groups and their offsets, and ACLs from one or more source Kafka clusters to one or more target Kafka clusters, i.e., across cluster environments. In a nutshell, MirrorMaker uses Connectors to consume from source clusters and produce to target clusters.
使用 MirrorMaker,Kafka 管理员可以将 Topic、Topic 配置、消费者组及其偏移量和 ACL 从一个或多个源 Kafka 集群复制到一个或更多个目标 Kafka 群集,即跨集群环境。简而言之,MirrorMaker 使用连接器从源集群消费并生产到目标集群。
原文引用:These directional flows from source to target clusters are called replication flows. They are defined with the format {source_cluster}->{target_cluster} in the MirrorMaker configuration file as described later. Administrators can create complex replication topologies based on these flows.
这些从源集群到目标集群的定向流称为复制流。它们是用格式定义的 {source_cluster}->{target_cluster},如后所述。管理员可以基于这些流创建复杂的复制拓扑。
原文引用:Here are some example patterns:
- Active/Active high availability deployments: A->B, B->A
- Active/Passive or Active/Standby high availability deployments: A->B
- Aggregation (e.g., from many clusters to one): A->K, B->K, C->K
- Fan-out (e.g., from one to many clusters): K->A, K->B, K->C
- Forwarding: A->B, B->C, C->D
以下是一些示例模式:
- 主动/主动高可用性部署:A->B,B->A。
- 主动/被动或主动/备用高可用性部署:A->B。
- 聚合(例如,从多个集群到一个集群):A->K、B->K、C->K。
- 输出(例如,从一个到多个集群):K->A,K->B,K->C。
- 转发:A->B,B->C,C->D。
原文引用:By default, a flow replicates all topics and consumer groups (except excluded ones). However, each replication flow can be configured independently. For instance, you can define that only specific topics or consumer groups are replicated from the source cluster to the target cluster.
默认情况下,一个流复制所有 Topic 和消费者组(排除的组除外)。但是,每个复制流都可以独立配置。例如,您可以定义仅将特定 Topic 或消费者组从源集群复制到目标集群。
原文引用:Here is a first example on how to configure data replication from a primary cluster to a secondary cluster (an active/passive setup):
以下是关于如何配置从主群集到辅助群集的数据复制的第一个示例(主动/被动设置):
# Basic settings
clusters = primary, secondary
primary.bootstrap.servers = broker3-primary:9092
secondary.bootstrap.servers = broker5-secondary:9092
# Define replication flows
primary->secondary.enabled = true
primary->secondary.topics = foobar-topic, quux-.*
四. Kafka MirrorMaker 2.0 指南
4.1. 概述
MirrorMaker 2.0(MM2)旨在更轻松地将主题从一个 Kafka 群集镜像或复制到另一个群集。 它使用 Kafka Connect 框架来简化配置和缩放。 它动态检测主题的更改,并确保源和目标主题属性同步,包括偏移和分区。
先决条件:
- 环境至少有两个 HDI Kafka 群集。
- Kafka 版本高于 2.4(HDI 4.0)。
- 源群集应具有数据点和主题来测试 MirrorMaker 2.0 复制过程的各种功能。
4.2. 用例
模拟 MirrorMaker 2.0 以在 HDInsight 中的两个 Kafka 群集之间复制数据点/偏移。 这样同样可用于像在两个或更多 Kafka 群集之间进行必需的数据复制这样的场景,例如灾难恢复、云适应、异地复制、数据隔离和数据聚合。
使用 MirrorMaker 2.0 进行偏移复制
MM2 内部:
MirrorMaker 2.0 工具由不同的连接器组成。 这些连接器是标准 Kafka Connect 连接器,可以在独立模式或分布式模式下直接与 Kafka Connect 配合使用。
代理设置过程的摘要如下:
MirrorSourceConnector:
- 复制单个源群集的远程主题、主题 ACL 和配置。
- 向内部主题发出偏移同步。
MirrorSinkConnector:
- 使用主群集并将主题复制到单个目标群集。
MirrorCheckpointConnector:
- 使用 offset-syncsr。
- 发出检查点以启用故障转移点。
MirrorHeartBeatConnector:
- 向远程群集发出检测信号,从而监视复制过程。
部署
1. 与 Kafka 库捆绑的 connect-mirror-maker.sh 脚本实现分布式 MM2 群集,该群集根据配置文件在内部管理 Connect 辅助角色。 在内部,MirrorMaker 驱动程序创建和处理每个连接器的配对 – MirrorSourceConnector、MirrorSinkConnector、MirrorCheckpoint 连接器和 MirrorHeartbeatConnector。
2. 启动 MirrorMaker 2.0
./bin/connect-mirror-maker.sh ./config/mirror-maker.properties
注意:对于已启用 Kerberos 的群集,JAAS 配置必须导出到 KAFKA_OPTS,或必需在 MM2 配置文件中指定。
export KAFKA_OPTS="-Djava.security.auth.login.config=<path-to-jaas.conf>"
4.3. 配置文件
示例 MirrorMaker 2.0 配置文件
# specify any number of cluster aliases
clusters = source, destination
# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for example. "A_host1:9092, A_host2:9092, A_host3:9092" and you can see the exact host name on Ambari > Hosts
source.bootstrap.servers = wn0-src-kafka.bx.internal.cloudapp.net:9092,wn1-src-kafka.bx.internal.cloudapp.net:9092,wn2-src-kafka.bx.internal.cloudapp.net:9092
destination.bootstrap.servers = wn0-dest-kafka.bx.internal.cloudapp.net:9092,wn1-dest-kafka.bx.internal.cloudapp.net:9092,wn2-dest-kafka.bx.internal.cloudapp.net:9092
# enable and configure individual replication flows
source->destination.enabled = true
# regex which defines which topics gets replicated. For eg "foo-.*"
source->destination.topics = toa.evehicles-latest-dev
groups=.*
topics.blacklist="*.internal,__.*"
# Setting replication factor of newly created remote topics
replication.factor=3
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1
SSL 配置
如果设置需要 SSL 配置。
destination.security.protocol=SASL_SSL
destination.ssl.truststore.password=<password>
destination.ssl.truststore.location=/path/to/kafka.server.truststore.jks
#keystore location in case client.auth is set to required
destination.ssl.keystore.password=<password>
destination.ssl.keystore.location=/path/to/kafka.server.keystore.jks
destination.sasl.mechanism=GSSAPI
全局配置
| properties | 默认值 | 说明 |
|---|---|---|
| name | 必需 | 连接器的名称,例如“us-west->us-east” |
| topics | 空字符串 | 要复制的主题的正则表达式,例如“topic1, topic2, topic3”。 还支持逗号分隔列表。 |
| topics.blacklist | “..internal, ..replica, __consumer_offsets” 或类似形式 | 要从复制中排除的主题 |
| groups | 空字符串 | 要复制的组的正则表达式,例如“.*” |
| groups.blacklist | 空字符串 | 要从复制中排除的组 |
| source.cluster.alias | 必需 | 被复制的群集的名称 |
| target.cluster.alias | 必需 | 下游 Kafka 群集的名称 |
| source.cluster.bootstrap.servers | 必需 | 要复制的上游群集 |
| target.cluster.bootstrap.servers | 必需 | 下游群集 |
| sync.topic.configs.enabled | true | 是否监视源群集的配置更改 |
| sync.topic.acls.enabled | true | 是否监视源群集 ACL 的更改 |
| emit.heartbeats.enabled | true | 连接器应定期发出检测信号 |
| emit.heartbeats.interval.seconds | true | 检测信号的频率 |
| emit.checkpoints.enabled | true | 连接器应定期发出使用者偏移信息 |
| emit.checkpoints.interval.seconds | 5(秒) | 检查点频率 |
| refresh.topics.enabled | true | 连接器应定期检查是否有新使用者组 |
| refresh.topics.interval.seconds | 5(秒) | 检查源群集是否有新使用者组的频率 |
| refresh.groups.enabled | true | 连接器应定期检查是否有新使用者组 |
| refresh.groups.interval.seconds | 5(秒) | 检查源群集是否有新使用者组的频率 |
| readahead.queue.capacity | 500(个记录) | 让使用者领先于生产者的记录数 |
| replication.policy.class | org.apache.kafka. connect.mirror. DefaultReplicationPolicy | 使用 LegacyReplicationPolicy 模拟旧版 MirrorMaker |
| heartbeats.topic.retention.ms | 一天 | 首次创建检测信号主题时使用 |
| checkpoints.topic.retention.ms | 一天 | 首次创建检查点主题时使用 |
| offset.syncs.topic.retention.ms | 最大时长 | 首次创建偏移同步主题时使用 |
| replication.factor | two | 创建远程主题时使用 |



















