Debezium日常分享系列之:Debezium Engine

Debezium日常分享系列之:Debezium Engine

  • 依赖
  • 打包项目
  • 在代码中
  • 输出消息格式
  • 消息转换
  • 消息转换谓词
  • 高级记录使用
  • 引擎属性
  • 异步引擎属性
  • 数据库模式历史属性
  • 处理故障

Debezium连接器通常通过部署到Kafka Connect服务来运行,并配置一个或多个连接器来监视上游数据库,并为上游数据库中的所有更改生成数据变更事件。这些数据变更事件被写入Kafka,可以由许多不同的应用程序独立消费。Kafka Connect提供了出色的容错性和可伸缩性,因为它作为分布式服务运行,并确保所有注册和配置的连接器始终运行。例如,即使集群中的一个Kafka Connect端点关闭,剩余的Kafka Connect端点也会重新启动之前在已终止端点上运行的任何连接器,从而最大程度地减少停机时间并消除管理活动。

并非每个应用程序都需要这种级别的容错性和可靠性,他们可能不想依赖外部的Kafka代理和Kafka Connect服务。相反,一些应用程序更愿意直接在应用程序空间中嵌入Debezium连接器。它们仍然需要相同的数据变更事件,但更希望连接器直接将其发送到应用程序而不是在Kafka中持久化。
这个debezium-api模块定义了一个小的API,允许应用程序使用Debezium Engine轻松配置和运行Debezium连接器。

从2.6.0版本开始,Debezium提供了两个DebeziumEngine接口的实现。较旧的EmbeddedEngine实现运行一个只使用一个任务的连接器。连接器按顺序发出所有记录。这是默认的实现。

从2.6.0版本开始,还提供了一个新的AsyncEmbeddedEngine实现。这个实现也只运行一个连接器,但它可以在多个线程中处理记录,并运行多个任务,如果连接器支持的话(目前只有SQL Server和MongoDB的连接器支持在一个连接器中运行多个任务)。由于这两个引擎实现了相同的接口并共享相同的API,下面的代码示例对于任何引擎都是有效的。这两个实现支持相同的配置选项。

然而,新的AsyncEmbeddedEngine提供了一些用于设置和优化并行处理的新配置选项。

依赖

要使用Debezium Engine模块,将debezium-api模块添加到应用程序的依赖项中。还应将debezium-embedded模块添加到依赖项中,这是该API的一个开箱即用的实现。对于Maven,这需要将以下内容添加到应用程序的POM文件中:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${version.debezium}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${version.debezium}</version>
</dependency>

其中${version.debezium}可以是您使用的Debezium版本,也可以是一个包含Debezium版本字符串的Maven属性的值。

同样,为您的应用程序将使用的每个Debezium连接器添加依赖项。例如,可以将以下内容添加到您的应用程序的Maven POM文件中,以便您的应用程序可以使用MySQL连接器:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>${version.debezium}</version>
</dependency>

或者对于 MongoDB 连接器:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mongodb</artifactId>
    <version>${version.debezium}</version>
</dependency>

本文档的其余部分介绍了如何在应用程序中嵌入 MySQL 连接器。其他连接器的使用方式类似,但连接器特定的配置、主题和事件除外。

打包项目

Debezium使用SPI通过ServiceLoader加载实现。实现可以基于连接器类型,也可以是自定义实现。

有些接口有多个实现。例如,io.debezium.snapshot.spi.SnapshotLock在核心中有一个默认实现,并且针对每个连接器有特定的实现。为了确保Debezium可以定位所需的实现,必须显式地配置构建工具以合并META-INF/services文件。

例如,如果使用的是Maven shade插件,请添加ServicesResourceTransformer转换器,如下例所示:

...
<configuration>
 <transformers>
    ...
    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
    ...
 </transformers>
...
</configuration>

或者,如果您使用 Maven Assembly 插件,则可以使用 metaInf-services 容器描述符处理程序。

在代码中

