Elasticsearch 8.9 Bulk批量给索引增加数据源码

  • 一、相关API的handler
  • 二、RestBulkAction,组装bulkRequest调用TransportBulkAction
  • 三、TransportBulkAction 会把数据分发到数据该到的数据节点
    • 1、把数据按分片分组,按分片分组数据再发送到指定的数据节点
      • (1) 计算此文档发往哪个分片
        • 1)根据索引是否是分区索引,返回不同的索引路由对象
        • 2) 文档没有id会自动给文档创建id
        • 3)根据不同的索引路由对象,id和routing决定此文档发往哪个分片
      • (2)、通过taskManager注册Task执行action.execute发送到数据节点
  • 四、数据节点(TransportShardBulkAction)处理处理来自主节点的数据
    • 1、针对此节点上索引分片进行操作
      • (1) 组装Engine.Index
      • (2)先添加到Lucene,成功后再添加到translog

下面的图来自ElasticSearch——刷盘原理流程,这篇文章主要讲的是客户端发送bulk命令到保存到Lucenetranslog的过程源码,不涉及到把数据刷到磁盘的逻辑,也不讲解存储在Lucene的数据结构

在这里插入图片描述

一、相关API的handler

ActionModule.java

//主节点处理谁分发到不同数据节点node的逻辑
 actions.register(BulkAction.INSTANCE, TransportBulkAction.class);
 //node节点接收到主节点分发的数据后的处理
 actions.register(TransportShardBulkAction.TYPE, TransportShardBulkAction.class);
 //主节点接收客户端的请求的hander
 registerHandler.accept(new RestBulkAction(settings));

二、RestBulkAction,组装bulkRequest调用TransportBulkAction

public class RestBulkAction extends BaseRestHandler {
 
    @Override
    public List<Route> routes() {
        return List.of(
            new Route(POST, "/_bulk"),
            new Route(PUT, "/_bulk"),
            new Route(POST, "/{index}/_bulk"),
            new Route(PUT, "/{index}/_bulk"),
            Route.builder(POST, "/{index}/{type}/_bulk").deprecated(TYPES_DEPRECATION_MESSAGE, RestApiVersion.V_7).build(),
            Route.builder(PUT, "/{index}/{type}/_bulk").deprecated(TYPES_DEPRECATION_MESSAGE, RestApiVersion.V_7).build()
        );
    }

    @Override
    public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
        if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) {
            request.param("type");
        }
        BulkRequest bulkRequest = new BulkRequest();
        String defaultIndex = request.param("index");
        String defaultRouting = request.param("routing");
        FetchSourceContext defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request);
        String defaultPipeline = request.param("pipeline");
        String waitForActiveShards = request.param("wait_for_active_shards");
        if (waitForActiveShards != null) {
            bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
        }
        Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, null);
        bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
        bulkRequest.setRefreshPolicy(request.param("refresh"));
        bulkRequest.add(
            request.requiredContent(),
            defaultIndex,
            defaultRouting,
            defaultFetchSourceContext,
            defaultPipeline,
            defaultRequireAlias,
            allowExplicitIndex,
            request.getXContentType(),
            request.getRestApiVersion()
        );
		
        return channel -> client.bulk(bulkRequest, new RestStatusToXContentListener<>(channel));
    }
}
  @Override
    public void bulk(final BulkRequest request, final ActionListener<BulkResponse> listener) {
        execute(BulkAction.INSTANCE, request, listener);
    }

其中BulkAction.INSTANCE会通过最上面的actions转到TransportBulkAction.class

三、TransportBulkAction 会把数据分发到数据该到的数据节点

public class TransportBulkAction extends HandledTransportAction<BulkRequest, BulkResponse> {

 @Override
    protected void doExecute(Task task, org.elasticsearch.action.bulk.BulkRequest bulkRequest, ActionListener<org.elasticsearch.action.bulk.BulkResponse> listener) {
        /*
         * 这是在传输过程中调用的,因此我们可以快速检查索引内存压力,但我们不想让传输线程保持繁忙。然后,一旦我们有了索引压力,
         * 我们就会分叉到其中一个写入线程池。我们这样做是因为处理批量请求可能会变得昂贵,原因如下:
         * 在将子请求分派给分片时,我们可能需要压缩它们。LZ4 速度超快,但速度足够慢,最好不要在传输线程上执行此操作,尤其是对于大型子请求。
         * 我们可以检测到这些情况,然后才分叉,但这要正确处理起来很复杂,而且分叉的开销相当低。
         */
        final int indexingOps = bulkRequest.numberOfActions();
        final long indexingBytes = bulkRequest.ramBytesUsed();
        final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
        final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
        final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
        final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE;
        //通过线程池调用
        threadPool.executor(Names.WRITE).execute(new ActionRunnable<>(releasingListener) {
            @Override
            protected void doRun() {
                doInternalExecute(task, bulkRequest, executorName, releasingListener);
            }
        });
    }

