深入解析 Flink CDC 增量快照读取机制

一、Flink-CDC 1.x 痛点

Flink CDC 1.x 使用 Debezium 引擎集成来实现数据采集,支持全量加增量模式,确保数据的一致性。然而,这种集成存在一些痛点需要注意:

  1. 一致性通过加锁保证:在保证数据一致性时,Debezium 需要对读取的库或表加锁。全局锁可能导致数据库出现挂起情况,而表级锁会影响表的写操作。

  2. 只支持单并发读取:Flink CDC 1.x版本只支持单并发读取,对于大表读取非常耗时。如果需要读取的数据量较大,可能会导致性能瓶颈。

  3. 全量读取阶段不支持 checkpoint:CDC 的initial模式下读取分为两个阶段,全量和增量。然而,在全量读取阶段,不支持 checkpoint 的功能。如果出现故障,必须重新进行全量读取操作。

1.1、全局锁

在 Flink CDC 1.x 中,全量读取时的锁机制流程如下:

  1. 开始全量读取:当 Flink CDC 启动全量读取任务时,它会与 MySQL 数据库建立连接,并开始读取源表的数据。

  2. 获取读取的锁:为了保证数据的一致性,Flink CDC 在全量读取过程中需要获取读取的锁。在默认情况下,Flink CDC 使用全局锁(Global Lock)来确保数据的一致性。

  3. 全局锁的获取:Flink CDC 通过向 MySQL 数据库发送命令来获取全局锁。全局锁将阻塞其他对源表进行写操作的事务,确保在全量读取期间不会有数据的变更。

  4. 全量读取数据:一旦获得全局锁,Flink CDC 开始进行全量读取。它会扫描源表的所有数据,并将其传输到目标系统(如 Doris)进行加载和处理。

  5. 释放全局锁:当全量读取完成后,Flink CDC 会释放全局锁,允许其他事务对源表进行写操作。

全局锁的获取可能会导致一些潜在的问题:

  1. 长时间锁定:全局锁通常需要在全量读取过程中长时间持有,这可能会对其他业务操作产生影响。如果全量读取任务的持续时间较长,其他事务可能需要等待较长时间才能执行读写操作。
  2. 性能影响:获取全局锁可能导致性能下降。当全局锁被获取时,其他事务需要等待锁的释放,这可能导致并发性下降,特别是在高负载的情况下。长时间的等待可能会导致数据库挂起(hang),影响整体系统的吞吐量和响应时间。

1.2、表级锁

在 Flink CDC 1.x 中,全量读取表时的表锁机制流程如下:

  1. 开始全量读取:当 Flink CDC 启动全量读取任务时,它会与 MySQL 数据库建立连接,并准备开始读取源表的数据。

  2. 获取表级锁:为了确保数据的一致性,在全量读取期间需要获取源表的表级锁。表级锁将阻塞其他事务对源表进行写操作,以保证读取过程中数据不会发生变化。

  3. 发起锁请求:Flink CDC 向 MySQL 数据库发送请求,尝试获取源表的表级锁。这个请求将被发送到 MySQL 的锁管理器。

  4. 等待锁释放:如果源表的表级锁已经被其他事务占用,Flink CDC 将等待锁释放的信号。在等待期间,Flink CDC 将一直保持连接并监测锁的状态。

  5. 获取锁成功:一旦源表的表级锁被成功获取,Flink CDC 可以开始进行全量数据的读取操作。它会扫描源表的所有数据,并将其传输到目标系统进行加载和处理。

  6. 释放表级锁:当全量读取完成后,Flink CDC 会释放源表的表级锁,允许其他事务对源表进行写操作。

表级锁的获取和释放可能会带来一些潜在的问题:

  1. 数据一致性问题:表级锁在全量读取期间会锁定整张表,以保证数据的一致性。然而,在某些情况下,如果全量读取过程中出现了长时间的阻塞或异常情况,可能会导致数据一致性问题。
  2. 长时间锁定:表级锁通常需要在读取过程中长时间持有,特别是在全量读取时。这可能会对其他事务产生长时间的阻塞,影响系统的响应性能。

二、Flink-CDC 2.x 新特性