您的应用程序需要为每个要运行的连接器实例设置一个嵌入式引擎。io.debezium.engine.DebeziumEngine<‍R‍>类作为一个易于使用的包装器,完全管理连接器的生命周期。您可以使用它的构建器API创建DebeziumEngine实例,提供以下内容:

  • 您希望以哪种格式接收消息,例如JSON、Avro或Kafka Connect SourceRecord(见链接)
  • 配置属性(可能从属性文件中加载),用于定义引擎和连接器的环境
  • 一个方法,该方法将被调用以处理连接器产生的每个数据变更事件

以下是一个配置和运行嵌入式引擎的示例代码:

// Define the configuration for the Debezium Engine with MySQL connector...
final Properties props = new Properties();
props.setProperty("name", "engine");
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", "/path/to/storage/offsets.dat");
props.setProperty("offset.flush.interval.ms", "60000");
/* begin connector properties */
props.setProperty("database.hostname", "localhost");
props.setProperty("database.port", "3306");
props.setProperty("database.user", "mysqluser");
props.setProperty("database.password", "mysqlpw");
props.setProperty("database.server.id", "85744");
props.setProperty("topic.prefix", "my-app-connector");
props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory");
props.setProperty("schema.history.internal.file.filename", "/path/to/storage/schemahistory.dat");

// Create the engine with this configuration ...
try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
        .using(props)
        .notifying(record -> {
            System.out.println(record);
        }).build()
    ) {
    // Run the engine asynchronously ...
    ExecutorService executor = Executors.newSingleThreadExecutor();
    executor.execute(engine);

    // Do something else or wait for a signal or an event
}
// Engine is stopped when the main code is finished

让我们更详细地研究这段代码,从我们在这里重复的前几行开始:

// Define the configuration for the Debezium Engine with MySQL connector...
final Properties props = new Properties();
props.setProperty("name", "engine");
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", "/path/to/storage/offsets.dat");
props.setProperty("offset.flush.interval.ms", "60000");

这将创建一个新的标准Properties对象,用于设置引擎所需的几个字段,无论使用哪个连接器。第一个字段是引擎的名称,它将在连接器产生的源记录和内部状态中使用,因此在应用程序中使用一些有意义的名称。
connector.class字段定义了扩展Kafka Connect org.apache.kafka.connect.source.SourceConnector抽象类的类名;在此示例中,我们指定了Debezium的MySqlConnector类。

当Kafka Connect连接器运行时,它会从源中读取信息,并定期记录定义了它已经处理了多少信息的"偏移量"。如果连接器重新启动,它将使用最后记录的偏移量来确定在源信息中应该从哪里恢复读取。由于连接器不知道也不关心偏移量的存储方式,因此引擎需要提供一种存储和恢复这些偏移量的方式。我们的配置的下几个字段指定了我们的引擎应该使用FileOffsetBackingStore类将偏移量存储在本地文件系统上的/path/to/storage/offset.dat文件中(文件可以任意命名和存储在任何位置)。此外,尽管连接器在生成每个源记录时记录偏移量,但引擎会定期将偏移量刷新到后备存储(在我们的示例中,每分钟刷新一次)。这些字段可以根据您的应用程序需要进行调整。

接下来的几行定义了特定于连接器的字段(在每个连接器文档中有记录),在我们的示例中是MySqlConnector连接器:

 /* begin connector properties */
    props.setProperty("database.hostname", "localhost")
    props.setProperty("database.port", "3306")
    props.setProperty("database.user", "mysqluser")
    props.setProperty("database.password", "mysqlpw")
    props.setProperty("database.server.id", "85744")
    props.setProperty("topic.prefix", "my-app-connector")
    props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory")
    props.setProperty("schema.history.internal.file.filename", "/path/to/storage/schemahistory.dat")

在这里,我们设置了MySQL数据库服务器运行的主机机器的名称和端口号,并定义了将用于连接到MySQL数据库的用户名和密码。请注意,对于MySQL,用户名和密码应对应于已被授予以下MySQL权限的MySQL数据库用户:

  • SELECT
  • RELOAD
  • SHOW DATABASES
  • REPLICATION SLAVE
  • REPLICATION CLIENT

在读取数据库的一致快照时,需要前三个权限。最后两个权限允许数据库读取通常用于MySQL复制的服务器的binlog。

