ES源码五:写操作流程(从Es到底层Luence,全网最细的一篇,全是硬货)

今天是玩转es的一天

创建索引

image.png

写入文档

image.png

入口BaseRestHandler

BaseRestHandler是Rest请求的入口,你可以理解为spring mvc里面的controller一样
image.png
prepareRequest是一个抽象方法,实际上是由各种Rest*Action来重写的,比如这里我们是对索引文档的处理,会执行到RestIndexAction.prepareRequest

**RestIndexAction.**prepareRequest

这个方法最终会被文档写入的RestIndexAction处理

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
    // todo 1. 索引操作请求
    IndexRequest indexRequest;
    // todo 2. 拿出Rest请求中的{Type} 信息,一般情况咱们 这里都是_doc
    final String type = request.param("type");
    if (type != null && type.equals(MapperService.SINGLE_MAPPING_NAME) == false) {
        deprecationLogger.deprecate(DeprecationCategory.TYPES, "index_with_types", TYPES_DEPRECATION_MESSAGE);
        indexRequest = new IndexRequest(request.param("index"), type, request.param("id"));
    } else {
        // todo 3.创建索取请求对象,参数“索取结构的名称”
        indexRequest = new IndexRequest(request.param("index"));
        // todo 4. 索引文档的Id
        indexRequest.id(request.param("id"));
    }

    // todo 5. 索取操作指定的路由信息,默认情况下可以不指定,指定之后,你的寻找分片的操作,会使用指定的routing值做计算
    indexRequest.routing(request.param("routing"));
    // todo 6. ES内部提供了一套ingest逻辑,类似于ETL 数据清洗
    /**
         * todo
         *  如果你的索取请求配置了pipline的话,那么在服务端会先对数据进行“数据清洗”的操作
         *  索取请求 会进入到管道中,管道中有多个“Filter”,对你的doc进行数据整理,比如说
         *  doc并没有指定 “时间戳”的话,那么你可以在这一步给doc添加上“时间戳”
         *  功能远不止这些,详细的用法大家可以找官网看资料
         *  在服务端pipline是单独“存储”,每个pipeline都有自己的名称
         */
    indexRequest.setPipeline(request.param("pipeline"));
    // todo 7.设置索引请求的数据
    /**
         * todo
         *  参数一:二进制数据引用的对象,通过它可以读取到 索取请求 的消息体的二进制数据
         *  参数二:二进制数据类型是上面,一般情况这里都是json
         */
    indexRequest.source(request.requiredContent(), request.getXContentType());
    // todo 8. 超时设置,默认情况都是1分钟
    indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
    // todo 9. 刷盘策略
    /**
         * todo
         *  索取操作的大概流程:请求先写入到ES buffer中,之后再写translog。默认情况下 buffer 内的数据 1秒钟 刷一磁盘
         *  刷盘做什么那?其实就是调用Lucene的IndexWriter,将buffer内的indexRequest数据,写入到Lucene的段缓冲区中(os cache)
         *  refreshPolicy:你可以通过这个字段,来决策 你这个请求 是不是要强制刷盘
         *  buffer 什么时候会刷盘那? 1. 强刷 2.满了(500mb)3.1秒钟会刷1次
         */
    indexRequest.setRefreshPolicy(request.param("refresh"));
    // todo 10. 这2个参数是一个过期参数, es现在有新的字段去控制“并发修改了”
    indexRequest.version(RestActions.parseVersion(request));
    indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType()));
    // todo 11. 类似于学号,每个文档都会有一个独一无二的学号
    indexRequest.setIfSeqNo(request.paramAsLong("if_seq_no", indexRequest.ifSeqNo()));
    // todo 12. 类似于班级,在ES中Shard中,可以有1个PrimaryShard和多个ReplicationShard,当PrimaryShard发生变化之后,PrimaryTerm就会自增
    indexRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", indexRequest.ifPrimaryTerm()));
    indexRequest.setRequireAlias(request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, indexRequest.isRequireAlias()));
    // todo 13. 操作类型
    String sOpType = request.param("op_type");
    // todo 14. 默认等待多少个存活分片,本次的 索取请求 才可以去写那
    /**
         * todo
         *  默认情况下1个分片存活即可,即PrimaryShard是在线的就行
         *  如果你指定成2的话,那么需要有一个PrimaryShard在线和一个ReplicationShard在线,这次的索引请求才会被处理
         */
        String waitForActiveShards = request.param("wait_for_active_shards");
        if (waitForActiveShards != null) {
            indexRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
        }
        if (sOpType != null) {
            indexRequest.opType(sOpType);
        }

        // todo 15.返回一个function
        /**
         * todo
         *  调用nodeClient -> index方法
         *  参数1: indexRequest 上面构建出来的这个索取请求对象
         *  参数2: RestStatusToXContentListener 这个实例包装了 RestChannel对象,同这个listener可以将响应发送到 rest请求的 请求端
         */
        return channel ->
            client.index(indexRequest, new RestStatusToXContentListener<>(channel, r -> r.getLocation(indexRequest.routing())));
    }

总结一下一共做了哪些事:

  1. 将RestRequest转换为IndexRequest
    1. 设置文档id
    2. 设置routing路由
    3. 设置pipline,做数据清洗
    4. 设置超时
    5. 设置刷盘策略
    6. 设置文档的seqNo,类似于学号
    7. 设置文档的primaryTerm,类似于班级(这些参数后面都有用)

我们来看一下最终的indexRequest:
image.png

  1. 创建一个function(nodeClient.index(indexRequest))

action.accept =>nodeClient.index

image.png
这里最终就会执行上面说的那个function
image.png

NodeClient.index

这里先会调用到父类的AbstractClient.index方法
image.png
这里会创建一个indexAction的实例,接下来会调用到execute方法
image.png
这里的几个参数也看一下:

  1. action:之前创建的indexAction

image.png

  1. request:还是之前的indexRequest

image.png

NodeClient.doExecute

image.png
接下来调用doExecute方法会调用到executeLocal

NodeClient.executeLocal

image.png
这里会调用transportAction,通过IndexAction来获取transportIndexAction(也就是最终找到了Index的RPC处理逻辑入口):
image.png

下一步进入TransportIndexAction处理逻辑

TransportIndexAction.execute

image.png
里面是空的,也就是会调用父类TransportAction.execute方法

TransportAction.execute

image.png
这里会调用到execute(task,request, actionListener)
image.png
这里最终会调用到TransportBulkAction

到这里总结一下:

  1. nodeClient 通过IndexAction PRC类型找出对应的PRC处理器(TransportIndexAction)
  2. TransportIndexAction 将请求 转发给 另外一个“PRC处理器”(TransportBulkAction)

TransportBulkAction.doExecute

image.png
到这里的时候,会发现请求从indexRequest已经变成bulkRequest了,已经帮你打好包了,我们看一下bulkRequest里面是什么?
image.png

TransportBulkAction.doInternalExecute