Flink 2.x不仅引入了增量快照读取机制,还带来了一些其他功能的改进。以下是对Flink 2.x的主要功能的介绍:

  1. 增量快照读取:Flink 2.x引入了增量快照读取机制,这是一种全新的数据读取方式。该机制支持并发读取和以chunk为粒度进行checkpoint。在增量快照读取过程中,Flink首先根据表的主键将其划分为多个块(chunk),然后将这些块分配给多个读取器并行读取数据。这一机制极大地提高了数据读取的效率。
  2. 精确一次性处理:Flink 2.x引入了Exactly-Once语义,确保数据处理结果的精确一次性。MySQL CDC 连接器是Flink的Source连接器,可以利用Flink的checkpoint机制来确保精确一次性处理。
  3. 动态加表:Flink 2.x支持动态加表,通过使用savepoint来复用之前作业的状态,解决了动态加表的问题。
  4. 无主键表的处理:Flink 2.x对无主键表的读取和处理进行了优化。在无主键表中,Flink可以通过一些额外的字段来识别数据记录的唯一性,从而实现准确的数据读取和处理。

本文主要介绍了Flink 2.x引入的重要特性之一:增量快照读取机制。该机制带来了并发读取、chunk粒度的checkpoint等优势,提升了数据读取的效率。

三、增量快照读取机制

3.1、功能

增量快照读取基本功能:

  1. 并发读取:在增量快照读取期间,源(Source)可以支持并发读取。这意味着多个读取器可以同时读取数据,从而提高读取的速度和效率。
  2. Chunk级别的checkpoint:增量快照读取期间,源可以进行chunk级别的checkpoint。这意味着在读取过程中,可以对数据进行更细粒度的检查点,提高故障恢复的准确性和效率。
  3. 全量增量无锁读取算法:相比于旧的快照机制,全量快照读取不需要源具有数据库锁权限。这降低了对数据库的依赖和权限要求,简化了配置和部署的过程。

3.2、并发读取

增量快照读取的并行读取功能利用了Flink的Source并行度来控制源的并行度。你可以通过设置作业的并行度(parallelism.default)来实现。

在SQL CLI中,可以使用以下命令进行设置:

Flink SQL> SET 'parallelism.default' = 4;

通过将并行度设置为4,Flink CDC Source算子将占用4个slot来并行读取数据。这样可以最大程度地利用系统资源,提高数据读取的效率和速度。

3.3、Chunk级别的checkpoint

3.3.1、Chunk

为了充分利用并行Source,MySQL CDC Source在增量快照读取过程中使用主键列将表划分为多个分片(chunk)。默认情况下,MySQL CDC Source会识别表的主键列,并使用主键中的第一列作为分片列。如果表中没有主键,增量快照读取将失败。你可以通过禁用scan.incremental.snapshot.enabled来回退到旧的快照读取机制。

对于数值和自动增量拆分列,MySQL CDC Source会按照固定步长高效地拆分块。例如,如果你有一个主键列为id的表,类型为自动增量的BIGINT,最小值为0,最大值为100,并设置表选项scan.incremental.snapshot.chunk.size的值为25,那么表将被拆分为以下块:

(-∞, 25),
[25, 50),
[50, 75),
[75, 100),
[100, +∞)

对于其他类型的主键列,MySQL CDC Source执行类似以下形式的语句来获取每个块的低值和高值:SELECT MAX(STR_ID) AS chunk_high FROM (SELECT * FROM TestTable WHERE STR_ID > 'uuid-001' limit 25),然后将块集分割如下:

(-∞, 'uuid-001'),
['uuid-001', 'uuid-009'),
['uuid-009', 'uuid-abc'),
['uuid-abc', 'uuid-def'),
[uuid-def, +∞).

通过这种分片方式,MySQL CDC Source可以高效地划分表数据,以实现并行的增量快照读取。每个读取器将负责读取和处理一个或多个分片的数据,从而提高整体的读取性能和效率。

注意,scan.incremental.snapshot.chunk.size的默认值为8096

3.3.2、原理

在 Flink CDC 中实现 Chunk 级别的 checkpoint 本质是使用 Flink 的 Checkpointing 机制和相应的配置,启用 Chunk 级别的 checkpoint 后,Flink CDC 将在每个 Chunk 完成读取后进行一次 checkpoint,以确保数据的一致性和容错性。

注意,Flink 的 checkpoint 机制包括两种类型的 checkpoint:时间驱动和计数驱动。但Flink CDC 中 Chunk 级别的 checkpoint 并不是直接利用Flink 计数驱动的 checkpoint 来实现的,相反,它是 Flink CDC 根据自身的机制自己实现的。它提供了在每个 Chunk 完成读取时进行一次 checkpoint 的能力,以实现更细粒度的数据一致性和容错性保障。

