【Flink CDC】Flink CDC的Schema Evolution表结构演变的源码分析和流程图

Flink CDC版本:3.2.1
说明:本文从SchemaOperator接收到,表结构变更事件开始,表结构变更事件应由source端产生,本文不讨论。
可以先看流程图,研究源码。
参考文章:
Flink cdc3.0动态变更表结构——源码解析

一、源码解析

以Sink to doris举例:

SchemaOperator

org.apache.flink.cdc.runtime.operators.schema.SchemaOperator
判断是否是SchemaChangeEvent事件,调用processSchemaChangeEvents方法

/**  
 * This method is guaranteed to not be called concurrently with other methods of the operator. 
 */
@Override  
public void processElement(StreamRecord<Event> streamRecord)  
        throws InterruptedException, TimeoutException, ExecutionException {  
    Event event = streamRecord.getValue();  
    if (event instanceof SchemaChangeEvent) {  // (0)
        processSchemaChangeEvents((SchemaChangeEvent) event);  
    } else if (event instanceof DataChangeEvent) {  // (13)
        processDataChangeEvents(streamRecord, (DataChangeEvent) event);  
    } else {  
        throw new RuntimeException("Unknown event type in Stream record: " + event);  
    }  
}

调用handleSchemaChangeEvent方法:

private void processSchemaChangeEvents(SchemaChangeEvent event)  
        throws InterruptedException, TimeoutException, ExecutionException {  
    TableId tableId = event.tableId();  
    LOG.info(  
            "{}> Table {} received SchemaChangeEvent {} and start to be blocked.",  
            subTaskId,  
            tableId,  
            event);  
    handleSchemaChangeEvent(tableId, event);  
    // Update caches  
    originalSchema.put(tableId, getLatestOriginalSchema(tableId));  
    schemaDivergesMap.put(tableId, checkSchemaDiverges(tableId));  
  
    List<TableId> optionalRoutedTable = getRoutedTables(tableId);  
    if (!optionalRoutedTable.isEmpty()) {  
        tableIdMappingCache  
                .get(tableId)  
                .forEach(routed -> evolvedSchema.put(routed, getLatestEvolvedSchema(routed)));  
    } else {  
        evolvedSchema.put(tableId, getLatestEvolvedSchema(tableId));  
    }  
}

handleSchemaChangeEvent调用requestSchemaChange方法,请求修改Schema:

response.isAccepted()就是注册中心接收了此修改需求。进入if后,重点来了:output.collect(new StreamRecord<>(new FlushEvent(tableId))); 。注意这里发送了一个new FlushEvent(tableId)事件,这个事件会在SinkWriter用到,就是通知SinkWriter要执行flush,即把数据刷入到sink端数据库,和jdbc的commit相似。
FlushEvent内容非常简单只有tableId但是其类型是FlushEvent,此类的doc内容是:

  • An {@link Event} from {@code SchemaOperator} to notify {@code DataSinkWriterOperator} that it
  • start flushing.
    也就是FlushEvent作为特殊数据传递事件,接收到此数据的DataSinkWriterOperator会触发其执行flushing操作,也就是将目前收到的所有数据都写入目标数据库。可以理解为:
    schema修改后的数据 --> FlushEvent(新插入) --> schema修改前的数据

发送FlushEvent事件后执行requestSchemaChangeResult方法,此方法是while阻塞的方法,简而言之是等所有writer都完成了FlushEvent前数据的(旧表结构的数据)写入前,一直阻塞不发送新表结构的数据至下游。

最后finishedSchemaChangeEvents.forEach(e -> output.collect(new StreamRecord<>(e))); ,简略的说其内部是handler方法中生成的SchemaRegistryRequestHandler#applySchemaChange事件,将原始的SchemaChangeEvent转换成新的数据,还是根据Flink CDC的schema.change.behavior转换,其类型如下:
![[image-20250106113512324.png]]

具体将这些时间发送至下游怎么用暂时没有研究了。

private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent)  
        throws InterruptedException, TimeoutException {  
  
    if (schemaChangeBehavior == SchemaChangeBehavior.EXCEPTION  
            && schemaChangeEvent.getType() != SchemaChangeEventType.CREATE_TABLE) {  
        // CreateTableEvent should be applied even in EXCEPTION mode  
        throw new RuntimeException(  
                String.format(  
                        "Refused to apply schema change event %s in EXCEPTION mode.",  
                        schemaChangeEvent));  
    }  
  
    // The request will block if another schema change event is being handled  
    SchemaChangeResponse response = requestSchemaChange(tableId, schemaChangeEvent);  // (1)
    if (response.isAccepted()) {   // (3)
        LOG.info("{}> Sending the FlushEvent for table {}.", subTaskId, tableId);  
        output.collect(new StreamRecord<>(new FlushEvent(tableId)));  // (4)
        List<SchemaChangeEvent> expectedSchemaChangeEvents = response.getSchemaChangeEvents();  
        schemaOperatorMetrics.increaseSchemaChangeEvents(expectedSchemaChangeEvents.size());  
  
        // The request will block until flushing finished in each sink writer  
        SchemaChangeResultResponse schemaEvolveResponse = requestSchemaChangeResult();  // (5) 
        List<SchemaChangeEvent> finishedSchemaChangeEvents =  
                schemaEvolveResponse.getFinishedSchemaChangeEvents();  
  
        // Update evolved schema changes based on apply results  
        finishedSchemaChangeEvents.forEach(e -> output.collect(new StreamRecord<>(e)));  
    } else if (response.isDuplicate()) {  
        LOG.info(  
                "{}> Schema change event {} has been handled in another subTask already.",  
                subTaskId,  
                schemaChangeEvent);  
    } else if (response.isIgnored()) {  
        LOG.info(  
                "{}> Schema change event {} has been ignored. No schema evolution needed.",  
                subTaskId,  
                schemaChangeEvent);  
    } else {  
        throw new IllegalStateException("Unexpected response status " + response);  
    }  
}