//todo  参数1:task
    // 参数2:bulkRequest 批量索引请求对象
    // 参数3:executorName 执行器名称
    // 参数4:releasingListener 会在执行 onResponse 或者 onFailure 方法之前 先执行 releasable#close 方法,去更新
    // IndexingPressure 对象内的字段..
    protected void doInternalExecute(Task task, BulkRequest bulkRequest, String executorName, ActionListener<BulkResponse> listener) {
        // todo 1. 开始时间戳
        final long startTime = relativeTime();
        // todo 2. 响应数组,大小为 “bulk中的indexRequest”的数量,也就是说 响应是一对一的
        final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
        // todo 3. pipeline 相关代码
        boolean hasIndexRequestsWithPipelines = false;
        final Metadata metadata = clusterService.state().getMetadata();
        final Version minNodeVersion = clusterService.state().getNodes().getMinNodeVersion();
        for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
            IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
            if (indexRequest != null) {
                // Each index request needs to be evaluated, because this method also modifies the IndexRequest
                boolean indexRequestHasPipeline = IngestService.resolvePipelines(actionRequest, indexRequest, metadata);
                hasIndexRequestsWithPipelines |= indexRequestHasPipeline;
            }

            if (actionRequest instanceof IndexRequest) {
                IndexRequest ir = (IndexRequest) actionRequest;
                ir.checkAutoIdWithOpTypeCreateSupportedByVersion(minNodeVersion);
                if (ir.getAutoGeneratedTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) {
                    throw new IllegalArgumentException("autoGeneratedTimestamp should not be set externally");
                }
            }
        }

        if (hasIndexRequestsWithPipelines) {
            // this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but
            // also with IngestService.NOOP_PIPELINE_NAME on each request. This ensures that this on the second time through this method,
            // this path is never taken.
            try {
                if (Assertions.ENABLED) {
                    final boolean arePipelinesResolved = bulkRequest.requests()
                        .stream()
                        .map(TransportBulkAction::getIndexWriteRequest)
                        .filter(Objects::nonNull)
                        .allMatch(IndexRequest::isPipelineResolved);
                    assert arePipelinesResolved : bulkRequest;
                }
                if (clusterService.localNode().isIngestNode()) {
                    processBulkIndexIngestRequest(task, bulkRequest, executorName, listener);
                } else {
                    ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
                }
            } catch (Exception e) {
                listener.onFailure(e);
            }
            return;
        }
        // todo pipline结束

        // Attempt to create all the indices that we're going to need during the bulk before we start.
        // Step 1: collect all the indices in the request
        // todo 4. 获取出bulk请求所有涉及到的索引信息
        //  key:索引名称  value:别名.. 暂时先不考虑 别名的情况.
        final Map<String, Boolean> indices = bulkRequest.requests.stream()
            // delete requests should not attempt to create the index (if the index does not
            // exists), unless an external versioning is used
            .filter(request -> request.opType() != DocWriteRequest.OpType.DELETE
                || request.versionType() == VersionType.EXTERNAL
                || request.versionType() == VersionType.EXTERNAL_GTE)
            .collect(Collectors.toMap(DocWriteRequest::index, DocWriteRequest::isRequireAlias, (v1, v2) -> v1 || v2));

        // todo 5. 保存无法创建的索引
        // Step 2: filter the list of indices to find those that don't currently exist.
        final Map<String, IndexNotFoundException> indicesThatCannotBeCreated = new HashMap<>();
        // todo 6. 保存本地bulk请求 自动创建的那部分索引
        Set<String> autoCreateIndices = new HashSet<>();
        // todo 7. 集群状态对象
        ClusterState state = clusterService.state();
        // todo 8. 遍历bulk涉及的索引map
        for (Map.Entry<String, Boolean> indexAndFlag : indices.entrySet()) {
            final String index = indexAndFlag.getKey();
            // todo 这一步去判断是否需要去创建“index”的索引结构
            boolean shouldAutoCreate = indexNameExpressionResolver.hasIndexAbstraction(index, state) == false;
            // We should only auto create if we are not requiring it to be an alias
            if (shouldAutoCreate && (indexAndFlag.getValue() == false)) {
                // todo 将需要被创建 索引结构 的索引名称 保存下来..
                autoCreateIndices.add(index);
            }
        }

        // Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
        // todo 如果索引结构都存在
        if (autoCreateIndices.isEmpty()) {
            executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
        } else {
            // todo 如果索引结构不存在
            // todo 计数器:初始值需要自动创建索引的数量
            final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
            // todo 遍历需要自动创建索引的结构的索引,去创建
            for (String index : autoCreateIndices) {
                // todo createIndex 底层会向Master发起 RPC 请求,让主节点去做创建索引结构的逻辑..
                //  成功或者失败 都会调用到 本机的  ActionListener 内部封装的逻辑...
                createIndex(index, bulkRequest.timeout(), minNodeVersion, new ActionListener<CreateIndexResponse>() {
                    @Override
                    public void onResponse(CreateIndexResponse result) {
                        if (counter.decrementAndGet() == 0) {
                            // todo 什么条件才会成立呢?所有的待创建索引结构,都创建了(成功或者失败),这种情况下会走这里..

                            //  根据executorName拿到一组线程资源,然后执行 事件...
                            threadPool.executor(executorName).execute(new ActionRunnable<BulkResponse>(listener) {

                                @Override
                                protected void doRun() {
                                    // todo 9. 执行批量,比较重要
                                    executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
                                }
                            });
                        }
                    }

                    @Override
                    public void onFailure(Exception e) {
                        final Throwable cause = ExceptionsHelper.unwrapCause(e);
                        if (cause instanceof IndexNotFoundException) {
                            indicesThatCannotBeCreated.put(index, (IndexNotFoundException) cause);
                        }
                        else if ((cause instanceof ResourceAlreadyExistsException) == false) {
                            // fail all requests involving this index, if create didn't work
                            for (int i = 0; i < bulkRequest.requests.size(); i++) {
                                DocWriteRequest<?> request = bulkRequest.requests.get(i);
                                if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) {
                                    bulkRequest.requests.set(i, null);
                                }
                            }
                        }
                        if (counter.decrementAndGet() == 0) {
                            final ActionListener<BulkResponse> wrappedListener = ActionListener.wrap(listener::onResponse, inner -> {
                                inner.addSuppressed(e);
                                listener.onFailure(inner);
                            });
                            threadPool.executor(executorName).execute(new ActionRunnable<BulkResponse>(wrappedListener) {
                                @Override
                                protected void doRun() {
                                    executeBulk(task, bulkRequest, startTime, wrappedListener, responses, indicesThatCannotBeCreated);
                                }

                                @Override
                                public void onRejection(Exception rejectedException) {
                                    rejectedException.addSuppressed(e);
                                    super.onRejection(rejectedException);
                                }
                            });
                        }
                    }
                });
            }
        }
    }

总结一下这个方法做了哪些事:

  1. 遍历bulk涉及到的“索引结构名称”

image.png

  1. 找出不存在的“索引结构”,通过RPC ->Master 进行创建索引结构。

image.png

  1. 执行transportBulkAction.executeBulk方法(也就是批量方法)