该配置还包括一个用于MySQL的数值标识符。由于MySQL的binlog是MySQL复制机制的一部分,因此为了读取binlog,MySqlConnector实例必须加入MySQL服务器组,这意味着该服务器ID必须是1到232-1之间的任意整数。在我们的代码中,我们将其设置为一个相当大但有些随机的值,仅供我们的应用程序使用。

该配置还指定了MySQL服务器的逻辑名称。连接器将此逻辑名称包含在其生成的每个源记录的主题字段中,使您的应用程序能够区分这些记录的来源。我们的示例使用了一个名为"products"的服务器名称,这可能是因为数据库包含产品信息。当然,您可以为您的应用程序命名任何有意义的名称。

当MySqlConnector类运行时,它会读取MySQL服务器的binlog,其中包括对由服务器托管的数据库所做的所有数据更改和模式更改。由于所有数据更改都是基于拥有表格的模式结构化的,因此连接器需要跟踪所有模式更改,以便可以正确解码更改事件。连接器记录模式信息,以便如果连接器重新启动并恢复从最后记录的偏移量读取,它知道该偏移量时数据库模式的确切外观。连接器如何记录数据库模式历史记录在我们的配置的最后两个字段中定义,即我们的连接器应该使用FileSchemaHistory类将数据库模式历史更改存储在本地文件系统上的/path/to/storage/schemahistory.dat文件中(同样,此文件可以任意命名和存储在任何位置)。

最后,使用build()方法构建不可变配置。(顺便说一下,我们可以使用Configuration.read(…)方法之一从属性文件中读取配置,而不是通过编程方式构建它。)

现在我们有了一个配置,我们可以创建引擎。以下是相关的代码行:

// Create the engine with this configuration ...
try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
        .using(props)
        .notifying(record -> {
            System.out.println(record);
        })
        .build()) {
}

所有的更改事件都将传递给给定的处理方法,该方法必须与java.util.function.Consumer<‍R‍>函数接口的签名匹配,其中<‍R‍>必须与调用create()时指定的格式类型匹配。请注意,您的应用程序的处理函数不应抛出任何异常;如果抛出异常,引擎将记录方法抛出的任何异常,并继续处理下一个源记录,但您的应用程序将没有机会处理导致异常的特定源记录,这意味着您的应用程序可能与数据库不一致。

此时,我们有一个已配置并准备运行的DebeziumEngine对象,但它什么也不做。DebeziumEngine设计为由Executor或ExecutorService异步执行:

// Run the engine asynchronously ...
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);

// Do something else or wait for a signal or an event

您的应用程序可以通过调用其 close() 方法来安全、优雅地停止引擎:

// At some later time ...
engine.close();

或者,由于引擎支持Closeable接口,当离开try块时,它将被自动调用。
引擎的连接器将停止从源系统读取信息,将所有剩余的更改事件转发给处理函数,并将最新的偏移量刷新到偏移量存储中。只有在所有这些操作完成后,引擎的run()方法才会返回。如果您的应用程序需要在退出之前等待引擎完全停止,您可以使用ExecutorService的shutdown和awaitTermination方法来实现:

try {
    executor.shutdown();
    while (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
        logger.info("Waiting another 5 seconds for the embedded engine to shut down");
    }
}
catch ( InterruptedException e ) {
    Thread.currentThread().interrupt();
}

或者,您可以在创建DebeziumEngine时注册CompletionCallback作为回调函数,以便在引擎终止时得到通知。

请记住,当JVM关闭时,它只会等待非守护线程。因此,当您在守护线程上运行引擎时,如果您的应用程序退出,请确保等待引擎进程完成。

为了确保优雅和完全的关闭,并确保每个源记录仅发送一次到应用程序,您的应用程序应始终正确停止引擎。例如,不要依赖于关闭ExecutorService,因为这会中断运行的线程。虽然当线程被中断时,DebeziumEngine确实会终止,但引擎可能无法完全终止,并且当您的应用程序重新启动时,它可能会看到在关闭之前处理的一些相同的源记录。