requestSchemaChange是一个阻塞的方法(while (true)),发送SchemaChangeRequest直到返回的response不是Busy。可以看到发送的的SchemaChangeRequest

private SchemaChangeResponse requestSchemaChange(  
        TableId tableId, SchemaChangeEvent schemaChangeEvent)  
        throws InterruptedException, TimeoutException {  
    long schemaEvolveTimeOutMillis = System.currentTimeMillis() + rpcTimeOutInMillis;  
    while (true) {  
        SchemaChangeResponse response =  
                sendRequestToCoordinator(  
                        new SchemaChangeRequest(tableId, schemaChangeEvent, subTaskId));  
        if (response.isRegistryBusy()) {  // (2)
            if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) {  
                LOG.info(  
                        "{}> Schema Registry is busy now, waiting for next request...",  
                        subTaskId);  
                Thread.sleep(1000);  
            } else {  
                throw new TimeoutException("TimeOut when requesting schema change");  
            }  
        } else {  
            return response;  
        }  
    }  
}

sendRequestToCoordinator方法是org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway类的,也就Flink的内部类。
实习类有:
(1)org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway
(2)org.apache.flink.runtime.taskexecutor.rpc.RpcTaskOperatorEventGateway
内部具体逻辑暂不深入了解。
其实际发送至 SchemaRegistry#handleEventFromOperator

private <REQUEST extends CoordinationRequest, RESPONSE extends CoordinationResponse>  
        RESPONSE sendRequestToCoordinator(REQUEST request) {  
    try {  
        CompletableFuture<CoordinationResponse> responseFuture =  
                toCoordinator.sendRequestToCoordinator(  
                        getOperatorID(), new SerializedValue<>(request));  
        return CoordinationResponseUtils.unwrap(responseFuture.get());  
    } catch (Exception e) {  
        throw new IllegalStateException(  
                "Failed to send request to coordinator: " + request.toString(), e);  
    }  
}

requestSchemaChangeResult执行的操作非常简单,就是等待返回,如果跳出while方法结束,就代表sink端已经完成所有旧数据的flush,在此之前SchemaOperator类不会向下游发送新数据,因为FlushEvent后的数据都是schema变更的后的新数据了。

private SchemaChangeResultResponse requestSchemaChangeResult()  
        throws InterruptedException, TimeoutException {  
    CoordinationResponse coordinationResponse =  
            sendRequestToCoordinator(new SchemaChangeResultRequest());  
    long nextRpcTimeOutMillis = System.currentTimeMillis() + rpcTimeOutInMillis;  
    while (coordinationResponse instanceof SchemaChangeProcessingResponse) {  // (6) (7)
        if (System.currentTimeMillis() < nextRpcTimeOutMillis) {  
            Thread.sleep(1000);  
            coordinationResponse = sendRequestToCoordinator(new SchemaChangeResultRequest());  
        } else {  
            throw new TimeoutException("TimeOut when requesting release upstream");  
        }  
    }  
    return ((SchemaChangeResultResponse) coordinationResponse);  
}

这里的toCoordinator.sendRequestToCoordinator也是使用flink内部的调用过程,暂不做研究。
这个发送过程也是被org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry#handleCoordinationRequest接收了,并在if (request instanceof SchemaChangeResultRequest)内处理其逻辑。

private <REQUEST extends CoordinationRequest, RESPONSE extends CoordinationResponse>  
        RESPONSE sendRequestToCoordinator(REQUEST request) {  
    try {  
        CompletableFuture<CoordinationResponse> responseFuture =  
                toCoordinator.sendRequestToCoordinator(  
                        getOperatorID(), new SerializedValue<>(request));  
        return CoordinationResponseUtils.unwrap(responseFuture.get());  
    } catch (Exception e) {  
        throw new IllegalStateException(  
                "Failed to send request to coordinator: " + request.toString(), e);  
    }  
}

SchemaRegistry

org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry

toCoordinator.sendRequestToCoordinator方法就由handleCoordinationRequest接收,进入request instanceof SchemaChangeRequest中的handleSchemaChangeRequest方法。

@Override  
public CompletableFuture<CoordinationResponse> handleCoordinationRequest(  
        CoordinationRequest request) {  
    CompletableFuture<CoordinationResponse> responseFuture = new CompletableFuture<>();  
    runInEventLoop(  
            () -> {  
                try {  
                    if (request instanceof SchemaChangeRequest) {  
                        SchemaChangeRequest schemaChangeRequest = (SchemaChangeRequest) request;  
                        requestHandler.handleSchemaChangeRequest(  
                                schemaChangeRequest, responseFuture);  
                    } else if (request instanceof SchemaChangeResultRequest) {  
                        requestHandler.getSchemaChangeResult(responseFuture);  
                    } else if (request instanceof GetEvolvedSchemaRequest) {  
                        handleGetEvolvedSchemaRequest(  
                                ((GetEvolvedSchemaRequest) request), responseFuture);  
                    } else if (request instanceof GetOriginalSchemaRequest) {  
                        handleGetOriginalSchemaRequest(  
                                (GetOriginalSchemaRequest) request, responseFuture);  
                    } else {  
                        throw new IllegalArgumentException(  
                                "Unrecognized CoordinationRequest type: " + request);  
                    }  
                } catch (Throwable t) {  
                    context.failJob(t);  
                    throw t;  
                }  
            },  
            "handling coordination request %s",  
            request);  
    return responseFuture;  
}