这里有一个比较重要的对象:ClusterState(集群状态对象)
image.png

  • nodes:节点信息: 包含了集群中所有节点的信息。它是一个映射,键是节点的ID。

image.png

  • routing table: 包含了所有索引的分片和副本的分配情况,即哪些节点包含哪些分片。

image.png
创建了几个索引,里面就是几个routingTable值
其中my_index就是我在使用的索引,我们创建的时候设置为5,这里正好创建了5个indexShardRoutingTable(这是单个分片的路由信息,包含一个主分片和多个副本分片的具体分布,每一个分片是通过 ShardRouting对象表示)
每一个ShardRouting对象就表示单个分片中主分片、副本分片、分片id、分片所在的节点,分片的当前状态

  • Metadata: 这部分存储了关于集群中所有索引的元数据,包括索引名称、 设置、映射、别名等

image.png


### TransportBulkAction.executeBulk ![image.png](https://img-blog.csdnimg.cn/img_convert/5b611969fb9c3153f6179150acab5ef0.png)
这里创建了BulkOperation,并且会调用它的run方法, **它的run方法最终会跑到BulkOperation.doRun方法中**

BulkOperation.doRun

    @Override
        protected void doRun() {
            assert bulkRequest != null;

            // todo 1. 通过observer 集群状态,observer这个方法 在内部 通过 clusterService 拿到集群状态,并保存到它内部字段中..
            final ClusterState clusterState = observer.setAndGetObservedState();

            // todo 2. 通过集群状态 判断集群是否可提供正常服务..
            if (handleBlockExceptions(clusterState)) {
                return;
            }

            //todo 3 当做一个工具,用于获取出索引请求的真实的索引名称(因为ES支持别名...内部做了解析别名的逻辑..)
            // 参数1:集群状态
            // 参数2:索引名称解析器
            final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);

            // todo 4. 集群元数据信息..(集群有哪些索引,索引有哪些Shard,Shard都在哪里..)
            Metadata metadata = clusterState.metadata();

            // todo 5.循环进行分组 等逻辑
            for (int i = 0; i < bulkRequest.requests.size(); i++) {
                DocWriteRequest<?> docWriteRequest = bulkRequest.requests.get(i);
                //the request can only be null because we set it to null in the previous step, so it gets ignored
                // todo 获取出当前索引请求对象
                if (docWriteRequest == null) {
                    continue;
                }
                if (addFailureIfRequiresAliasAndAliasIsMissing(docWriteRequest, i, metadata)) {
                    continue;
                }
                // todo 处理 索引结构 不在线 等等情况,将这些情况的 写请求跳过,,并设置好 对应的response
                if (addFailureIfIndexIsUnavailable(docWriteRequest, i, concreteIndices, metadata)) {
                    continue;
                }
                // todo 通过“工具”获取出来 当前请求的索引去名称
                Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest);
                try {
                    // The ConcreteIndices#resolveIfAbsent(...) method validates via IndexNameExpressionResolver whether
                    // an operation is allowed in index into a data stream, but this isn't done when resolve call is cached, so
                    // the validation needs to be performed here too.
                    // todo 通过集群状态 获取 元数据 信息 -> 通过元数据 再拿到 索引映射表 -> 通过映射表 拿到当前 indexRequest 的indexAbstraction
                    //      indexAbstraction:通过它可以拿到当前这个索引的 元数据信息。比如说 有哪些分片...
                    IndexAbstraction indexAbstraction = clusterState.getMetadata().getIndicesLookup().get(concreteIndex.getName());
                    if (indexAbstraction.getParentDataStream() != null &&
                        // avoid valid cases when directly indexing into a backing index
                        // (for example when directly indexing into .ds-logs-foobar-000001)
                        concreteIndex.getName().equals(docWriteRequest.index()) == false &&
                        docWriteRequest.opType() != DocWriteRequest.OpType.CREATE) {
                        throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams");
                    }

                    switch (docWriteRequest.opType()) {
                        //todo CREATE 和 INDEX 有什么区别呢?
                        // POST /CREATE/...
                        // POST /INDEX/....
                        // CREATE:它会考虑文档是否存在的情况,如果存在一个ID相同的文档的话,CREATE请求会拒绝本次索引操作
                        // INDEX 则会覆盖..
                        case CREATE:
                        case INDEX:
                            prohibitAppendWritesInBackingIndices(docWriteRequest, metadata);
                            prohibitCustomRoutingOnDataStream(docWriteRequest, metadata);
                            IndexRequest indexRequest = (IndexRequest) docWriteRequest;
                            // todo 获取当前请求的索引元数据
                            final IndexMetadata indexMetadata = metadata.index(concreteIndex);
                            // todo 获取索引的mapping信息
                            MappingMetadata mappingMd = indexMetadata.mappingOrDefault();
                            // todo 获取版本信息
                            Version indexCreated = indexMetadata.getCreationVersion();
                            // todo 解析routing,暂时咱们认为 routing 就是 POST 时传递的routing即可。
                            //  它里面的逻辑 还是跟别名有关系
                            indexRequest.resolveRouting(metadata);
                            // todo 当你未指定文档ID的情况下,内部会给indexRequest生成一个文档ID
                            indexRequest.process(indexCreated, mappingMd, concreteIndex.getName());
                            break;
                        case UPDATE:
                            TransportUpdateAction.resolveAndValidateRouting(metadata, concreteIndex.getName(),
                                (UpdateRequest) docWriteRequest);
                            break;
                        case DELETE:
                            docWriteRequest.routing(metadata.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index()));
                            // check if routing is required, if so, throw error if routing wasn't specified
                            if (docWriteRequest.routing() == null && metadata.routingRequired(concreteIndex.getName())) {
                                throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id());
                            }
                            break;
                        default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
                    }
                } catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException e) {
                    BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.type(),
                        docWriteRequest.id(), e);
                    BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure);
                    responses.set(i, bulkItemResponse);
                    // make sure the request gets never processed again
                    bulkRequest.requests.set(i, null);
                }
            }

            // todo 6.
            // first, go over all the requests and create a ShardId -> Operations mapping
            Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
            for (int i = 0; i < bulkRequest.requests.size(); i++) {
                DocWriteRequest<?> request = bulkRequest.requests.get(i);
                if (request == null) {
                    continue;
                }
                String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
                //todo 计算本次索引请求要写入的分片ID
                // 第一步拿到了一个 OperationRouting 对象,这个对象内部封装了 路由算法..
                // 参数1:clusterState 集群状态
                // 参数2:索引名称
                // 参数3:文档id
                // 参数4:routing信息
                ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(),
                    request.routing()).shardId();
                // todo 如果map中不存在shardId的k-v的情况,那么使用参数2 shard-> new arraylist 在map中插入一条 k-v
                List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
                // todo 将当前索引请求 加入到 分片的请求组 中,再次包装了“索取请求”,包装对象记录了 索引请求 在bulkRequest中的位置
                // todo 方便后续设置 响应结果
                shardRequests.add(new BulkItemRequest(i, request));
            }

            if (requestsByShard.isEmpty()) {
                // todo 如果分组之后,分组结果是空map的话,直接对端发送 响应
                listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),
                    buildTookInMillis(startTimeNanos)));
                return;
            }

            // todo counter:计数器,初始值为 bulk 请求分组之后,分组数量,也就是涉及到的 分片数量
            final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
            // todo 本机的 nodeId值
            String nodeId = clusterService.localNode().getId();
            // todo 遍历bulk 按照shard 分组的结果
            for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
                // todo 当前分片id
                final ShardId shardId = entry.getKey();
                // todo 当前分片的请求列表
                final List<BulkItemRequest> requests = entry.getValue();
                // todo 创建 批量分片请求对象 ,内部封装了 打在该分片上的 索取请求
                BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(),
                        requests.toArray(new BulkItemRequest[requests.size()]));
                // todo 需要等待在线的分片数量 (默认1)
                bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
                // todo 超时限制
                bulkShardRequest.timeout(bulkRequest.timeout());
                // todo 路由结果基于的集群状态版本号
                bulkShardRequest.routedBasedOnClusterVersion(clusterState.version());
                if (task != null) {
                    bulkShardRequest.setParentTask(nodeId, task.getId());
                }
                // todo 参数1: 分片批量请求对象 参数二: 响应监听器
                shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
                    @Override
                    public void onResponse(BulkShardResponse bulkShardResponse) {
                        for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
                            // we may have no response if item failed
                            if (bulkItemResponse.getResponse() != null) {
                                bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
                            }
                            responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
                        }
                        if (counter.decrementAndGet() == 0) {
                            finishHim();
                        }
                    }

                    @Override
                    public void onFailure(Exception e) {
                        // create failures for all relevant requests
                        for (BulkItemRequest request : requests) {
                            final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
                            DocWriteRequest<?> docWriteRequest = request.request();
                            responses.set(request.id(), new BulkItemResponse(request.id(), docWriteRequest.opType(),
                                    new BulkItemResponse.Failure(indexName, docWriteRequest.type(), docWriteRequest.id(), e)));
                        }
                        if (counter.decrementAndGet() == 0) {
                            finishHim();
                        }
                    }

                    private void finishHim() {
                        listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),
                            buildTookInMillis(startTimeNanos)));
                    }
                });
            }
            bulkRequest = null; // allow memory for bulk request items to be reclaimed before all items have been completed
        }

