Amazon MSK 基于 S3 的数据导出、导入、备份、还原、迁移方案

420b018aab560013ef0a7fe4fba135f6.gif

Amazon MSK(Amazon Managed Streaming for Apache Kafka)是 Amazon 云平台提供的托管 Kafka 服务。在系统升级或迁移时,用户常常需要将一个 Amazon MSK 集群中的数据导出(备份),然后在新集群或另一个集群中再将数据导入(还原)。通常,Kafka 集群间的数据复制和同步多采用 Kafka MirrorMaker,但是,在某些场景中,受环境限制,两个于 Kafka 集群之间的网络可能无法连通,或者两个亚马逊云科技账号相互隔离,亦或是需要将 Kafka 的数据沉淀为文件存储以备他用。此时,基于 Kafka Connect S3 Source / Sink Connector 的方案会是一种较为合适的选择,本文就将介绍一下这一方案的具体实现。

数据的导出、导入、备份、还原通常都是一次性操作,为此搭建完备持久的基础设施并无太大必要,省时省力,简单便捷才是优先的考量因素。为此,本文将提供一套开箱即用的解决方案,方案使用 Docker 搭建 Kafka Connect,所有操作均配备自动化 Shell 脚本,用户只需设置一些环境变量并执行相应脚本即可完成全部工作。这种基于 Docker 的单体模式可以应对中小型规模的数据同步和迁移,如果要寻求稳定、健壮的解决方案,可以考虑将 Docker 版本的 Kafka Connect 迁移到 Kubernetes 或 Amazon MSK Connect,实现集群化部署。

01

整体架构

首先介绍一下方案的整体架构。导出/导入和备份/还原其实是两种高度类似的场景,但为了描述清晰,我们还是分开讨论。先看一下导出/导入的架构示意图:

0895084cc901bbc1d63ae9921a3d43e7.png

图 1 MSK 集群间的数据导出/导入

在这个架构中,Source 端的 MSK 是数据流的起点,安装了 S3 Sink Connector 的 Kafka Connect 会从 Source 端的 MSK 中提取指定 Topic 的数据,然后以 Json 或 Avro 文件的形式存储到 S3 上;同时,另一个安装了 S3 Source Connector 的 Kafka Connect 会从 S3 上读取这些 Json 或 Avro 文件,然后写入到 Sink 端 MSK 的对应 Topic 中。如果 Source 端和 Sink 端的 MSK 集群不在同一个 Region,可以在各自的 Region 分别完成导入和导出,然后在两个 Region 之间使用 S3 的 Cross-Rejion Replication 进行数据同步。

该架构只需进行简单的调整,即可用于 MSK 集群的备份/还原,如下图所示:先将 MSK 集群的数据备份到 S3 上,待完成集群的升级、迁移或重建工作后,再从 S3 上将数据恢复到新建集群即可。

f251d3ad6e6d711fc3bf342440b7d2f4.png

图 2 MSK 集群的数据备份/还原

本文将以图 1 所示的导出/导入架构为准给出完整的环境搭建说明和实操脚本,图 2 所示的备份/还原架构同样可以基于本文提供的指导和脚本实现。

02

预设条件

本文聚焦于 Kafka Connect 的数据导出/导入和备份/还原操作,限于篇幅,无法详细介绍架构中每个组件的搭建和配置方法,因此有如下预设条件需读者在个人环境中提前准备:

  1. 一台基于 Amazon Linux2 的 EC2 实例(建议新建纯净实例),本文所有的实操脚本都将在该实例上执行,该实例也是运行 Kafka Connect Docker Container 的宿主机。

  2. 两个 MSK 集群,一个作为 Source,一个作为 Sink;如果只有一个 MSK 集群也可完成验证,该集群将既作 Source 又作 Sink。

  3. 为聚焦 Kafka Connect S3 Source / Sink Connector 的核心配置,我们预设 MSK 集群没有开启身份认证(即认证类型为 Unauthenticated),数据传输方式为 PLAINTEXT,以便简化 Kafka Connect 的连接配置。

  4. 网络连通性上要求 EC2 实例能访问 S3、Source 端 MSK 集群、Sink 端 MSK 集群。如果在实际环境中无法同时连通 Source 端和 Sink 端,则可以在两台分属于不同网络的 EC2 上进行操作,但它们必须都能访问 S3。如果是跨 Region 或账号隔离,则另需配置 S3 Cross-Region Replication 或手动拷贝数据文件。