3.4、全量增量无锁读取算法【重点】

3.4.1、原理

3.4.1.1、全量无锁读取算法流程
  1. 首先,FlinkCDC 会先根据主键和粒度将要读取的表划分为多个分片(chunk)。

  2. 每个 MySQL CDC Source 负责读取一个分片,多个Source 可以并发读取多个chunk,完成当前分片处理后才可以读取下一个分片,直到读取完所有分片。

  3. 在读取每个分片时,FlinkCDC 使用一种名为偏移信号算法的方法来获取快照区块的最终一致输出。以下是该算法的简要步骤:

    • (1) 在读取chunk数据前先记录当前的 binlog 位置,即 LOW 偏移量。

    • (2) 执行语句 SELECT * FROM MyTable WHERE id > chunk_low AND id <= chunk_high,读取chunk分片内的数据并缓存至快照区块。

    • (3) 读取完chunk后再次记录当前的 binlog 位置记录,即 HIGH 偏移量,如下图:
      在这里插入图片描述

    • (4) 读取binlog:从 LOW 偏移量到 HIGH 偏移量之间的 binlog 记录,读取到的数据 append 到上个队列后面,并将此时binlog的最终offset保存至最后,如下图:

    在这里插入图片描述

    • (5) 检查读取到的 binlog 每条记录,如果属于chunk分片范围,则对之前缓存的chunk队列里的数据进行修正,最后将修正后的记录作为快照区块的最终输出,如下图:

    在这里插入图片描述

    • (6) 将此次chunk的元信息【lw,hw等】保存至MySqlSourceReader 进行备份【checkponit阶段也会保存此数据】,为后续增量读取做准备。
  4. 当所有chunk都被消费完毕后,即全量阶段同步完毕,此时将结束Source的并发读取,改为单线程读取binlog日志进行后续同步,此步骤在3.4.1.2、增量无锁读取算法流程。


  • 为了方便理解举例:表当前总数据为9条,Chunk切分粒度scan.incremental.snapshot.chunk.size=5;作业的并发数为2,故Mysql CDC Source 会有两个Task并行读取Chunk01,Chunk02,读取过程如下:

在这里插入图片描述

  • chunk01的数据流转过程如下:由于update#6、update#9 不属于chunk01分片范围故不做处理。

在这里插入图片描述

  • chunk02的数据流转过程如下:update#9、delete#7属于切片范围故修正缓存数据,而 update#4 不属于chunk02分片范围故不做处理。

在这里插入图片描述

FAQ[常见问题]:

  • chunk01 与 chunk02阶段有重叠部分,即 update#9,是否会影响数据准确性?
    • 答:不会,因为chunk只会对属于该分片范围的数据进行处理,故不会重复执行。
  • chunk01 与 chunk02 均未处理 update#4 日志,是否会影响数据准确性?
    • 答:不会,因为当所有chunk阶段结束后,MySqlSourceEnumerator调查员会根据所有chunk中的min(lw) 再次读取binlog,选择性补全数据,具体细节在:3.4.1.2、增量无锁读取算法流程
  • chun02 没有读取update#6的日志,是否会影响数据准确性?
    • 答:不会,因为update#6的日志 < lw,说明chunk02在lw时已经读取到了update#6后的最新数据,故不会影响数据准确性。
3.4.1.2、增量无锁读取算法流程
  1. 当全量阶段同步完毕后, MySqlSourceReader 会将每个 chunk 的 lw,hw等元数据汇报给 MySqlSourceEnumerator调查员,如下图:

在这里插入图片描述

  1. MySqlSourceEnumerator调查员取所有chunk中最小的lw 作为offset 来读取binlog日志,如下图:

    在这里插入图片描述

  2. 当一个 binlog 记录属于一个分片的主键范围内时,如果该记录在这个分片的 hw 之后,则该记录应该发送给下游,如下图:update#6、update#9虽然数据chunk02分片范围但<=hw 故舍弃;而update#4属于chunk01分片范围 且 >hw 代表缺失该条记录故发送至下游。

在这里插入图片描述

  1. 当一个 binlog 记录已经处于所有chunk中最大的hw时,即表示日志记录已经进入 Pure Binlog Phase,对于这样的 binlog 记录,不需进行比较,直接发送给下游,如下图:

在这里插入图片描述

至此增量无锁读取算法流程完毕

3.4.2、源码分析

  • MySql cdc 类图关系如下:

在这里插入图片描述

  • 快照读取chunk分片逻辑:MySqlSnapshotSplitReadTask#doExecute
protected SnapshotResult doExecute(
            ChangeEventSourceContext context,
            SnapshotContext snapshotContext,
            SnapshottingTask snapshottingTask)
            throws Exception {
        final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx =
                (RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext;
        ctx.offset = offsetContext;
        final SignalEventDispatcher signalEventDispatcher =
                new SignalEventDispatcher(
                        offsetContext.getPartition(),
                        topicSelector.topicNameFor(snapshotSplit.getTableId()),
                        dispatcher.getQueue());

        final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection);
        LOG.info(
                "Snapshot step 1 - Determining low watermark {} for split {}",
                lowWatermark,
                snapshotSplit);
        ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))
                .setLowWatermark(lowWatermark);
        signalEventDispatcher.dispatchWatermarkEvent(
                snapshotSplit, lowWatermark, SignalEventDispatcher.WatermarkKind.LOW);

        LOG.info("Snapshot step 2 - Snapshotting data");
        createDataEvents(ctx, snapshotSplit.getTableId());

        final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection);
        LOG.info(
                "Snapshot step 3 - Determining high watermark {} for split {}",
                highWatermark,
                snapshotSplit);
        signalEventDispatcher.dispatchWatermarkEvent(
                snapshotSplit, highWatermark, SignalEventDispatcher.WatermarkKind.HIGH);
        ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))
                .setHighWatermark(highWatermark);

        return SnapshotResult.completed(ctx.offset);
}
  • chunk分片数据读取后进行格式处理归一逻辑:RecordUtils#normalizedSplitRecords