这段代码非常多,总结一下做了这些事:

  1. 遍历bulkRequest中的全部“DocWriteRequest”,对于未指定“_id”的写请求,这一步自动生成。
  2. 根据“id”进行路由算法,找到当前“DocWriteRequest”命中的“shard”,加入到Shard到请求分组中。

image.png

  1. 之后,根据Shard分组的请求,交给“TransportShardBulkAction”处理,其实这一步事交给“TransportShardBulkAction”的爷爷“TransportReplicaitonAction”来作为处理入口,从TransportBulkAction将数据 根据shardId分组,然后让调用TransportReplicationAction来完成分组 数据的插入、更新或者删除操作, 完成了很漂亮的解耦

image.png

另外说明一下为什么TransportBulkAction处理完成之后又给到TransportShardBulkAction?
TransportBulkAction批量请求中的各个操作(如索引、更新、删除操作)根据目标索引和分片进行分组。这是为了确保每个操作能够被路由到正确的分片上进行处理
TransportShardBulkAction:负责在特定分片上执行这些操作。它处理的是已经被TransportBulkAction分解和分派到具体分片的操作,对于每个分片,TransportShardBulkAction 执行实际的文档索引、更新或删除操作。这包括与Lucene索引交互、处理文档版本控制和执行任何必要的索引映射更新。

TransportReplicaitonAction.doExecute

image.png
image.png
这里又创建了一个ReroutePhase(从这个类也大概知道,它要重写路由,为什么要重新路由,因为当前节点不一定是主分片节点),然后调用了它的run方法,当然最终也是会执行到doRun方法

ReroutePhase.doRun

  @Override
        public void onFailure(Exception e) {
            finishWithUnexpectedFailure(e);
        }

        @Override
        protected void doRun() {
            setPhase(task, "routing");
            final ClusterState state = observer.setAndGetObservedState();
            final ClusterBlockException blockException = blockExceptions(state, request.shardId().getIndexName());
            if (blockException != null) {
                if (blockException.retryable()) {
                    logger.trace("cluster is blocked, scheduling a retry", blockException);
                    retry(blockException);
                } else {
                    finishAsFailed(blockException);
                }
            } else {
                // todo 1. 获取当前分片归属的索引的 索引元数据信息
                final IndexMetadata indexMetadata = state.metadata().index(request.shardId().getIndex());
                if (indexMetadata == null) {
                    // ensure that the cluster state on the node is at least as high as the node that decided that the index was there
                    // todo 2.什么时候会发生state.version() < request。routedBasedOnClusterVersion()
                    //  ReroutePhase调用点存在两个:
                    //  1. 本地TransportBulkAction 通过 TransportShardBulkAction.execute
                    //  2. 远程RPC发起 indices:data/write/bulk[s][p] 事件,最终也会在这里创建 ReroutePhase 去执行逻辑..
                    if (state.version() < request.routedBasedOnClusterVersion()) {
                        logger.trace("failed to find index [{}] for request [{}] despite sender thinking it would be here. " +
                                "Local cluster state version [{}]] is older than on sending node (version [{}]), scheduling a retry...",
                            request.shardId().getIndex(), request, state.version(), request.routedBasedOnClusterVersion());
                        retry(new IndexNotFoundException("failed to find index as current cluster state with version [" + state.version() +
                            "] is stale (expected at least [" + request.routedBasedOnClusterVersion() + "]",
                            request.shardId().getIndexName()));
                        return;
                    } else {
                        finishAsFailed(new IndexNotFoundException(request.shardId().getIndex()));
                        return;
                    }
                }

                if (indexMetadata.getState() == IndexMetadata.State.CLOSE) {
                    finishAsFailed(new IndexClosedException(indexMetadata.getIndex()));
                    return;
                }

                if (request.waitForActiveShards() == ActiveShardCount.DEFAULT) {
                    // if the wait for active shard count has not been set in the request,
                    // resolve it from the index settings
                    request.waitForActiveShards(indexMetadata.getWaitForActiveShards());
                }
                assert request.waitForActiveShards() != ActiveShardCount.DEFAULT :
                    "request waitForActiveShards must be set in resolveRequest";

                // todo 3. 获取“主分片”的路由信息
                final ShardRouting primary = state.getRoutingTable().shardRoutingTable(request.shardId()).primaryShard();
                if (primary == null || primary.active() == false) {
                    logger.trace("primary shard [{}] is not yet active, scheduling a retry: action [{}], request [{}], "
                        + "cluster state version [{}]", request.shardId(), actionName, request, state.version());
                    retryBecauseUnavailable(request.shardId(), "primary shard is not active");
                    return;
                }
                if (state.nodes().nodeExists(primary.currentNodeId()) == false) {
                    logger.trace("primary shard [{}] is assigned to an unknown node [{}], scheduling a retry: action [{}], request [{}], "
                        + "cluster state version [{}]", request.shardId(), primary.currentNodeId(), actionName, request, state.version());
                    retryBecauseUnavailable(request.shardId(), "primary shard isn't assigned to a known node.");
                    return;
                }
                // todo 4. 获取主分片 所在节点信息
                final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
                if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
                    // todo 5.说明主分片 就是本地负责的 一个分片
                    performLocalAction(state, primary, node, indexMetadata);
                } else {
                    // todo 5. 说明主分片是有其他data node负责的一个分片,这里就需要走RPC转发请求了
                    performRemoteAction(state, primary, node);
                }
            }
        }

