elasticsearch源码分析-04集群状态发布

集群状态发布

cluster模块封装了在集群层面执行的任务,如集群健康、集群级元信息管理、分片分配给节点、节点管理等。集群任务执行之后可能会产生新的集群状态,如果产生新的集群状态主节点会将集群状态广播给其他节点。

集群状态封装在clusterState中,支持增量同步

提交集群任务的主要时机有以下几种:

  • 索引的创建、删除、打开、关闭
  • 索引模板、映射、别名的变化
  • gateway模块发布选举出来的集群状态
  • 快照
  • 分片分配
  • 集群节点变化等

提交集群任务入口在ClusterService的submitStateUpdateTask方法,第一个参数是事件来源,第二个参数是要执行的具体任务

public <T extends ClusterStateTaskConfig & ClusterStateTaskExecutor<T> & ClusterStateTaskListener>
        void submitStateUpdateTask(String source, T updateTask) {
        submitStateUpdateTask(source, updateTask, updateTask, updateTask, updateTask);
    }
    
public <T> void submitStateUpdateTask(String source, T task,
                                          ClusterStateTaskConfig config,
                                          ClusterStateTaskExecutor<T> executor,
                                          ClusterStateTaskListener listener) {
        submitStateUpdateTasks(source, Collections.singletonMap(task, listener), config, executor);
    }

public <T> void submitStateUpdateTasks(final String source,
                                           final Map<T, ClusterStateTaskListener> tasks, final ClusterStateTaskConfig config,
                                           final ClusterStateTaskExecutor<T> executor) {
        masterService.submitStateUpdateTasks(source, tasks, config, executor);
    }

最有代表性的任务是ClusterStateUpdateTask,它实现了ClusterStateTaskConfig、ClusterStateTaskExecutor

public abstract class ClusterStateUpdateTask
        implements ClusterStateTaskConfig, ClusterStateTaskExecutor<ClusterStateUpdateTask>, ClusterStateTaskListener {

在这里插入图片描述
ClusterStateTaskConfig包含了任务的配置信息和优先级

TimeValue timeout();
Priority priority();

ClusterStateTaskExecutor主要是定义要执行的任务,最主要的方法就是execute方法

 ClusterTasksResult<T> execute(ClusterState currentState, List<T> tasks) throws Exception;

任务执行时会传入当前集群状态,任务运行过程中如果产生新的集群状态就返回新的集群状态,如果没有就返回原来的集群状态

ClusterStateTaskListener主要是提交任务后的回调处理

/**
     * A callback called when execute fails.
     */
    void onFailure(String source, Exception e);

    /**
     * called when the task was rejected because the local node is no longer master.
     * Used only for tasks submitted to {@link MasterService}.
     */
    default void onNoLongerMaster(String source) {
        onFailure(source, new NotMasterException("no longer master. source: [" + source + "]"));
    }

    /**
     * Called when the result of the {@link ClusterStateTaskExecutor#execute(ClusterState, List)} have been processed
     * properly by all listeners.
     */
    default void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
    }

MasterService主要负责集群任务管理和运行,只有主节点会提交集群任务到内部队列,并运行队列中的任务

public <T> void submitStateUpdateTasks(final String source,
                                           final Map<T, ClusterStateTaskListener> tasks, final ClusterStateTaskConfig config,
                                           final ClusterStateTaskExecutor<T> executor) {
        if (!lifecycle.started()) {
            return;
        }
        final ThreadContext threadContext = threadPool.getThreadContext();
        final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(true);
        try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
            threadContext.markAsSystemContext();
            //封装任务
            List<Batcher.UpdateTask> safeTasks = tasks.entrySet().stream()
                .map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue(), supplier), executor))
                .collect(Collectors.toList());
            //提交任务
            taskBatcher.submitTasks(safeTasks, config.timeout());
        } catch (EsRejectedExecutionException e) {
            // ignore cases where we are shutting down..., there is really nothing interesting
            // to be done here...
            if (!lifecycle.stoppedOrClosed()) {
                throw e;
            }
        }
    }
public void submitTasks(List<? extends BatchedTask> tasks, @Nullable TimeValue timeout) throws EsRejectedExecutionException {
        if (tasks.isEmpty()) {
            return;
        }
        final BatchedTask firstTask = tasks.get(0);
        assert tasks.stream().allMatch(t -> t.batchingKey == firstTask.batchingKey) :
            "tasks submitted in a batch should share the same batching key: " + tasks;
        // convert to an identity map to check for dups based on task identity
        //根据任务标识检查重复数据
        final Map<Object, BatchedTask> tasksIdentity = tasks.stream().collect(Collectors.toMap(
            BatchedTask::getTask,
            Function.identity(),
            (a, b) -> { throw new IllegalStateException("cannot add duplicate task: " + a); },
            IdentityHashMap::new));

        synchronized (tasksPerBatchingKey) {
            //添加相同batchingKey的任务,返回已存在batchingKey的任务
            LinkedHashSet<BatchedTask> existingTasks = tasksPerBatchingKey.computeIfAbsent(firstTask.batchingKey,
                k -> new LinkedHashSet<>(tasks.size()));
            //检查是否存在相同batchingKey的任务
            for (BatchedTask existing : existingTasks) {
                // check that there won't be two tasks with the same identity for the same batching key
                BatchedTask duplicateTask = tasksIdentity.get(existing.getTask());
                if (duplicateTask != null) {
                    throw new IllegalStateException("task [" + duplicateTask.describeTasks(
                        Collections.singletonList(existing)) + "] with source [" + duplicateTask.source + "] is already queued");
                }
            }
            existingTasks.addAll(tasks);
        }
        //执行任务
        if (timeout != null) {
            threadExecutor.execute(firstTask, timeout, () -> onTimeoutInternal(tasks, timeout));
        } else {
            threadExecutor.execute(firstTask);
        }
    }