SchemaRegistry#handleEventFromOperator方法用于处理DataSinkWriterOperator#handleFlushEvent发送而来的FlushSuccessEvent事件。还是使用handler执行具体逻辑:SchemaRegistryRequestHandler#flushSuccess

  
@Override  
public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {  
    runInEventLoop(  
            () -> {  
                try {  
                    if (event instanceof FlushSuccessEvent) {  
                        FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent) event;  
                        LOG.info(  
                                "Sink subtask {} succeed flushing for table {}.",  
                                flushSuccessEvent.getSubtask(),  
                                flushSuccessEvent.getTableId().toString());  
                        requestHandler.flushSuccess(  
                                flushSuccessEvent.getTableId(),  
                                flushSuccessEvent.getSubtask(),  
                                currentParallelism);  
                    } else if (event instanceof SinkWriterRegisterEvent) {  
                        requestHandler.registerSinkWriter(  
                                ((SinkWriterRegisterEvent) event).getSubtask());  
                    } else {  
                        throw new FlinkException("Unrecognized Operator Event: " + event);  
                    }  
                } catch (Throwable t) {  
                    context.failJob(t);  
                    throw t;  
                }  
            },  
            "handling event %s from subTask %d",  
            event,  
            subtask);  
}

SchemaRegistryRequestHandler

SchemaRegistryRequestHandler是SchemaRegistry的执行器,类中schemaChangeStatus是自己的状态记录状态的。
pendingSubTaskIds是记录待处理的任务id的,即数据流ID,是含有一个任务所有的并行度的子任务ID。
此处:
(1)pendingSubTaskIds空 -> 继续执行
(2)requestSubTaskId和发送过来的一样,则为移除头一个。
(3)其他pendingSubTaskIds不为空情形,则直接返回SchemaChangeResponse.busy(),此处的busy就和SchemaOperator的response.isRegistryBusy()对应上了。
继续执行:
calculateDerivedSchemaChangeEvents方法是对事件作息写转换,根据的是flink的schema evolution的策略进行转换,例如通过返回空集合的方式进行忽略 。

`schema.change.behavior` is of enum type, and could be set to `exception`, `evolve`, `try_evolve`, `lenient` or `ignore`.

而后此handler的状态修改为WAITING_FOR_FLUSH
并返回ResponseCode.ACCEPTED的状态,此时程序跳转回SchemaOperator#handleSchemaChangeEvent方法。

SchemaRegistryRequestHandler#handleSchemaChangeRequest方法:

  
/**  
 * Handle the {@link SchemaChangeRequest} and wait for all sink subtasks flushing.  
 * * @param request the received SchemaChangeRequest  
 */public void handleSchemaChangeRequest(  
        SchemaChangeRequest request, CompletableFuture<CoordinationResponse> response) {  
  
    // We use requester subTask ID as the pending ticket, because there will be at most 1 schema  
    // change requests simultaneously from each subTask    int requestSubTaskId = request.getSubTaskId();  
  
    synchronized (schemaChangeRequestLock) {  
        // Make sure we handle the first request in the pending list to avoid out-of-order  
        // waiting and blocks checkpointing mechanism.        if (schemaChangeStatus == RequestStatus.IDLE) {  
            if (pendingSubTaskIds.isEmpty()) {  
                LOG.info(  
                        "Received schema change event request {} from table {} from subTask {}. Pending list is empty, handling this.",  
                        request.getSchemaChangeEvent(),  
                        request.getTableId().toString(),  
                        requestSubTaskId);  
            } else if (pendingSubTaskIds.get(0) == requestSubTaskId) {  
                LOG.info(  
                        "Received schema change event request {} from table {} from subTask {}. It is on the first of the pending list, handling this.",  
                        request.getSchemaChangeEvent(),  
                        request.getTableId().toString(),  
                        requestSubTaskId);  
                pendingSubTaskIds.remove(0);  
            } else {  
                LOG.info(  
                        "Received schema change event request {} from table {} from subTask {}. It is not the first of the pending list ({}).",  
                        request.getSchemaChangeEvent(),  
                        request.getTableId().toString(),  
                        requestSubTaskId,  
                        pendingSubTaskIds);  
                if (!pendingSubTaskIds.contains(requestSubTaskId)) {  
                    pendingSubTaskIds.add(requestSubTaskId);  
                }  
                response.complete(wrap(SchemaChangeResponse.busy()));  // (2) 
                return;  
            }  
  
            SchemaChangeEvent event = request.getSchemaChangeEvent();  
  
            // If this schema change event has been requested by another subTask, ignore it.  
            if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) {  
                LOG.info("Event {} has been addressed before, ignoring it.", event);  
                clearCurrentSchemaChangeRequest();  
                LOG.info(  
                        "SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.",  
                        request);  
                response.complete(wrap(SchemaChangeResponse.duplicate()));  
                return;  
            }  
            schemaManager.applyOriginalSchemaChange(event);  
            List<SchemaChangeEvent> derivedSchemaChangeEvents =  
                    calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());  // (14)
  
            // If this schema change event is filtered out by LENIENT mode or merging table  
            // route strategies, ignore it.            if (derivedSchemaChangeEvents.isEmpty()) {  
                LOG.info("Event {} is omitted from sending to downstream, ignoring it.", event);  
                clearCurrentSchemaChangeRequest();  
                LOG.info(  
                        "SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.",  
                        request);  
  
                response.complete(wrap(SchemaChangeResponse.ignored()));  
                return;  
            }  
  
            LOG.info(  
                    "SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will be blocked.");  
            // This request has been accepted.  
            schemaChangeStatus = RequestStatus.WAITING_FOR_FLUSH;  // (3)
            currentDerivedSchemaChangeEvents = new ArrayList<>(derivedSchemaChangeEvents);  
  
            response.complete(wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));  // (3) 
        } else {  
            LOG.info(  
                    "Schema Registry is busy processing a schema change request, could not handle request {} for now. Added {} to pending list ({}).",  
                    request,  
                    requestSubTaskId,  
                    pendingSubTaskIds);  
            if (!pendingSubTaskIds.contains(requestSubTaskId)) {  
                pendingSubTaskIds.add(requestSubTaskId);  
            }  
            response.complete(wrap(SchemaChangeResponse.busy()));  // (2) 
        }  
    }  
}