这段代码也是极其的长,总结一下做了哪些事:

  1. 通过集群状态读取PrimaryShard的路由信息,获取出primaryShard归属的Node节点信息。根据Node是否为当前节点来决定后续逻辑
  2. 如果是本机负责的分片,就执行performLocalAction
  3. 如果是其他机器负责的分片,就执行performRemoteAction

ReroutePhase.performLocalAction

image.png
一直往performAction里面跟,会发现执行RPC “indices:data/write/bulk [p]”,其实就是一次本地调用,rpcHandler为TransportReplicationAction.handlerPrimaryRequest(), 这个rpcHandler是在构造方法里面设置上的
image.png
在这个handlerPrimaryRequest里面会创建一个AsyncPrimaryAction:
image.png

ReroutePhase.performRemoteAction

image.png
这个performRemoteAction 会执行PRC“indices:data/write/bulk”,这是一次远程RPC调用,请求会发送到PrimaryShard归属节点。那这个PRC请求由哪个rpcHandler处理那?TransportReplicationAction.handleOperationRequest(),在这个方法会再次调用runReroutePhase逻辑,最终也是会走到performLocalAction😄

总结一下:

  1. 通过集群状态读取出PrimaryShard的路由信息,获取出PrimaryShard归属的Node节点信息。根据Node是否为当前节点来决定后续逻辑
  2. PrimaryShard为本机节点:执行 RPC “indices:data/write/bulk [p]”,其实是一次本地调用,rpcHandler为 TransportReplicationAction->handlerPrimaryRequest(…),在这个方法创建了一个叫做“AsyncPrimaryAction”的AbstractRunnable的实现对象
  3. PrimaryShard为其它节点:执行RPC “indices:data/write/bulk”,这是一次远程RPC调用,请求会发送到PrimaryShard归属节点。

AsyncPrimaryAction.doRun

 @Override
        protected void doRun() throws Exception {
            // todo 1. 获取出请求分片的id和对象
            final ShardId shardId = primaryRequest.getRequest().shardId();
            // todo 2. 获取本机中指定分片id的索引分片对象,这个对象是一个非常核心的对象,内部封装了 索引分配的 底层操作
            //  比如:写、查询、get、刷盘
            final IndexShard indexShard = getIndexShard(shardId);
            // todo 3.获取索引分配对象的分片路由信息
            final ShardRouting shardRouting = indexShard.routingEntry();
            // we may end up here if the cluster state used to route the primary is so stale that the underlying
            // index shard was replaced with a replica. For example - in a two node cluster, if the primary fails
            // the replica will take over and a replica will be assigned to the first node.
            if (shardRouting.primary() == false) {
                throw new ReplicationOperation.RetryOnPrimaryException(shardId, "actual shard is not a primary " + shardRouting);
            }
            final String actualAllocationId = shardRouting.allocationId().getId();
            // todo 如果本机指定的shardId的IndexShard它的allocationId不是 request中定义的 allocationId,这里直接抛出异常
            if (actualAllocationId.equals(primaryRequest.getTargetAllocationID()) == false) {
                throw new ShardNotFoundException(shardId, "expected allocation id [{}] but found [{}]",
                    primaryRequest.getTargetAllocationID(), actualAllocationId);
            }
            final long actualTerm = indexShard.getPendingPrimaryTerm();
            if (actualTerm != primaryRequest.getPrimaryTerm()) {
                // todo 如果分片实际的primaryTerm并不是请求的primaryTerm,可能在此期间 shardId 下的实例 发生过 主分片的选举
                throw new ShardNotFoundException(shardId, "expected allocation id [{}] with term [{}] but found [{}]",
                    primaryRequest.getTargetAllocationID(), primaryRequest.getPrimaryTerm(), actualTerm);
            }

            // todo 4. 获取操作凭证
            //  参数一:indexShard 主分片对象
            //  参数二:request 获取的是 shardBulkRequest对象
            //  参数三:封装了一个监听器,获取完成凭证之后的处理逻辑
            acquirePrimaryOperationPermit(
                    indexShard,
                    primaryRequest.getRequest(),
                    ActionListener.wrap(
                            releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)),
                            e -> {
                                if (e instanceof ShardNotInPrimaryModeException) {
                                    onFailure(new ReplicationOperation.RetryOnPrimaryException(shardId, "shard is not in primary mode", e));
                                } else {
                                    onFailure(e);
                                }
                            }));
        }