    protected void doInternalExecute(Task task, org.elasticsearch.action.bulk.BulkRequest bulkRequest, String executorName, ActionListener<BulkResponse> listener) {
       //省略代码
        //在开始之前,尝试创建我们在批量处理期间需要的所有索引。
        // Step 1: 收集请求中的所有索引
        final Map<String, Boolean> indices = bulkRequest.requests.stream()  
            //  删除请求不应尝试创建索引(如果索引不存在),除非使用外部版本控制
            .filter(
                request -> request.opType() != DocWriteRequest.OpType.DELETE
                    || request.versionType() == VersionType.EXTERNAL
                    || request.versionType() == VersionType.EXTERNAL_GTE
            )
            .collect(Collectors.toMap(DocWriteRequest::index, DocWriteRequest::isRequireAlias, (v1, v2) -> v1 || v2));

        // Step 2: 筛选索引列表以查找当前不存在的索引。
        final Map<String, IndexNotFoundException> indicesThatCannotBeCreated = new HashMap<>();
        Set<String> autoCreateIndices = new HashSet<>();
        ClusterState state = clusterService.state();
        for (Map.Entry<String, Boolean> indexAndFlag : indices.entrySet()) {
            final String index = indexAndFlag.getKey();
            boolean shouldAutoCreate = indexNameExpressionResolver.hasIndexAbstraction(index, state) == false;
            //只有当我们不要求它是别名时,我们才应该自动创建
            if (shouldAutoCreate && (indexAndFlag.getValue() == false)) {
                autoCreateIndices.add(index);
            }
        }

        // Step 3: 创建所有缺失的索引(如果有任何缺失)。在所有创建返回后启动批量
        if (autoCreateIndices.isEmpty()) {
            executeBulk(task, bulkRequest, startTime, listener, executorName, responses, indicesThatCannotBeCreated);
        } else {     
			//省略代码
			 for (String index : autoCreateIndices) {
				//省略代码,遍历创建索引	
			}
		}

	}
	 void executeBulk(
        Task task,
        BulkRequest bulkRequest,
        long startTimeNanos,
        ActionListener<BulkResponse> listener,
        String executorName,
        AtomicArray<BulkItemResponse> responses,
        Map<String, IndexNotFoundException> indicesThatCannotBeCreated
    ) {
        //创建一个BulkOperation对象,执行doRun方法
        new BulkOperation(task, bulkRequest, listener, executorName, responses, startTimeNanos, indicesThatCannotBeCreated).run();
    }

}

1、把数据按分片分组,按分片分组数据再发送到指定的数据节点

 private final class BulkOperation extends ActionRunnable<BulkResponse> {
	@Override
        protected void doRun() {
           //省略代码
            Metadata metadata = clusterState.metadata();
        
            //按 ShardId -> Operations 映射对请求进行分组
            Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
            //遍历请求的每一条数据
            for (int i = 0; i < bulkRequest.requests.size(); i++) {
                DocWriteRequest<?> docWriteRequest = bulkRequest.requests.get(i);
         		//省略代码
               
                IndexAbstraction ia = null;
                //请求是要把文档加入到索引
                boolean includeDataStreams = docWriteRequest.opType() == DocWriteRequest.OpType.CREATE;
                try {
                    //给定的请求解析索引
                    ia = concreteIndices.resolveIfAbsent(docWriteRequest);
                    //获取具体的写入索引
                    final Index concreteIndex = docWriteRequest.getConcreteWriteIndex(ia, metadata);
                    //判断索引是否关闭
                    if (addFailureIfIndexIsClosed(docWriteRequest, concreteIndex, i, metadata)) {
                        continue;
                    }
                    //获取索引的路由信息,其中返回的indexRouting是new Unpartitioned
                    IndexRouting indexRouting = concreteIndices.routing(concreteIndex);
                    //这里如果文档没有带id,则会给文档生成一个id
                    docWriteRequest.process(indexRouting);
                    //获取分片ID 里面IdAndRoutingOnly调用的是Unpartitioned获取分片id
                    int shardId = docWriteRequest.route(indexRouting);
                    //请求和分片ID封装为BulkItemRequest对象,computeIfAbsent是如果不存在则新建
                    List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(
                        new ShardId(concreteIndex, shardId),
                        shard -> new ArrayList<>()
                    );
                    //并将其添加到requestsByShard中对应的分片请求列表中。
                    shardRequests.add(new BulkItemRequest(i, docWriteRequest));
                } catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException | ResourceNotFoundException e) {
                    String name = ia != null ? ia.getName() : docWriteRequest.index();
                    BulkItemResponse.Failure failure = new BulkItemResponse.Failure(name, docWriteRequest.id(), e);
                    BulkItemResponse bulkItemResponse = BulkItemResponse.failure(i, docWriteRequest.opType(), failure);
                    responses.set(i, bulkItemResponse);
                    // make sure the request gets never processed again
                    bulkRequest.requests.set(i, null);
                }
            }
            //没有要添加的数据,直接返回了
            if (requestsByShard.isEmpty()) {
                listener.onResponse(
                    new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos))
                );
                return;
            }
            //下面就知道是按照分片ID分别分发请求
            final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
            String nodeId = clusterService.localNode().getId();
            for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
                final ShardId shardId = entry.getKey();
                final List<BulkItemRequest> requests = entry.getValue();
                BulkShardRequest bulkShardRequest = new BulkShardRequest(
                    shardId,
                    bulkRequest.getRefreshPolicy(),
                    requests.toArray(new BulkItemRequest[requests.size()])
                );
                bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
                bulkShardRequest.timeout(bulkRequest.timeout());
                bulkShardRequest.routedBasedOnClusterVersion(clusterState.version());
                if (task != null) {
                    bulkShardRequest.setParentTask(nodeId, task.getId());
                }
                client.executeLocally(TransportShardBulkAction.TYPE, bulkShardRequest, new ActionListener<>() {
                	//成功后的响应处理
                    @Override
                    public void onResponse(BulkShardResponse bulkShardResponse) {
                        for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {    
                            if (bulkItemResponse.getResponse() != null) {
                                bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
                            }
                            responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
                        }
                      //所有的分片请求都完成后,则完成组装响应报文
                        if (counter.decrementAndGet() == 0) {
                            finishHim();
                        }
                    }
					//失败的处理逻辑
                    @Override
                    public void onFailure(Exception e) {
                        // create failures for all relevant requests
                        for (BulkItemRequest request : requests) {
                            final String indexName = request.index();
                            DocWriteRequest<?> docWriteRequest = request.request();
                            BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e);
                            responses.set(request.id(), BulkItemResponse.failure(request.id(), docWriteRequest.opType(), failure));
                        }
                        //所有的分片请求都完成后,则完成组装响应报文
                        if (counter.decrementAndGet() == 0) {
                            finishHim();
                        }
                    }

                    private void finishHim() {
                        listener.onResponse(
                            new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos))
                        );
                    }
                });
            }
            bulkRequest = null; // allow memory for bulk request items to be reclaimed before all items have been completed允许在所有项目完成之前回收批量请求项的内存
        }
}