正如前面提到的,DebeziumEngine接口有两个实现。这两个实现使用相同的API,前面的代码示例对两个版本都有效。唯一的例外是创建DebeziumEngine实例。正如在介绍中提到的,默认情况下使用EmbeddedEngine实现。因此,DebeziumEngine.create(Json.class)方法在内部使用EmbeddedEngine实例。

如果您想使用新的AsyncEmbeddedEngine实例,可以使用以下方法:DebeziumEngine#create(KeyValueHeaderChangeEventFormat<‍K, V, H‍> format, String builderFactory)

例如,要创建一个使用AsyncEmbeddedEngine并以JSON作为其键、值和标头格式的嵌入式引擎,您可以使用以下代码:

try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine
        .create(KeyValueHeaderChangeEventFormat.of(Json.class, Json.class, Json.class),
                "io.debezium.embedded.async.ConvertingAsyncEngineBuilderFactory")
        .using(props)
        .notifying(record -> {
            System.out.println(record);
        }).build()
    ) {
    // Also run the engine asynchronously ...
    ExecutorService executor = Executors.newSingleThreadExecutor();
    executor.execute(engine);

    // Do something else or wait for a signal or an event
}

输出消息格式

DebeziumEngine#create()可以接受多个不同的参数,这些参数会影响消息被消费者接收的格式。允许的值为:

  • Connect.class - 输出值是包装Kafka Connect的SourceRecord的变更事件
  • Json.class - 输出值是键和值对,编码为JSON字符串
  • JsonByteArray.class - 输出值是键和值对,格式化为JSON并编码为UTF-8字节数组
  • Avro.class - 输出值是以Avro序列化记录编码的键和值对
  • CloudEvents.class - 输出值是编码为 消息的键和值对

在调用DebeziumEngine#create()时也可以指定标头格式。允许的值为:

  • Json.class - 标头值被编码为JSON字符串
  • JsonByteArray.class - 标头值被格式化为JSON并编码为UTF-8字节数组

在内部,引擎将数据转换委托给Kafka Connect或Apicurio转换器实现,使用最适合执行转换的算法。可以使用引擎属性对转换器进行参数化以修改其行为。JSON输出格式的示例:

final Properties props = new Properties();
...
props.setProperty("converter.schemas.enable", "false"); // don't include schema in message
...
final DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
    .using(props)
    .notifying((records, committer) -> {

        for (ChangeEvent<String, String> r : records) {
            System.out.println("Key = '" + r.key() + "' value = '" + r.value() + "'");
            committer.markProcessed(r);
        }
...

其中 ChangeEvent 数据类型是键/值对。

消息转换

在将消息传递给处理程序之前,可以通过Kafka Connect的简单消息转换(SMT)管道运行它们。每个SMT可以将消息保持不变、修改消息或过滤消息。使用属性transforms配置链。属性包含要应用的转换的逗号分隔的逻辑名称列表。然后,属性transforms.<‍logical_name‍>.type为每个转换定义了实现类的名称,transforms.<‍logical_name‍>.*配置选项将传递给转换。
配置示例:

final Properties props = new Properties();
...
props.setProperty("transforms", "filter, router");                                               // (1)
props.setProperty("transforms.router.type", "org.apache.kafka.connect.transforms.RegexRouter");  // (2)
props.setProperty("transforms.router.regex", "(.*)");                                            // (3)
props.setProperty("transforms.router.replacement", "trf$1");                                     // (3)
props.setProperty("transforms.filter.type", "io.debezium.embedded.ExampleFilterTransform");      // (4)

定义了两个转换 - 过滤器和路由器

路由器转换的实现是 org.apache.kafka.connect.transforms.RegexRouter

路由器转换有两个配置选项 - 正则表达式和替换

过滤器转换的实现是 io.debezium.embedded.ExampleFilterTransform

消息转换谓词

谓词可以应用于转换,以使转换成为可选的。

配置示例如下

final Properties props = new Properties();
...
props.setProperty("transforms", "filter");                                                 // (1)
props.setProperty("predicates", "headerExists");                                           // (2)
props.setProperty("predicates.headerExists.type", "org.apache.kafka.connect.transforms.predicates.HasHeaderKey"); //(3)
props.setProperty("predicates.headerExists.name", "header.name");                          // (4)
props.setProperty("transforms.filter.type", "io.debezium.embedded.ExampleFilterTransform");// (5)
props.setProperty("transforms.filter.predicate", "headerExists");                          // (6)
props.setProperty("transforms.filter.negate", "true"); 

定义了一个转换 - 过滤器

定义了一个谓词 - headerExists

headerExists 谓词的实现是 org.apache.kafka.connect.transforms.predicates.HasHeaderKey

headerExists 谓词有一个配置选项 - name

过滤器转换的实现是 io.debezium.embedded.ExampleFilterTransform

过滤器转换需要谓词 headerExists

过滤器转换期望谓词的值被否定,从而使谓词确定标头是否不存在

高级记录使用

对于某些用例,例如尝试批量写入记录或针对异步 API 时,上面描述的功能接口可能具有挑战性。在这些情况下,使用 io.debezium.engine.DebeziumEngine.ChangeConsumer. 接口可能会更容易。

此接口具有单个函数,其签名如下:

/**
  * Handles a batch of records, calling the {@link RecordCommitter#markProcessed(Object)}
  * for each record and {@link RecordCommitter#markBatchFinished()} when this batch is finished.
  * @param records the records to be processed
  * @param committer the committer that indicates to the system that we are finished
  */
 void handleBatch(List<R> records, RecordCommitter<R> committer) throws InterruptedException;

如Javadoc中所提到的,RecordCommitter对象将在每个记录和每个批次完成时被调用。RecordCommitter接口是线程安全的,这允许对记录进行灵活的处理。

您可以选择重写已处理的记录的偏移量。这可以通过首先调用RecordCommitter#buildOffsets()构建一个新的Offsets对象,使用Offsets#set(String key, Object value)更新偏移量,然后使用更新后的Offsets调用RecordCommitter#markProcessed(SourceRecord record, Offsets sourceOffsets)来完成。

要使用ChangeConsumer API,您必须将接口的实现传递给通知API,如下所示:

class MyChangeConsumer implements DebeziumEngine.ChangeConsumer<RecordChangeEvent<SourceRecord>> {
  public void handleBatch(List<RecordChangeEvent<SourceRecord>> records, RecordCommitter<RecordChangeEvent<SourceRecord>> committer) throws InterruptedException {
    ...
  }
}
// Create the engine with this configuration ...
DebeziumEngine<RecordChangeEvent<SourceRecord>> engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
        .using(props)
        .notifying(new MyChangeConsumer())
        .build();

如果使用 JSON 格式(等效格式也适用于其他格式),则代码将如下所示:

class JsonChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<String, String>> {
  public void handleBatch(List<ChangeEvent<String, String>> records,
    RecordCommitter<ChangeEvent<String, String>> committer) throws InterruptedException {
    ...
  }
}
// Create the engine with this configuration ...
DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
        .using(props)
        .notifying(new JsonChangeConsumer())
        .build();

引擎属性

除非有默认值,否则以下配置属性是必需的(为了文本格式化,Java 类的包名称被替换为 <…​>)。

属性默认值描述
name连接器实例的唯一名称。
connector.class连接器的 Java 类的名称
offset.storage负责连接器偏移持久性的 Java 类的名称。
offset.storage.file.filename存储偏移量的文件的路径。
offset.storage.topic要存储偏移量的 Kafka 主题的名称。
offset.storage.partitions创建偏移量存储主题时使用的分区数。
offset.storage.replication.factor创建偏移存储主题时使用的复制因子。
offset.commit.policy提交策略的 Java 类的名称。它根据处理的事件数和自上次提交以来经过的时间定义何时触发偏移提交。默认是基于时间间隔的定期提交策略。
offset.flush.interval.ms60000尝试提交偏移的间隔。默认值为 1 分钟。
offset.flush.timeout.ms5000在取消该过程并恢复将来尝试提交的偏移数据之前,等待记录刷新和分区要提交到偏移存储的最大毫秒数。默认值为 5 秒。
errors.max.retries-1失败前连接错误的最大重试次数(-1 = 无限制,0 = 禁用,> 0 = 重试次数)。
errors.retry.delay.initial.ms300遇到连接错误时重试的初始延迟(以毫秒为单位)。每次重试时此值将加倍,但不会超过 errors.retry.delay.max.ms。
errors.retry.delay.max.ms10000遇到连接错误时重试之间的最大延迟(以毫秒为单位)。

异步引擎属性

属性默认值描述
record.processing.threads根据工作负载和可用 CPU 核心数按需分配线程。可用于处理更改事件记录的线程数。如果未指定任何值(默认值),则引擎将使用 Java ThreadPoolExecutor 根据当前工作负载动态调整线程数。最大线程数是给定计算机上的 CPU 核心数。如果指定了值,则引擎将使用 Java 固定线程池方法创建具有指定线程数的线程池。要使用给定计算机上的所有可用核心,请设置占位符值 AVAILABLE_CORES。
record.processing.shutdown.timeout.ms1000调用任务关闭后等待处理已提交记录的最长时间(以毫秒为单位)。
record.processing.orderORDERED确定应如何生成记录。ORDERED记录按顺序处理;也就是说,它们按从数据库获取的顺序生成。UNORDERED记录按非顺序处理;也就是说,它们可以按与源数据库不同的顺序生成。UNORDERED 选项的非顺序处理可实现更好的吞吐量,因为记录在任何 SMT 处理和消息序列化完成后立即生成,而无需等待其他记录。当向引擎提供 ChangeConsumer 方法时,此选项不起作用。
record.processing.with.serial.consumerfalse指定是否应从提供的 Consumer 创建默认的 ChangeConsumer,从而导致串行 Consumer 处理。如果您在使用 API 创建引擎时指定了 ChangeConsumer 接口,则此选项无效。
task.management.timeout.ms180,000 (3 min)引擎等待任务生命周期管理操作(启动和停止)完成的时间(以毫秒为单位)。

数据库模式历史属性

一些连接器还需要一组额外的属性来配置数据库模式历史记录:

  • MySQL
  • SQL Server
  • Oracle
  • Db2

如果没有正确配置数据库模式历史记录,则连接器将拒绝启动。默认配置需要可用的Kafka集群。对于其他部署,可使用基于文件的数据库模式历史记录存储实现。

属性默认值描述
schema.history.internal负责持久保存数据库模式历史的 Java 类的名称。
schema.history.internal.file.filename存储数据库架构历史记录的文件的路径。
schema.history.internal.kafka.topic存储数据库架构历史记录的 Kafka 主题。
schema.history.internal.kafka.bootstrap.servers要连接的 Kafka 集群服务器的初始列表。集群提供用于存储数据库架构历史记录的主题。

处理故障

当引擎执行时,其连接器会主动记录每个源记录中的源偏移,并且引擎会定期将这些偏移刷新到持久存储中。当应用程序和引擎正常关闭或崩溃时,重新启动后,引擎及其连接器将从最后记录的偏移处恢复读取源信息。

那么,当嵌入式引擎正在运行时应用程序发生故障会发生什么?结果是,在重新启动后,应用程序很可能会收到一些之前在崩溃之前已经处理过的源记录。这取决于引擎多久将偏移刷新到其存储中(通过offset.flush.interval.ms属性)以及特定连接器在一个批次中返回多少个源记录。最理想的情况是每次都刷新偏移量(例如,将offset.flush.interval.ms设置为0),但即使这样,嵌入式引擎仍然只会在从连接器接收到每个源记录批次后刷新偏移量。
例如,MySQL连接器使用max.batch.size来指定批次中可能出现的源记录的最大数量。即使将offset.flush.interval.ms设置为0,当应用程序在崩溃后重新启动时,可能会看到最多n个重复记录,其中n是批次的大小。如果将offset.flush.interval.ms属性设置得更高,则应用程序可能会看到最多n * m个重复记录,其中n是批次的最大大小,m是在单个偏移刷新间隔期间可能累积的批次数。(显然,可以将嵌入式连接器配置为不进行批处理并始终刷新偏移量,从而使应用程序永远不会接收到任何重复的源记录。但是,这会大大增加开销并降低连接器的吞吐量。)

总的来说,当使用嵌入式连接器时,应用程序在正常操作期间(包括在正常关闭后重新启动)将仅接收到每个源记录一次,但在崩溃或不正确关闭后重新启动后,需要容忍接收到重复事件。如果应用程序需要更严格的确切一次性行为,那么应该使用完整的Debezium平台,该平台可以提供确切一次性保证(即使在崩溃和重新启动后)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/920786.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于单片机的多功能跑步机控制系统

本设计基于单片机的一种多功能跑步机控制系统。该系统以STM32单片机为主控制器&#xff0c;由七个电路模块组成&#xff0c;分别是&#xff1a;单片机模块、电机控制模块、心率检测模块、音乐播放模块、液晶显示模块、语音控制模块、电源模块。其中&#xff0c;单片机模块是整个…

测试工程师如何在面试中脱颖而出

目录 1.平时工作中是怎么去测的&#xff1f; 2.B/S架构和C/S架构区别 3.B/S架构的系统从哪些点去测&#xff1f; 4.你为什么能够做测试这一行&#xff1f;&#xff08;根据个人情况分析理解&#xff09; 5.你认为测试的目的是什么&#xff1f; 6.软件测试的流程&#xff…

PHM技术:基于支持向量机的智能故障诊断 | 行星齿轮箱智能故障诊断

目录 1.数据获取 2.特征提取与选择 3.健康状态识别 1.数据获取 用的行星齿轮箱数据采集自图1中的多级齿轮传动系统实验台中&#xff0c;在实验过程中&#xff0c;分别模拟了8种行星齿轮箱的健康状态&#xff0c;包括正常、第一级太阳轮点蚀、第一级太阳轮齿根裂纹、第一级…

【划分型 DP-约束划分个数】【hard】【阿里笔试】力扣1278. 分割回文串 III

给你一个由小写字母组成的字符串 s&#xff0c;和一个整数 k。 请你按下面的要求分割字符串&#xff1a; 首先&#xff0c;你可以将 s 中的部分字符修改为其他的小写英文字母。 接着&#xff0c;你需要把 s 分割成 k 个非空且不相交的子串&#xff0c;并且每个子串都是回文串…

国标GB28181视频平台EasyCVR视频融合平台H.265/H.264转码业务流程

在当今数字化、网络化的视频监控领域&#xff0c;大中型项目对于视频监控管理平台的需求日益增长&#xff0c;特别是在跨区域、多设备、高并发的复杂环境中。EasyCVR视频监控汇聚管理平台正是为了满足这些需求而设计的&#xff0c;它不仅提供了全面的管理功能&#xff0c;还支持…

相机触发模式

参考自&#xff1a;相机触发模式_硬触发和软触发的区别-CSDN博客 一、图像采集模式分类 相机的图像采集模式分为内触发模式与外触发模式。其中内触发模式包含连续采集、单帧采集两种形式&#xff1b;外触发模式包含软件外触发、硬件外触发。本文以海康相机的软件平台作介绍&a…

【数据结构】【线性表】【练习】反转链表

申明 该题源自力扣题库19&#xff0c;文章内容&#xff08;代码&#xff0c;图表等&#xff09;均原创&#xff0c;侵删&#xff01; 题目 给你单链表的头指针head以及两个整数left和right&#xff0c;其中left<right&#xff0c;请你反转从位置left到right的链表节点&…

Applied Intelligence投稿

一、关于手稿格式&#xff1a; 1、该期刊是一个二区的&#xff0c;模板使用Springer nature格式&#xff0c; 期刊投稿要求&#xff0c;详细期刊投稿指南&#xff0c;大部分按Soringernature模板即可&#xff0c;图片表格声明参考文献命名要求需注意。 2、参考文献&#xff…

【Google Cloud】Private Service Connect 托管式服务

简介 Private Service Connect 是什么 Private Service Connect 是 Google Cloud&#xff08;原名 GCP&#xff09;Virtual Private Cloud&#xff08;VPC&#xff09;的一项功能。 该功能主要用于以下两个场景&#xff1a; 使用私有 IP 访问 Google Cloud 的 API。将用户自…

JDK、MAVEN与IDEA的安装与配置

1.认识JDK、MAVEN与IDEA JDK 提供了编译和运行Java程序的基本环境。Maven 帮助管理项目的构建和依赖。IDEA 提供了一个强大的开发环境&#xff0c;使得编写、调试和运行Java程序更加高效。 2. 安装与环境配置 2.1 官网地址 选择你需要的版本下载&#xff1a; MAVEN下载传送…

MySQL深入:B+树的演化、索引和索引结构

提示&#xff1a;内容是读《MySQL技术内幕&#xff1a;InnoDB存储引擎》&#xff0c;笔记摘要 文章目录 二叉查找树平衡二叉树(AVL) B树(BTree)B树(BTree)InnoDB B树索引索引结构&#xff08;InnoDB B树&#xff09;B树存放的数据量 二叉查找树 在二叉查找树中&#xff0c;左子…

FairGuard游戏加固实机演示

此前&#xff0c;FairGuard对市面上部分游戏遭遇破解的案例进行了详细分析&#xff0c;破解者会采用静态分析与动态调试相结合的手段&#xff0c;逆向分析出代码逻辑并对其进行篡改&#xff0c;实现作弊功能&#xff0c;甚至是对游戏资源文件进行篡改&#xff0c;从而制售外挂。…

聊一聊Elasticsearch的索引数据搜索过程

与向索引写入数据的时候必须是主分片来承担不同。搜索的时候&#xff0c;主分片和副本分片均可以承担&#xff0c;最终选用主分片还是副本分片是通过轮询的方式来进行选择的。 索引数据的搜索过程&#xff0c;依据有无路由值&#xff0c;分为两种&#xff1a;不带路由值的搜索…

视频修复技术和实时在线处理

什么是视频修复&#xff1f; 视频修复技术的目标是填补视频中的缺失部分&#xff0c;使视频内容连贯合理。这项技术在对象移除、视频修复和视频补全等领域有着广泛的应用。传统方法通常需要处理整个视频&#xff0c;导致处理速度慢&#xff0c;难以满足实时处理的需求。 技术发…

自动化运维-检测Linux服务器CPU、内存、负载、IO读写、机房带宽和服务器类型等信息脚本

前言&#xff1a;以上脚本为今年8月1号发布的&#xff0c;当时是没有任何问题&#xff0c;但现在脚本里网络速度测试py文件获取不了了&#xff0c;测速这块功能目前无法实现&#xff0c;后面我会抽时间来研究&#xff0c;大家如果有建议也可以分享下。 脚本内容&#xff1a; #…

C语言-11-18笔记

1.C语言数据类型 类型存储大小值范围char1 字节-128 到 127 或 0 到 255unsigned char1 字节0 到 255signed char1 字节-128 到 127int2 或 4 字节-32,768 到 32,767 或 -2,147,483,648 到 2,147,483,647unsigned int2 或 4 字节0 到 65,535 或 0 到 4,294,967,295short2 字节…

【HashMap篇】HashMap实现原理|put方法|扩容机制|寻址算法|1.7情况下的多线程死循环问题

目录 一、二叉树、红黑树、散列表简单介绍 1.二叉树 &#xff08;1&#xff09;什么是二叉树 &#xff08;2&#xff09;什么是二叉搜索树 2.红黑树 &#xff08;1&#xff09;什么是红黑树 3.散列表 &#xff08;1&#xff09;什么是散列表 &#xff08;2&#xff09;…

AI 大模型重塑软件开发的未来

✅作者简介&#xff1a;2022年博客新星 第八。热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏…

idea 配置 leetcode插件 代码模版

开启自定义模版 codeFileName&#xff1a; $!velocityTool.camelCaseName(${question.titleSlug})Code Template&#xff1a; ${question.content} package leetcode.editor.cn; /*** ${question.title}* author lww* since $!velocityTool.date()*/ public class $!velocit…

微软Microsoft有许多耳熟能详的软件?

微软有许多耳熟能详的软件&#xff0c;以下是一些比较有代表性的&#xff1a; 一、软件类 操作系统&#xff1a; Windows 系列&#xff1a;这是微软最为著名且广泛使用的操作系统&#xff0c;如 Windows 10、Windows 11 等。它为全球绝大多数个人电脑提供了操作平台&#xff0c…