// todo 参数 primaryShardReference封装了 主分片对象和 操作凭证释放器
        void runWithPrimaryShardReference(final PrimaryShardReference primaryShardReference) {
            try {
                // todo 1. 获取集群状态
                final ClusterState clusterState = clusterService.state();
                // todo 2. 通过集群状态最终获取到 索引元数据 信息
                final IndexMetadata indexMetadata = clusterState.metadata().getIndexSafe(primaryShardReference.routingEntry().index());

                final ClusterBlockException blockException = blockExceptions(clusterState, indexMetadata.getIndex().getName());
                if (blockException != null) {
                    logger.trace("cluster is blocked, action failed on primary", blockException);
                    throw blockException;
                }

                // todo 如果主分片在请求期间 被转移到其他节点负责了
                if (primaryShardReference.isRelocated()) {
                    // todo 关闭,释放凭证
                    primaryShardReference.close(); // release shard operation lock as soon as possible
                    setPhase(replicationTask, "primary_delegation");
                    // delegate primary phase to relocation target
                    // it is safe to execute primary phase on relocation target as there are no more in-flight operations where primary
                    // phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase.
                    // todo 获取最新的主分片路由信息
                    final ShardRouting primary = primaryShardReference.routingEntry();
                    assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary;
                    final Writeable.Reader<Response> reader = TransportReplicationAction.this::newResponseInstance;
                    // todo 获取转移到的节点
                    DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId());
                    // todo 转发请求
                    transportService.sendRequest(relocatingNode, transportPrimaryAction,
                        new ConcreteShardRequest<>(primaryRequest.getRequest(), primary.allocationId().getRelocationId(),
                            primaryRequest.getPrimaryTerm()), transportOptions,
                        new ActionListenerResponseHandler<Response>(onCompletionListener, reader) {
                            @Override
                            public void handleResponse(Response response) {
                                setPhase(replicationTask, "finished");
                                super.handleResponse(response);
                            }

                            @Override
                            public void handleException(TransportException exp) {
                                setPhase(replicationTask, "finished");
                                super.handleException(exp);
                            }
                        });
                } else {
                    // todo 3.正常情况走这里:即主分片由当前节点负责
                    setPhase(replicationTask, "primary");
                    // todo 4. 创建监听器:“主分片写结果监听器” 调用时机:主分片写完成+ 副本分片也写完成的时候 才会调用这个监听器
                    final ActionListener<Response> responseListener = ActionListener.wrap(response -> {
                        // todo onResponse流程
                        adaptResponse(response, primaryShardReference.indexShard);
                        // todo 默认:syncGlobalCheckpointAfterOperation 为true
                        if (syncGlobalCheckpointAfterOperation) {
                            try {
                                primaryShardReference.indexShard.maybeSyncGlobalCheckpoint("post-operation");
                            } catch (final Exception e) {
                                // only log non-closed exceptions
                                if (ExceptionsHelper.unwrap(
                                    e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
                                    // intentionally swallow, a missed global checkpoint sync should not fail this operation
                                    logger.info(
                                        new ParameterizedMessage(
                                            "{} failed to execute post-operation global checkpoint sync",
                                            primaryShardReference.indexShard.shardId()), e);
                                }
                            }
                        }

                        primaryShardReference.close(); // release shard operation lock before responding to caller
                        setPhase(replicationTask, "finished");
                        onCompletionListener.onResponse(response);
                    }, e -> handleException(primaryShardReference, e));

                    // todo 6. 封装了很多参数,然后创建了ReplicationOperation,这个对象又去封装了 主分片写操作 + 副本写操作,当写操作都完成,去调用
                    /**
                     * todo
                     *  参数一:primaryRequest.getRequest()  =>  ShardBulkRequest 对象
                     *  参数二:primaryShardReference
                     *  参数三:写结果监听器
                     *  参数四:newReplicasProxy() => 拿到一个 ReplicasProxy 实例,该实例封装了 向 副本同步数据的 RPC 请求逻辑..
                     *  参数五:logger
                     *  参数六:threadPool 线程池(transportReplicationAction的资源..)
                     *  参数七:actionName 值:"indices:data/write/bulk[s]"
                     *  参数八:primaryRequest.getPrimaryTerm() => 主分片的primaryTerm
                     *  参数九:initialRetryBackoffBound 初始值50毫秒,当向副本所在Node发起RPC同步数据失败时,重试的时间间隔 初始值,
                     *  参数十:retryTimeout 向副本所在node发起rpc同步数据,重试的超时限制
                     *
                     */
                    new ReplicationOperation<>(primaryRequest.getRequest(), primaryShardReference,
                        responseListener.map(result -> result.finalResponseIfSuccessful),
                        newReplicasProxy(), logger, threadPool, actionName, primaryRequest.getPrimaryTerm(), initialRetryBackoffBound,
                        retryTimeout)
                        .execute();
                }
            } catch (Exception e) {
                handleException(primaryShardReference, e);
            }
        }

这里还是总结一下:

  1. 先获取请求的分片id,通过分片id拿到“索取分片对象”:indexShard

image.png

  1. 获取索引分片对象的分片路由对象

image.png

  1. 获取操作凭证
  2. 先获取集群状态对象,然后通过**集群状态对象、分片路由对象 **来获取索引元数据

image.png

  1. 创建ReplicationOperation对象,执行execute方法

ReplicationOperation.execute

image.png
这里没什么核心的,只要关注primary.perform方法就行

PrimaryShardReference.�perform

image.png
这个一个抽象方法,最终被子类的TransportWriteAction实现

TransportWriteAction.shardOperationOnPrimary

image.png
同样,这个dispatchedShardOperationOnPrimary依旧是个抽象方法,最终会被TransportShardBulkAction实现,兜兜转转又回来了。。

TransportShardBulkAction.dispatchedShardOperationOnPrimary

image.png
接下来会调用到transportShardBulkAction.performOnPrimary方法

TransportShardBulkAciton.performOnPrimary

image.png

�TransportShardBulkAciton.executeBulkItemRequest

  static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier,
                                       MappingUpdatePerformer mappingUpdater, Consumer<ActionListener<Void>> waitForMappingUpdate,
                                       ActionListener<Void> itemDoneListener) throws Exception {
        // todo 1. 获取当前子请求的操作类型:目前仅考虑 INDEX CREATE 这两种情况
        final DocWriteRequest.OpType opType = context.getCurrent().opType();

        final UpdateHelper.Result updateResult;
        if (opType == DocWriteRequest.OpType.UPDATE) {
            final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent();
            try {
                updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), nowInMillisSupplier);
            } catch (Exception failure) {
                // we may fail translating a update to index or delete operation
                // we use index result to communicate failure while translating update request
                final Engine.Result result =
                    new Engine.IndexResult(failure, updateRequest.version());
                context.setRequestToExecute(updateRequest);
                context.markOperationAsExecuted(result);
                context.markAsCompleted(context.getExecutionResult());
                return true;
            }
            // execute translated update request
            switch (updateResult.getResponseResult()) {
                case CREATED:
                case UPDATED:
                    IndexRequest indexRequest = updateResult.action();
                    IndexMetadata metadata = context.getPrimary().indexSettings().getIndexMetadata();
                    MappingMetadata mappingMd = metadata.mappingOrDefault();
                    indexRequest.process(metadata.getCreationVersion(), mappingMd, updateRequest.concreteIndex());
                    context.setRequestToExecute(indexRequest);
                    break;
                case DELETED:
                    context.setRequestToExecute(updateResult.action());
                    break;
                case NOOP:
                    context.markOperationAsNoOp(updateResult.action());
                    context.markAsCompleted(context.getExecutionResult());
                    return true;
                default:
                    throw new IllegalStateException("Illegal update operation " + updateResult.getResponseResult());
            }
        } else {
            // todo 2. 执行到这里,说明当前请求操作类型为:index create 或者delete中一个
            context.setRequestToExecute(context.getCurrent());
            updateResult = null;
        }

        assert context.getRequestToExecute() != null; // also checks that we're in TRANSLATED state
        // todo 3. 主分片对象
        final IndexShard primary = context.getPrimary();
        // todo 4. 获取请求的version值,version是干嘛的那,es乐观锁
        final long version = context.getRequestToExecute().version();
        final boolean isDelete = context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE;
        final Engine.Result result;
        if (isDelete) {
            final DeleteRequest request = context.getRequestToExecute();
            result = primary.applyDeleteOperationOnPrimary(version, request.type(), request.id(), request.versionType(),
                request.ifSeqNo(), request.ifPrimaryTerm());
        } else {
            // todo 从上下文中获取当前的 索取请求
            final IndexRequest request = context.getRequestToExecute();
            /**
             * todo
             *  参数一:version 版本
             *  参数二:versionType 类型(默认:INTERNAL)
             *  参数三:SourceToParse对象 (内部引擎基于该对象进行索引工作)
             *  参数四:seqNo request.ifSeqNo() 新的版本控制
             *  参数五:primaryTerm request.ifPrimaryTerm() 新的版本控制
             *  参数六:request.getAutoGeneratedTimestamp() 时间戳
             *  参数七:boolean request.isRetry() 是否为重试
             */
            result = primary.applyIndexOperationOnPrimary(version, request.versionType(), new SourceToParse(
                    request.index(), request.type(), request.id(), request.source(), request.getContentType(), request.routing()),
                request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry());
        }