SchemaRegistryRequestHandler#getSchemaChangeResult方法:
内容就是检查类成员变量SchemaRegistryRequestHandler#schemaChangeStatus的状态:

  • FINISHED -> 重置自身状态并返回FINISHED状态
  • 非FINISHED -> 返回Processing状态,SchemaOperator#requestSchemaChangeResult接到SchemaChangeProcessingResponse会在while一直循环等待阻塞。
public void getSchemaChangeResult(CompletableFuture<CoordinationResponse> response) {  
    Preconditions.checkState(  
            schemaChangeStatus != RequestStatus.IDLE,  
            "Illegal schemaChangeStatus: should not be IDLE before getting schema change request results.");  
    if (schemaChangeStatus == RequestStatus.FINISHED) {  // (12)
        schemaChangeStatus = RequestStatus.IDLE;  
        LOG.info(  
                "SchemaChangeStatus switched from FINISHED to IDLE for request {}",  
                currentDerivedSchemaChangeEvents);  
  
        // This request has been finished, return it and prepare for the next request  
        List<SchemaChangeEvent> finishedEvents = clearCurrentSchemaChangeRequest();  
        SchemaChangeResultResponse resultResponse =  
                new SchemaChangeResultResponse(finishedEvents);  
        response.complete(wrap(resultResponse));  
    } else {  
        // Still working on schema change request, waiting it  
        response.complete(wrap(new SchemaChangeProcessingResponse()));  
    }  
}

方法flushSuccess用于处理DataSinkWriterOperator返回的FlushSuccessEvent事件。这里有点不好理解。
activeSinkWriters是记录所有可用的writer的索引,也就是说writer的并行度可能大于1,activeSinkWriters记录的是writer的索引,接收的FlushSuccessEvent只是其中一个writer发送的。需要等待所有writer都完成flush才能确定所有的schema修改前的数据都写入数据库了。
(1)if (activeSinkWriters.size() < parallelism)内的就是上述过程。
(2)if (flushedSinkWriters.equals(activeSinkWriters))代表所有writer都完成了flush。而后修改handler状态为RequestStatus.APPLYING,即此handler正在apply schema change。接下来执行applySchemaChange方法 。

/**  
 * Record flushed sink subtasks after receiving FlushSuccessEvent. * * @param tableId the subtask in SchemaOperator and table that the FlushEvent is about  
 * @param sinkSubtask the sink subtask succeed flushing  
 */public void flushSuccess(TableId tableId, int sinkSubtask, int parallelism) {  
    flushedSinkWriters.add(sinkSubtask);  
    if (activeSinkWriters.size() < parallelism) {  
        LOG.info(  
                "Not all active sink writers have been registered. Current {}, expected {}.",  
                activeSinkWriters.size(),  
                parallelism);  
        return;  
    }  
    if (flushedSinkWriters.equals(activeSinkWriters)) {  
        Preconditions.checkState(  
                schemaChangeStatus == RequestStatus.WAITING_FOR_FLUSH,  
                "Illegal schemaChangeStatus state: should be WAITING_FOR_FLUSH before collecting enough FlushEvents, not "  
                        + schemaChangeStatus);  
  
        schemaChangeStatus = RequestStatus.APPLYING;  // (9)
        LOG.info(  
                "All sink subtask have flushed for table {}. Start to apply schema change.",  
                tableId.toString());  
        schemaChangeThreadPool.submit(  
                () -> applySchemaChange(tableId, currentDerivedSchemaChangeEvents));  
    }  
}

SchemaRegistryRequestHandler#applySchemaChange方法:
内部主要是schemaManager.applyEvolvedSchemaChange(changeEvent)即执行表结构变更操作,其接口类org.apache.flink.cdc.common.sink.MetadataApplier的doc内容:

  • {@code MetadataApplier} is used to apply metadata changes to external systems.
    可以看到schemaManager至对外部数据执行的表结构变更,其实就是sink端的数据库,其内部一般是收到需要变更的内容,拼接SQL并发送到数据库执行。