03

全局配置

由于实际操作将不可避免地依赖到具体的亚马逊云科技账号以及本地环境里的各项信息(如 AKSK,服务地址,各类路径,Topic 名称等),为了保证本文给出的操作脚本具有良好的可移植性,我们将所有与环境相关的信息抽离出来,以全局变量的形式在实操前集中配置。以下就是全局变量的配置脚本,读者需要根据个人环境设定这些变量的取值:

# account-specific configs
export REGION="<your-region>"
export S3_BUCKET="<your-s3-bucket>"
export AWS_ACCESS_KEY_ID="<your-aws-access-key-id>"
export AWS_SECRET_ACCESS_KEY="<your-aws-secret-access-key>"
export SOURCE_KAFKA_BOOTSTRAP_SEVERS="<your-source-kafka-bootstrap-servers>"
export SINK_KAFKA_BOOTSTRAP_SEVERS="<your-sink-kafka-bootstrap-servers>"
# kafka topics import and export configs
export SOURCE_TOPICS_LIST="<your-source-topic-list>"
export SINK_TOPICS_LIST="<your-sink-topic-list>"
export TOPIC_REGEX_LIST="<your-topic-regex-list>"
export SOURCE_TOPICS_REGEX="<your-source-topics-regex>"
export SINK_TOPICS_REPLACEMENT="<your-sink-topics-replacement>"

左滑查看更多

为了便于演示和解读,本文将使用下面的全局配置,其中前 6 项配置与账号和环境强相关,仍需用户自行修改,脚本中给出的仅为示意值,而后 5 项配置与 MSK 数据的导入导出息息相关,不建议修改,因为后续的解读将基于这里设定的值展开,待完成验证后,您可再根据需要灵活修改后 5 项配置以完成实际的导入导出工作。

回到操作流程,登录准备好的 EC2 实例,修改下面脚本中与账号和环境相关的前 6 项配置,然后执行修改后的脚本。此外,需要提醒注意的是:在后续操作中,部分脚本执行后将不再返回,而是持续占用当前窗口输出日志或 Kafka 消息,因此需要新开命令行窗口,每次新开窗口都需要执行一次这里的全局配置脚本。

# 实操步骤(1): 全局配置
# account and environment configs
export REGION="us-east-1"
export S3_BUCKET="source-topics-data"
export AWS_ACCESS_KEY_ID="ABCDEFGHIGKLMNOPQRST"
export AWS_SECRET_ACCESS_KEY="abcdefghigklmnopqrstuvwxyz0123456789"
export SOURCE_KAFKA_BOOTSTRAP_SEVERS="b-1.cluster1.6ww5j7.c1.kafka.us-east-1.amazonaws.com:9092"
export SINK_KAFKA_BOOTSTRAP_SEVERS="b-1.cluster2.2au4b8.c2.kafka.us-east-1.amazonaws.com:9092"
# kafka topics import and export configs
export SOURCE_TOPICS_LIST="source-topic-1,source-topic-2"
export SINK_TOPICS_LIST="sink-topic-1,sink-topic-2"
export TOPIC_REGEX_LIST="source-topic-1:.*,source-topic-2:.*"
export SOURCE_TOPICS_REGEX="source-topic-(\\\d)" # to be resolved to "source-topic-(\\d)" in json configs
export SINK_TOPICS_REPLACEMENT="sink-topic-\$1" # to be resolved to "sink-topic-$1" in json configs

左滑查看更多

关于上述脚本中的后 5 项配置,有如下详细说明:

8ecdc6a7f38c8d066a0fd61e613e6d06.png

我们就以脚本中设定的值为例,解读一下这 5 项配置联合起来将要实现的功能,同时也是本文将演示的主要内容:

在 Source 端的 MSK 集群上存在两个名为 source-topic-1 和 source-topic-2 的Topic,通过安装有 S3 Sink Connector 的 Kafka Connect (Docker 容器)将两个 Topic 的数据导出到 S3 的指定存储桶中,然后再通过安装有 S3 Source Connector 的 Kafka Connect (Docker 容器,可以和 S3 Source Connector 共存为一个Docker 容器)将 S3 存储桶中的数据写入到 Sink 端的 MSK 集群上,其中原source-topic-1 的数据将被写入 sink-topic-1,原 source-topic-2 的数据将被写入 sink-topic-2。