总结一下这里:

  1. 获取当前请求的操作类型
  2. 从上下文里面获取“主分片”对象, 因为后续要通过它完成写操作

image.png

  1. 封装SourceToParse

image.png
image.png

  1. 调用主分片的applyIndexOperationOnPrimary,进行底层写操作
  2. 根据不同的调用结果,进行处理

IndexShard.applyIndexOperationOnPrimary

image.png
继续往里面看:

private Engine.IndexResult applyIndexOperation(
    Engine engine, long seqNo, long opPrimaryTerm, long version,
    @Nullable VersionType versionType, long ifSeqNo, long ifPrimaryTerm,
    long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin,
    SourceToParse sourceToParse) throws IOException {

    // 断言,确保操作的主术语不大于当前分片的主术语
    assert opPrimaryTerm <= getOperationPrimaryTerm()
            : "op term [ " + opPrimaryTerm + " ] > shard term [" + getOperationPrimaryTerm() + "]";
    
    // 确保当前的操作源是允许写入的
    ensureWriteAllowed(origin);
    
    Engine.Index operation;
    try {
        // 解析文档类型
        final String resolvedType = mapperService.resolveDocumentType(sourceToParse.type());
        final SourceToParse sourceWithResolvedType;
        
        // 如果解析的文档类型与原始类型相同,使用原始的sourceToParse;否则创建一个新的sourceToParse实例
        if (resolvedType.equals(sourceToParse.type())) {
            sourceWithResolvedType = sourceToParse;
        } else {
            sourceWithResolvedType = new SourceToParse(sourceToParse.index(), resolvedType, sourceToParse.id(),
                sourceToParse.source(), sourceToParse.getXContentType(), sourceToParse.routing());
        }
        
        // 准备索引操作
        operation = prepareIndex(docMapper(resolvedType), sourceWithResolvedType,
            seqNo, opPrimaryTerm, version, versionType, origin, autoGeneratedTimeStamp, isRetry, ifSeqNo, ifPrimaryTerm);
        
        // 检查并更新动态映射
        Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
        if (update != null) {
            return new Engine.IndexResult(update);
        }
    } catch (Exception e) {
        // 解析或映射更新期间的任何异常都视为文档级失败,
        // 不会导致关闭分片的副作用
        verifyNotClosed(e);
        return new Engine.IndexResult(e, version, opPrimaryTerm, seqNo);
    }

    // 执行索引操作,并返回操作结果
    return index(engine, operation);
}

�继续往里面看index方法:

private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOException {
    // 将活动状态设为真,可能用于表示开始了一个新的索引操作
    active.set(true);
    final Engine.IndexResult result;

    // 在实际索引之前,调用监听器的 preIndex 方法,可能用于执行一些预处理操作
    index = indexingOperationListeners.preIndex(shardId, index);
    try {
        // 如果启用了 Trace 日志级别,则记录详细的索引操作信息
        if (logger.isTraceEnabled()) {
            // 记录索引请求的详细信息,包括文档类型、ID、序列号等,注意这里没有将源数据转为字符串,以避免编码错误
            logger.trace("index [{}][{}] seq# [{}] allocation-id [{}] primaryTerm [{}] operationPrimaryTerm [{}] origin [{}]",
                index.type(), index.id(), index.seqNo(), routingEntry().allocationId(), index.primaryTerm(), getOperationPrimaryTerm(),
                index.origin());
        }
        // 执行索引操作,将文档索引到 Elasticsearch
        result = engine.index(index);

        // 再次检查是否启用了 Trace 日志级别,如果是,则记录索引完成后的状态
        if (logger.isTraceEnabled()) {
            logger.trace("index-done [{}][{}] seq# [{}] allocation-id [{}] primaryTerm [{}] operationPrimaryTerm [{}] origin [{}] " +
                    "result-seq# [{}] result-term [{}] failure [{}]",
                index.type(), index.id(), index.seqNo(), routingEntry().allocationId(), index.primaryTerm(), getOperationPrimaryTerm(),
                index.origin(), result.getSeqNo(), result.getTerm(), result.getFailure());
        }
    } catch (Exception e) {
        // 如果索引操作过程中抛出异常,记录异常信息
        if (logger.isTraceEnabled()) {
            logger.trace(new ParameterizedMessage(
                "index-fail [{}][{}] seq# [{}] allocation-id [{}] primaryTerm [{}] operationPrimaryTerm [{}] origin [{}]",
                index.type(), index.id(), index.seqNo(), routingEntry().allocationId(), index.primaryTerm(), getOperationPrimaryTerm(),
                index.origin()
            ), e);
        }
        // 调用监听器的 postIndex 方法,可能用于处理异常后的清理工作
        indexingOperationListeners.postIndex(shardId, index, e);
        // 重新抛出异常,使得调用者能够处理或记录这个异常
        throw e;
    }
    // 如果索引操作成功,调用监听器的 postIndex 方法,可能用于执行一些后处理操作
    indexingOperationListeners.postIndex(shardId, index, result);
    // 返回索引结果
    return result;
}

�看这里的engine.index方法

InternalEngine

image.png
image.png
image.png
�最终调用到这里,到这里,是不是大彻大悟了😄,
indexWriter就是lucene的API了,想学Lucene的可以再看它的源码去