(1) 计算此文档发往哪个分片

其中上面的关键代码块是下面

 //获取索引的路由信息,其中返回的indexRouting是new Unpartitioned
 IndexRouting indexRouting = concreteIndices.routing(concreteIndex);
 //这里会针对文档的id做一些处理,比如会判断是否存在,不存在是抛异常还是创建一个新的
 docWriteRequest.process(indexRouting);
 //获取分片ID 里面IdAndRoutingOnly调用的是Unpartitioned获取分片id
 int shardId = docWriteRequest.route(indexRouting);

首先indexRouting的对象下面的Partitioned或者Unpartitioned

1)根据索引是否是分区索引,返回不同的索引路由对象
public static IndexRouting fromIndexMetadata(IndexMetadata metadata) {
        if (false == metadata.getRoutingPaths().isEmpty()) {
            return new ExtractFromSource(metadata);
        }
        //代码检查索引元数据是否是分区索引
        //如果是,则创建一个分区索引路由对象(Partitioned)并返回
        if (metadata.isRoutingPartitionedIndex()) {
            return new Partitioned(metadata);
        }
        //以上条件都不满足,则创建一个非分区索引路由对象(Unpartitioned)并返回
        return new Unpartitioned(metadata);
    }
2) 文档没有id会自动给文档创建id
 @Override
    public void process(IndexRouting indexRouting) {
        indexRouting.process(this);
    }

不管Partitioned还是Unpartitioned都继承自IdAndRoutingOnly

private abstract static class IdAndRoutingOnly extends IndexRouting {
 		@Override
        public void process(IndexRequest indexRequest) {
        	//往索引添加文档的id不能为空字符串,但是可以为null,后续会自动创建id
            if ("".equals(indexRequest.id())) {
                throw new IllegalArgumentException("if _id is specified it must not be empty");
            }

            // generate id if not already provided
            if (indexRequest.id() == null) {
                indexRequest.autoGenerateId();
            }
        }
}
public void autoGenerateId() {
        assert id == null;
        assert autoGeneratedTimestamp == UNSET_AUTO_GENERATED_TIMESTAMP : "timestamp has already been generated!";
        assert ifSeqNo == UNASSIGNED_SEQ_NO;
        assert ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM;
        autoGeneratedTimestamp = Math.max(0, System.currentTimeMillis());
        String uid = UUIDs.base64UUID();
        id(uid);
    }
3)根据不同的索引路由对象,id和routing决定此文档发往哪个分片

其中route的接口如下

@Override
    public int route(IndexRouting indexRouting) {
        return indexRouting.indexShard(id, routing, contentType, source);
    }
    private abstract static class IdAndRoutingOnly extends IndexRouting {
	   protected abstract int shardId(String id, @Nullable String routing);
	    @Override
        public int indexShard(String id, @Nullable String routing, XContentType sourceType, BytesReference source) {
            if (id == null) {
                throw new IllegalStateException("id is required and should have been set by process");
            }
            checkRoutingRequired(id, routing);
            return shardId(id, routing);
        }
}