/**
     * Normalize the records of snapshot split which represents the split records state on high
     * watermark. data input: [low watermark event] [snapshot events ] [high watermark event]
     * [binlog events] [binlog-end event] data output: [low watermark event] [normalized events]
     * [high watermark event]
     */
    public static List<SourceRecord> normalizedSplitRecords(
            MySqlSnapshotSplit snapshotSplit,
            List<SourceRecord> sourceRecords,
            SchemaNameAdjuster nameAdjuster) {
        List<SourceRecord> normalizedRecords = new ArrayList<>();
        Map<Struct, SourceRecord> snapshotRecords = new HashMap<>();
        List<SourceRecord> binlogRecords = new ArrayList<>();
        if (!sourceRecords.isEmpty()) {

            SourceRecord lowWatermark = sourceRecords.get(0);
            checkState(
                    isLowWatermarkEvent(lowWatermark),
                    String.format(
                            "The first record should be low watermark signal event, but is %s",
                            lowWatermark));
            SourceRecord highWatermark = null;
            int i = 1;
            for (; i < sourceRecords.size(); i++) {
                SourceRecord sourceRecord = sourceRecords.get(i);
                if (!isHighWatermarkEvent(sourceRecord)) {
                    snapshotRecords.put((Struct) sourceRecord.key(), sourceRecord);
                } else {
                    highWatermark = sourceRecord;
                    i++;
                    break;
                }
            }

            if (i < sourceRecords.size() - 1) {
                List<SourceRecord> allBinlogRecords =
                        sourceRecords.subList(i, sourceRecords.size() - 1);
                for (SourceRecord binlog : allBinlogRecords) {
                    if (isDataChangeRecord(binlog)) {
                        Object[] key =
                                getSplitKey(snapshotSplit.getSplitKeyType(), binlog, nameAdjuster);
                        // 当获取chunk lw hw 的binlog后会先判断是否数据chunk的区间内,只有负责chunk区间内的数据才会被更正
                        if (splitKeyRangeContains(
                                key, snapshotSplit.getSplitStart(), snapshotSplit.getSplitEnd())) {
                            binlogRecords.add(binlog);
                        }
                    }
                }
            }
            checkState(
                    isHighWatermarkEvent(highWatermark),
                    String.format(
                            "The last record should be high watermark signal event, but is %s",
                            highWatermark));
            // chunk数据修正逻辑函数:upsertBinlog
            normalizedRecords =
                    upsertBinlog(
                            snapshotSplit,
                            lowWatermark,
                            highWatermark,
                            snapshotRecords,
                            binlogRecords);
        }
        return normalizedRecords;
    }
  • chunk数据修正逻辑:RecordUtils#upsertBinlog

    private static List<SourceRecord> upsertBinlog(
                MySqlSplit split,
                SourceRecord lowWatermarkEvent,
                SourceRecord highWatermarkEvent,
                Map<Struct, SourceRecord> snapshotRecords,
                List<SourceRecord> binlogRecords) {
            final List<SourceRecord> normalizedBinlogRecords = new ArrayList<>();
            normalizedBinlogRecords.add(lowWatermarkEvent);
            // upsert binlog events to snapshot events of split
            if (!binlogRecords.isEmpty()) {
                for (SourceRecord binlog : binlogRecords) {
                    Struct key = (Struct) binlog.key();
                    Struct value = (Struct) binlog.value();
                    if (value != null) {
                        Envelope.Operation operation =
                                Envelope.Operation.forCode(
                                        value.getString(Envelope.FieldName.OPERATION));
                        switch (operation) {
                            case UPDATE:
                                Envelope envelope = Envelope.fromSchema(binlog.valueSchema());
                                Struct source = value.getStruct(Envelope.FieldName.SOURCE);
                                Struct updateAfter = value.getStruct(Envelope.FieldName.AFTER);
                                Instant ts =
                                        Instant.ofEpochMilli(
                                                (Long) source.get(Envelope.FieldName.TIMESTAMP));
                                SourceRecord record =
                                        new SourceRecord(
                                                binlog.sourcePartition(),
                                                binlog.sourceOffset(),
                                                binlog.topic(),
                                                binlog.kafkaPartition(),
                                                binlog.keySchema(),
                                                binlog.key(),
                                                binlog.valueSchema(),
                                                envelope.read(updateAfter, source, ts));
                                snapshotRecords.put(key, record);
                                break;
                            case DELETE:
                                snapshotRecords.remove(key);
                                break;
                            case CREATE:
                                snapshotRecords.put(key, binlog);
                                break;
                            case READ:
                                throw new IllegalStateException(
                                        String.format(
                                                "Binlog record shouldn't use READ operation, the the record is %s.",
                                                binlog));
                        }
                    }
                }
            }
            normalizedBinlogRecords.addAll(snapshotRecords.values());
            normalizedBinlogRecords.add(highWatermarkEvent);
            return normalizedBinlogRecords;
        }
    
  • 全量快照结束后MySqlSourceReader 整合各个split,汇报给MySqlSourceEnumerator逻辑:handleSourceEvents