这里有去重逻辑,拥有相同ClusterStateTaskExecutor对象实例的任务只会执行一次,然后对于其他相同的实例直接赋值相同的执行结果。区分重复任务的方式时通过定义的任务本身,去重的方式不是将重复的数据删除,而是在执行完任务后赋予重复任务相同的结果。

ClusterStateTaskExecutor相同有两种情况可能是提交的任务本身重复,还有就是之前提交的任务已存在,但是尚未执行此时提交相同的任务就会保存到对应的列表中,只会执行一次

任务会被封装到UpdateTask中

class UpdateTask extends BatchedTask {
            final ClusterStateTaskListener listener;

            UpdateTask(Priority priority, String source, Object task, ClusterStateTaskListener listener,
                       ClusterStateTaskExecutor<?> executor) {
                super(priority, source, executor, task);
                this.listener = listener;
            }

            @Override
            public String describeTasks(List<? extends BatchedTask> tasks) {
                return ((ClusterStateTaskExecutor<Object>) batchingKey).describeTasks(
                    tasks.stream().map(BatchedTask::getTask).collect(Collectors.toList()));
            }
        }

提交到线程池运行调用run方法

@Override
public void run() {
    //运行还没处理的任务
    runIfNotProcessed(this);
}
void runIfNotProcessed(BatchedTask updateTask) {
        //具有相同batching key的任务只会执行一次
        if (updateTask.processed.get() == false) {
            final List<BatchedTask> toExecute = new ArrayList<>();
            final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
            synchronized (tasksPerBatchingKey) {
                //获取任务列表
                LinkedHashSet<BatchedTask> pending = tasksPerBatchingKey.remove(updateTask.batchingKey);
                if (pending != null) {
                    for (BatchedTask task : pending) {
                        if (task.processed.getAndSet(true) == false) {
                            logger.trace("will process {}", task);
                            //构建要执行的任务列表
                            toExecute.add(task);
                            processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
                        } else {
                            logger.trace("skipping {}, already processed", task);
                        }
                    }
                }
            }

            if (toExecute.isEmpty() == false) {
                final String tasksSummary = processTasksBySource.entrySet().stream().map(entry -> {
                    String tasks = updateTask.describeTasks(entry.getValue());
                    return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
                }).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
                //执行任务
                run(updateTask.batchingKey, toExecute, tasksSummary);
            }
        }
    }

执行任务并发布集群状态的逻辑在MasterService中

@Override
        protected void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary) {
            ClusterStateTaskExecutor<Object> taskExecutor = (ClusterStateTaskExecutor<Object>) batchingKey;
            List<UpdateTask> updateTasks = (List<UpdateTask>) tasks;
            //运行任务,并发布集群状态
            runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummary));
        }
private void runTasks(TaskInputs taskInputs) {
        final String summary = taskInputs.summary;
        if (!lifecycle.started()) {
            logger.debug("processing [{}]: ignoring, master service not started", summary);
            return;
        }

        logger.debug("executing cluster state update for [{}]", summary);
        //之前集群状态
        final ClusterState previousClusterState = state();
        //只在主节点执行
        if (!previousClusterState.nodes().isLocalNodeElectedMaster() && taskInputs.runOnlyWhenMaster()) {
            logger.debug("failing [{}]: local node is no longer master", summary);
            taskInputs.onNoLongerMaster();
            return;
        }

        final long computationStartTime = threadPool.relativeTimeInMillis();
        //执行task任务生成新的集群状态
        final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState);
        taskOutputs.notifyFailedTasks();
        final TimeValue computationTime = getTimeSince(computationStartTime);
        logExecutionTime(computationTime, "compute cluster state update", summary);

        if (taskOutputs.clusterStateUnchanged()) {
            final long notificationStartTime = threadPool.relativeTimeInMillis();
            taskOutputs.notifySuccessfulTasksOnUnchangedClusterState();
            final TimeValue executionTime = getTimeSince(notificationStartTime);
            logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary);
        } else {//集群状态发生改变
            final ClusterState newClusterState = taskOutputs.newClusterState;
            if (logger.isTraceEnabled()) {
                logger.trace("cluster state updated, source [{}]\n{}", summary, newClusterState);
            } else {
                logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary);
            }
            final long publicationStartTime = threadPool.relativeTimeInMillis();
            try {
                ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState);
                // new cluster state, notify all listeners
                final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
                if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
                    String nodesDeltaSummary = nodesDelta.shortSummary();
                    if (nodesDeltaSummary.length() > 0) {
                        logger.info("{}, term: {}, version: {}, delta: {}",
                            summary, newClusterState.term(), newClusterState.version(), nodesDeltaSummary);
                    }
                }

                logger.debug("publishing cluster state version [{}]", newClusterState.version());
                //发布集群状态
                publish(clusterChangedEvent, taskOutputs, publicationStartTime);
            } catch (Exception e) {
                handleException(summary, publicationStartTime, newClusterState, e);
            }
        }
    }

执行方法前判断是不是主节点因为只有主节点可以运行集群任务,根据执行任务前的集群状态执行任务生成新的集群状态

执行任务获取任务执行结果,并生成新的集群状态

private TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState) {
        //执行提交的任务,并且返回新的集群状态
        ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, previousClusterState);
        //根据分配分片结果生成新的集群状态
        ClusterState newClusterState = patchVersions(previousClusterState, clusterTasksResult);
        return new TaskOutputs(taskInputs, previousClusterState, newClusterState, getNonFailedTasks(taskInputs, clusterTasksResult),
            clusterTasksResult.executionResults);
    }

获取任务列表,调用executor的execute方法