特别地,如果是备份/还原场景,需要保持导出/导入的 Topic 名称一致,此时,可直接删除 S3 Source Connector 中以 transforms 开头的 4 项配置(将在下文中出现),或者将下面两项改为:

export SOURCE_TOPICS_REGEX=".*"
export SINK_TOPICS_REPLACEMENT="\$0"

左滑查看更多

如果您只有一个 MSK 集群,同样可以完成本文的验证工作,只需将 SOURCE_KAFKA_BOOTSTRAP_SEVERS 和 SINK_KAFKA_BOOTSTRAP_SEVERS 同时设置为该集群即可,这样,该集群既是 Source 端又是 Sink 端,由于配置中的 Source Topics 和 Sink Topics 并不同名,所以不会产生冲突。

04

环境准备

4.1 安装工具包

在 EC2 上执行以下脚本,安装并配置 jq,yq,docker,jdk,kafka-console-client 五个必须的软件包,您可以根据自身 EC2 的情况酌情选择安装全部或部分软件。建议使用纯净的 EC2 实例,完成全部的软件安装:

# 实操步骤(2): 安装工具包
# install jq
sudo yum -y install jq
jq --version


# install yq
sudo wget https://github.com/mikefarah/yq/releases/download/v4.35.1/yq_linux_amd64 -O /usr/bin/yq
sudo chmod a+x /usr/bin/yq
yq --version


# install docker
sudo yum -y install docker
# enable & start docker
sudo systemctl enable docker
sudo systemctl start docker
sudo systemctl status docker
# configure docker, add current user to docker user group
# and refresh docker group to take effect immediately
sudo usermod -aG docker $USER
newgrp docker
docker --version


# install docker compose
dockerConfigDir=${dockerConfigDir:-$HOME/.docker}
mkdir -p $dockerConfigDir/cli-plugins
wget "https://github.com/docker/compose/releases/download/v2.20.3/docker-compose-$(uname -s)-$(uname -m)" -O $dockerConfigDir/cli-plugins/docker-compose
chmod a+x $dockerConfigDir/cli-plugins/docker-compose
docker compose version


# install jdk
sudo yum -y install java-1.8.0-openjdk-devel
# configure jdk
sudo tee /etc/profile.d/java.sh << EOF
export JAVA_HOME=/usr/lib/jvm/java
export PATH=\$JAVA_HOME/bin:\$PATH
EOF
# make current ssh session and other common linux users can run java cli
source /etc/profile.d/java.sh
sudo -i -u root source /etc/profile.d/java.sh || true
sudo -i -u ec2-user source /etc/profile.d/java.sh || true
java -version


# install kafka console client
kafkaClientUrl="https://archive.apache.org/dist/kafka/3.5.1/kafka_2.12-3.5.1.tgz"
kafkaClientPkg=$(basename $kafkaClientUrl)
kafkaClientDir=$(basename $kafkaClientUrl ".tgz")
wget $kafkaClientUrl -P /tmp/
sudo tar -xzf /tmp/$kafkaClientPkg -C /opt
sudo tee /etc/profile.d/kafka-client.sh << EOF
export KAFKA_CLIENT_HOME=/opt/$kafkaClientDir
export PATH=\$KAFKA_CLIENT_HOME/bin:\$PATH
EOF


# make current ssh session and other common linux users can run kakfa console cli
source /etc/profile.d/kafka-client.sh
sudo -i -u root source /etc/profile.d/kafka-client.sh || true
sudo -i -u ec2-user source /etc/profile.d/kafka-client.sh || true


# verify if kafka client available
kafka-console-consumer.sh --version


# set aksk for s3 and other aws operation
aws configure set default.region $REGION
aws configure set aws_access_key_id $AWS_ACCESS_KEY_ID
aws configure set aws_secret_access_key $AWS_SECRET_ACCESS_KEY

左滑查看更多

4.2 创建 S3 存储桶

整个方案以 S3 作为数据转储媒介,为此需要在 S3 上创建一个存储桶。Source 端 MSK 集群的数据将会导出到该桶中并以 Json 文件形式保存,向 Sink 端 MSK 集群导入数据时,读取的也是存储在该桶中的 Json 文件。

# 实操步骤(3): 创建 S3 存储桶
aws s3 rm --recursive s3://$S3_BUCKET || aws s3 mb s3://$S3_BUCKET