@Override
    public void handleSourceEvents(SourceEvent sourceEvent) {
        if (sourceEvent instanceof FinishedSnapshotSplitsAckEvent) {
            FinishedSnapshotSplitsAckEvent ackEvent = (FinishedSnapshotSplitsAckEvent) sourceEvent;
            LOG.debug(
                    "The subtask {} receives ack event for {} from enumerator.",
                    subtaskId,
                    ackEvent.getFinishedSplits());
            for (String splitId : ackEvent.getFinishedSplits()) {
                this.finishedUnackedSplits.remove(splitId);
            }
        } else if (sourceEvent instanceof FinishedSnapshotSplitsRequestEvent) {
            // report finished snapshot splits
            LOG.debug(
                    "The subtask {} receives request to report finished snapshot splits.",
                    subtaskId);
            reportFinishedSnapshotSplitsIfNeed();
        } else if (sourceEvent instanceof BinlogSplitMetaEvent) {
            LOG.debug(
                    "The subtask {} receives binlog meta with group id {}.",
                    subtaskId,
                    ((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId());
            fillMetaDataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent);
        } else {
            super.handleSourceEvents(sourceEvent);
        }
    }

    private void reportFinishedSnapshotSplitsIfNeed() {
        if (!finishedUnackedSplits.isEmpty()) {
            final Map<String, BinlogOffset> finishedOffsets = new HashMap<>();
            for (MySqlSnapshotSplit split : finishedUnackedSplits.values()) {
                finishedOffsets.put(split.splitId(), split.getHighWatermark());
            }
            FinishedSnapshotSplitsReportEvent reportEvent =
                    new FinishedSnapshotSplitsReportEvent(finishedOffsets);
            context.sendSourceEventToCoordinator(reportEvent);
            LOG.debug(
                    "The subtask {} reports offsets of finished snapshot splits {}.",
                    subtaskId,
                    finishedOffsets);
        }
    }
  • MySqlSourceEnumerator 收到全量快照结束后处理逻辑:createBinlogSplit

当 MySqlSourceEnumerator 将所有 split 的 hw 收齐之后,会创建一个 binlog split,该分片包含了需要读取 binlog 的起始位置(所有分片 hw 的最小值)和所有分片的 hw 信息。

private MySqlBinlogSplit createBinlogSplit() {
        final List<MySqlSnapshotSplit> assignedSnapshotSplit =
                snapshotSplitAssigner.getAssignedSplits().values().stream()
                        .sorted(Comparator.comparing(MySqlSplit::splitId))
                        .collect(Collectors.toList());

        Map<String, BinlogOffset> splitFinishedOffsets =
                snapshotSplitAssigner.getSplitFinishedOffsets();
        final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();

        BinlogOffset minBinlogOffset = null;
        for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
            // find the min binlog offset
            BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
            if (minBinlogOffset == null || binlogOffset.isBefore(minBinlogOffset)) {
                minBinlogOffset = binlogOffset;
            }
            finishedSnapshotSplitInfos.add(
                    new FinishedSnapshotSplitInfo(
                            split.getTableId(),
                            split.splitId(),
                            split.getSplitStart(),
                            split.getSplitEnd(),
                            binlogOffset));
        }

        // the finishedSnapshotSplitInfos is too large for transmission, divide it to groups and
        // then transfer them

        boolean divideMetaToGroups = finishedSnapshotSplitInfos.size() > splitMetaGroupSize;
        return new MySqlBinlogSplit(
                BINLOG_SPLIT_ID,
                minBinlogOffset == null ? BinlogOffset.INITIAL_OFFSET : minBinlogOffset,
                BinlogOffset.NO_STOPPING_OFFSET,
                divideMetaToGroups ? new ArrayList<>() : finishedSnapshotSplitInfos,
                new HashMap<>(),
                finishedSnapshotSplitInfos.size());
    }
  • 增量阶段逻辑:shouldEmit

MySqlSourceEnumerator 将 binlog 分片分配给 MySqlSourceReader 时,任务从全量阶段转变为增量阶段。MySqlSourceReader 在读取 binlog 数据后,使用 shouldEmit 来判断是否应该将该记录发送给下游。

/**
     * Returns the record should emit or not.
     *
     * <p>The watermark signal algorithm is the binlog split reader only sends the binlog event that
     * belongs to its finished snapshot splits. For each snapshot split, the binlog event is valid
     * since the offset is after its high watermark.
     *
     * <pre> E.g: the data input is :
     *    snapshot-split-0 info : [0,    1024) highWatermark0
     *    snapshot-split-1 info : [1024, 2048) highWatermark1
     *  the data output is:
     *  only the binlog event belong to [0,    1024) and offset is after highWatermark0 should send,
     *  only the binlog event belong to [1024, 2048) and offset is after highWatermark1 should send.
     * </pre>
     */
    private boolean shouldEmit(SourceRecord sourceRecord) {
        if (isDataChangeRecord(sourceRecord)) {
            TableId tableId = getTableId(sourceRecord);
            BinlogOffset position = getBinlogPosition(sourceRecord);
           	// 判断是否处于纯净的binlog区域
            if (hasEnterPureBinlogPhase(tableId, position)) {
                return true;
            }
            // only the table who captured snapshot splits need to filter
            if (finishedSplitsInfo.containsKey(tableId)) {
                RowType splitKeyType =
                        ChunkUtils.getSplitType(
                                statefulTaskContext.getDatabaseSchema().tableFor(tableId));
                Object[] key =
                        getSplitKey(
                                splitKeyType,
                                sourceRecord,
                                statefulTaskContext.getSchemaNameAdjuster());
                for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
                    if (RecordUtils.splitKeyRangeContains(
                                    key, splitInfo.getSplitStart(), splitInfo.getSplitEnd())
                            && position.isAfter(splitInfo.getHighWatermark())) { // 判断该binlog是否属于chunk区间且是否>该chunk的hw
                        return true;
                    }
                }
            }
            // not in the monitored splits scope, do not emit
            return false;
        }
        // always send the schema change event and signal event
        // we need record them to state of Flink
        return true;
    }

	private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset position) {
        // the existed tables those have finished snapshot reading
        if (maxSplitHighWatermarkMap.containsKey(tableId)
                && position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
            return true;
        }
        // capture dynamically new added tables
        // TODO: there is still very little chance that we can't capture new added table.
        //  That the tables dynamically added after discovering captured tables in enumerator
        //  and before the lowest binlog offset of all table splits. This interval should be
        //  very short, so we don't support it for now.
        return !maxSplitHighWatermarkMap.containsKey(tableId)
                && capturedTableFilter.isIncluded(tableId);
    }