private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, ClusterState previousClusterState) {
        ClusterTasksResult<Object> clusterTasksResult;
        try {
            List<Object> inputs = taskInputs.updateTasks.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList());
            //执行任务,并返回新的集群状态
            clusterTasksResult = taskInputs.executor.execute(previousClusterState, inputs);
            if (previousClusterState != clusterTasksResult.resultingState &&
                previousClusterState.nodes().isLocalNodeElectedMaster() &&
                (clusterTasksResult.resultingState.nodes().isLocalNodeElectedMaster() == false)) {
                throw new AssertionError("update task submitted to MasterService cannot remove master");
            }
        } catch (Exception e) {
           ......
               clusterTasksResult = ClusterTasksResult.builder()
                .failures(taskInputs.updateTasks.stream().map(updateTask -> updateTask.task)::iterator, e)
                .build(previousClusterState);
        }
        ......
        return clusterTasksResult;
    }

这里我们以gateway恢复集群状态为例

ClusterTasksResult<T> execute(ClusterState currentState, List<T> tasks) throws Exception;
@Override
    public final ClusterTasksResult<ClusterStateUpdateTask> execute(ClusterState currentState, List<ClusterStateUpdateTask> tasks)
            throws Exception {
        //执行集群状态变更task,并且返回执行之后的集群状态结果
        ClusterState result = execute(currentState);
        return ClusterTasksResult.<ClusterStateUpdateTask>builder().successes(tasks).build(result);
    }

		@Override
        public void onSuccess(final ClusterState recoveredState) {
            logger.trace("successful state recovery, importing cluster state...");
            clusterService.submitStateUpdateTask("local-gateway-elected-state",
                new RecoverStateUpdateTask() {
                @Override
                public ClusterState execute(final ClusterState currentState) {
                    final ClusterState updatedState = ClusterStateUpdaters.mixCurrentStateAndRecoveredState(currentState, recoveredState);
                    return super.execute(ClusterStateUpdaters.recoverClusterBlocks(updatedState));
                }
            });
        }

@Override
        public ClusterState execute(final ClusterState currentState) {
            if (currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
                logger.debug("cluster is already recovered");
                return currentState;
            }
            //状态信息恢复完成
            final ClusterState newState = Function.<ClusterState>identity()
                    .andThen(ClusterStateUpdaters::updateRoutingTable)
                    .andThen(ClusterStateUpdaters::removeStateNotRecoveredBlock)
                    .apply(currentState);
            //开始分配分片
            return allocationService.reroute(newState, "state recovered");
        }

生成新的集群状态,开始分配分片,根据之前的集群状态和新生成的结果构造新的集群状态

private ClusterState patchVersions(ClusterState previousClusterState, ClusterTasksResult<?> executionResult) {
        //新的集群状态
        ClusterState newClusterState = executionResult.resultingState;

        if (previousClusterState != newClusterState) {
            // only the master controls the version numbers
            //生成新的集群状态版本号,递增的
            Builder builder = incrementVersion(newClusterState);
            //路由表发生了改变,也就是分片信息发送了改变,分片-node
            if (previousClusterState.routingTable() != newClusterState.routingTable()) {
                builder.routingTable(RoutingTable.builder(newClusterState.routingTable())
                    .version(newClusterState.routingTable().version() + 1).build());
            }
            //集群元数据发生了改变
            if (previousClusterState.metadata() != newClusterState.metadata()) {
                builder.metadata(Metadata.builder(newClusterState.metadata()).version(newClusterState.metadata().version() + 1));
            }

            newClusterState = builder.build();
        }

        return newClusterState;
    }

回到MasterService的runTasks方法中新的集群状态已经生成并返回,然后判断集群状态和之前的集群状态是否相同,如果发生变化则将进入集群状态发布阶段,将最新的集群状态广播到所有节点

//发布集群状态
publish(clusterChangedEvent, taskOutputs, publicationStartTime);
protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeMillis) {
        final PlainActionFuture<Void> fut = new PlainActionFuture<Void>() {
            @Override
            protected boolean blockingAllowed() {
                return isMasterUpdateThread() || super.blockingAllowed();
            }
        };
        //发布集群状态
        clusterStatePublisher.publish(clusterChangedEvent, fut, taskOutputs.createAckListener(threadPool, clusterChangedEvent.state()));

        // indefinitely wait for publication to complete
        //无限期等待发布完成
        try {
            FutureUtils.get(fut);
            onPublicationSuccess(clusterChangedEvent, taskOutputs);
        } catch (Exception e) {
            onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeMillis, e);
        }
    }
@Override
    public void publish(ClusterChangedEvent clusterChangedEvent, ActionListener<Void> publishListener, AckListener ackListener) {
        //新的集群状态
        ClusterState newState = clusterChangedEvent.state();
        assert newState.getNodes().isLocalNodeElectedMaster() : "Shouldn't publish state when not master " + clusterChangedEvent.source();

        try {

            // state got changed locally (maybe because another master published to us)
            if (clusterChangedEvent.previousState() != this.committedState.get()) {
                throw new FailedToCommitClusterStateException("state was mutated while calculating new CS update");
            }

            pendingStatesQueue.addPending(newState);
            //发布集群状态
            publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener);
        } catch (FailedToCommitClusterStateException t) {
            // cluster service logs a WARN message
            logger.debug("failed to publish cluster state version [{}] (not enough nodes acknowledged, min master nodes [{}])",
                newState.version(), electMaster.minimumMasterNodes());

            synchronized (stateMutex) {
                pendingStatesQueue.failAllStatesAndClear(
                    new ElasticsearchException("failed to publish cluster state"));

                rejoin("zen-disco-failed-to-publish");
            }

            publishListener.onFailure(t);
            return;
        }

        final DiscoveryNode localNode = newState.getNodes().getLocalNode();
        final AtomicBoolean processedOrFailed = new AtomicBoolean();
        pendingStatesQueue.markAsCommitted(newState.stateUUID(),
            new PendingClusterStatesQueue.StateProcessedListener() {
                @Override
                public void onNewClusterStateProcessed() {
                    processedOrFailed.set(true);
                    publishListener.onResponse(null);
                    ackListener.onNodeAck(localNode, null);
                }

                @Override
                public void onNewClusterStateFailed(Exception e) {
                    processedOrFailed.set(true);
                    publishListener.onFailure(e);
                    ackListener.onNodeAck(localNode, e);
                    logger.warn(() -> new ParameterizedMessage(
                            "failed while applying cluster state locally [{}]", clusterChangedEvent.source()), e);
                }
            });

        synchronized (stateMutex) {
            if (clusterChangedEvent.previousState() != this.committedState.get()) {
                publishListener.onFailure(
                        new FailedToCommitClusterStateException("local state was mutated while CS update was published to other nodes")
                );
                return;
            }
            //经过二阶段提交状态已经发布到了集群,但不能保证所有节点都成功了,下面处理提交后的集群状态
            boolean sentToApplier = processNextCommittedClusterState("master " + newState.nodes().getMasterNode() +
                " committed version [" + newState.version() + "] source [" + clusterChangedEvent.source() + "]");
            if (sentToApplier == false && processedOrFailed.get() == false) {
                assert false : "cluster state published locally neither processed nor failed: " + newState;
                logger.warn("cluster state with version [{}] that is published locally has neither been processed nor failed",
                    newState.version());
                publishListener.onFailure(new FailedToCommitClusterStateException("cluster state that is published locally has neither " +
                        "been processed nor failed"));
            }
        }
    }