最后,修改handler状态为RequestStatus.FINISHED
好像此FlushSuccessEvent没有继续向SchemaOperator继续传递,其实不然,SchemaOperator是不断向SchemaRegistry发送请求的:SchemaOperator#requestSchemaChangeResult
SchemaRegistry是根据handler状态判断返回值类型的
SchemaRegistryRequestHandler#getSchemaChangeResult,此时handler状态已经是RequestStatus.FINISHEDSchemaRegistry就会给CompletableFuture填充非SchemaChangeProcessingResponse了,SchemaOperator类就中断阻塞,继续向下游发送数据了。

  
/**  
 * Apply the schema change to the external system. * * @param tableId the table need to change schema  
 * @param derivedSchemaChangeEvents list of the schema changes  
 */private void applySchemaChange(  
        TableId tableId, List<SchemaChangeEvent> derivedSchemaChangeEvents) {  
    for (SchemaChangeEvent changeEvent : derivedSchemaChangeEvents) {  
        if (changeEvent.getType() != SchemaChangeEventType.CREATE_TABLE) {  
            if (schemaChangeBehavior == SchemaChangeBehavior.IGNORE) {  
                currentIgnoredSchemaChanges.add(changeEvent);  
                continue;  
            }  
        }  
        if (!metadataApplier.acceptsSchemaEvolutionType(changeEvent.getType())) {  
            LOG.info("Ignored schema change {} to table {}.", changeEvent, tableId);  
            currentIgnoredSchemaChanges.add(changeEvent);  
        } else {  
            try {  
                metadataApplier.applySchemaChange(changeEvent);  
                LOG.info("Applied schema change {} to table {}.", changeEvent, tableId);  
                schemaManager.applyEvolvedSchemaChange(changeEvent);  
                currentFinishedSchemaChanges.add(changeEvent);  
            } catch (Throwable t) {  
                LOG.error(  
                        "Failed to apply schema change {} to table {}. Caused by: {}",  
                        changeEvent,  
                        tableId,  
                        t);  
                if (!shouldIgnoreException(t)) {  
                    currentChangeException = t;  
                    break;  
                } else {  
                    LOG.warn(  
                            "Failed to apply event {}, but keeps running in tolerant mode. Caused by: {}",  
                            changeEvent,  
                            t);  
                }  
            }  
        }  
    }  
    Preconditions.checkState(  
            schemaChangeStatus == RequestStatus.APPLYING,  
            "Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes, not "  
                    + schemaChangeStatus);  
    schemaChangeStatus = RequestStatus.FINISHED;  
    LOG.info(  
            "SchemaChangeStatus switched from APPLYING to FINISHED for request {}.",  
            currentDerivedSchemaChangeEvents);  
}

SchemaRegistryRequestHandler.RequestStatus类是就handler类状态的类型。具体状态流程可见文档。

// Schema change event state could transfer in the following way:  
//  
//      -------- B --------  
//      |                 |  
//      v                 |  
//  --------           ---------------------  
//  | IDLE | --- A --> | WAITING_FOR_FLUSH |  
//  --------           ---------------------  
//     ^                        |  
//      E                       C  
//       \                      v  
//  ------------          ------------  
//  | FINISHED | <-- D -- | APPLYING |  
//  ------------          ------------  
//  
//  A: When a request came to an idling request handler.  
//  B: When current request is duplicate or ignored by LENIENT / routed table merging  
// strategies.  
//  C: When schema registry collected enough flush success events, and actually started to apply  
// schema changes.  
//  D: When schema change application finishes (successfully or with exceptions)  
//  E: When current schema change request result has been retrieved by SchemaOperator, and ready  
// for the next request.  
private enum RequestStatus {  
    IDLE,  
    WAITING_FOR_FLUSH,  
    APPLYING,  
    FINISHED  
}

接下来看下:Sink端的事件处理:

DataSinkWriterOperator

org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperator

org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperator#processElement方法:

重点是对FlushEvent的处理

  
@Override  
public void processElement(StreamRecord<Event> element) throws Exception {  
    Event event = element.getValue();  
  
    // FlushEvent triggers flush  
    if (event instanceof FlushEvent) {  
        handleFlushEvent(((FlushEvent) event));  
        return;  
    }  
  
    // CreateTableEvent marks the table as processed directly  
    if (event instanceof CreateTableEvent) {  
        processedTableIds.add(((CreateTableEvent) event).tableId());  
        this.<OneInputStreamOperator<Event, CommittableMessage<CommT>>>getFlinkWriterOperator()  
                .processElement(element);  
        return;  
    }  
  
    // Check if the table is processed before emitting all other events, because we have to make  
    // sure that sink have a view of the full schema before processing any change events,    // including schema changes.    ChangeEvent changeEvent = (ChangeEvent) event;  
    if (!processedTableIds.contains(changeEvent.tableId())) {  
        emitLatestSchema(changeEvent.tableId());  
        processedTableIds.add(changeEvent.tableId());  
    }  
    processedTableIds.add(changeEvent.tableId());  
    this.<OneInputStreamOperator<Event, CommittableMessage<CommT>>>getFlinkWriterOperator()  
            .processElement(element);  
}

handleFlushEvent方法内只有两个操作:

  • flush: 将目前已经接受到所有数据写入目标库(相当于jdbc的commit操作)。
  • 发送事件:发送FlushSuccess。notifyFlushSuccess内容见类SchemaEvolutionClient
private void handleFlushEvent(FlushEvent event) throws Exception {  
    copySinkWriter.flush(false);  // (8) 
    schemaEvolutionClient.notifyFlushSuccess(  
            getRuntimeContext().getIndexOfThisSubtask(), event.getTableId());  // (9)
}

SchemaEvolutionClient

org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient

org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient#notifyFlushSuccess方法:
发送了FlushSuccessEvent事件至SchemaRegistry类的handleEventFromOperator方法。

public void notifyFlushSuccess(int subtask, TableId tableId) throws IOException {  
    toCoordinator.sendOperatorEventToCoordinator(  
            schemaOperatorID, new SerializedValue<>(new FlushSuccessEvent(subtask, tableId)));  
}