左滑查看更多

4.3 在源 MSK 上创建 Source Topics

为了确保 Topics 数据能完整备份和还原,S3 Source Connector 建议 Sink Topics 的分区数最好与 Source Topics 保持一致(详情参考 [官方文档]),如果让 MSK 自动创建 Topic,则很有可能会导致 Source Topics 和 Sink Topics 的分区数不对等,所以,我们选择手动创建 Source Topics 和 Sink Topics,并确保它们的分区数一致。以下脚本将创建 source-topic-1 和 source-topic-2 两个 Topic,各含 9 个分区:

# 实操步骤(4): 在源 MSK 上创建 Source Topics
for topic in $(IFS=,; echo $SOURCE_TOPICS_LIST); do
    # create topic
    kafka-topics.sh --bootstrap-server $SOURCE_KAFKA_BOOTSTRAP_SEVERS --create --topic $topic --replication-factor 3 --partitions 9
    # describe topic
    kafka-topics.sh --bootstrap-server $SOURCE_KAFKA_BOOTSTRAP_SEVERS --describe --topic $topic
done

左滑查看更多

  • 官方文档:

    https://docs.confluent.io/kafka-connectors/s3-source/current/backup-and-restore/overview.html

4.4 在目标 MSK 上创建 Sink Topics

原因同上,以下脚本将创建:sink-topic-1 和 sink-topic-2 两个 Topic,各含 9 个分区:

# 实操步骤(5): 在目标 MSK 上创建 Sink Topics
for topic in $(IFS=,; echo $SINK_TOPICS_LIST); do
    # create topic
    kafka-topics.sh --bootstrap-server $SINK_KAFKA_BOOTSTRAP_SEVERS --create --topic $topic --replication-factor 3 --partitions 9
    # describe topic
    kafka-topics.sh --bootstrap-server $SINK_KAFKA_BOOTSTRAP_SEVERS --describe --topic $topic
done

左滑查看更多

05

制作 Kafka Connect 镜像

接下来是制作带 S3 Sink Connector 和 S3 Source Connector 的 Kafka Connect 镜像,镜像和容器均以 kafka-s3-syncer 命名,以下是具体操作:

# 实操步骤(6): 制作 Kafka Connect 镜像
# note: do NOT use current dir as building docker image context dir,
# it is advised to create a new clean dir as image building context folder.
export DOCKER_BUILDING_CONTEXT_DIR="/tmp/kafka-s3-syncer"
mkdir -p $DOCKER_BUILDING_CONTEXT_DIR


# download and unpackage s3 sink connector plugin
wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-s3/versions/10.5.4/confluentinc-kafka-connect-s3-10.5.4.zip \
    -O $DOCKER_BUILDING_CONTEXT_DIR/confluentinc-kafka-connect-s3-10.5.4.zip
unzip -o $DOCKER_BUILDING_CONTEXT_DIR/confluentinc-kafka-connect-s3-10.5.4.zip -d $DOCKER_BUILDING_CONTEXT_DIR


# download and unpackage s3 source connector plugin
wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-s3-source/versions/2.4.5/confluentinc-kafka-connect-s3-source-2.4.5.zip \
    -O $DOCKER_BUILDING_CONTEXT_DIR/confluentinc-kafka-connect-s3-source-2.4.5.zip
unzip -o $DOCKER_BUILDING_CONTEXT_DIR/confluentinc-kafka-connect-s3-source-2.4.5.zip -d $DOCKER_BUILDING_CONTEXT_DIR


# make dockerfile
cat << EOF > Dockerfile
FROM confluentinc/cp-kafka-connect:7.5.0
# provision s3 sink connector
COPY confluentinc-kafka-connect-s3-10.5.4 /usr/share/java/confluentinc-kafka-connect-s3-10.5.4
# provision s3 source connector
COPY confluentinc-kafka-connect-s3-source-2.4.5 /usr/share/java/confluentinc-kafka-connect-s3-source-2.4.5
EOF


# build image
docker build -t kafka-s3-syncer -f Dockerfile $DOCKER_BUILDING_CONTEXT_DIR
# check if plugin is deployed in container
docker run -it --rm kafka-s3-syncer ls -al /usr/share/java/

左滑查看更多

06

配置并启动 Kafka Connect