其中shardId有两种实现,分别是Partitioned还是Unpartitioned

 private static class Unpartitioned extends IdAndRoutingOnly {
        Unpartitioned(IndexMetadata metadata) {
            super(metadata);
        }
		//优先routing,如果没有则用id
        @Override
        protected int shardId(String id, @Nullable String routing) {
            return hashToShardId(effectiveRoutingToHash(routing == null ? id : routing));
        }

        @Override
        public void collectSearchShards(String routing, IntConsumer consumer) {
            consumer.accept(hashToShardId(effectiveRoutingToHash(routing)));
        }
    }

    private static class Partitioned extends IdAndRoutingOnly {
        private final int routingPartitionSize;

        Partitioned(IndexMetadata metadata) {
            super(metadata);
            this.routingPartitionSize = metadata.getRoutingPartitionSize();
        }
		//其中routing不能为null
        @Override
        protected int shardId(String id, @Nullable String routing) {
            if (routing == null) {
                throw new IllegalArgumentException("A routing value is required for gets from a partitioned index");
            }
            int offset = Math.floorMod(effectiveRoutingToHash(id), routingPartitionSize);
            return hashToShardId(effectiveRoutingToHash(routing) + offset);
        }

        @Override
        public void collectSearchShards(String routing, IntConsumer consumer) {
            int hash = effectiveRoutingToHash(routing);
            for (int i = 0; i < routingPartitionSize; i++) {
                consumer.accept(hashToShardId(hash + i));
            }
        }
    }

下面只看Unpartitioned

  /**
     * Convert a routing value into a hash.
     * 将路由值转换为哈希值。
     */
    private static int effectiveRoutingToHash(String effectiveRouting) {
        return Murmur3HashFunction.hash(effectiveRouting);
    }
/**
 * Hash function based on the Murmur3 algorithm, which is the default as of Elasticsearch 2.0.
 * 基于 Murmur3 算法的哈希函数,这是 Elasticsearch 2.0 的默认算法。
 */
public final class Murmur3HashFunction {

    private Murmur3HashFunction() {
        // no instance
    }

    public static int hash(String routing) {
        final byte[] bytesToHash = new byte[routing.length() * 2];
        for (int i = 0; i < routing.length(); ++i) {
            final char c = routing.charAt(i);
            final byte b1 = (byte) c, b2 = (byte) (c >>> 8);
            assert ((b1 & 0xFF) | ((b2 & 0xFF) << 8)) == c; // no information loss
            bytesToHash[i * 2] = b1;
            bytesToHash[i * 2 + 1] = b2;
        }
        return hash(bytesToHash, 0, bytesToHash.length);
    }

    public static int hash(byte[] bytes, int offset, int length) {
        return StringHelper.murmurhash3_x86_32(bytes, offset, length, 0);
    }
}
 /**
     * Convert a hash generated from an {@code (id, routing}) pair into a
     * shard id. 将从 {@code (id, routing}) 对生成的哈希转换为分片 ID。
     */
    protected final int hashToShardId(int hash) {
        return Math.floorMod(hash, routingNumShards) / routingFactor;
    }

这样就指定了文档的分片id

(2)、通过taskManager注册Task执行action.execute发送到数据节点

 client.executeLocally(TransportShardBulkAction.TYPE, bulkShardRequest, new ActionListener<>()
 public <Request extends ActionRequest, Response extends ActionResponse> Task executeLocally(
        ActionType<Response> action,
        Request request,
        ActionListener<Response> listener
    ) {
        return taskManager.registerAndExecute(
            "transport",
            transportAction(action),
            request,
            localConnection,
            new SafelyWrappedActionListener<>(listener)
        );
    }

后面的逻辑就不梳理了,直接看TransportShardBulkAction.TYPE

四、数据节点(TransportShardBulkAction)处理处理来自主节点的数据

public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
 //这里处理属于属于当前节点分片的数据,请求是从主节点上过来的
    @Override
    protected void dispatchedShardOperationOnPrimary(
        BulkShardRequest request,
        IndexShard primary,
        ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener
    ) {
        ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext());
        performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, (update, shardId, mappingListener) -> {
            assert update != null;
            assert shardId != null;
            mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), update, mappingListener);
        }, mappingUpdateListener -> observer.waitForNextChange(new ClusterStateObserver.Listener() {
           //省略代码
            }
        }), listener, threadPool, executor(primary), postWriteRefresh, postWriteAction);
    }


}