首先准备发送集群状态的目标节点列表,剔除本节点。构建增量发布或全量发布集群状态,然后执行序列化并压缩,以便将状态发布出去

public void publish(final ClusterChangedEvent clusterChangedEvent, final int minMasterNodes,
                        final Discovery.AckListener ackListener) throws FailedToCommitClusterStateException {
        final DiscoveryNodes nodes;
        final SendingController sendingController;
        final Set<DiscoveryNode> nodesToPublishTo;
        final Map<Version, BytesReference> serializedStates;
        final Map<Version, BytesReference> serializedDiffs;
        final boolean sendFullVersion;
        try {
            //需要发送目的节点
            nodes = clusterChangedEvent.state().nodes();
            nodesToPublishTo = new HashSet<>(nodes.getSize());
            DiscoveryNode localNode = nodes.getLocalNode();
            final int totalMasterNodes = nodes.getMasterNodes().size();
            for (final DiscoveryNode node : nodes) {
                if (node.equals(localNode) == false) {
                    nodesToPublishTo.add(node);
                }
            }
            sendFullVersion = !discoverySettings.getPublishDiff() || clusterChangedEvent.previousState() == null;
            //全量状态
            serializedStates = new HashMap<>();
            //增量状态
            serializedDiffs = new HashMap<>();

            // we build these early as a best effort not to commit in the case of error.
            // sadly this is not water tight as it may that a failed diff based publishing to a node
            // will cause a full serialization based on an older version, which may fail after the
            // change has been committed.
            //构建序列化后的结果
            buildDiffAndSerializeStates(clusterChangedEvent.state(), clusterChangedEvent.previousState(),
                    nodesToPublishTo, sendFullVersion, serializedStates, serializedDiffs);
            //发布状态返回结果处理
            final BlockingClusterStatePublishResponseHandler publishResponseHandler =
                new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener);
            sendingController = new SendingController(clusterChangedEvent.state(), minMasterNodes,
                totalMasterNodes, publishResponseHandler);
        } catch (Exception e) {
            throw new FailedToCommitClusterStateException("unexpected error while preparing to publish", e);
        }

        try {
            //发布
            innerPublish(clusterChangedEvent, nodesToPublishTo, sendingController, ackListener, sendFullVersion, serializedStates,
                serializedDiffs);
        } catch (FailedToCommitClusterStateException t) {
            throw t;
        } catch (Exception e) {
            // try to fail committing, in cause it's still on going
            if (sendingController.markAsFailed("unexpected error", e)) {
                // signal the change should be rejected
                throw new FailedToCommitClusterStateException("unexpected error", e);
            } else {
                throw e;
            }
        }
    }

全量状态保存在serializedStates,增量状态保存在serializedDiffs。每个集群状态都有自己为一个版本好,在发布集群状态时允许相邻版本好之间只发送增量内容

构造需要发送的状态,如果上次发布集群状态的节点不存在或设置了全量发送配置,则构建全量状态否则构建增量状态然后进行序列化并压缩

private void buildDiffAndSerializeStates(ClusterState clusterState, ClusterState previousState, Set<DiscoveryNode> nodesToPublishTo,
                                             boolean sendFullVersion, Map<Version, BytesReference> serializedStates,
                                             Map<Version, BytesReference> serializedDiffs) {
        Diff<ClusterState> diff = null;
        for (final DiscoveryNode node : nodesToPublishTo) {
            try {
                //发送全量
                if (sendFullVersion || !previousState.nodes().nodeExists(node)) {
                    // will send a full reference
                    if (serializedStates.containsKey(node.getVersion()) == false) {
                        serializedStates.put(node.getVersion(), serializeFullClusterState(clusterState, node.getVersion()));
                    }
                } else {
                    //发送增量
                    // will send a diff
                    if (diff == null) {
                        diff = clusterState.diff(previousState);
                    }
                    if (serializedDiffs.containsKey(node.getVersion()) == false) {
                        serializedDiffs.put(node.getVersion(), serializeDiffClusterState(diff, node.getVersion()));
                    }
                }
            } catch (IOException e) {
                throw new ElasticsearchException("failed to serialize cluster_state for publishing to node {}", e, node);
            }
        }
    }

es使用二阶段提交来实现状态发布,第一步是push及先将状态数据发送到node节点,但不应用,如果得到超过半数的节点的返回确认,则执行第二步commit及发送提交请求,二阶段提交不能保证节点收到commit请求后可以正确应用,也就是它只能保证发了commit请求,但是无法保证单个节点上的状态应用是成功还是失败的

  • push阶段发送集群状态数据