镜像制作完成后,就可以启动了 Kafka Connect 了。Kafka Connect 有很多配置项,具体可参考其[官方文档],需要提醒注意的是:在下面的配置中,我们使用的是 Kafka Connect 内置的消息转换器:JsonConverter,如果你的输入/输出格式是 Avro 或 Parquet,则需要另行安装对应插件并设置正确的 Converter Class。

# 实操步骤(7): 配置并启动 Kafka Connect
cat << EOF > docker-compose.yml
services:
  kafka-s3-syncer:
    image: kafka-s3-syncer
    hostname: kafka-s3-syncer
    container_name: kafka-s3-syncer
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: $SOURCE_KAFKA_BOOTSTRAP_SEVERS
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-s3-syncer
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-s3-syncer
      CONNECT_CONFIG_STORAGE_TOPIC: kafka-s3-syncer-configs
      CONNECT_OFFSET_STORAGE_TOPIC: kafka-s3-syncer-offsets
      CONNECT_STATUS_STORAGE_TOPIC: kafka-s3-syncer-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: false
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_CONFLUENT_TOPIC_REPLICATION_FACTOR: 3
      CONNECT_PLUGIN_PATH: /usr/share/java
      AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID
      AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY
EOF
# valid, format and print yaml with yq
yq . docker-compose.yml
docker compose up -d --wait
docker compose logs -f kafka-s3-syncer
# docker compose down # stop and remove container

左滑查看更多

上述脚本执行后,命令窗口将不再返回,而是会持续输出容器日志,因此下一步操作需要新开一个命令行窗口。

  • 官方文档:

    https://docs.confluent.io/platform/current/installation/docker/config-reference.html#kconnect-long-configuration

07

配置并启动 S3 Sink Connector

在第 5 节的操作中,我们已经将 S3 Sink Connector 安装到了 Kafka Connect 的 Docker 镜像中,但是还需要显式地配置并启动它。新开一个命令行窗口,先执行一遍《实操步骤(1): 全局配置》,声明全局变量,然后执行以下脚本:

# 实操步骤(8): 配置并启动 S3 Sink Connector
cat << EOF > s3-sink-connector.json
{
  "name": "s3-sink-connector",
  "config": {
    "tasks.max": "1",
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "topics": "$SOURCE_TOPICS_LIST",
    "s3.region": "$REGION",
    "s3.bucket.name": "$S3_BUCKET",
    "s3.part.size": "5242880",
    "flush.size": "1",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner"
  }
}
EOF
# valid, format and print json with jq
jq . s3-sink-connector.json
# delete connector configs if exsiting
curl -X DELETE localhost:8083/connectors/s3-sink-connector
# submit connector configs
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @s3-sink-connector.json
# start connector
curl -X POST localhost:8083/connectors/s3-sink-connector/start
# check connector status
# very useful! if connector has errors, it will show in message.
curl -s http://localhost:8083/connectors/s3-sink-connector/status | jq

左滑查看更多

08

配置并启动 S3 Source Connector

同上,在第 5 节的操作中,我们已经将 S3 Source Connector 安装到了 Kafka Connect 的 Docker 镜像中,同样需要显式地配置并启动它:

# 实操步骤(9): 配置并启动 S3 Source Connector
cat << EOF > s3-source-connector.json
{
  "name": "s3-source-connector",
  "config": {
    "tasks.max": "1",
    "connector.class": "io.confluent.connect.s3.source.S3SourceConnector",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "confluent.topic.bootstrap.servers": "$SOURCE_KAFKA_BOOTSTRAP_SEVERS",
    "mode": "RESTORE_BACKUP",
    "topics.dir": "topics",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "topic.regex.list": "$TOPIC_REGEX_LIST",
    "transforms": "mapping",
    "transforms.mapping.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.mapping.regex": "$SOURCE_TOPICS_REGEX",
    "transforms.mapping.replacement": "$SINK_TOPICS_REPLACEMENT",
    "s3.poll.interval.ms": "60000",
    "s3.bucket.name": "$S3_BUCKET",
    "s3.region": "$REGION"
  }
}
EOF
# valid, format and print json with jq
jq . s3-source-connector.json
# delete connector configs if exsiting
curl -X DELETE localhost:8083/connectors/s3-source-connector
# submit connector configs
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @s3-source-connector.json
# start connector
curl -X POST localhost:8083/connectors/s3-source-connector/start
# check connector status
# very useful! if connector has errors, it will show in message.
curl -s http://localhost:8083/connectors/s3-source-connector/status | jq