performOnPrimary 直接看这个

  public static void performOnPrimary(
        org.elasticsearch.action.bulk.BulkShardRequest request,
        IndexShard primary,
        UpdateHelper updateHelper,
        LongSupplier nowInMillisSupplier,
        MappingUpdatePerformer mappingUpdater,
        Consumer<ActionListener<Void>> waitForMappingUpdate,
        ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener,
        ThreadPool threadPool,
        String executorName,
        @Nullable PostWriteRefresh postWriteRefresh,
        @Nullable Consumer<Runnable> postWriteAction
    ) {
        new ActionRunnable<>(listener) {

            private final Executor executor = threadPool.executor(executorName);

            private final BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(request, primary);

            final long startBulkTime = System.nanoTime();

            @Override
            protected void doRun() throws Exception {
                //只要所有的请求没有执行完
                while (context.hasMoreOperationsToExecute()) {
                    if (executeBulkItemRequest(
                        context,
                        updateHelper,
                        nowInMillisSupplier,
                        mappingUpdater,
                        waitForMappingUpdate,
                        ActionListener.wrap(v -> executor.execute(this), this::onRejection)
                    ) == false) {
                        //我们正在等待另一个线程上的映射更新,一旦完成,它将再次调用此操作,因此我们在这里爆发。
                        return;
                    }
                    assert context.isInitial(); // either completed and moved to next or reset 要么完成并移至下一个,要么重置
                }
                primary.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime); 
                finishRequest();
            }

            @Override
            public void onRejection(Exception e) {
               //省略代码
            }

            private void finishRequest() {
                //省略代码
            }
        }.run();
    }

1、针对此节点上索引分片进行操作

static boolean executeBulkItemRequest(
       BulkPrimaryExecutionContext context,
        UpdateHelper updateHelper,
        LongSupplier nowInMillisSupplier,
        MappingUpdatePerformer mappingUpdater,
        Consumer<ActionListener<Void>> waitForMappingUpdate,
        ActionListener<Void> itemDoneListener
    ) throws Exception {
            //,则获取IndexRequest对象,并创建SourceToParse对象,将相应参数传递给primary的applyIndexOperationOnPrimary方法进行索引操作
            final IndexRequest request = context.getRequestToExecute();
            final SourceToParse sourceToParse = new SourceToParse(
                request.id(),
                request.source(),
                request.getContentType(),
                request.routing(),
                request.getDynamicTemplates()
            );
            //把文档数据保存到分片,返回结果保存结果
            result = primary.applyIndexOperationOnPrimary(
                version,
                request.versionType(),
                sourceToParse,
                request.ifSeqNo(),
                request.ifPrimaryTerm(),
                request.getAutoGeneratedTimestamp(),
                request.isRetry()
            );
      
        //从结果中得到,需要更新索引Mapper的映射,则更新索引的mapper
        if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
			//省略代码
        } 
        return true;
    }

(1) 组装Engine.Index

public Engine.IndexResult applyIndexOperationOnPrimary(
        long version,
        VersionType versionType,
        SourceToParse sourceToParse,
        long ifSeqNo,
        long ifPrimaryTerm,
        long autoGeneratedTimestamp,
        boolean isRetry
    ) throws IOException {
        assert versionType.validateVersionForWrites(version);
        //针对索引的操作,包括更新TransLog
        return applyIndexOperation(
            getEngine(),
            UNASSIGNED_SEQ_NO,
            getOperationPrimaryTerm(),
            version,
            versionType,
            ifSeqNo,
            ifPrimaryTerm,
            autoGeneratedTimestamp,
            isRetry,
            Engine.Operation.Origin.PRIMARY,
            sourceToParse
        );
    }
private Engine.IndexResult applyIndexOperation(
        Engine engine,
        long seqNo,
        long opPrimaryTerm,
        long version,
        @Nullable VersionType versionType,
        long ifSeqNo,
        long ifPrimaryTerm,
        long autoGeneratedTimeStamp,
        boolean isRetry,
        Engine.Operation.Origin origin,
        SourceToParse sourceToParse
    ) throws IOException {
        assert opPrimaryTerm <= getOperationPrimaryTerm()
            : "op term [ " + opPrimaryTerm + " ] > shard term [" + getOperationPrimaryTerm() + "]";
        ensureWriteAllowed(origin);
        Engine.Index operation;
        try {
            //组装index
            operation = prepareIndex(
                mapperService,
                sourceToParse,
                seqNo,
                opPrimaryTerm,
                version,
                versionType,
                origin,
                autoGeneratedTimeStamp,
                isRetry,
                ifSeqNo,
                ifPrimaryTerm,
                getRelativeTimeInNanos()
            );
            Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
            if (update != null) {
                return new Engine.IndexResult(update, operation.parsedDoc().id());
            }
        } catch (Exception e) {
           //省略代码
        }

        return index(engine, operation);
    }
 private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOException {
        active.set(true);
        final Engine.IndexResult result;
        final Engine.Index preIndex = indexingOperationListeners.preIndex(shardId, index);
        try {
           	//省略代码
            //InternalEngine.index 逐条写入doc
            // Engine 封装了Lucene和translog的调用,对外提供读写接口.
            result = engine.index(preIndex);
           //省略代码
        } catch (Exception e) {
          	//省略代码
            indexingOperationListeners.postIndex(shardId, preIndex, e);
            throw e;
        }
        indexingOperationListeners.postIndex(shardId, preIndex, result);
        return result;
    }