private void innerPublish(final ClusterChangedEvent clusterChangedEvent, final Set<DiscoveryNode> nodesToPublishTo,
                              final SendingController sendingController, final Discovery.AckListener ackListener,
                              final boolean sendFullVersion, final Map<Version, BytesReference> serializedStates,
                              final Map<Version, BytesReference> serializedDiffs) {

        final ClusterState clusterState = clusterChangedEvent.state();
        final ClusterState previousState = clusterChangedEvent.previousState();
        //发布超时时间
        final TimeValue publishTimeout = discoverySettings.getPublishTimeout();
        //发布起始时间
        final long publishingStartInNanos = System.nanoTime();
        //遍历节点异步发送全量或增量状态数据
        for (final DiscoveryNode node : nodesToPublishTo) {
            // try and serialize the cluster state once (or per version), so we don't serialize it
            // per node when we send it over the wire, compress it while we are at it...
            // we don't send full version if node didn't exist in the previous version of cluster state
            //发生全量状态
            if (sendFullVersion || !previousState.nodes().nodeExists(node)) {
                sendFullClusterState(clusterState, serializedStates, node, publishTimeout, sendingController);
            } else {
                //发布增量状态
                sendClusterStateDiff(clusterState, serializedDiffs, serializedStates, node, publishTimeout, sendingController);
            }
        }
        //等待提交,等待第一阶段完成收到足够的响应或达到了超时时间
        sendingController.waitForCommit(discoverySettings.getCommitTimeout());

        final long commitTime = System.nanoTime() - publishingStartInNanos;

        ackListener.onCommit(TimeValue.timeValueNanos(commitTime));

        try {
            long timeLeftInNanos = Math.max(0, publishTimeout.nanos() - commitTime);
            final BlockingClusterStatePublishResponseHandler publishResponseHandler = sendingController.getPublishResponseHandler();
            sendingController.setPublishingTimedOut(!publishResponseHandler.awaitAllNodes(TimeValue.timeValueNanos(timeLeftInNanos)));
            if (sendingController.getPublishingTimedOut()) {
                DiscoveryNode[] pendingNodes = publishResponseHandler.pendingNodes();
                // everyone may have just responded
                if (pendingNodes.length > 0) {
                    logger.warn("timed out waiting for all nodes to process published state [{}] (timeout [{}], pending nodes: {})",
                        clusterState.version(), publishTimeout, pendingNodes);
                }
            }
            // The failure is logged under debug when a sending failed. we now log a summary.
            Set<DiscoveryNode> failedNodes = publishResponseHandler.getFailedNodes();
            if (failedNodes.isEmpty() == false) {
                logger.warn("publishing cluster state with version [{}] failed for the following nodes: [{}]",
                    clusterChangedEvent.state().version(), failedNodes);
            }
        } catch (InterruptedException e) {
            // ignore & restore interrupt
            Thread.currentThread().interrupt();
        }
    }

无论是发送全量数据还是发送增量数据最终都会调用到sendClusterStateToNode方法

private void sendClusterStateToNode(final ClusterState clusterState, BytesReference bytes,
                                        final DiscoveryNode node,
                                        final TimeValue publishTimeout,
                                        final SendingController sendingController,
                                        final boolean sendDiffs, final Map<Version, BytesReference> serializedStates) {
        try {
            //调用底层的传输层发送
            transportService.sendRequest(node, SEND_ACTION_NAME,
                    new BytesTransportRequest(bytes, node.getVersion()),
                    stateRequestOptions,
                    new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {

                        @Override
                        public void handleResponse(TransportResponse.Empty response) {
                            //发布超时
                            if (sendingController.getPublishingTimedOut()) {
                                logger.debug("node {} responded for cluster state [{}] (took longer than [{}])", node,
                                    clusterState.version(), publishTimeout);
                            }
                            //检查收到的响应是否过半,然后执行commit
                            sendingController.onNodeSendAck(node);
                        }

                        @Override
                        public void handleException(TransportException exp) {
                            if (sendDiffs && exp.unwrapCause() instanceof IncompatibleClusterStateVersionException) {
                                logger.debug("resending full cluster state to node {} reason {}", node, exp.getDetailedMessage());
                                sendFullClusterState(clusterState, serializedStates, node, publishTimeout, sendingController);
                            } else {
                                logger.debug(() -> new ParameterizedMessage("failed to send cluster state to {}", node), exp);
                                sendingController.onNodeSendFailed(node, exp);
                            }
                        }
                    });
        } catch (Exception e) {
            logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", node), e);
            sendingController.onNodeSendFailed(node, e);
        }
    }

调用transportService的sendRequest方法异步发送数据,rpc请求为internal:discovery/zen/publish/send对应节点注册的处理器为SendClusterStateRequestHandler

//发送处理
        transportService.registerRequestHandler(SEND_ACTION_NAME, ThreadPool.Names.SAME, false, false, BytesTransportRequest::new,
            new SendClusterStateRequestHandler());

private class SendClusterStateRequestHandler implements TransportRequestHandler<BytesTransportRequest> {

        @Override
        public void messageReceived(BytesTransportRequest request, final TransportChannel channel, Task task) throws Exception {
            //处理状态变更请求
            handleIncomingClusterStateRequest(request, channel);
        }
    }