TaskOperatorEventGateway

org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway
SchemaOperatorDataSinkWriterOperator中的toCoordinator都是此类对象。

/*
Gateway to send an OperatorEvent or CoordinationRequest from a Task to the OperatorCoordinator JobManager side.
This is the first step in the chain of sending Operator Events and Requests from Operator to Coordinator. Each layer adds further context, so that the inner layers do not need to know about the complete context, which keeps dependencies small and makes testing easier.
      
OperatorEventGateway takes the event, enriches the event with the OperatorID, and forwards it to:
TaskOperatorEventGateway enriches the event with the ExecutionAttemptID and forwards it to the:
JobMasterOperatorEventGateway which is RPC interface from the TaskManager to the JobManager.
*/
public interface TaskOperatorEventGateway {  
  
    /**  
     * Sends an event from the operator (identified by the given operator ID) to the operator     * coordinator (identified by the same ID).     */    void sendOperatorEventToCoordinator(OperatorID operator, SerializedValue<OperatorEvent> event);  
  
    /**  
     * Sends a request from current operator to a specified operator coordinator which is identified     * by the given operator ID and return the response.     */    CompletableFuture<CoordinationResponse> sendRequestToCoordinator(  
            OperatorID operator, SerializedValue<CoordinationRequest> request);  
}

MetadataApplier

org.apache.flink.cdc.common.sink.MetadataApplier
此类负责将表结构修改的事件,转化成为DDL,发送给目标sink端数据库执行。

/** {@code MetadataApplier} is used to apply metadata changes to external systems. */  
@PublicEvolving  
public interface MetadataApplier extends Serializable {  
  
    /** Apply the given {@link SchemaChangeEvent} to external systems. */  
    void applySchemaChange(SchemaChangeEvent schemaChangeEvent) throws SchemaEvolveException;  // (10) 
  
    /** Sets enabled schema evolution event types of current metadata applier. */  
    default MetadataApplier setAcceptedSchemaEvolutionTypes(  
            Set<SchemaChangeEventType> schemaEvolutionTypes) {  
        return this;  
    }  
  
    /** Checks if this metadata applier should this event type. */  
    default boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEventType) {  
        return true;  
    }  
  
    /** Checks what kind of schema change events downstream can handle. */  
    default Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {  
        return Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet());  
    }  
}

DorisMetadataApplier

org.apache.flink.cdc.connectors.doris.sink.DorisMetadataApplier 实现 MetadataApplier

org.apache.flink.cdc.connectors.doris.sink.DorisMetadataApplier#applySchemaChange
以:

// (10)
@Override  
public void applySchemaChange(SchemaChangeEvent event) throws SchemaEvolveException {  
    try {  
        // send schema change op to doris  
        if (event instanceof CreateTableEvent) {  
            applyCreateTableEvent((CreateTableEvent) event);  
        } else if (event instanceof AddColumnEvent) {  
            applyAddColumnEvent((AddColumnEvent) event);  
        } else if (event instanceof DropColumnEvent) {  
            applyDropColumnEvent((DropColumnEvent) event);  
        } else if (event instanceof RenameColumnEvent) {  
            applyRenameColumnEvent((RenameColumnEvent) event);  
        } else if (event instanceof AlterColumnTypeEvent) {  
            applyAlterColumnTypeEvent((AlterColumnTypeEvent) event);  
        } else {  
            throw new UnsupportedSchemaChangeEventException(event);  
        }  
    } catch (Exception ex) {  
        throw new SchemaEvolveException(event, ex.getMessage(), null);  
    }  
}

applyAddColumnEvent举例说明:
这里仅做一些转换

private void applyAddColumnEvent(AddColumnEvent event)  
        throws IOException, IllegalArgumentException {  
    TableId tableId = event.tableId();  
    List<AddColumnEvent.ColumnWithPosition> addedColumns = event.getAddedColumns();  
    for (AddColumnEvent.ColumnWithPosition col : addedColumns) {  
        Column column = col.getAddColumn();  
        FieldSchema addFieldSchema =  
                new FieldSchema(  
                        column.getName(),  
                        buildTypeString(column.getType()),  
                        column.getDefaultValueExpression(),  
                        column.getComment());  
        schemaChangeManager.addColumn(  
                tableId.getSchemaName(), tableId.getTableName(), addFieldSchema);  
    }  
}

SchemaChangeManager

org.apache.doris.flink.sink.schema.SchemaChangeManager

org.apache.doris.flink.sink.schema.SchemaChangeManager#addColumn方法:
SchemaChangeHelper是拼接SQL用的。schemaChange方法向数据库发送需要执行的SQL。

public boolean addColumn(String database, String table, FieldSchema field)  
        throws IOException, IllegalArgumentException {  
    if (checkColumnExists(database, table, field.getName())) {  
        LOG.warn(  
                "The column {} already exists in table {}, no need to add it again",  
                field.getName(),  
                table);  
        return true;  
    }  
    String tableIdentifier = getTableIdentifier(database, table);  
    String addColumnDDL = SchemaChangeHelper.buildAddColumnDDL(tableIdentifier, field);  
    return schemaChange(  
            database, table, buildRequestParam(false, field.getName()), addColumnDDL);  
}

SchemaChangeHelper

org.apache.doris.flink.sink.schema.SchemaChangeHelper

org.apache.doris.flink.sink.schema.SchemaChangeHelper#buildAddColumnDDL
ADD_DDL字符串模板拼接SQL:

// (11)
private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
public static String buildAddColumnDDL(String tableIdentifier, FieldSchema fieldSchema) {  
    String name = fieldSchema.getName();  
    String type = fieldSchema.getTypeString();  
    String defaultValue = fieldSchema.getDefaultValue();  
    String comment = fieldSchema.getComment();  
    StringBuilder addDDL =  
            new StringBuilder(  
                    String.format(  
                            ADD_DDL,  
                            DorisSchemaFactory.quoteTableIdentifier(tableIdentifier),  
                            DorisSchemaFactory.identifier(name),  
                            type));  
    if (defaultValue != null) {  
        addDDL.append(" DEFAULT ").append(DorisSchemaFactory.quoteDefaultValue(defaultValue));  
    }  
    commentColumn(addDDL, comment);  
    return addDDL.toString();  
}

流程总结:

  • SchemaOperator接收到SchemaChangeEvent,发送SchemaChangeRequest至SchemaRegistry。
  • SchemaRegistry内部执行器是SchemaRegistryRequestHandler,简称handler,handler内部持有有状态schemaChangeStatus其判断是否正在执行之前的Request,如果是则返回busy状态。如果不是则返回accept状态。其状态修改由RequestStatus.IDLERequestStatus.WAITING_FOR_FLUSH
  • SchemaOperator如果收到busy状态则sleep后再次发起请求,阻塞直到,收到accept状态,则发送一条FlushEvent至下游,之后发送SchemaChangeResultRequest至SchemaRegistry,等待返回结果如果是SchemaChangeProcessingResponse则认为SchemaChange还没有结束,sleep后再次发起请求,阻塞直至收到非SchemaChangeProcessingResponse。此时阻塞,不再发送新的表结构的数据至下游。
  • SchemaRegistry收到SchemaChangeResultRequest,handler会检查自身状态schemaChangeStatus,如果不是RequestStatus.FINISHED,则返回SchemaChangeProcessingResponse
  • DataSinkWriterOperator收到FlushEvent,并执行flush操作,将所有已经收到的老表结构的数据写入数据库。并发送FlushSuccessEvent给SchemaRegistry。
  • SchemaRegistry的handler收集FlushSuccessEvent,当收到所有的subtask的FlushSuccessEvent后,修改自身状态为RequestStatus.APPLYING。后使用MetadataApplier执行sink端(外)数据库的表结构变更。执行后修改自身状态为RequestStatus.FINISHED
  • 当SchemaOperator再次发送SchemaChangeResultRequest,且SchemaRegistry的handler的状态为RequestStatus.FINISHED,SchemaRegistry返回给其结果为 非SchemaChangeProcessingResponse,SchemaOperator将不再阻塞,开始将新的表结构的数据继续发送至下游。

二、流程图

在这里插入图片描述

下图中的序号已经在源码中表示,可以在源码中搜索。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/949381.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【编译原理与技术(李文生第二版)】期末复习

第五章 语法制导定义第五章 设计翻译方案√第六章 语义分析-类型表达式&#xff08;仅记录&#xff0c;没说考&#xff09;第七章 参数传递 √第七章 运行栈、display表 √例题1&#xff1a;来源&#xff1a;课件例题2&#xff1a;来源&#xff1a;教材7.4例题3&#xff1a;来源…

SpringBoot环境和Maven配置

SpringBoot环境和Maven配置 1. 环境准备2. Maven2.1 什么是Maven2.2 为什么要学 Maven2.3 创建一个 Maven项目2.4 Maven核心功能2.4.1 项目构建2.4.2 依赖管理2.4.3 Maven Help插件 2.5 Maven 仓库2.5.1本地仓库2.5.2 中央仓库2.5.3 私有服务器, 也称为私服 2.6 Maven设置国内源…

五个不同类型的数据库安装

一、 官方首页下载 打开 MySQL 官方首页&#xff0c;链接为&#xff1a; MySQL 进去社区后选择合适的版本进行安装 安装细节 依图一路next 点击finish结束安装 二、 在线YUM仓库 将该安装包的下载链接在 Linux 操作系统中按照以下命令直接进行下载 三、 二进制本地 通过该链接…

决定系数(R²分数)——评估回归模型性能的一个指标

目录 1.定义 2.计算举例 3. 结果分析 1.定义 R&#xff08;R平方&#xff09;分数&#xff0c;也称为决定系数&#xff0c;是用来评估回归模型性能的一个指标。它表示自变量解释因变量变异性的比例。R分数的取值范围通常在0到1之间&#xff0c;其值越接近1&#xff0c;说明…

基于单片机的直流稳压电源的设计(论文+源码)

1.系统方案设计 在本次直流稳压电源的设计中&#xff0c;其关键指标如下&#xff1a; 系统输入电压220V交流系统输出直流0到12V可调&#xff0c;步进可以达到0.1V电流最大输出可以到2A具有短路保护功能可以通过液晶或者数码管等显示设备显示当前输出电压 2. 电路图

排序算法——堆排序

什么是堆 堆就是一种特殊的二叉树&#xff0c;他有以下特点&#xff1a; 堆中某个节点的值总是不大于或不小于其父节点的值&#xff1b; 堆总是一棵完全二叉树。 堆又可以分为大根堆和小根堆 大根堆&#xff1a;根节点最大&#xff0c;每个节点都小于或等于父节点 小跟堆&am…

数据挖掘——聚类