其中engine.index的子类是InternalEngine.index方法

(2)先添加到Lucene,成功后再添加到translog

@Override
    public IndexResult index(Index index) throws IOException {
        assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field();
        final boolean doThrottle = index.origin().isRecovery() == false;
        try (ReleasableLock releasableLock = readLock.acquire()) {
          
                    //如果是主分片上的操作,则生成新的Index对象
                    if (index.origin() == Operation.Origin.PRIMARY) {
                        index = new Index(
                            index.uid(),
                            index.parsedDoc(),
                            generateSeqNoForOperationOnPrimary(index),
                            index.primaryTerm(),
                            index.version(),
                            index.versionType(),
                            index.origin(),
                            index.startTime(),
                            index.getAutoGeneratedIdTimestamp(),
                            index.isRetry(),
                            index.getIfSeqNo(),
                            index.getIfPrimaryTerm()
                        );

                        final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
                        if (toAppend == false) {
                            advanceMaxSeqNoOfUpdatesOnPrimary(index.seqNo());
                        }
                    } else {
                        //其他分片就标记为已见
                        markSeqNoAsSeen(index.seqNo());
                    }

                    if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
                        //把数据更新到Lucene中
                        indexResult = indexIntoLucene(index, plan);
                    } else {
                        indexResult = new IndexResult(
                            plan.versionForIndexing,
                            index.primaryTerm(),
                            index.seqNo(),
                            plan.currentNotFoundOrDeleted,
                            index.id()
                        );
                    }
                }
                if (index.origin().isFromTranslog() == false) {
                    final Translog.Location location;
                    //如果更新Lucene成功,则把索引数据放入到translog中
                    if (indexResult.getResultType() == Result.Type.SUCCESS) {
                        location = translog.add(new Translog.Index(index, indexResult));
                    } 
                    //省略代码
                    indexResult.setTranslogLocation(location);
                }
               //省略代码
                indexResult.setTook(relativeTimeInNanosSupplier.getAsLong() - index.startTime());
                indexResult.freeze();
                return indexResult;
         
       
    }

文档添加到Lucene

import org.apache.lucene.index.IndexWriter;

 private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) throws IOException {
        try {
            if (plan.addStaleOpToLucene) { //添加
                addStaleDocs(index.docs(), indexWriter);
            } else if (plan.useLuceneUpdateDocument) { //更新
                assert assertMaxSeqNoOfUpdatesIsAdvanced(index.uid(), index.seqNo(), true, true);
                updateDocs(index.uid(), index.docs(), indexWriter);
            } else {
                // document does not exists, we can optimize for create, but double check if assertions are running
                assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false);
                addDocs(index.docs(), indexWriter);
            }
            return new IndexResult(plan.versionForIndexing, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted, index.id());
        } catch (Exception ex) {
          //省略代码
        }
    }
     private void addStaleDocs(final List<LuceneDocument> docs, final IndexWriter indexWriter) throws IOException {
        for (LuceneDocument doc : docs) {
            doc.add(softDeletesField); // soft-deleted every document before adding to Lucene
        }
        if (docs.size() > 1) {
            indexWriter.addDocuments(docs);
        } else {
            indexWriter.addDocument(docs.get(0));
        }
    }

在写入到transLog日志中,会先转成new Translog.Index 再添加到translog

    public Location add(final Operation operation) throws IOException {
         final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
		try {
            writeOperationWithSize(out, operation);
            final BytesReference bytes = out.bytes();
            try (ReleasableLock ignored = readLock.acquire()) {
                ensureOpen();
               //省略代码
                return current.add(bytes, operation.seqNo());
            }
        }
}

    private ReleasableBytesStreamOutput buffer;

 /**
     *将给定的字节添加到具有指定序列号的转录日志中;返回字节写入到的位置。
     * @param data  the bytes to write 要写入的字节数
     * @param seqNo the sequence number associated with the operation 与操作关联的序列号
     * @return the location the bytes were written to 字节写入到的位置
     * @throws IOException if writing to the translog resulted in an I/O exception
     */
    public Translog.Location add(final BytesReference data, final long seqNo) throws IOException {
        //首先检查缓冲的字节数是否超过了forceWriteThreshold阈值,如果超过了,则调用writeBufferedOps方法将缓冲的操作写入。
        long bufferedBytesBeforeAdd = this.bufferedBytes;
        if (bufferedBytesBeforeAdd >= forceWriteThreshold) {
            writeBufferedOps(Long.MAX_VALUE, bufferedBytesBeforeAdd >= forceWriteThreshold * 4);
        }

        final Translog.Location location;
        synchronized (this) {
            ensureOpen();
            //代码确保buffer不为null,
            if (buffer == null) {
                buffer = new ReleasableBytesStreamOutput(bigArrays);
            }
            //数据写入缓冲区。然后更新minSeqNo和maxSeqNo的值
            assert bufferedBytes == buffer.size();
            final long offset = totalOffset;
            totalOffset += data.length();
            data.writeTo(buffer);

            assert minSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0;
            assert maxSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0;

            minSeqNo = SequenceNumbers.min(minSeqNo, seqNo);
            maxSeqNo = SequenceNumbers.max(maxSeqNo, seqNo);
            //并将seqNo添加到nonFsyncedSequenceNumbers中。操作计数器递增
            nonFsyncedSequenceNumbers.add(seqNo);

            operationCounter++;

            assert assertNoSeqNumberConflict(seqNo, data);
            //然后使用generation、offset和数据长度创建一个Translog.Location对象。
            location = new Translog.Location(generation, offset, data.length());
            //调用operationListener.operationAdded方法通知操作监听器有新的操作添加,并更新bufferedBytes的值。
            operationListener.operationAdded(data, seqNo, location);
            bufferedBytes = buffer.size();
        }

        return location;
    }

