今天是玩转es的一天
创建索引
写入文档
入口BaseRestHandler
BaseRestHandler是Rest请求的入口,你可以理解为spring mvc里面的controller一样
prepareRequest是一个抽象方法,实际上是由各种Rest*Action来重写的,比如这里我们是对索引文档的处理,会执行到RestIndexAction.prepareRequest
**RestIndexAction.**prepareRequest
这个方法最终会被文档写入的RestIndexAction处理
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
// todo 1. 索引操作请求
IndexRequest indexRequest;
// todo 2. 拿出Rest请求中的{Type} 信息,一般情况咱们 这里都是_doc
final String type = request.param("type");
if (type != null && type.equals(MapperService.SINGLE_MAPPING_NAME) == false) {
deprecationLogger.deprecate(DeprecationCategory.TYPES, "index_with_types", TYPES_DEPRECATION_MESSAGE);
indexRequest = new IndexRequest(request.param("index"), type, request.param("id"));
} else {
// todo 3.创建索取请求对象,参数“索取结构的名称”
indexRequest = new IndexRequest(request.param("index"));
// todo 4. 索引文档的Id
indexRequest.id(request.param("id"));
}
// todo 5. 索取操作指定的路由信息,默认情况下可以不指定,指定之后,你的寻找分片的操作,会使用指定的routing值做计算
indexRequest.routing(request.param("routing"));
// todo 6. ES内部提供了一套ingest逻辑,类似于ETL 数据清洗
/**
* todo
* 如果你的索取请求配置了pipline的话,那么在服务端会先对数据进行“数据清洗”的操作
* 索取请求 会进入到管道中,管道中有多个“Filter”,对你的doc进行数据整理,比如说
* doc并没有指定 “时间戳”的话,那么你可以在这一步给doc添加上“时间戳”
* 功能远不止这些,详细的用法大家可以找官网看资料
* 在服务端pipline是单独“存储”,每个pipeline都有自己的名称
*/
indexRequest.setPipeline(request.param("pipeline"));
// todo 7.设置索引请求的数据
/**
* todo
* 参数一:二进制数据引用的对象,通过它可以读取到 索取请求 的消息体的二进制数据
* 参数二:二进制数据类型是上面,一般情况这里都是json
*/
indexRequest.source(request.requiredContent(), request.getXContentType());
// todo 8. 超时设置,默认情况都是1分钟
indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
// todo 9. 刷盘策略
/**
* todo
* 索取操作的大概流程:请求先写入到ES buffer中,之后再写translog。默认情况下 buffer 内的数据 1秒钟 刷一磁盘
* 刷盘做什么那?其实就是调用Lucene的IndexWriter,将buffer内的indexRequest数据,写入到Lucene的段缓冲区中(os cache)
* refreshPolicy:你可以通过这个字段,来决策 你这个请求 是不是要强制刷盘
* buffer 什么时候会刷盘那? 1. 强刷 2.满了(500mb)3.1秒钟会刷1次
*/
indexRequest.setRefreshPolicy(request.param("refresh"));
// todo 10. 这2个参数是一个过期参数, es现在有新的字段去控制“并发修改了”
indexRequest.version(RestActions.parseVersion(request));
indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType()));
// todo 11. 类似于学号,每个文档都会有一个独一无二的学号
indexRequest.setIfSeqNo(request.paramAsLong("if_seq_no", indexRequest.ifSeqNo()));
// todo 12. 类似于班级,在ES中Shard中,可以有1个PrimaryShard和多个ReplicationShard,当PrimaryShard发生变化之后,PrimaryTerm就会自增
indexRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", indexRequest.ifPrimaryTerm()));
indexRequest.setRequireAlias(request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, indexRequest.isRequireAlias()));
// todo 13. 操作类型
String sOpType = request.param("op_type");
// todo 14. 默认等待多少个存活分片,本次的 索取请求 才可以去写那
/**
* todo
* 默认情况下1个分片存活即可,即PrimaryShard是在线的就行
* 如果你指定成2的话,那么需要有一个PrimaryShard在线和一个ReplicationShard在线,这次的索引请求才会被处理
*/
String waitForActiveShards = request.param("wait_for_active_shards");
if (waitForActiveShards != null) {
indexRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
}
if (sOpType != null) {
indexRequest.opType(sOpType);
}
// todo 15.返回一个function
/**
* todo
* 调用nodeClient -> index方法
* 参数1: indexRequest 上面构建出来的这个索取请求对象
* 参数2: RestStatusToXContentListener 这个实例包装了 RestChannel对象,同这个listener可以将响应发送到 rest请求的 请求端
*/
return channel ->
client.index(indexRequest, new RestStatusToXContentListener<>(channel, r -> r.getLocation(indexRequest.routing())));
}
总结一下一共做了哪些事:
- 将RestRequest转换为IndexRequest
- 设置文档id
- 设置routing路由
- 设置pipline,做数据清洗
- 设置超时
- 设置刷盘策略
- 设置文档的seqNo,类似于学号
- 设置文档的primaryTerm,类似于班级(这些参数后面都有用)
我们来看一下最终的indexRequest:
- 创建一个function(nodeClient.index(indexRequest))
action.accept =>nodeClient.index
这里最终就会执行上面说的那个function
NodeClient.index
这里先会调用到父类的AbstractClient.index方法
这里会创建一个indexAction的实例,接下来会调用到execute方法
这里的几个参数也看一下:
- action:之前创建的indexAction
- request:还是之前的indexRequest
NodeClient.doExecute
接下来调用doExecute方法会调用到executeLocal
NodeClient.executeLocal
这里会调用transportAction,通过IndexAction来获取transportIndexAction(也就是最终找到了Index的RPC处理逻辑入口):
下一步进入TransportIndexAction处理逻辑
TransportIndexAction.execute
里面是空的,也就是会调用父类TransportAction.execute方法
TransportAction.execute
这里会调用到execute(task,request, actionListener)
这里最终会调用到TransportBulkAction
到这里总结一下:
- nodeClient 通过IndexAction PRC类型找出对应的PRC处理器(TransportIndexAction)
- TransportIndexAction 将请求 转发给 另外一个“PRC处理器”(TransportBulkAction)
TransportBulkAction.doExecute
到这里的时候,会发现请求从indexRequest已经变成bulkRequest了,已经帮你打好包了,我们看一下bulkRequest里面是什么?
TransportBulkAction.doInternalExecute
�
//todo 参数1:task
// 参数2:bulkRequest 批量索引请求对象
// 参数3:executorName 执行器名称
// 参数4:releasingListener 会在执行 onResponse 或者 onFailure 方法之前 先执行 releasable#close 方法,去更新
// IndexingPressure 对象内的字段..
protected void doInternalExecute(Task task, BulkRequest bulkRequest, String executorName, ActionListener<BulkResponse> listener) {
// todo 1. 开始时间戳
final long startTime = relativeTime();
// todo 2. 响应数组,大小为 “bulk中的indexRequest”的数量,也就是说 响应是一对一的
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
// todo 3. pipeline 相关代码
boolean hasIndexRequestsWithPipelines = false;
final Metadata metadata = clusterService.state().getMetadata();
final Version minNodeVersion = clusterService.state().getNodes().getMinNodeVersion();
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
if (indexRequest != null) {
// Each index request needs to be evaluated, because this method also modifies the IndexRequest
boolean indexRequestHasPipeline = IngestService.resolvePipelines(actionRequest, indexRequest, metadata);
hasIndexRequestsWithPipelines |= indexRequestHasPipeline;
}
if (actionRequest instanceof IndexRequest) {
IndexRequest ir = (IndexRequest) actionRequest;
ir.checkAutoIdWithOpTypeCreateSupportedByVersion(minNodeVersion);
if (ir.getAutoGeneratedTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) {
throw new IllegalArgumentException("autoGeneratedTimestamp should not be set externally");
}
}
}
if (hasIndexRequestsWithPipelines) {
// this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but
// also with IngestService.NOOP_PIPELINE_NAME on each request. This ensures that this on the second time through this method,
// this path is never taken.
try {
if (Assertions.ENABLED) {
final boolean arePipelinesResolved = bulkRequest.requests()
.stream()
.map(TransportBulkAction::getIndexWriteRequest)
.filter(Objects::nonNull)
.allMatch(IndexRequest::isPipelineResolved);
assert arePipelinesResolved : bulkRequest;
}
if (clusterService.localNode().isIngestNode()) {
processBulkIndexIngestRequest(task, bulkRequest, executorName, listener);
} else {
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
}
} catch (Exception e) {
listener.onFailure(e);
}
return;
}
// todo pipline结束
// Attempt to create all the indices that we're going to need during the bulk before we start.
// Step 1: collect all the indices in the request
// todo 4. 获取出bulk请求所有涉及到的索引信息
// key:索引名称 value:别名.. 暂时先不考虑 别名的情况.
final Map<String, Boolean> indices = bulkRequest.requests.stream()
// delete requests should not attempt to create the index (if the index does not
// exists), unless an external versioning is used
.filter(request -> request.opType() != DocWriteRequest.OpType.DELETE
|| request.versionType() == VersionType.EXTERNAL
|| request.versionType() == VersionType.EXTERNAL_GTE)
.collect(Collectors.toMap(DocWriteRequest::index, DocWriteRequest::isRequireAlias, (v1, v2) -> v1 || v2));
// todo 5. 保存无法创建的索引
// Step 2: filter the list of indices to find those that don't currently exist.
final Map<String, IndexNotFoundException> indicesThatCannotBeCreated = new HashMap<>();
// todo 6. 保存本地bulk请求 自动创建的那部分索引
Set<String> autoCreateIndices = new HashSet<>();
// todo 7. 集群状态对象
ClusterState state = clusterService.state();
// todo 8. 遍历bulk涉及的索引map
for (Map.Entry<String, Boolean> indexAndFlag : indices.entrySet()) {
final String index = indexAndFlag.getKey();
// todo 这一步去判断是否需要去创建“index”的索引结构
boolean shouldAutoCreate = indexNameExpressionResolver.hasIndexAbstraction(index, state) == false;
// We should only auto create if we are not requiring it to be an alias
if (shouldAutoCreate && (indexAndFlag.getValue() == false)) {
// todo 将需要被创建 索引结构 的索引名称 保存下来..
autoCreateIndices.add(index);
}
}
// Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
// todo 如果索引结构都存在
if (autoCreateIndices.isEmpty()) {
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
} else {
// todo 如果索引结构不存在
// todo 计数器:初始值需要自动创建索引的数量
final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
// todo 遍历需要自动创建索引的结构的索引,去创建
for (String index : autoCreateIndices) {
// todo createIndex 底层会向Master发起 RPC 请求,让主节点去做创建索引结构的逻辑..
// 成功或者失败 都会调用到 本机的 ActionListener 内部封装的逻辑...
createIndex(index, bulkRequest.timeout(), minNodeVersion, new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
// todo 什么条件才会成立呢?所有的待创建索引结构,都创建了(成功或者失败),这种情况下会走这里..
// 根据executorName拿到一组线程资源,然后执行 事件...
threadPool.executor(executorName).execute(new ActionRunnable<BulkResponse>(listener) {
@Override
protected void doRun() {
// todo 9. 执行批量,比较重要
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
}
});
}
}
@Override
public void onFailure(Exception e) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof IndexNotFoundException) {
indicesThatCannotBeCreated.put(index, (IndexNotFoundException) cause);
}
else if ((cause instanceof ResourceAlreadyExistsException) == false) {
// fail all requests involving this index, if create didn't work
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest<?> request = bulkRequest.requests.get(i);
if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) {
bulkRequest.requests.set(i, null);
}
}
}
if (counter.decrementAndGet() == 0) {
final ActionListener<BulkResponse> wrappedListener = ActionListener.wrap(listener::onResponse, inner -> {
inner.addSuppressed(e);
listener.onFailure(inner);
});
threadPool.executor(executorName).execute(new ActionRunnable<BulkResponse>(wrappedListener) {
@Override
protected void doRun() {
executeBulk(task, bulkRequest, startTime, wrappedListener, responses, indicesThatCannotBeCreated);
}
@Override
public void onRejection(Exception rejectedException) {
rejectedException.addSuppressed(e);
super.onRejection(rejectedException);
}
});
}
}
});
}
}
}
总结一下这个方法做了哪些事:
- 遍历bulk涉及到的“索引结构名称”
- 找出不存在的“索引结构”,通过RPC ->Master 进行创建索引结构。
- 执行transportBulkAction.executeBulk方法(也就是批量方法)
这里有一个比较重要的对象:ClusterState(集群状态对象)
- nodes:节点信息: 包含了集群中所有节点的信息。它是一个映射,键是节点的ID。
- routing table: 包含了所有索引的分片和副本的分配情况,即哪些节点包含哪些分片。
创建了几个索引,里面就是几个routingTable值
其中my_index就是我在使用的索引,我们创建的时候设置为5,这里正好创建了5个indexShardRoutingTable(这是单个分片的路由信息,包含一个主分片和多个副本分片的具体分布,每一个分片是通过 ShardRouting对象表示)
每一个ShardRouting对象就表示单个分片中主分片、副本分片、分片id、分片所在的节点,分片的当前状态
- Metadata: 这部分存储了关于集群中所有索引的元数据,包括索引名称、 设置、映射、别名等
### TransportBulkAction.executeBulk ![image.png](https://img-blog.csdnimg.cn/img_convert/5b611969fb9c3153f6179150acab5ef0.png)
这里创建了BulkOperation,并且会调用它的run方法, **它的run方法最终会跑到BulkOperation.doRun方法中**
BulkOperation.doRun
@Override
protected void doRun() {
assert bulkRequest != null;
// todo 1. 通过observer 集群状态,observer这个方法 在内部 通过 clusterService 拿到集群状态,并保存到它内部字段中..
final ClusterState clusterState = observer.setAndGetObservedState();
// todo 2. 通过集群状态 判断集群是否可提供正常服务..
if (handleBlockExceptions(clusterState)) {
return;
}
//todo 3 当做一个工具,用于获取出索引请求的真实的索引名称(因为ES支持别名...内部做了解析别名的逻辑..)
// 参数1:集群状态
// 参数2:索引名称解析器
final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
// todo 4. 集群元数据信息..(集群有哪些索引,索引有哪些Shard,Shard都在哪里..)
Metadata metadata = clusterState.metadata();
// todo 5.循环进行分组 等逻辑
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest<?> docWriteRequest = bulkRequest.requests.get(i);
//the request can only be null because we set it to null in the previous step, so it gets ignored
// todo 获取出当前索引请求对象
if (docWriteRequest == null) {
continue;
}
if (addFailureIfRequiresAliasAndAliasIsMissing(docWriteRequest, i, metadata)) {
continue;
}
// todo 处理 索引结构 不在线 等等情况,将这些情况的 写请求跳过,,并设置好 对应的response
if (addFailureIfIndexIsUnavailable(docWriteRequest, i, concreteIndices, metadata)) {
continue;
}
// todo 通过“工具”获取出来 当前请求的索引去名称
Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest);
try {
// The ConcreteIndices#resolveIfAbsent(...) method validates via IndexNameExpressionResolver whether
// an operation is allowed in index into a data stream, but this isn't done when resolve call is cached, so
// the validation needs to be performed here too.
// todo 通过集群状态 获取 元数据 信息 -> 通过元数据 再拿到 索引映射表 -> 通过映射表 拿到当前 indexRequest 的indexAbstraction
// indexAbstraction:通过它可以拿到当前这个索引的 元数据信息。比如说 有哪些分片...
IndexAbstraction indexAbstraction = clusterState.getMetadata().getIndicesLookup().get(concreteIndex.getName());
if (indexAbstraction.getParentDataStream() != null &&
// avoid valid cases when directly indexing into a backing index
// (for example when directly indexing into .ds-logs-foobar-000001)
concreteIndex.getName().equals(docWriteRequest.index()) == false &&
docWriteRequest.opType() != DocWriteRequest.OpType.CREATE) {
throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams");
}
switch (docWriteRequest.opType()) {
//todo CREATE 和 INDEX 有什么区别呢?
// POST /CREATE/...
// POST /INDEX/....
// CREATE:它会考虑文档是否存在的情况,如果存在一个ID相同的文档的话,CREATE请求会拒绝本次索引操作
// INDEX 则会覆盖..
case CREATE:
case INDEX:
prohibitAppendWritesInBackingIndices(docWriteRequest, metadata);
prohibitCustomRoutingOnDataStream(docWriteRequest, metadata);
IndexRequest indexRequest = (IndexRequest) docWriteRequest;
// todo 获取当前请求的索引元数据
final IndexMetadata indexMetadata = metadata.index(concreteIndex);
// todo 获取索引的mapping信息
MappingMetadata mappingMd = indexMetadata.mappingOrDefault();
// todo 获取版本信息
Version indexCreated = indexMetadata.getCreationVersion();
// todo 解析routing,暂时咱们认为 routing 就是 POST 时传递的routing即可。
// 它里面的逻辑 还是跟别名有关系
indexRequest.resolveRouting(metadata);
// todo 当你未指定文档ID的情况下,内部会给indexRequest生成一个文档ID
indexRequest.process(indexCreated, mappingMd, concreteIndex.getName());
break;
case UPDATE:
TransportUpdateAction.resolveAndValidateRouting(metadata, concreteIndex.getName(),
(UpdateRequest) docWriteRequest);
break;
case DELETE:
docWriteRequest.routing(metadata.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index()));
// check if routing is required, if so, throw error if routing wasn't specified
if (docWriteRequest.routing() == null && metadata.routingRequired(concreteIndex.getName())) {
throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id());
}
break;
default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
}
} catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException e) {
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.type(),
docWriteRequest.id(), e);
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure);
responses.set(i, bulkItemResponse);
// make sure the request gets never processed again
bulkRequest.requests.set(i, null);
}
}
// todo 6.
// first, go over all the requests and create a ShardId -> Operations mapping
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest<?> request = bulkRequest.requests.get(i);
if (request == null) {
continue;
}
String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
//todo 计算本次索引请求要写入的分片ID
// 第一步拿到了一个 OperationRouting 对象,这个对象内部封装了 路由算法..
// 参数1:clusterState 集群状态
// 参数2:索引名称
// 参数3:文档id
// 参数4:routing信息
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(),
request.routing()).shardId();
// todo 如果map中不存在shardId的k-v的情况,那么使用参数2 shard-> new arraylist 在map中插入一条 k-v
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
// todo 将当前索引请求 加入到 分片的请求组 中,再次包装了“索取请求”,包装对象记录了 索引请求 在bulkRequest中的位置
// todo 方便后续设置 响应结果
shardRequests.add(new BulkItemRequest(i, request));
}
if (requestsByShard.isEmpty()) {
// todo 如果分组之后,分组结果是空map的话,直接对端发送 响应
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),
buildTookInMillis(startTimeNanos)));
return;
}
// todo counter:计数器,初始值为 bulk 请求分组之后,分组数量,也就是涉及到的 分片数量
final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
// todo 本机的 nodeId值
String nodeId = clusterService.localNode().getId();
// todo 遍历bulk 按照shard 分组的结果
for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
// todo 当前分片id
final ShardId shardId = entry.getKey();
// todo 当前分片的请求列表
final List<BulkItemRequest> requests = entry.getValue();
// todo 创建 批量分片请求对象 ,内部封装了 打在该分片上的 索取请求
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(),
requests.toArray(new BulkItemRequest[requests.size()]));
// todo 需要等待在线的分片数量 (默认1)
bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
// todo 超时限制
bulkShardRequest.timeout(bulkRequest.timeout());
// todo 路由结果基于的集群状态版本号
bulkShardRequest.routedBasedOnClusterVersion(clusterState.version());
if (task != null) {
bulkShardRequest.setParentTask(nodeId, task.getId());
}
// todo 参数1: 分片批量请求对象 参数二: 响应监听器
shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
@Override
public void onResponse(BulkShardResponse bulkShardResponse) {
for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
// we may have no response if item failed
if (bulkItemResponse.getResponse() != null) {
bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
}
responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
}
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
@Override
public void onFailure(Exception e) {
// create failures for all relevant requests
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
DocWriteRequest<?> docWriteRequest = request.request();
responses.set(request.id(), new BulkItemResponse(request.id(), docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.type(), docWriteRequest.id(), e)));
}
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
private void finishHim() {
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),
buildTookInMillis(startTimeNanos)));
}
});
}
bulkRequest = null; // allow memory for bulk request items to be reclaimed before all items have been completed
}
这段代码非常多,总结一下做了这些事:
- 遍历bulkRequest中的全部“DocWriteRequest”,对于未指定“_id”的写请求,这一步自动生成。
- 根据“id”进行路由算法,找到当前“DocWriteRequest”命中的“shard”,加入到Shard到请求分组中。
- 之后,根据Shard分组的请求,交给“TransportShardBulkAction”处理,其实这一步事交给“TransportShardBulkAction”的爷爷“TransportReplicaitonAction”来作为处理入口,从TransportBulkAction将数据 根据shardId分组,然后让调用TransportReplicationAction来完成分组 数据的插入、更新或者删除操作, 完成了很漂亮的解耦
另外说明一下为什么TransportBulkAction处理完成之后又给到TransportShardBulkAction?
TransportBulkAction:批量请求中的各个操作(如索引、更新、删除操作)根据目标索引和分片进行分组。这是为了确保每个操作能够被路由到正确的分片上进行处理
TransportShardBulkAction:负责在特定分片上执行这些操作。它处理的是已经被TransportBulkAction分解和分派到具体分片的操作,对于每个分片,TransportShardBulkAction 执行实际的文档索引、更新或删除操作。这包括与Lucene索引交互、处理文档版本控制和执行任何必要的索引映射更新。
TransportReplicaitonAction.doExecute
这里又创建了一个ReroutePhase(从这个类也大概知道,它要重写路由,为什么要重新路由,因为当前节点不一定是主分片节点),然后调用了它的run方法,当然最终也是会执行到doRun方法
ReroutePhase.doRun
�
@Override
public void onFailure(Exception e) {
finishWithUnexpectedFailure(e);
}
@Override
protected void doRun() {
setPhase(task, "routing");
final ClusterState state = observer.setAndGetObservedState();
final ClusterBlockException blockException = blockExceptions(state, request.shardId().getIndexName());
if (blockException != null) {
if (blockException.retryable()) {
logger.trace("cluster is blocked, scheduling a retry", blockException);
retry(blockException);
} else {
finishAsFailed(blockException);
}
} else {
// todo 1. 获取当前分片归属的索引的 索引元数据信息
final IndexMetadata indexMetadata = state.metadata().index(request.shardId().getIndex());
if (indexMetadata == null) {
// ensure that the cluster state on the node is at least as high as the node that decided that the index was there
// todo 2.什么时候会发生state.version() < request。routedBasedOnClusterVersion()
// ReroutePhase调用点存在两个:
// 1. 本地TransportBulkAction 通过 TransportShardBulkAction.execute
// 2. 远程RPC发起 indices:data/write/bulk[s][p] 事件,最终也会在这里创建 ReroutePhase 去执行逻辑..
if (state.version() < request.routedBasedOnClusterVersion()) {
logger.trace("failed to find index [{}] for request [{}] despite sender thinking it would be here. " +
"Local cluster state version [{}]] is older than on sending node (version [{}]), scheduling a retry...",
request.shardId().getIndex(), request, state.version(), request.routedBasedOnClusterVersion());
retry(new IndexNotFoundException("failed to find index as current cluster state with version [" + state.version() +
"] is stale (expected at least [" + request.routedBasedOnClusterVersion() + "]",
request.shardId().getIndexName()));
return;
} else {
finishAsFailed(new IndexNotFoundException(request.shardId().getIndex()));
return;
}
}
if (indexMetadata.getState() == IndexMetadata.State.CLOSE) {
finishAsFailed(new IndexClosedException(indexMetadata.getIndex()));
return;
}
if (request.waitForActiveShards() == ActiveShardCount.DEFAULT) {
// if the wait for active shard count has not been set in the request,
// resolve it from the index settings
request.waitForActiveShards(indexMetadata.getWaitForActiveShards());
}
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT :
"request waitForActiveShards must be set in resolveRequest";
// todo 3. 获取“主分片”的路由信息
final ShardRouting primary = state.getRoutingTable().shardRoutingTable(request.shardId()).primaryShard();
if (primary == null || primary.active() == false) {
logger.trace("primary shard [{}] is not yet active, scheduling a retry: action [{}], request [{}], "
+ "cluster state version [{}]", request.shardId(), actionName, request, state.version());
retryBecauseUnavailable(request.shardId(), "primary shard is not active");
return;
}
if (state.nodes().nodeExists(primary.currentNodeId()) == false) {
logger.trace("primary shard [{}] is assigned to an unknown node [{}], scheduling a retry: action [{}], request [{}], "
+ "cluster state version [{}]", request.shardId(), primary.currentNodeId(), actionName, request, state.version());
retryBecauseUnavailable(request.shardId(), "primary shard isn't assigned to a known node.");
return;
}
// todo 4. 获取主分片 所在节点信息
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
// todo 5.说明主分片 就是本地负责的 一个分片
performLocalAction(state, primary, node, indexMetadata);
} else {
// todo 5. 说明主分片是有其他data node负责的一个分片,这里就需要走RPC转发请求了
performRemoteAction(state, primary, node);
}
}
}
这段代码也是极其的长,总结一下做了哪些事:
- 通过集群状态读取PrimaryShard的路由信息,获取出primaryShard归属的Node节点信息。根据Node是否为当前节点来决定后续逻辑
- 如果是本机负责的分片,就执行performLocalAction
- 如果是其他机器负责的分片,就执行performRemoteAction
ReroutePhase.performLocalAction
一直往performAction里面跟,会发现执行RPC “indices:data/write/bulk [p]”,其实就是一次本地调用,rpcHandler为TransportReplicationAction.handlerPrimaryRequest(), 这个rpcHandler是在构造方法里面设置上的
�
在这个handlerPrimaryRequest里面会创建一个AsyncPrimaryAction:
ReroutePhase.performRemoteAction
�
这个performRemoteAction 会执行PRC“indices:data/write/bulk”,这是一次远程RPC调用,请求会发送到PrimaryShard归属节点。那这个PRC请求由哪个rpcHandler处理那?TransportReplicationAction.handleOperationRequest(),在这个方法会再次调用runReroutePhase逻辑,最终也是会走到performLocalAction😄
总结一下:
- 通过集群状态读取出PrimaryShard的路由信息,获取出PrimaryShard归属的Node节点信息。根据Node是否为当前节点来决定后续逻辑
- PrimaryShard为本机节点:执行 RPC “indices:data/write/bulk [p]”,其实是一次本地调用,rpcHandler为 TransportReplicationAction->handlerPrimaryRequest(…),在这个方法创建了一个叫做“AsyncPrimaryAction”的AbstractRunnable的实现对象
- PrimaryShard为其它节点:执行RPC “indices:data/write/bulk”,这是一次远程RPC调用,请求会发送到PrimaryShard归属节点。
AsyncPrimaryAction.doRun
@Override
protected void doRun() throws Exception {
// todo 1. 获取出请求分片的id和对象
final ShardId shardId = primaryRequest.getRequest().shardId();
// todo 2. 获取本机中指定分片id的索引分片对象,这个对象是一个非常核心的对象,内部封装了 索引分配的 底层操作
// 比如:写、查询、get、刷盘
final IndexShard indexShard = getIndexShard(shardId);
// todo 3.获取索引分配对象的分片路由信息
final ShardRouting shardRouting = indexShard.routingEntry();
// we may end up here if the cluster state used to route the primary is so stale that the underlying
// index shard was replaced with a replica. For example - in a two node cluster, if the primary fails
// the replica will take over and a replica will be assigned to the first node.
if (shardRouting.primary() == false) {
throw new ReplicationOperation.RetryOnPrimaryException(shardId, "actual shard is not a primary " + shardRouting);
}
final String actualAllocationId = shardRouting.allocationId().getId();
// todo 如果本机指定的shardId的IndexShard它的allocationId不是 request中定义的 allocationId,这里直接抛出异常
if (actualAllocationId.equals(primaryRequest.getTargetAllocationID()) == false) {
throw new ShardNotFoundException(shardId, "expected allocation id [{}] but found [{}]",
primaryRequest.getTargetAllocationID(), actualAllocationId);
}
final long actualTerm = indexShard.getPendingPrimaryTerm();
if (actualTerm != primaryRequest.getPrimaryTerm()) {
// todo 如果分片实际的primaryTerm并不是请求的primaryTerm,可能在此期间 shardId 下的实例 发生过 主分片的选举
throw new ShardNotFoundException(shardId, "expected allocation id [{}] with term [{}] but found [{}]",
primaryRequest.getTargetAllocationID(), primaryRequest.getPrimaryTerm(), actualTerm);
}
// todo 4. 获取操作凭证
// 参数一:indexShard 主分片对象
// 参数二:request 获取的是 shardBulkRequest对象
// 参数三:封装了一个监听器,获取完成凭证之后的处理逻辑
acquirePrimaryOperationPermit(
indexShard,
primaryRequest.getRequest(),
ActionListener.wrap(
releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)),
e -> {
if (e instanceof ShardNotInPrimaryModeException) {
onFailure(new ReplicationOperation.RetryOnPrimaryException(shardId, "shard is not in primary mode", e));
} else {
onFailure(e);
}
}));
}
�
// todo 参数 primaryShardReference封装了 主分片对象和 操作凭证释放器
void runWithPrimaryShardReference(final PrimaryShardReference primaryShardReference) {
try {
// todo 1. 获取集群状态
final ClusterState clusterState = clusterService.state();
// todo 2. 通过集群状态最终获取到 索引元数据 信息
final IndexMetadata indexMetadata = clusterState.metadata().getIndexSafe(primaryShardReference.routingEntry().index());
final ClusterBlockException blockException = blockExceptions(clusterState, indexMetadata.getIndex().getName());
if (blockException != null) {
logger.trace("cluster is blocked, action failed on primary", blockException);
throw blockException;
}
// todo 如果主分片在请求期间 被转移到其他节点负责了
if (primaryShardReference.isRelocated()) {
// todo 关闭,释放凭证
primaryShardReference.close(); // release shard operation lock as soon as possible
setPhase(replicationTask, "primary_delegation");
// delegate primary phase to relocation target
// it is safe to execute primary phase on relocation target as there are no more in-flight operations where primary
// phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase.
// todo 获取最新的主分片路由信息
final ShardRouting primary = primaryShardReference.routingEntry();
assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary;
final Writeable.Reader<Response> reader = TransportReplicationAction.this::newResponseInstance;
// todo 获取转移到的节点
DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId());
// todo 转发请求
transportService.sendRequest(relocatingNode, transportPrimaryAction,
new ConcreteShardRequest<>(primaryRequest.getRequest(), primary.allocationId().getRelocationId(),
primaryRequest.getPrimaryTerm()), transportOptions,
new ActionListenerResponseHandler<Response>(onCompletionListener, reader) {
@Override
public void handleResponse(Response response) {
setPhase(replicationTask, "finished");
super.handleResponse(response);
}
@Override
public void handleException(TransportException exp) {
setPhase(replicationTask, "finished");
super.handleException(exp);
}
});
} else {
// todo 3.正常情况走这里:即主分片由当前节点负责
setPhase(replicationTask, "primary");
// todo 4. 创建监听器:“主分片写结果监听器” 调用时机:主分片写完成+ 副本分片也写完成的时候 才会调用这个监听器
final ActionListener<Response> responseListener = ActionListener.wrap(response -> {
// todo onResponse流程
adaptResponse(response, primaryShardReference.indexShard);
// todo 默认:syncGlobalCheckpointAfterOperation 为true
if (syncGlobalCheckpointAfterOperation) {
try {
primaryShardReference.indexShard.maybeSyncGlobalCheckpoint("post-operation");
} catch (final Exception e) {
// only log non-closed exceptions
if (ExceptionsHelper.unwrap(
e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
// intentionally swallow, a missed global checkpoint sync should not fail this operation
logger.info(
new ParameterizedMessage(
"{} failed to execute post-operation global checkpoint sync",
primaryShardReference.indexShard.shardId()), e);
}
}
}
primaryShardReference.close(); // release shard operation lock before responding to caller
setPhase(replicationTask, "finished");
onCompletionListener.onResponse(response);
}, e -> handleException(primaryShardReference, e));
// todo 6. 封装了很多参数,然后创建了ReplicationOperation,这个对象又去封装了 主分片写操作 + 副本写操作,当写操作都完成,去调用
/**
* todo
* 参数一:primaryRequest.getRequest() => ShardBulkRequest 对象
* 参数二:primaryShardReference
* 参数三:写结果监听器
* 参数四:newReplicasProxy() => 拿到一个 ReplicasProxy 实例,该实例封装了 向 副本同步数据的 RPC 请求逻辑..
* 参数五:logger
* 参数六:threadPool 线程池(transportReplicationAction的资源..)
* 参数七:actionName 值:"indices:data/write/bulk[s]"
* 参数八:primaryRequest.getPrimaryTerm() => 主分片的primaryTerm
* 参数九:initialRetryBackoffBound 初始值50毫秒,当向副本所在Node发起RPC同步数据失败时,重试的时间间隔 初始值,
* 参数十:retryTimeout 向副本所在node发起rpc同步数据,重试的超时限制
*
*/
new ReplicationOperation<>(primaryRequest.getRequest(), primaryShardReference,
responseListener.map(result -> result.finalResponseIfSuccessful),
newReplicasProxy(), logger, threadPool, actionName, primaryRequest.getPrimaryTerm(), initialRetryBackoffBound,
retryTimeout)
.execute();
}
} catch (Exception e) {
handleException(primaryShardReference, e);
}
}
这里还是总结一下:
- 先获取请求的分片id,通过分片id拿到“索取分片对象”:indexShard
- 获取索引分片对象的分片路由对象
- 获取操作凭证
- 先获取集群状态对象,然后通过**集群状态对象、分片路由对象 **来获取索引元数据
- 创建ReplicationOperation对象,执行execute方法
�
ReplicationOperation.execute
这里没什么核心的,只要关注primary.perform方法就行
PrimaryShardReference.�perform
这个一个抽象方法,最终被子类的TransportWriteAction实现
TransportWriteAction.shardOperationOnPrimary
同样,这个dispatchedShardOperationOnPrimary依旧是个抽象方法,最终会被TransportShardBulkAction实现,兜兜转转又回来了。。
TransportShardBulkAction.dispatchedShardOperationOnPrimary
接下来会调用到transportShardBulkAction.performOnPrimary方法
�
TransportShardBulkAciton.performOnPrimary
�TransportShardBulkAciton.executeBulkItemRequest
static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier,
MappingUpdatePerformer mappingUpdater, Consumer<ActionListener<Void>> waitForMappingUpdate,
ActionListener<Void> itemDoneListener) throws Exception {
// todo 1. 获取当前子请求的操作类型:目前仅考虑 INDEX CREATE 这两种情况
final DocWriteRequest.OpType opType = context.getCurrent().opType();
final UpdateHelper.Result updateResult;
if (opType == DocWriteRequest.OpType.UPDATE) {
final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent();
try {
updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), nowInMillisSupplier);
} catch (Exception failure) {
// we may fail translating a update to index or delete operation
// we use index result to communicate failure while translating update request
final Engine.Result result =
new Engine.IndexResult(failure, updateRequest.version());
context.setRequestToExecute(updateRequest);
context.markOperationAsExecuted(result);
context.markAsCompleted(context.getExecutionResult());
return true;
}
// execute translated update request
switch (updateResult.getResponseResult()) {
case CREATED:
case UPDATED:
IndexRequest indexRequest = updateResult.action();
IndexMetadata metadata = context.getPrimary().indexSettings().getIndexMetadata();
MappingMetadata mappingMd = metadata.mappingOrDefault();
indexRequest.process(metadata.getCreationVersion(), mappingMd, updateRequest.concreteIndex());
context.setRequestToExecute(indexRequest);
break;
case DELETED:
context.setRequestToExecute(updateResult.action());
break;
case NOOP:
context.markOperationAsNoOp(updateResult.action());
context.markAsCompleted(context.getExecutionResult());
return true;
default:
throw new IllegalStateException("Illegal update operation " + updateResult.getResponseResult());
}
} else {
// todo 2. 执行到这里,说明当前请求操作类型为:index create 或者delete中一个
context.setRequestToExecute(context.getCurrent());
updateResult = null;
}
assert context.getRequestToExecute() != null; // also checks that we're in TRANSLATED state
// todo 3. 主分片对象
final IndexShard primary = context.getPrimary();
// todo 4. 获取请求的version值,version是干嘛的那,es乐观锁
final long version = context.getRequestToExecute().version();
final boolean isDelete = context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE;
final Engine.Result result;
if (isDelete) {
final DeleteRequest request = context.getRequestToExecute();
result = primary.applyDeleteOperationOnPrimary(version, request.type(), request.id(), request.versionType(),
request.ifSeqNo(), request.ifPrimaryTerm());
} else {
// todo 从上下文中获取当前的 索取请求
final IndexRequest request = context.getRequestToExecute();
/**
* todo
* 参数一:version 版本
* 参数二:versionType 类型(默认:INTERNAL)
* 参数三:SourceToParse对象 (内部引擎基于该对象进行索引工作)
* 参数四:seqNo request.ifSeqNo() 新的版本控制
* 参数五:primaryTerm request.ifPrimaryTerm() 新的版本控制
* 参数六:request.getAutoGeneratedTimestamp() 时间戳
* 参数七:boolean request.isRetry() 是否为重试
*/
result = primary.applyIndexOperationOnPrimary(version, request.versionType(), new SourceToParse(
request.index(), request.type(), request.id(), request.source(), request.getContentType(), request.routing()),
request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry());
}
总结一下这里:
- 获取当前请求的操作类型
- 从上下文里面获取“主分片”对象, 因为后续要通过它完成写操作
- 封装SourceToParse
- 调用主分片的applyIndexOperationOnPrimary,进行底层写操作
- 根据不同的调用结果,进行处理
IndexShard.applyIndexOperationOnPrimary
�
继续往里面看:
private Engine.IndexResult applyIndexOperation(
Engine engine, long seqNo, long opPrimaryTerm, long version,
@Nullable VersionType versionType, long ifSeqNo, long ifPrimaryTerm,
long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin,
SourceToParse sourceToParse) throws IOException {
// 断言,确保操作的主术语不大于当前分片的主术语
assert opPrimaryTerm <= getOperationPrimaryTerm()
: "op term [ " + opPrimaryTerm + " ] > shard term [" + getOperationPrimaryTerm() + "]";
// 确保当前的操作源是允许写入的
ensureWriteAllowed(origin);
Engine.Index operation;
try {
// 解析文档类型
final String resolvedType = mapperService.resolveDocumentType(sourceToParse.type());
final SourceToParse sourceWithResolvedType;
// 如果解析的文档类型与原始类型相同,使用原始的sourceToParse;否则创建一个新的sourceToParse实例
if (resolvedType.equals(sourceToParse.type())) {
sourceWithResolvedType = sourceToParse;
} else {
sourceWithResolvedType = new SourceToParse(sourceToParse.index(), resolvedType, sourceToParse.id(),
sourceToParse.source(), sourceToParse.getXContentType(), sourceToParse.routing());
}
// 准备索引操作
operation = prepareIndex(docMapper(resolvedType), sourceWithResolvedType,
seqNo, opPrimaryTerm, version, versionType, origin, autoGeneratedTimeStamp, isRetry, ifSeqNo, ifPrimaryTerm);
// 检查并更新动态映射
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
return new Engine.IndexResult(update);
}
} catch (Exception e) {
// 解析或映射更新期间的任何异常都视为文档级失败,
// 不会导致关闭分片的副作用
verifyNotClosed(e);
return new Engine.IndexResult(e, version, opPrimaryTerm, seqNo);
}
// 执行索引操作,并返回操作结果
return index(engine, operation);
}
�继续往里面看index方法:
private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOException {
// 将活动状态设为真,可能用于表示开始了一个新的索引操作
active.set(true);
final Engine.IndexResult result;
// 在实际索引之前,调用监听器的 preIndex 方法,可能用于执行一些预处理操作
index = indexingOperationListeners.preIndex(shardId, index);
try {
// 如果启用了 Trace 日志级别,则记录详细的索引操作信息
if (logger.isTraceEnabled()) {
// 记录索引请求的详细信息,包括文档类型、ID、序列号等,注意这里没有将源数据转为字符串,以避免编码错误
logger.trace("index [{}][{}] seq# [{}] allocation-id [{}] primaryTerm [{}] operationPrimaryTerm [{}] origin [{}]",
index.type(), index.id(), index.seqNo(), routingEntry().allocationId(), index.primaryTerm(), getOperationPrimaryTerm(),
index.origin());
}
// 执行索引操作,将文档索引到 Elasticsearch
result = engine.index(index);
// 再次检查是否启用了 Trace 日志级别,如果是,则记录索引完成后的状态
if (logger.isTraceEnabled()) {
logger.trace("index-done [{}][{}] seq# [{}] allocation-id [{}] primaryTerm [{}] operationPrimaryTerm [{}] origin [{}] " +
"result-seq# [{}] result-term [{}] failure [{}]",
index.type(), index.id(), index.seqNo(), routingEntry().allocationId(), index.primaryTerm(), getOperationPrimaryTerm(),
index.origin(), result.getSeqNo(), result.getTerm(), result.getFailure());
}
} catch (Exception e) {
// 如果索引操作过程中抛出异常,记录异常信息
if (logger.isTraceEnabled()) {
logger.trace(new ParameterizedMessage(
"index-fail [{}][{}] seq# [{}] allocation-id [{}] primaryTerm [{}] operationPrimaryTerm [{}] origin [{}]",
index.type(), index.id(), index.seqNo(), routingEntry().allocationId(), index.primaryTerm(), getOperationPrimaryTerm(),
index.origin()
), e);
}
// 调用监听器的 postIndex 方法,可能用于处理异常后的清理工作
indexingOperationListeners.postIndex(shardId, index, e);
// 重新抛出异常,使得调用者能够处理或记录这个异常
throw e;
}
// 如果索引操作成功,调用监听器的 postIndex 方法,可能用于执行一些后处理操作
indexingOperationListeners.postIndex(shardId, index, result);
// 返回索引结果
return result;
}
�看这里的engine.index方法
InternalEngine
�最终调用到这里,到这里,是不是大彻大悟了😄,
indexWriter就是lucene的API了,想学Lucene的可以再看它的源码去
�
![image.png](https://cdn.nlark.com/yuque/0/2024/png/772987/1713785040929-dc113277-9f84-46dc-8fb1-343cdbfbac1e.png#averageHue=%23212226&clientId=u3c9595f3-33f7-4&from=paste&height=208&id=u4e6be745&originHeight=416&originWidth=2096&originalType=binary&ratio=2&rotation=0&s