四、相关文档

  • 官方文档
  • Flink CDC 设计文档
  • FAQ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/271760.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

LH7904C高压线太阳能警示灯

适用场所&#xff1a; 适用于高压线,塔吊,路政,船舶,种植,塔机,航海航道等场所起警示作用。 产品特点&#xff1a; 光控无开关&#xff0c;白天不闪&#xff0c;昏暗环境自动闪烁&#xff0c;无需手动操作&#xff0c;省时省事; 采用红色LED作光源&#xff0c;亮度高&#…

边缘计算云边端全览—边缘计算系统设计与实践【文末送书-10】

文章目录 一.边缘计算1.1边缘计算的典型应用 二.边缘计算 VS 云计算三.边缘计算系统设计与实践【文末送书-10】3.1 粉丝福利&#xff1a;文末推荐与福利免费包邮送书&#xff01; 一.边缘计算 边缘计算是指在靠近物或数据源头的一侧&#xff0c;采用网络、计算、存储、应用核心…

camunda-modeler画图入门

软件下载 camunda-modeler是camunda的工作流绘制桌面工具 5.9.0和5.18.0版本下载地址 https://storage.googleapis.com/downloads-camunda-cloud-release/camunda-modeler/5.9.0/camunda-modeler-5.9.0-win-x64.ziphttps://storage.googleapis.com/downloads-camunda-cloud-…

苹果证书p12和描述文件的创建方法

​ 苹果证书p12和描述文件的创建方法 在2020年之前&#xff0c;我们在使用appuploder创建苹果证书的时候&#xff0c;只需要注册苹果开发者账号&#xff0c;但不需要缴费成为开发者。 在2020年之后&#xff0c;需要先缴费成为苹果开发者。 假如你还没有注册苹果开发者账号&…

右值引用和移动语义以及C++11新增的类功能

正文开始前给大家推荐个网站&#xff0c;前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。 右值引用和左值引用 传统的C语法中就有引用的语法&#xff0c;而C11中新增了的右值引用语法特…

FC忍者神龟格斗可视化hack源码

[FC][忍者神龟格斗][最佳可视化][Final] 时间:2023.12.22 作者:FlameCyclone 内容: 1.可视化功能菜单 (1)菜单按键操作 1.上下键: 移动选项 2.左右键: 翻页 3.选择键: 翻转功能开关 4.开始键: 退出菜单 5.B键: 启用功能 …

如何进行实例管理

目录 修改实例规格 修改网络带宽 网站的访问量每天都比较高&#xff0c;网站明显变慢了&#xff0c;这是怎么回事&#xff1f; 这说明你的网站的并发访问能力已经不足了&#xff0c;并发访问是指同一时间&#xff0c;多个用户请求访问同一个域名下的资源或服务&#xff0c;请…

RHCE9学习指南 第10章 ACL权限

10.1 ACL介绍及基本用法 前面讲权限时是对u、u、o来设置权限的。假如有如图10-1所示的需求。 图10-1 为三个用户设置权限 有一个目录aa&#xff0c;要求tom、bob、mary具有不同的权限&#xff0c;利用前面讲过的知识是完全可以实现的。 所有者设置为tom&#xff0c;把所有者权…

目标追踪:使用ByteTrack进行目标检测和跟踪

BYTE算法是一种简单而有效的关联方法&#xff0c;通过关联几乎每个检测框而不仅仅是高分的检测框来跟踪对象。这篇博客的目标是介绍ByteTrack以及多目标跟踪&#xff08;MOT&#xff09;的技术。我们还将介绍在样本视频上使用ByteTrack跟踪运行YOLOv8目标检测。 多目标跟踪&…

【Python微信机器人】第六七篇: 封装32位和64位Python hook框架实战打印微信日志