数据挖掘——聚类 聚类K-meansKNN VS K-meansK-Nearest Neighbors (KNN)K-means K中心算法PAM算法 K-modes算法——解决数据敏感的问题KMeans算法 ——解决初始点选择问题K-中心点层次方法AGNES算法——最小距离单链接全链接平均链接 聚类评估K均值和K中心点的优缺点层次化聚类…

在线机考|2024华为实习秋招春招编程题(最新)——第3题_个性化歌单推荐系统_300分(十一)

题目内容 假设你是音乐服务的开发者,为了提高用户体验需要解决推荐歌单的同质化问题,保证推荐给用户的所有歌单不包含相同歌曲的。给定一个包含N个歌单和M条歌单重复记录,每个歌单用一个从1到N的整数编号,歌单重复记录包含两个歌单的ID,表示两个歌单有相同的歌曲。 你的任…

每日OJ_牛客_宵暗的妖怪_DP_C++_Java

目录 牛客_宵暗的妖怪_DP 题目解析 C代码 Java代码 牛客_宵暗的妖怪_DP 宵暗的妖怪 描述&#xff1a; 露米娅作为宵暗的妖怪&#xff0c;非常喜欢吞噬黑暗。这天&#xff0c;她来到了一条路上&#xff0c;准备吞噬这条路上的黑暗。这条道路一共被分为n 部分&…

开源架构的自动化测试策略优化版

最近四篇文章推荐&#xff1a; 开源架构的容器化部署优化版&#xff08;New&#xff09; 开源架构的微服务架构实践优化版&#xff08;New&#xff09; 开源架构中的数据库选择优化版&#xff08;New&#xff09; 开源架构学习指南&#xff1a;文档与资源的智慧锦囊&#xff08…

2. 进程和线程

文章目录 前言1. 进程是什么2. 进程的相关属性3. 线程是什么4. 为什么引入线程5. 进程和线程的区别 前言 上一篇博客&#xff0c;我们讲到了CPU和操作系统&#xff0c;今天我们讲一个操作系统中一个非常重要的概念—线程和进程 1. 进程是什么 每个应用程序运行于现代操作系统…

三甲医院等级评审八维数据分析应用(八)--数据治理的持续改进与反馈机制篇

一、引言 1.1 研究背景与意义 当前三甲医院在数据管理方面暴露出诸多棘手问题。一方面,数据治理缺乏系统性与规范性,数据标准不统一,不同科室、信息系统之间的数据格式各异、编码混乱,导致数据整合与共享困难重重,严重制约了数据分析的准确性与深度。以某三甲医院为例,…

HTML——66.单选框

<!DOCTYPE html> <html><head><meta charset"UTF-8"><title>单选框</title></head><body><!--input元素的type属性&#xff1a;(必须要有)--> <!--单选框:&#xff08;如所住省会&#xff0c;性别选择&…

数据结构与算法之排序

9.1 排序的概念 1. 排序的定义 定义&#xff1a;排序是将表中的记录按关键字递增&#xff08;或递减&#xff09;有序排列的过程。说明&#xff1a;数据中可以存在相同关键字的记录。本章主要考虑递增排序。扩展&#xff1a;排序是数据处理中的基本操作之一&#xff0c;广泛应用…

Swagger学习⑩——@Server注解

介绍 Server 是 Swagger/OpenAPI 3.0 注解中的一个注解&#xff0c;用于定义 API 文档中的服务器信息。通过 Server 注解&#xff0c;你可以指定 API 服务的基础 URL 或其他相关信息。 源代码 package io.swagger.v3.oas.annotations.servers;import io.swagger.v3.oas.anno…

MATLAB仿真:基于GS算法的经大气湍流畸变涡旋光束波前校正仿真

GS算法流程 GS&#xff08;Gerchberg-Saxton&#xff09;相位恢复算法是一种基于傅里叶变换的最速下降算法&#xff0c;可以通过输出平面和输入平面上光束的光强分布计算出光束的相位分布。图1是基于GS算法的涡旋光束畸变波前校正系统框图&#xff0c;在该框图中&#xff0c;已…

C语言笔记之`char*`、`const char*` 和 `char[]`辨析

C语言笔记之char*、const char* 和 char[]辨析 code review! 参考笔记 1.C语言笔记之char*、const char* 和 char[]辨析 2.C++笔记之int、size_t、uint8_t、unsigned char*区别 3.C++之char和string字符串类探究 4.C++笔记之字节数组的处理 5.C++笔记之如何给 const char* 类型…

十种基础排序算法(C语言实现,带源码)(有具体排序例子,适合学习理解)

学习了十种常见的排序方法&#xff0c;此文章针对所学的排序方法进行整理&#xff08;通过C语言完成排序&#xff09;。 参考内容&#xff1a; https://blog.csdn.net/mwj327720862/article/details/80498455 https://www.runoob.com/w3cnote/ten-sorting-algorithm.html 1. 冒…

Timer、Ticker使用及其注意事项

Timer、Ticker使用及其注意事项 在刚开始学习golang语言的时候就听说Timer、Ticker的使用要尤其注意&#xff0c;很容易出现问题&#xff0c;这次就来一探究竟。 本文主要脉络&#xff1a; 介绍定时器体系&#xff0c;并介绍常用使用方式和错误使用方式源码解读 timer、tic…

密码学科普

1 信息传输中的安全隐患 1. 窃听 解决方案&#xff1a;明文加密&#xff0c;X只能窃听到密文 2. 假冒 解决方案&#xff1a;消息认证码或者数字签名 3. 篡改 解决方案&#xff1a;消息认证码或者数字签名 4. 事后否认 解决方案&#xff1a;数字签名 2 对称加密/非对称加密 1…