左滑查看更多

至此,整个环境搭建完毕,一个以 S3 作为中转媒介的 MSK 数据导出、导入、备份、还原链路已经处于运行状态。

09

 测试

现在,我们来验证一下整个链路是否能正常工作。首先,使用 kafka-console-consumer.sh 监控 source-topic-1 和 sink-topic-1 两个 Topic,然后使用脚本向 source-topic-1 持续写入数据,如果在 sink-topic-1 看到了相同的数据输出,就说明数据成功地从 source-topic-1 导出然后又导入到了 sink-topic-1 中,相应的,在 S3 存储桶中也能看到“沉淀”的数据文件。

 9.1 打开 Source Topic

新开一个命令行窗口,先执行一遍《实操步骤(1): 全局配置》,声明全局变量,然后使用如下命令持续监控 source-topic-1 中的数据:

# 实操步骤(10): 打开 Source Topic
kafka-console-consumer.sh --bootstrap-server $SOURCE_KAFKA_BOOTSTRAP_SEVERS --topic ${SOURCE_TOPICS_LIST%%,*}

左滑查看更多

9.2 打开 Sink Topic

新开一个命令行窗口,先执行一遍《实操步骤(1): 全局配置》,声明全局变量,然后使用如下命令持续监控 sink-topic-1 中的数据:

# 实操步骤(11): 打开 Sink Topic
kafka-console-consumer.sh --bootstrap-server $SOURCE_KAFKA_BOOTSTRAP_SEVERS --topic ${SINK_TOPICS_LIST%%,*}

左滑查看更多

9.3 向 Source Topic 写入数据

新开一个命令行窗口,先执行一遍《实操步骤(1): 全局配置》,声明全局变量,然后使用如下命令向 source-topic-1 中写入数据:

# 实操步骤(12): 向 Source Topic 写入数据
# download a public dataset
wget https://data.ny.gov/api/views/5xaw-6ayf/rows.json?accessType=DOWNLOAD -O /tmp/sample.raw.json
# extract pure json data
jq -c .data /tmp/sample.raw.json > /tmp/sample.json
# feeding json records to kafka
for i in {1..100}; do
    kafka-console-producer.sh --bootstrap-server $SOURCE_KAFKA_BOOTSTRAP_SEVERS --topic ${SOURCE_TOPICS_LIST%%,*} < /tmp/sample.json
done

左滑查看更多

9.4 现象与结论

执行上述写入操作后,从监控 source-topic-1 的命令行窗口中可以很快看到写入的数据,这说明 Source 端 MSK 已经开始持续产生数据了,随后(约 1 分钟),即可在监控 sink-topic-1 的命令行窗口中看到相同的输出数据,这说明目标端的数据同步也已开始正常工作。此时,打开 S3 的存储桶会发现大量 Json 文件,这些 Json 是由 S3 Sink Connector 从 source-topic-1 导出并存放到 S3 上的,然后 S3 Source Connector 又读取了这些 Json 并写入到了 sink-topic-1 中,至此,整个方案的演示与验证工作全部结束。

10

清理

在验证过程中,我们可能需要多次调整并重试,每次重试最好恢复到初始状态,以下脚本会帮助我们清理所有已创建的资源:

# 实操步骤(13): 清理操作
docker compose down
aws s3 rm --recursive s3://$S3_BUCKET || aws s3 mb s3://$S3_BUCKET
kafka-topics.sh --bootstrap-server $SOURCE_KAFKA_BOOTSTRAP_SEVERS --delete --topic 'sink.*|source.*|kafka-s3-syncer.*|_confluent-command'
kafka-topics.sh --bootstrap-server $SOURCE_KAFKA_BOOTSTRAP_SEVERS --list
kafka-topics.sh --bootstrap-server $SINK_KAFKA_BOOTSTRAP_SEVERS --delete --topic 'sink.*|source.*|kafka-s3-syncer.*'
kafka-topics.sh --bootstrap-server $SINK_KAFKA_BOOTSTRAP_SEVERS --list

左滑查看更多

11

小结