介绍一下Translog

/**Translog 是每个索引的分片组件,它以持久的方式记录所有未提交的索引操作。
在 Elasticsearch 中,每个 {@link org.elasticsearch.index.engine.InternalEngine} 都有一个 Translog 实例。
此外,从 Elasticsearch 2.0 开始,引擎还会在每次提交时记录一个 {@link *TRANSLOG_UUID_KEY},以确保 lucene 索引与事务日志文件之间的强关联。
此 UUID 用于防止从属于其他引擎的事务日志中意外恢复。

每个 Translog 只有一个 translog 文件打开,供 translog 生成 ID 随时引用的写入。
此 ID 将写入 {@code translog.ckp} 文件,该文件旨在适合单个磁盘块,因此文件的写入是原子的。
检查点文件在 translog 的每个 fsync 操作上写入,并记录写入的操作数、当前 translog 的文件生成、其 fsync 偏移量(以字节为单位)以及其他重要统计信息。

当当前转录文件达到特定大小 ({@link IndexSettingsINDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING}) 时,或者当新旧操作之间明确分离时(在主要术语更改时),
将重新打开当前文件进行只读,并创建一个新的只写文件。
任何非最新的、只读的 translog 文件总是有一个与之关联的 {@code translog-{gen}.ckp},它是其上一个 {@code translog.ckp} 的同步副本,因此在灾难恢复中,最后一个 fsync 偏移量、操作数等仍会保留。
**/
public class Translog extends AbstractIndexShardComponent implements IndexShardComponent, Closeable {
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/155361.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

三菱FX3U小项目—自锁控制与故障报警

目录 一、项目描述 二、IO口分配 三、项目程序 四、总结 一、项目描述 当按下启动按钮时&#xff0c;电机通电运转&#xff0c;当按下停止按钮时&#xff0c;电动机断电停止&#xff1b;当设备检修旋钮得电时&#xff0c;电动机停止并且故障指示灯闪烁1s&#xff1b;当电动…

算法通关村第九关-黄金挑战二叉树较难问题

将有序数组转换为二叉搜索树 描述 : 给你一个整数数组 nums &#xff0c;其中元素已经按 升序 排列&#xff0c;请你将其转换为一棵 高度平衡 二叉搜索树。 高度平衡 二叉树是一棵满足「每个节点的左右两个子树的高度差的绝对值不超过 1 」的二叉树。 题目 : LeetCode 10…

基于SSM的智慧养老平台设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…

简单的用Python实现一下,采集某牙视频,多个视频翻页下载

前言 表弟自从学会了Python&#xff0c;每天一回家就搁那爬视频&#xff0c;不知道的以为是在学习&#xff0c;结果我昨天好奇看了一眼&#xff0c;好家伙&#xff0c;在那爬某牙舞蹈区&#xff0c;太过分了&#xff01; 为了防止表弟做坏事&#xff0c;我连忙找了个凳子坐下&…

MySql 数据库初始化,创建用户,创建数据库,授权

登录MySQL&#xff08;使用管理员账户&#xff09; mysql -u root -p 设置用户 -- 创建用户并设置密码 CREATE USER user_name% IDENTIFIED BY user_password;-- 删除用户 drop user user_name; 设置数据库 -- 创建数据库 CREATE DATABASE database_name;-- 删除数据库 DR…

计算机网络:网络层ARP协议

在实现IP通信时使用了两个地址&#xff1a;IP地址&#xff08;网络层地址&#xff09;和MAC地址&#xff08;数据链路层地址&#xff09; 问题&#xff1a;已知一个机器&#xff08;主机或路由器&#xff09;的IP地址&#xff0c;如何找到相应的MAC地址&#xff1f; 为了解决…

【Dolphinscheduler3.1.1】二次开发本地启动项目(前端+后端)

背景说明 由于业务的定制化开发&#xff0c;需要对Dolphinscheduler进行二次开发&#xff0c;现将项目的启动步骤记录如下。 一、 基础软件安装(必装项请自行安装) Maven: v3.5&#xff0c;配阿里云仓库地址即可 Node: v16. MySQL (5.7系列) : 两者任选其一即可 JDK (1.8)…

xv6第一章:Operating system interfaces

操作系统通过接口为程序提供服务。xv6只包含一些基本的接口&#xff0c;如上图。 xv6采用kernel的方式。kernel是一种特殊的程序为一般程序提供服务。计算机中有许多进程但是只有一个进程。 当一个进程需要使用kernel服务&#xff0c;需要进行system call。 system call后&am…

c#正则表达式

using System.Text.RegularExpressions; namespace demo1 {/// <summary>/// 正则表达式&#xff08;Regular Expression&#xff09;是一种文本模式&#xff0c;包括普通字符&#xff08;例如&#xff0c;a&#xff5e;z的字母&#xff09;和特殊字符&#xff08;称为“…

Ubuntu22.04 部署Mqtt服务器

1、打开Download EMQX &#xff08;www.emqx.io&#xff09;下载mqtt服务器版本 2、Download the EMQX repository curl -s https://assets.emqx.com/scripts/install-emqx-deb.sh | sudo bash 3.Install EMQX sudo apt-get install emqx 4.Run EMQX sudo systemctl start…

设计模式-享元模式-笔记

动机&#xff08;Movition&#xff09; 在软件系统采用纯粹对象方案的问题在于大量细颗粒的对象会很快充斥在系统中&#xff0c;从而带来很高的运行时代价---主要指内存需求方面的代价。 如何在避免大量细粒度对象问题的同时&#xff0c;让外部客户程序依然能够透明地使用面向…

基于SSM的教学管理系统设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…

srs webrtc推拉流环境搭建(公网)

本地环境搭建 官方代码https://github.com/ossrs/srs 拉取代码&#xff1a; git clone https://github.com/ossrs/srs.gitcd ./configure make ./objs/srs -c conf/https.rtc.confsrs在公网上&#xff0c;由于srs是lite-ice端&#xff0c;导致他不会主动到srs获取自己的公网i…

关于响应式编程ReactiveX,RxGo

ReactiveX&#xff0c;简称为 Rx&#xff0c;是一个异步编程的 API。与 callback&#xff08;回调&#xff09;、promise&#xff08;JS 提供这种方式&#xff09;和 deferred&#xff08;Python 的 twisted 网络编程库就是使用这种方式&#xff09;这些异步编程方式有所不同&a…

CSDN每日一题学习训练——Python版(简化路径,不同的二叉搜索树)

版本说明 当前版本号[20231116]。 版本修改说明20231116初版 目录 文章目录 版本说明目录简化路径题目解题思路代码思路参考代码 不同的二叉搜索树题目解题思路代码思路参考代码 简化路径 题目 给你一个字符串 path &#xff0c;表示指向某一文件或目录的 Unix 风格 绝对路…

数据结构刷题

空间复杂度&#xff1a;临时开辟的空间、空间是可以重复利用的 递归为O(n) 时间复杂度&#xff1a;程序执行次数 消失的数字 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 思路1&#xff1a;利用连续的特点求等差和然后减去所有元素得到的就是消…

【AI视野·今日Robot 机器人论文速览 第六十三期】Thu, 26 Oct 2023

AI视野今日CS.Robotics 机器人学论文速览 Fri, 27 Oct 2023 Totally 27 papers &#x1f449;上期速览✈更多精彩请移步主页 Daily Robotics Papers 6-DoF Stability Field via Diffusion Models Authors Takuma Yoneda, Tianchong Jiang, Gregory Shakhnarovich, Matthew R. …

JVM虚拟机:垃圾回收器ZGC和Shenandoah算法

随着计算机技术的不断发展,内存管理成为了一个重要的话题。垃圾回收是一种自动内存管理技术,它可以自动地回收不再使用的内存,从而减少内存泄漏和程序崩溃的风险。在Java等高级编程语言中,垃圾回收器是必不可少的组件。近年来,ZGC和Shenandoah算法作为新一代的垃圾回收器,…

C++ 基础二

文章目录 四、流程控制语句4.1 选择结构4.1.1 if语句 4.1.2 三目运算符4.1.3 switch语句注意事项 4.1.4 if和switch的区别【CHAT】4.2 循环结构4.2.1 while循环语句4.2.2 do...while循环语句 4.2.3 for循环语句九九乘法表 4.3 跳转语句4.3.1 break语句4.3.2 continue语句4.3.3 …

扩散模型实战(九):使用CLIP模型引导和控制扩散模型

推荐阅读列表&#xff1a; 扩散模型实战&#xff08;一&#xff09;&#xff1a;基本原理介绍 扩散模型实战&#xff08;二&#xff09;&#xff1a;扩散模型的发展 扩散模型实战&#xff08;三&#xff09;&#xff1a;扩散模型的应用 扩散模型实战&#xff08;四&#xff…