protected void handleIncomingClusterStateRequest(BytesTransportRequest request, TransportChannel channel) throws IOException {
        Compressor compressor = CompressorFactory.compressor(request.bytes());
        StreamInput in = request.bytes().streamInput();
        final ClusterState incomingState;
        synchronized (lastSeenClusterStateMutex) {
            try {
                if (compressor != null) {
                    in = compressor.streamInput(in);
                }
                in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry);
                in.setVersion(request.version());
                // If true we received full cluster state - otherwise diffs
                //true:全量状态,false:增量
                if (in.readBoolean()) {
                    incomingState = ClusterState.readFrom(in, transportService.getLocalNode());
                    fullClusterStateReceivedCount.incrementAndGet();
                    logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(),
                        request.bytes().length());
                } else if (lastSeenClusterState != null) {
                    Diff<ClusterState> diff = ClusterState.readDiffFrom(in, lastSeenClusterState.nodes().getLocalNode());
                    incomingState = diff.apply(lastSeenClusterState);
                    compatibleClusterStateDiffReceivedCount.incrementAndGet();
                    logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]",
                        incomingState.version(), incomingState.stateUUID(), request.bytes().length());
                } else {
                    logger.debug("received diff for but don't have any local cluster state - requesting full state");
                    throw new IncompatibleClusterStateVersionException("have no local cluster state");
                }
            } catch (IncompatibleClusterStateVersionException e) {
                incompatibleClusterStateDiffReceivedCount.incrementAndGet();
                throw e;
            } catch (Exception e) {
                logger.warn("unexpected error while deserializing an incoming cluster state", e);
                throw e;
            } finally {
                IOUtils.close(in);
            }
            //触发监听器
            incomingClusterStateListener.onIncomingClusterState(incomingState);
            lastSeenClusterState = incomingState;
        }
        //发送发回空结果
        channel.sendResponse(TransportResponse.Empty.INSTANCE);
    }

保存集群状态,然后返回空结果

继续回到主节点发送数据的回调函数中,检查响应是否足够

public synchronized void onNodeSendAck(DiscoveryNode node) {
            if (committed) {//提交状态
                assert sendAckedBeforeCommit.isEmpty();
                sendCommitToNode(node, clusterState, this);
            } else if (committedOrFailed()) {
                logger.trace("ignoring ack from [{}] for cluster state version [{}]. already failed", node, clusterState.version());
            } else {
                // we're still waiting
                sendAckedBeforeCommit.add(node);
                if (node.isMasterNode()) {
                    checkForCommitOrFailIfNoPending(node);
                }
            }
        }

  //检查返回ack的节点数,如果超过了半数就执行commit
        private synchronized void checkForCommitOrFailIfNoPending(DiscoveryNode masterNode) {
            logger.trace("master node {} acked cluster state version [{}]. processing ... (current pending [{}], needed [{}])",
                    masterNode, clusterState.version(), pendingMasterNodes, neededMastersToCommit);
            neededMastersToCommit--;
            if (neededMastersToCommit == 0) {
                if (markAsCommitted()) {
                    for (DiscoveryNode nodeToCommit : sendAckedBeforeCommit) {
                        sendCommitToNode(nodeToCommit, clusterState, this);
                    }
                    sendAckedBeforeCommit.clear();
                }
            }
            decrementPendingMasterAcksAndChangeForFailure();
        }
  • commit阶段

接收到了足够的响应后开始执行commit逻辑

private void sendCommitToNode(final DiscoveryNode node, final ClusterState clusterState, final SendingController sendingController) {
        try {
            logger.trace("sending commit for cluster state (uuid: [{}], version [{}]) to [{}]",
                clusterState.stateUUID(), clusterState.version(), node);
            transportService.sendRequest(node, COMMIT_ACTION_NAME,
                    new CommitClusterStateRequest(clusterState.stateUUID()),
                    stateRequestOptions,
                    new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {

                        @Override
                        public void handleResponse(TransportResponse.Empty response) {
                            if (sendingController.getPublishingTimedOut()) {
                                logger.debug("node {} responded to cluster state commit [{}]", node, clusterState.version());
                            }
                            sendingController.getPublishResponseHandler().onResponse(node);
                        }

                        @Override
                        public void handleException(TransportException exp) {
                            logger.debug(() -> new ParameterizedMessage("failed to commit cluster state (uuid [{}], version [{}]) to {}",
                                    clusterState.stateUUID(), clusterState.version(), node), exp);
                            sendingController.getPublishResponseHandler().onFailure(node, exp);
                        }
                    });
        } catch (Exception t) {
            logger.warn(() -> new ParameterizedMessage("error sending cluster state commit (uuid [{}], version [{}]) to {}",
                    clusterState.stateUUID(), clusterState.version(), node), t);
            sendingController.getPublishResponseHandler().onFailure(node, t);
        }
    }

同样通过transportService发生RPC请求,内部请求的url为internal:discovery/zen/publish/commit

接收数据的节点注册的处理器为CommitClusterStateRequestHandler

//提交处理
        transportService.registerRequestHandler(COMMIT_ACTION_NAME, ThreadPool.Names.SAME, false, false, CommitClusterStateRequest::new,
            new CommitClusterStateRequestHandler());

//提交集群状态处理
    private class CommitClusterStateRequestHandler implements TransportRequestHandler<CommitClusterStateRequest> {
        @Override
        public void messageReceived(CommitClusterStateRequest request, final TransportChannel channel, Task task) throws Exception {
            handleCommitRequest(request, channel);
        }
    }

节点应用集群状态

@Override
    public void onClusterStateCommitted(String stateUUID, ActionListener<Void> processedListener) {
        //更新提交新状态
        final ClusterState state = pendingStatesQueue.markAsCommitted(stateUUID,
            new PendingClusterStatesQueue.StateProcessedListener() {
                @Override
                public void onNewClusterStateProcessed() {
                    processedListener.onResponse(null);
                }

                @Override
                public void onNewClusterStateFailed(Exception e) {
                    processedListener.onFailure(e);
                }
            });
        if (state != null) {
            synchronized (stateMutex) {
                //应用新的集群状态
                processNextCommittedClusterState("master " + state.nodes().getMasterNode() +
                    " committed version [" + state.version() + "]");
            }
        }
    }

 //集群应用新的集群状态
        clusterApplier.onNewClusterState("apply cluster state (from master [" + reason + "])",
            this::clusterState,
            new ClusterApplyListener() {
                @Override
                public void onSuccess(String source) {
                    try {
                        pendingStatesQueue.markAsProcessed(newClusterState);
                    } catch (Exception e) {
                        onFailure(source, e);
                    }
                }

                @Override
                public void onFailure(String source, Exception e) {
                    logger.error(() -> new ParameterizedMessage("unexpected failure applying [{}]", reason), e);
                    try {
                        // TODO: use cluster state uuid instead of full cluster state so that we don't keep reference to CS around
                        // for too long.
                        pendingStatesQueue.markAsFailed(newClusterState, e);
                    } catch (Exception inner) {
                        inner.addSuppressed(e);
                        logger.error(() -> new ParameterizedMessage("unexpected exception while failing [{}]", reason), inner);
                   }
                }
            });