本方案主要定位于轻便易用,在 S3 Sink Connector 和 S3 Source Connector 中还有很多与性能、吞吐量相关的配置,例如:s3.part.size, flush.size, s3.poll.interval.ms, tasks.max 等,读者可以在实际需要自行调整,此外, Kafka Connect 也可以方便地迁移到 Kubernetes 或 Amazon MSK Connect 中以实现集群化部署。

本篇作者

Laurence

亚马逊云科技资深解决方案架构师,多年系统开发与架构经验,对大数据、云计算、企业级应用、SaaS、分布式存储和领域驱动设计有丰富的实践经验,著有《大数据平台架构与原型实现:数据中台建设实战》一书。

77d08fe9155e261a5983ea8547b4e6f9.gif

星标不迷路,开发更极速!

关注后记得星标「亚马逊云开发者」

cec10d1145388582de6eb1a78ed40402.gif

听说,点完下面4个按钮

就不会碰到bug了!

5fb3a289d04ea2a1d545278154fbe0d4.gif

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/119710.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

04、SpringBoot + 微信支付 --- 内网穿透ngrok(安装、使用)

Native 下单 1、内网穿透 ngrok 1-1&#xff1a;注册下载 下载 2-2&#xff1a;使用方式 直接在该目录cmd打开 第一次时候这个ngrok时&#xff0c;需要为计算机做授权 授权命令&#xff1a; ngrok config add-authtoken 2XmL8EfYQe6uVAjM9Iami0pWogd_5ztKmSxHs6UeAQn9RQB…

python 之异常处理结构

文章目录 常见的异常处理表现形式1. SyntaxError2. NameError3. TypeError4. IndexError5. KeyError6. ZeroDivisionError7. FileNotFoundErrortry……except …… 结构1. try 块2. except 块示例&#xff1a;多个except块try……except ……else 结构结构说明&#xff1a;示例…

AVS3:双向梯度修正BGC

双向梯度修正&#xff08;Bi-directional Gradient Correction&#xff0c;BGC&#xff09;是利用双向参考块间的差值对预测值进行修正的技术。 BGC仅用于双向预测CU&#xff0c;设两个方向得到的单向预测值分别为pred0和pred1&#xff0c;修正前的双向预测值为predBI&#xf…

Elasticsearch:搜索架构

Elasticsearch 全文检索的复杂性 为了理解为什么全文搜索是一个很难解决的问题&#xff0c;让我们想一个例子。 假设你正在托管一个博客发布网站&#xff0c;其中包含数亿甚至数十亿的博客文章&#xff0c;每个博客文章包含数百个单词&#xff0c;类似于 CSDN。 执行全文搜索…

win版redis详细安装教程

一、下载 github下载地址 https://github.com/MicrosoftArchive/redis/releases 可选择&#xff1a;下载msi包或zip压缩包 这里我选择的是zip压缩包&#xff0c;直接通过cmd命令窗口操作即可。 二、安装步骤 1、解压Redis压缩包 选中压缩包&#xff0c;右键选择解压&#…

【计算机网络】数据链路层-MAC和ARP协议

文章目录 1. 认识以太网2. MAC协议MAC帧的格式MAC地址和IP地址的区别MTU 3. 局域网通信原理碰撞检测和避免 4. ARP协议ARP数据报的格式ARP缓存 1. 认识以太网 网络层解决的是跨网络点到点传输的问题&#xff0c;数据链路层解决的是同一网络中的通信。 数据链路层负责在同一局域…

ARMday01(计算机理论、ARM理论)

计算机理论 计算机组成 输入设备、输出设备、运算器、控制器、存储器 1.输入设备&#xff1a;将编写好的软件代码以及相关的数据输送到计算机中&#xff0c;转换成计算机能够识别、处理和存储的数据形式 键盘、鼠标、手柄、扫描仪、 2.输出设备&#xff1a;将计算机处理好的数…

jacoco和sonar

目录 jacoco 引入依赖 构建配置修改 单元测试 生成报告 查看报告 报告说明 1. Instructions 2. Branches 3. Cyclomatic Complexity 4. Lines 5. Methods 6. Classes sonar7.7 基础环境 需要下载软件 解压文件并配置 运行启动 jacoco 引入依赖 <dep…

系列一、Spring + SpringMVC + MyBatis整合

一、概述 整合 Spring、SpringMVC、MyBatis。 二、整合步骤 2.1、pom <dependencies><!-- 普通maven项目中使用Sl4j注解 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1…