目录修整 目前的系列目录(后面会根据实际情况变动): 在windows11上编译python将python注入到其他进程并运行注入Python并使用ctypes主动调用进程内的函数和读取内存结构体调用汇编引擎实战发送文本和图片消息(支持32位和64位微信)允许Python加载运行py脚本且支持热加载利用汇…

什么是数据可视化?数据可视化的流程与步骤

前言 数据可视化将大大小小的数据集转化为更容易被人脑理解和处理的视觉效果。可视化在我们的日常生活中非常普遍&#xff0c;但它们通常以众所周知的图表和图形的形式出现。正确的数据可视化以有意义和直观的方式为复杂的数据集提供关键的见解。 数据可视化定义 数据可视化…

「仙逆」王林夺舍身份曝光,火焚国火兽危机,两位始祖保护王林

Hello,小伙伴们&#xff0c;我是拾荒君。 《仙逆》第16集超前爆料&#xff0c;本次猛料&#xff0c;王林的天逆珠吞噬了火兽之王&#xff0c;使他的火属性达到了大圆满的境界。在封印屏障的保护下&#xff0c;他成功地逃脱了火兽的追击。然而&#xff0c;如今火兽数量众多&…

【视觉实践】使用Mediapipe进行手势识别

目录 1 Mediapipe 2 Solutions 3 安装依赖库 4 实践 1 Mediapipe Mediapipe是google的一个开源项目,可以提供开源的、跨平台的常用机器学习(machine learning,ML)方案。MediaPipe是一个用于构建机器学习管道的框架,用于处理视频、音频等时间序列数据。与资源消耗型的机…

易天新引进DELL Z9432F-ON交换机设备,网络通信再迎新风采

随着信息技术的飞速发展&#xff0c;网络通信已经成为现代社会中不可或缺的一部分。在这个数字化时代&#xff0c;企业对于高效、可靠的网络设备需求日益增加。为了满足企业日益增长的网络需求和为客户提供更好的服务&#xff0c;易天引进了DELL Z9432F-ON交换机设备&#xff0…

系统管理在工业物联网中的应用——青创智通工业物联网

工业物联网系统是一个复杂的大规模系统&#xff0c;涉及到众多的设备和系统&#xff0c;因此其管理面临诸多挑战。首先&#xff0c;设备和系统的多样性使得互通性成为一个难题&#xff0c;不同厂商的设备和系统之间的兼容性难以保证。其次&#xff0c;工业物联网系统的数据量庞…

Java开发框架和中间件面试题(5)

44.Tomcat一个请求的处理流程&#xff1f; 假设来自客户的请求为&#xff1a; http&#xff1a;//localhost&#xff1a;8080/test/index.jsp请求被发送到本机端口8080&#xff0c;被在那里侦听Copote HTTP/1.1 Connector,然后 1.Connector把该请求交给它所在的Service的Engi…

Antd Cascader 组件指定 placement 弹出位置无效

最近在使用 Antd Cascader 组件时&#xff0c;发现指定 placement 弹出位置无效&#xff0c;查看官方文档也没有找到相关的说明&#xff0c;经过一番搜索&#xff0c;终于发现了问题所在。 问题复现 代码示例&#xff1a; <Form.Item name"intention" label&quo…

7_js_dom编程入门1

Objective&#xff08;本课目标&#xff09; 掌握获取页面元素的常用方法 掌握事件触发案例 能够区分innerText和innerHTML的区别 综合案例训练 1 DOM 介绍 1.1 什么是DOM 文档对象模型&#xff08;Document Object Model&#xff0c;简称DOM&#xff09;&#xff0c;是 …

分享3种常用的前端埋点方式

只有了解用户&#xff0c;我们才能服务好用户&#xff0c;而最接近用户的我们&#xff0c;自然要承担起更多的责任。 那么在一个企业中&#xff0c;我们要如何去了解用户呢&#xff1f; 最直接有效的方式就是了解用户的行为&#xff0c;了解用户在网站中做了什么&#xff0c;呆…

工业交换机的冗余电源设计

在市场上&#xff0c;尤其是在工业级交换机上&#xff0c;我们经常能看到一个支持冗余电源的选项。在大多数工业现场中&#xff0c;我们都知道网络的稳定性是非常关键的。而且&#xff0c;像光伏和煤矿这样的行业经常位于偏远地区&#xff0c;环境条件恶劣。因此&#xff0c;在…