最终调用到ClusterApplierService的runTask方法

private void runTask(UpdateTask task) {
        if (!lifecycle.started()) {
            logger.debug("processing [{}]: ignoring, cluster applier service not started", task.source);
            return;
        }

        logger.debug("processing [{}]: execute", task.source);
        //获取之前的集群状态
        final ClusterState previousClusterState = state.get();
        //任务执行起始时间
        long startTimeMS = currentTimeInMillis();
        //简单的秒表,允许对许多任务进行计时
        final StopWatch stopWatch = new StopWatch();
        final ClusterState newClusterState;
        try {
            try (Releasable ignored = stopWatch.timing("running task [" + task.source + ']')) {
                newClusterState = task.apply(previousClusterState);
            }
        } catch (Exception e) {
            TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
            logger.trace(() -> new ParameterizedMessage(
                "failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}",
                executionTime, previousClusterState.version(), task.source, previousClusterState), e);
            warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);
            task.listener.onFailure(task.source, e);
            return;
        }

        if (previousClusterState == newClusterState) {
            TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
            logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime);
            warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);
            task.listener.onSuccess(task.source);
        } else {
            if (logger.isTraceEnabled()) {
                logger.debug("cluster state updated, version [{}], source [{}]\n{}", newClusterState.version(), task.source,
                    newClusterState);
            } else {
                logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), task.source);
            }
            try {
                //执行状态更新
                applyChanges(task, previousClusterState, newClusterState, stopWatch);
                TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
                logger.debug("processing [{}]: took [{}] done applying updated cluster state (version: {}, uuid: {})", task.source,
                    executionTime, newClusterState.version(),
                    newClusterState.stateUUID());
                warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);
                task.listener.onSuccess(task.source);
            } catch (Exception e) {
                TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
                assert applicationMayFail();
                task.listener.onFailure(task.source, e);
            }
        }
    }

遍历所有状态应用者,调用集群状态的应用者的应用集群状态方法

//发送集群状态应用者
callClusterStateAppliers(clusterChangedEvent, stopWatch);
private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) {
        clusterStateAppliers.forEach(applier -> {
            logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());
            try (Releasable ignored = stopWatch.timing("running applier [" + applier + "]")) {
                applier.applyClusterState(clusterChangedEvent);
            }
        });
    }

遍历所有集群状态监听器,调用集群状态变更回调函数

//发送集群状态监听器
callClusterStateListeners(clusterChangedEvent, stopWatch);
//执行集群状态变更后的回调
    private void callClusterStateListeners(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) {
        Stream.concat(clusterStateListeners.stream(), timeoutClusterStateListeners.stream()).forEach(listener -> {
            try {
                logger.trace("calling [{}] with change to version [{}]", listener, clusterChangedEvent.state().version());
                try (Releasable ignored = stopWatch.timing("notifying listener [" + listener + "]")) {
                    listener.clusterChanged(clusterChangedEvent);
                }
            } catch (Exception ex) {
                logger.warn("failed to notify ClusterStateListener", ex);
            }
        });
    }

回到主节点执行回调函数handleResponse和handleException两个回调函数执行相同的处理逻辑,将latch减一,如果有的节点执行失败也不会执行修复逻辑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/777304.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于Qt实现的PDF阅读、编辑工具

记录一下实现pdf工具功能 语言&#xff1a;c、qt IDE&#xff1a;vs2017 环境&#xff1a;win10 一、功能演示&#xff1a; 二、功能介绍&#xff1a; 1.基于saribbon主体界面框架&#xff0c;该框架主要是为了实现类似word导航项 2.加载PDF放大缩小以及预览功能 3.pdf页面跳转…

Qt 网络编程 网络信息获取操作

学习目标&#xff1a;网络信息获取操作 前置环境 运行环境:qt creator 4.12 学习内容 一、Qt 网络编程基础 Qt 直接提供了网络编程模块,包括基于 TCP/IP 的客户端和服务器相关类,如 QTcpSocket/QTcpServer 和 QUdpSocket,以及实现 HTTP、FTP 等协议的高级类,如 QNetworkRe…

SPIN-Diffusion:自我博弈微调提升文本到图像扩散模型性能

扩散模型作为生成AI的关键实体&#xff0c;已经在多个领域展现出了卓越的能力。然而&#xff0c;现有的扩散模型&#xff0c;如Stable Diffusion和SDXL&#xff0c;通常在预训练阶段后需要进行微调以更好地符合人类偏好。最近&#xff0c;研究者们开始尝试使用强化学习&#xf…

矩阵键盘与密码锁

目录 1.矩阵键盘介绍​编辑 2.扫描的概念 3.代码演示&#xff08;读取矩阵键盘键码&#xff09; 4.矩阵键盘密码锁 1.矩阵键盘介绍 为了减少I/O口的占用&#xff0c;通常将按键排列成矩阵形式&#xff0c;采用逐行或逐列的 “扫描”&#xff0c;就可以读出任何位置按键的状态…

jenkins配置gitee源码地址连接不上

报错信息如下&#xff1a; 网上找了好多都没说具体原因&#xff0c;最后还是看jenkins控制台输出日志发现&#xff1a; ssh命令执行失败&#xff08;git环境有问题&#xff0c;可能插件没安装成功等其他问题&#xff09; 后面发现是jenkins配置git的地方git安装路径错了。新手…

帕金森病患者在选择运动疗法时应该注意哪些事项?