MFC 基础篇(一)

目录 一.SDK编程 二.为什么要学MFC&#xff1f; 三.MFC能做什么&#xff1f; 四.MFC开发环境搭建 五.MFC项目创建 六.消息映射机制 一.SDK编程 Application Programming Interface 应用程序编程接口。 Software Development Kit 软件开发工具包&#xff0c;一般会包括A…

自动驾驶高效预训练--降低落地成本的新思路(AD-PT)

自动驾驶高效预训练--降低落地成本的新思路 1. 之前的方法2. 主要工作——面向自动驾驶的点云预训练2.1. 数据准备 出发点&#xff1a;通过预训练的方式&#xff0c;可以利用大量无标注数据进一步提升3D检测 https://arxiv.org/pdf/2306.00612.pdf 1. 之前的方法 1.基于对比学…

图像标注工具lableImg安装出错怎么办?

我们要训练自己的图像识别模型&#xff0c;首先要进行图像的标注。labelimg就是一款可视化的图像标注工具。它是用Python编写的&#xff0c;通过Qt实现其图形界面&#xff0c;尽管它只支持矩形框标注&#xff0c;但因跨平台&#xff0c;支持Linux、Mac OS、Windows&#xff0c;…

Pycharm加载项目时异常,看不到自己的项目文件

最近看到一个朋友问&#xff0c;他把项目导入pycharm为什么项目里的包不在项目里显示&#xff0c;只在projects file里显示&#xff1f;问题截图如下&#xff1a; Project里看不到自己的项目文件 只能在Project Files里看到自己的项目文件 问题解答 我也是偶然发现的这个方案…

思维模型 首因效应

本系列文章 主要是 分享 思维模型&#xff0c;涉及各个领域&#xff0c;重在提升认知。先入为主&#xff0c;一见钟情。 1 首因效应的应用 1.1 面试中的首因效应 小李是一名应届毕业生&#xff0c;他准备参加一家知名互联网公司的面试。在面试前&#xff0c;他做了充分的准备…

SQL注入漏洞及五大手法

SQL注入漏洞 文章目录 SQL注入漏洞万能用户名 SQL注入分类两大基本类型五大手法提交参数方式注入点的位置 注入点判断SQL注入的危害sql漏洞挖掘Mysql库中的注释 SQL注入基本手法联合查询条件 报错注入group byextractvalueupdataxml 布尔盲注延时注入 案例获取cms网站后台管理员…

云尘 命令执行系列

第一题 system <?php include "flag.php";if (isset($_POST[cmd])) {system($_POST[cmd]); }show_source(__FILE__);代码如上 system($_POST[cmd]); POST请求发送一个名为 cmd 的参数&#xff0c;然后将该参数的值传递给系统命令执行函数 system()&#xff0c…

vue + axios + mock

参考来源&#xff1a;Vue mock.js模拟数据实现首页导航与左侧菜单功能_vue.js_AB教程网 记录步骤&#xff1a;在参考资料来源添加axios步骤 1、安装mock依赖 npm install mock -D //只在开发环境使用 下载完成后&#xff0c;项目文件package.json中的devDependencies就会加…

支持内录系统声音的Mac录屏软件Omi Recorder

Screen Recorder by Omi是一款功能强大的屏幕录制应用程序。它可用于在Windows和Mac计算机上捕获屏幕&#xff0c;以便进行演示、教程、游戏录制、视频编辑等各种用途。 以下是该应用程序的一些主要特点&#xff1a; 支持高清录制&#xff1a;Omi Screen Recorder可以以高达6…

Appium 结合 TestNG 并行执行测试

Appium 测试框架可以让我们使用不同的编程语言&#xff08;Java、Python、Ruby、JavaScript&#xff09;测试不同移动端平台的应用&#xff08;Android、iOS&#xff09;&#xff0c;目前也是最火的移动端测试框架。这篇文章会带着大家学习到如何在不同设备中并行执行测试。 并…

没网络也能安装.Net 3.5!如何脱机安装.NET Framework 3.5

.NET框架是由微软制定的一个软件框架。它有助于在Windows上运行控制台、Web或移动应用程序。此有用的工具适用于Windows设备。 如何脱机安装.NET Framework 3.5 如果你拥有Windows 10、8、8.1或7,有时第三方软件可能会导致问题。你可能会在图片中看到这样的问题。 看这张照片…