![image.png](https://cdn.nlark.com/yuque/0/2024/png/772987/1713785040929-dc113277-9f84-46dc-8fb1-343cdbfbac1e.png#averageHue=%23212226&clientId=u3c9595f3-33f7-4&from=paste&height=208&id=u4e6be745&originHeight=416&originWidth=2096&originalType=binary&ratio=2&rotation=0&s

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/564969.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【C语言】strstr函数刨析-----字符串查找

目录 一、strstr 函数介绍 ✨函数头文件&#xff1a; ✨函数原型&#xff1a; ✨函数解读 ✨功能演示 二、函数的原理以及模拟实现 ✨函数原理 ✨函数的模拟实现 三、strstr函数的注意事项 四、共勉 一、strstr 函数介绍 strstr函数是在一个字符串中查找另一个字符…

K8s: Ingress对象, 创建Ingress控制器, 创建Ingress资源并暴露服务

Ingress对象 1 &#xff09;概述 Ingress 是对集群中服务的外部访问进行管理的 API 对象&#xff0c;典型的访问方式是 HTTPIngress-nginx 本质是网关&#xff0c;当你请求 abc.com/service/a, Ingress 就把对应的地址转发给你&#xff0c;底层运行了一个 nginx但 K8s 为什么不…

GitOps 和 DevOps 有什么区别?

GitLab 是一个全球知名的一体化 DevOps 平台&#xff0c;很多人都通过私有化部署 GitLab 来进行源代码托管。极狐GitLab &#xff1a;https://gitlab.cn/install?channelcontent&utm_sourcecsdn 是 GitLab 在中国的发行版&#xff0c;专门为中国程序员服务。可以一键式部署…

C语言:数据结构(单链表)

目录 1. 链表的概念及结构2. 实现单链表3. 链表的分类 1. 链表的概念及结构 概念&#xff1a;链表是一种物理存储结构上非连续、非顺序的存储结构&#xff0c;数据元素的逻辑顺序是通过链表的指针链接次序实现的。 链表的结构跟火车车厢相似&#xff0c;淡季时车次的车厢会相应…

六、项目发布 -- 4. 电子书详情页API开发、电子书列表API开发

电子书详情页API的编写 同理如下app.get中路由、回调&#xff1b;回调中要连接数据库、接收前端传过来的值、到数据库中做查询&#xff0c;然后回调&#xff08;如果回调失败返回什么JSON&#xff0c;如果回调成功返回什么JSON&#xff09;&#xff1b;最后千万别忘记了关闭数…

mapbox控制3D模型旋转

贴个群号 WebGIS学习交流群461555818&#xff0c;欢迎大家 效果 原理与源码 获取角度&#xff0c;然后一直更改角度&#xff0c;角度到达180度后赋值成-180度&#xff0c;然后转到开始获取的角度的角度的时候就停止旋转 function rotateModel(layerID){let bearing map.get…

2024.4.21周报

目录 摘要 Abstract 文献阅读&#xff1a;Next Item Recommendation with Self-Attentive Metric Learning 问题及方法 论文贡献 方法论 序列感知的推荐系统 神经注意模型 模型&#xff1a;ATTREC 序列推荐 基于Self-Attention的用户短期兴趣建模 用户长期兴趣建模…

卷积神经网络CNN入门

卷积神经网络应用领域 因为卷积神经网络主要应用场景就是计算机视觉任务&#xff0c;因此有必要简单介绍一下CV领域发展情况&#xff1a; 可以发现&#xff0c;在 ImageNet 图像数据集中分析图像的错误率十年间已经被深度学习给降低到了比人类&#xff08;HuMan&#xff09;识…

【matlab 代码的python复现】 Matlab实现的滤波器设计实现与Python 的库函数相同实现Scipy

实现一个IIR滤波器的设计 背景 Matlab 设计的滤波器通常封装过于完整,虽然在DSP中能够实现更多功能的滤波器设计但是很难实现Python端口的实现。 我们以一段原始的生物电信号EEG信号进行处理。 EEG信号 1.信号获取 EEG信号通常通过头皮电极,经过多通道采样芯片采样,将获…

35K的鸿蒙音视频开发岗位面经分享~

一个月前&#xff0c;阿里云在官网音视频终端 SDK 栏目发布适配 HarmonyOS NEXT 的操作文档和 SDK&#xff0c;官宣 MediaBox 音视频终端 SDK 全面适配 HarmonyOS NEXT。 此外&#xff0c;阿里云播放器 SDK 也在华为开发者联盟官网鸿蒙生态伙伴 SDK 专区同步上线&#xff0c;面…

OpenTelemetry-1.介绍

目录 1.是什么 2.为什么使用 OpenTelemetry 3.数据类型 Tracing Metrics Logging Baggage 4.架构图 5.核心概念 6.相关开源项目 ​编辑 7.分布式追踪的起源 8.百花齐放的分布式追踪 Zipkin Skywalking Pinpoint Jaeger OpenCensus OpenTracing 9.Openteleme…

「杭州*康恩贝」4月26日PolarDB开源数据库沙龙,开启报名!

4月26日&#xff08;周五&#xff09;&#xff0c;PolarDB开源社区联合康恩贝将共同举办开源数据库技术沙龙&#xff01; 时间&#xff1a;4月26日13:30 地点&#xff1a;浙江省杭州市滨江区滨康路568号康恩贝中心2楼 活动亮点 浙江英诺珐医药有限公司信息经理 朱常青 分享《…

数据结构-二叉树-堆

一、物理结构和逻辑结构 在内存中的存储结构&#xff0c;逻辑结构为想象出来的存储结构。 二、完全二叉树的顺序存储结构 parent (child - 1)/2 leftchild 2*parent 1; rightchild 2*parent 2 上面的顺序结构只适合存储完全二叉树。如果存储&#xff0c;会浪费很多的空…

清华大学:序列推荐模型稳定性飙升,STDP框架惊艳登场

获取本文论文原文PDF&#xff0c;请公众号留言&#xff1a;论文解读 引言&#xff1a;在线平台推荐系统的挑战与机遇 在线平台已成为我们日常生活中不可或缺的一部分&#xff0c;它们提供了丰富多样的商品和服务。然而&#xff0c;如何为用户推荐感兴趣的项目仍然是一个挑战。…

对接浦发银行支付(八)-- 对账接口

一、背景 本文不是要讲述支付服务的对账模块具体怎么做&#xff0c;仅是介绍如何对接浦发银行的对账接口。 也就是说&#xff0c;本文限读取到对账文件的内容&#xff0c;不会进一步去讲述如何与支付平台进行对账。 如果要获取商户的对账单&#xff0c;需要遵循以下步骤&…

使用自购服务器部署RustDesk - 远程桌面服务

服务器官网&#xff1a;雨云 - 新一代云服务提供商 推荐购买宿迁主机&#xff0c;使用NAT网络不购买独立IP&#xff0c;国内主机独立IP价格很贵&#xff0c;这种方式虽然不能省略端口号&#xff0c;但是可以确保访问速度很快&#xff0c;NAT给的10个端口基本够用&#xff1b; …

探索RadSystems:低代码开发的新选择(二)

系列文章目录 探索RadSystems&#xff1a;低代码开发的新选择&#xff08;一&#xff09;&#x1f6aa; 文章目录 系列文章目录前言一、RadSystems Studio是什么&#xff1f;二、用户认证三、系统角色许可四、用户记录管理五、时间戳记录总结 前言 在数字化时代&#xff0c;低…

路由过滤,路由策略小实验

目录 一&#xff0c;实验拓扑&#xff1a; 二&#xff0c;实验要求&#xff1a; 三&#xff0c;实验思路&#xff1a; 四&#xff0c;实验过程&#xff1a; 1&#xff0c;IP配置&#xff1a; 2、R1 和R2 运行 RIPv2&#xff0c;R2&#xff0c;R3 和R4运行 oSPF&#xff0…

8款有效删除Android锁屏的手机解锁软件

为了保护重要数据&#xff0c;许多手机用户倾向于使用图案锁、密码、指纹甚至面部识别来锁定他们的设备。但有时&#xff0c;他们无法解锁手机&#xff0c;因为忘记了复杂的密码、多次重复错误的锁定图案、或者手机被恶意代码攻击等。 8款有效删除Android锁屏的手机解锁软件 那…

光伏无人机勘探技术应用分析

光伏无人机勘探与传统勘探想必&#xff0c;具有智能化作业、测控精度高、环境适应性强等明显优势&#xff1b;卫星勘探辅助其能更快速甚至实时完成测绘拼图&#xff1b;在进行勘察时&#xff0c;可根据需要自由更换机载设备&#xff1b;自动诗经建模使数据更直观&#xff0c;工…