帕金森病患者在选择运动疗法时&#xff0c;应该遵循以下几点注意事项&#xff1a; 个性化运动处方&#xff1a;根据患者的病情、年龄、健康状况、以往运动能力等因素&#xff0c;制定个体化的运动处方。 避免运动负荷过大&#xff1a;运动时间不宜过长&#xff0c;注意控制心率…

机器学习 C++ 的opencv实现SVM图像二分类的测试 (三)【附源码】

机器学习 C 的opencv实现SVM图像二分类的测试 (三) 数据集合下载地址&#xff1a;https://download.csdn.net/download/hgaohr1021/89506900 根据上节得到的svm.xml&#xff0c;测试结果为&#xff1a; #include <stdio.h> #include <time.h> #include <o…

智慧生活新篇章,Vatee万腾平台领航前行

在21世纪的科技浪潮中&#xff0c;智慧生活已不再是一个遥远的梦想&#xff0c;而是正逐步成为我们日常生活的现实。从智能家居的温馨便捷&#xff0c;到智慧城市的高效运转&#xff0c;科技的每一次进步都在为我们的生活增添新的色彩。而在这场智慧生活的变革中&#xff0c;Va…

stm32定时器与pwm波

文章目录 4 TIM4.1 SysTick系统定时器4.2 TIM定时器中断与微秒级延时4.3 TIM使用PWM波4.3.1 PWM介绍4.3.2 无源蜂鸣器实现 4.4 TIM ,PWM常用函数 4 TIM 4.1 SysTick系统定时器 ​ Systick系统滴答&#xff0c;&#xff08;同时他有属于自己的中断&#xff0c;可以利用它来做看…

Star CCM+界面显示字体大小调整

前言 打开界面字体显示大小是默认的&#xff0c;软件内设置调整默认字体的大小是无法实现&#xff0c;需要在图标属性中进行设置&#xff0c;操作方法与中英文切换很类似&#xff0c;具体方法如下&#xff1a; 操作流程 1. 右击Star-CCM快捷⽅式&#xff0c;选择“属性”&…

【Mindspore进阶】-03.ShuffleNet实战

ShuffleNet图像分类 当前案例不支持在GPU设备上静态图模式运行&#xff0c;其他模式运行皆支持。 ShuffleNet网络介绍 ShuffleNetV1是旷视科技提出的一种计算高效的CNN模型&#xff0c;和MobileNet, SqueezeNet等一样主要应用在移动端&#xff0c;所以模型的设计目标就是利用有…

lodash-es 基本使用

中文文档&#xff1a;https://www.lodashjs.com/ cloneDeep方法文档&#xff1a;https://www.lodashjs.com/docs/lodash.cloneDeep#_clonedeepvalue 参考掘金文章&#xff1a;https://juejin.cn/post/7354940462061715497 安装&#xff1a; pnpm install lodash-esnpm地址&a…

Ad-hoc命令和模块简介

华子目录 Ad-hoc命令和模块简介1.概念2.格式3.Ansible命令常用参数4.模块类型4.1 三种模块类型4.2Ansible核心模块和附加模块 示例1示例2 Ad-hoc命令和模块简介 1.概念 Ansible提供两种方式去完成任务&#xff0c;一是ad-hoc命令&#xff0c;一是写Ansible playbook(剧本)Ad-…

理解抽象工厂设计模式

目录 抽象工厂模式抽象工厂模式结构抽象工厂模式适合应用场景抽象工厂模式优缺点练手题目题目描述输入描述输出描述提示信息题解 抽象工厂模式 抽象工厂模式是一种创建型设计模式&#xff0c; 它能创建一系列相关的对象&#xff0c; 而无需指定其具体类。 抽象工厂模式结构 抽…

代码随想录Day69(图论Part05)

并查集 // 1.初始化 int fa[MAXN]; void init(int n) {for (int i1;i<n;i)fa[i]i; }// 2.查询 找到的祖先直接返回&#xff0c;未进行路径压缩 int.find(int i){if(fa[i] i)return i;// 递归出口&#xff0c;当到达了祖先位置&#xff0c;就返回祖先elsereturn find(fa[i])…

nginx的知识面试易考点

Nginx概念 Nginx 是一个高性能的 HTTP 和反向代理服务。其特点是占有内存少&#xff0c;并发能力强&#xff0c;事实上nginx的并发能力在同类型的网页服务器中表现较好。 Nginx 专为性能优化而开发&#xff0c;性能是其最重要的考量指标&#xff0c;实现上非常注重效率&#…

EasyExcel 单元格根据图片数量动态设置宽度

在使用 EasyExcel 导出 Excel 时&#xff0c;如果某个单元格是图片内容&#xff0c;且存在多张图片&#xff0c;此时就需要单元格根据图片数量动态设置宽度。 经过自己的研究和实验&#xff0c;导出效果如下&#xff1a; 具体代码如下&#xff1a; EasyExcel 版本 <depen…

SQL使用join查询方式找出没有分类的电影id以及名称

系列文章目录 文章目录 系列文章目录前言 前言 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站&#xff0c;这篇文章男女通用&#xff0c;看懂了就去分享给你的码吧。 描述 现有电影信息…

UE5 03-物体碰撞检测

在你需要碰撞的物体上添加一个碰撞检测组件 碰撞预设 设置为NoCollision,这样移动过程中就不会有物理碰撞阻挡效果,只负责检测是否碰撞,比较难解释,如果学过Unity的话,可以把它理解成 Collision 为 Trigger -------------------下面这个有点像Unity的OnTriggerEnter,跟OnColli…

TinyDB,既是python模块也是数据库

目录 什么是TinyDB&#xff1f; 为什么选择TinyDB&#xff1f; 安装TinyDB TinyDB的基本使用 创建数据库 存储数据 查询数据 更新数据 删除数据 高级功能 索引 事务 结论 什么是TinyDB&#xff1f; 在Python的世界中&#xff0c;处理数据是编程中不可或缺的一部分…