FlinkSql使用ES sink并指定主键,为什么数据还是会被覆盖?
1. 问题描述
根据ES connector文档中的描述,创建ES表并指定主键后将采用upsert模式。
但是在实际的使用过程中却发现部分数据仍然存在被直接覆盖的问题。
举个例子,假如ES中本来的数据如下图中示例1所示,在FlinkSql中要写入的数据如示例2所示,按照upsert的理解执行结果应该如示例3所示,但实际结果为示例4所示,即对整个json文档进行了覆盖,而不是部分字段的更新。
实际发生问题的场景如下:
- 情形一:在任务启动前es中已经存在数据,任务启动后,发现部分主键key对应的json文档直接被覆盖成任务中新写入的数据
- 情形二:任务中存在A和B两个insert into语句往同一个es表中写入数据,A和B除主键key之外其他写入的字段不完全相同,任务启动后,发现部分主键key对应的数据只包含A或B的结果。
上述两种情景的sql中均包含join、groupBy等操作。
2. 问题排查
针对上述问题,在网上查找一番后并未直接找到结果。大多数给出的答案正如官方文档所述,会执行upsert操作。
针对自己在实际使用过程中遇到的问题,尝试从源码中一探究竟。
接下来,首先需要找到源码的切入点,从切入点设置断点进行逐步分析。
2.1. 自定义sink原理
源码基于flink 1.14.4
根据官方文档中对自定义sink的描述,connector sink的的工作原理如下
元数据的内容由create table语句所定义,通过CatalogTable
的实例表示,该实例表示Catalog
中未解析的元数据,包含可以在create table语句中表示的所有特征,框架将其解析为ResolvedCatalogTable
实例(一个经过验证的CatalogTable),然后再将其传递给DynamicTableSinkFactory
,最终传入到DynamicTableSink
,用于insert into语句的写入。
DynamicTableSinkFactory
负责提供connector的具体实现逻辑。用于将CatalogTable
的元数据转化为DynamicTableSink
。
其接口中仅有一个方法,返回DynamicTableSink
实例。
public interface DynamicTableSinkFactory extends DynamicTableFactory {
/**
* 根据CatalogTable的元数据转化为DynamicTableSink,CatalogTable通过org.apache.flink.table.factories.DynamicTableFactory.Context中获取
* 实现应该此方法中执行create table中with选项的验证
*/
DynamicTableSink createDynamicTableSink(Context context);
}
DynamicTableSink
负责将动态表sink到外部存储。
动态表的内容可以被视为一个changelog,所有的更改都会连续写出,直到changelog结束为止。给定的ChangelogMode
指示sink在运行时接受的一组更改。
该接口的实例可以看作是最终生成用于写入实际数据的具体运行时实现的工场。
最后planner将调用getSinkRuntimeProvider
方法来获取运行时实现的提供程序,即SinkRuntimeProvider
的实例。
因此自定义sink的核心是通过DynamicTableSink#getSinkRuntimeProvider
方法向planner提供一个SinkRuntimeProvider
实例,该实例中包含了向外部存储写入数据的关键。
public interface DynamicTableSink {
/**
* 返回sink在运行时接受的一组更改
* planner可以提出建议,但sink拥有最终决定权。如果planner不支持此模式,它将引发错误。
*/
ChangelogMode getChangelogMode(ChangelogMode requestedMode);
/**
* 返回用于写入数据的运行时实现,即SinkRuntimeProvider的实例
* 表运行时独立于provider接口,要求sink实现接受内部数据结构(RowData)
*/
SinkRuntimeProvider getSinkRuntimeProvider(Context context);
interface Context {
boolean isBounded();
<T> TypeInformation<T> createTypeInformation(DataType consumedDataType);
DataStructureConverter createDataStructureConverter(DataType consumedDataType);
}
interface DataStructureConverter extends RuntimeConverter {
@Nullable
Object toExternal(@Nullable Object internalStructure);
}
/**
* 提供用于写入数据的实际运行时实现
* SinkProvider 是sink的核心接口。
* flink-table-api-java-bridge 中的 SinkFunctionProvider 和 OutputFormatProvider 可用于向后兼容。 *
*/
interface SinkRuntimeProvider {
// marker interface
}
}
了解了自定义sink的实现原理后,接下里看下es sink的具体实现。
2.2. ES sink实现
本文中使用的es connector版本为
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>1.14.4</version>
</dependency>
DynamicTableSinkFactory
和DynamicTableSink
在es sink中的实现分别为Elasticsearch6DynamicSinkFactory
,Elasticsearch6DynamicSink
。
Elasticsearch6DynamicSinkFactory
的实现如下,获取到scheme后对schema和配置参数进行验证,最后返回Elasticsearch6DynamicSink
实例。
public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory {
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
// 从CatalogTable中获取schema
TableSchema tableSchema = context.getCatalogTable().getSchema();
// 验证主键
ElasticsearchValidationUtils.validatePrimaryKey(tableSchema);
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
// 得到format
final EncodingFormat<SerializationSchema<RowData>> format =
helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION);
helper.validate();
Configuration configuration = new Configuration();
context.getCatalogTable().getOptions().forEach(configuration::setString);
Elasticsearch6Configuration config =
new Elasticsearch6Configuration(configuration, context.getClassLoader());
// 验证参数
validate(config, configuration);
// 最终返回Elasticsearch6DynamicSink
return new Elasticsearch6DynamicSink(
format, config, TableSchemaUtils.getPhysicalSchema(tableSchema));
}
}
Elasticsearch6DynamicSink
的实现如下,getChangelogMode
方法表示ES sink接受除UPDATE_BEFORE类型之外的其他更改。getSinkRuntimeProvider
方法返回了SinkFunctionProvider
实例,为SinkRuntimeProvider
的子接口。
通过实现SinkFunctionProvider
接口中的SinkFunction<RowData> createSinkFunction()
方法,从而得到SinkFunction
实例。
final class Elasticsearch6DynamicSink implements DynamicTableSink {
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
ChangelogMode.Builder builder = ChangelogMode.newBuilder();
for (RowKind kind : requestedMode.getContainedKinds()) {
// 不接受update_before类型的更改
if (kind != RowKind.UPDATE_BEFORE) {
builder.addContainedKind(kind);
}
}
return builder.build();
}
@Override
public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
return () -> {
SerializationSchema<RowData> format =
this.format.createRuntimeEncoder(context, schema.toRowDataType());
// 负责生成真正和es交互的ActionRequest实例,如IndexRequest、UpdateRequest、DeleteRequest等
final RowElasticsearchSinkFunction upsertFunction =
new RowElasticsearchSinkFunction(
IndexGeneratorFactory.createIndexGenerator(config.getIndex(), schema),
config.getDocumentType(),
format,
XContentType.JSON,
REQUEST_FACTORY,
KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()));
final ElasticsearchSink.Builder<RowData> builder =
builderProvider.createBuilder(config.getHosts(), upsertFunction);
builder.setFailureHandler(config.getFailureHandler());
builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20));
builder.setBulkFlushInterval(config.getBulkFlushInterval());
builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
// we must overwrite the default factory which is defined with a lambda because of a bug
// in shading lambda serialization shading see FLINK-18006
if (config.getUsername().isPresent()
&& config.getPassword().isPresent()
&& !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())
&& !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get())) {
builder.setRestClientFactory(
new AuthRestClientFactory(
config.getPathPrefix().orElse(null),
config.getUsername().get(),
config.getPassword().get()));
} else {
builder.setRestClientFactory(
new DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
}
// 最终是得到ElasticsearchSink
final ElasticsearchSink<RowData> sink = builder.build();
if (config.isDisableFlushOnCheckpoint()) {
sink.disableFlushOnCheckpoint();
}
return sink;
};
}
}
public interface SinkFunctionProvider
extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
/** 返回SinkFunction的实例 */
SinkFunction<RowData> createSinkFunction();
}
因此planner在ES sink的实现中最终得到了ElasticsearchSink
的实例。该实例负责调用ES相应API与ES进行数据交互。
2.3. 实际与ES的交互
通过ElasticsearchSink
的继承关系,可以发现该类直接继承ElasticsearchSinkBase
,并最终继承了SinkFunction
。
SinkFunction
是实现自定义sink的根接口。每条写入sink的数据都将会调用该接口中的invoke
方法。ElasticsearchSinkBase
对此方法的实现如下,在其中通过调用ElasticsearchSinkFunction#process
方法进行数据写入。
private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
@Override
public void invoke(T value, Context context) throws Exception {
checkAsyncErrorsAndRequests();
// 调用ElasticsearchSinkFunction的process方法
elasticsearchSinkFunction.process(value, getRuntimeContext(), requestIndexer);
}
在ES sink中ElasticsearchSinkFunction
的唯一实现是RowElasticsearchSinkFunction
类,其关键代码如下。
class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<RowData> {
private final RequestFactory requestFactory;
@Override
/**
* 每条数据和ES实际交互的方法
*/
public void process(RowData element, RuntimeContext ctx, RequestIndexer indexer) {
// 根据每条数据的RowKind类型得到es api的UpdateRequest或DeleteRequest实例
switch (element.getRowKind()) {
case INSERT:
case UPDATE_AFTER:
// insert和update_after时,进行更新
processUpsert(element, indexer);
break;
case UPDATE_BEFORE:
case DELETE:
// update_before时,删除数据
processDelete(element, indexer);
break;
default:
throw new TableException("Unsupported message kind: " + element.getRowKind());
}
}
// 对数据进行IndexRequest或UpdateRequest请求
private void processUpsert(RowData row, RequestIndexer indexer) {
final byte[] document = serializationSchema.serialize(row);
final String key = createKey.apply(row);
if (key != null) {
// 指定主键时,得到UpdateRequest
final UpdateRequest updateRequest =
requestFactory.createUpdateRequest(
indexGenerator.generate(row), docType, key, contentType, document);
indexer.add(updateRequest);
} else {
// 未指定主键时,得到IndexRequest
final IndexRequest indexRequest =
requestFactory.createIndexRequest(
indexGenerator.generate(row), docType, key, contentType, document);
indexer.add(indexRequest);
}
}
// 对数据进行DeleteRequest请求
private void processDelete(RowData row, RequestIndexer indexer) {
final String key = createKey.apply(row);
final DeleteRequest deleteRequest =
requestFactory.createDeleteRequest(indexGenerator.generate(row), docType, key);
indexer.add(deleteRequest);
}
}
其中requestFactory.createUpdateRequest
、requestFactory.createIndexRequest
、requestFactory.createDeleteRequest
的方法分别如下
private static class Elasticsearch6RequestFactory implements RequestFactory {
@Override
public UpdateRequest createUpdateRequest(
String index,
String docType,
String key,
XContentType contentType,
byte[] document) {
return new UpdateRequest(index, docType, key)
.doc(document, contentType)
.upsert(document, contentType);
}
@Override
public IndexRequest createIndexRequest(
String index,
String docType,
String key,
XContentType contentType,
byte[] document) {
return new IndexRequest(index, docType, key).source(document, contentType);
}
@Override
public DeleteRequest createDeleteRequest(String index, String docType, String key) {
return new DeleteRequest(index, docType, key);
}
}
到此,我们已经找到了ES sink和ES进行交互的具体逻辑,对于changelog中每条数据根据其RowKind
类型决定执行INSERT、UPDATE还是DELETE请求。并且根据是否指定主键,细分为INSERT或UPDATE。
- 指定主键,只会进行UPDATE和DELETE操作。
- 未指定主键,只进行INSERT操作。
综上所述,在指定主键的情况下,如果changelog中只有INSERT和UPDATE_AFTER类型的数据时,那么仅调用UpdateRequest API将数据写入ES,数据只会执行Upsert操作。但是针对实际结果中发生数据覆盖的情况,只可能是存在DELETE类型的数据,导致其执行了processDelete方法,先将数据删除,随着changelog中数据不断到来再次执行processUpsert方法。最终导致结果中只保留了最新写入的数据,即看到的数据覆盖的情况。
DELETE类型的数据是否会真正生成?什么时机生成?
3. 验证演示
验证使用的示例代码如下,kafka中为随机生成的账单id从1到5,时间递增的账单数据。SQL加工逻辑为汇总每个id、每小时的账单金额,将结果写入到使用账单id作为主键的es表中。
public class EsConnectorSQLTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = EnvConf.getEnvWithUI();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
// 创建source表,交易表
tEnv.executeSql("CREATE TABLE transactions (\n" +
" account_id BIGINT,\n" +
" amount BIGINT,\n" +
" transaction_time TIMESTAMP(3),\n" +
" WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'transactions',\n" +
" 'properties.bootstrap.servers' = '******',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'format' = 'csv'\n" +
")");
// 创建sink表,消费报告表
tEnv.executeSql("CREATE TABLE spend_report (\n" +
" account_id BIGINT,\n" +
" log_ts TIMESTAMP(3),\n" +
" amount BIGINT\n," +
" PRIMARY KEY (account_id) NOT ENFORCED" +
") WITH (\n" +
" 'connector' = 'elasticsearch-6',\n" +
" 'hosts' = ******n" +
" 'index' = '******',\n" +
" 'document-type' = '******',\n" +
" 'sink.bulk-flush.max-actions' = '1',\n" +
" 'username' = '******',\n" +
" 'password' = '******',\n" +
" 'format' = 'json'\n" +
")");
// 从交易表中写数据到消费报告表中
Table report = report(tEnv.from("transactions"));
report.executeInsert("spend_report");
// 通过如下代码观察select的执行结果数据,changelog
// report.execute().print();
}
public static Table report(Table transactions) {
/*
* 将每个账号在小时内的金额进行汇总
* */
return transactions.select(
$("account_id"),
$("transaction_time").floor(TimeIntervalUnit.HOUR).as("log_ts"),
$("amount"))
.groupBy($("account_id"), $("log_ts"))
.select(
$("account_id"),
$("log_ts"),
$("amount").sum().as("amount"));
}
}
通过在RowElasticsearchSinkFunction#process
方法中设置断点,可以观察到对于同一个key的确产生了-D
类型的数据,导致删除了ES中数据,后续再执行upsert方法时,由于之前的数据已经被删除,所以更新后只剩下了新写入的数据。
通过观察示例代码main方法中report.execute().print()
结果,对于各个id首次聚合结果类型为+I
,当数据执行更新时,先生成-U
类型结果,将原先的结果回撤,然后紧跟着生成+U
类型的最新结果。
selece的结果中并不会产生-D
类型的数据,但是实际到达RowElasticsearchSinkFunction#process
方法时却包含了-D
类型的数据,因此-D
类型的数据并不是有select语句直接得到的。
+----+----------------------+-------------------------+----------------------+
| op | account_id | log_ts | amount |
+----+----------------------+-------------------------+----------------------+
| +I | 5 | 2565-04-12 05:00:00.000 | 461 |
| +I | 1 | 2565-04-12 05:00:00.000 | 517 |
| +I | 2 | 2565-04-12 05:00:00.000 | 630 |
| +I | 3 | 2565-04-12 05:00:00.000 | 640 |
| +I | 4 | 2565-04-12 05:00:00.000 | 356 |
| -U | 5 | 2565-04-12 05:00:00.000 | 461 |
| +U | 5 | 2565-04-12 05:00:00.000 | 1140 |
| -U | 1 | 2565-04-12 05:00:00.000 | 517 |
| +U | 1 | 2565-04-12 05:00:00.000 | 688 |
| +I | 2 | 2565-04-12 06:00:00.000 | 35 |
| +I | 3 | 2565-04-12 06:00:00.000 | 524 |
| +I | 4 | 2565-04-12 06:00:00.000 | 290 |
| +I | 5 | 2565-04-12 06:00:00.000 | 612 |
| +I | 1 | 2565-04-12 06:00:00.000 | 12 |
| -U | 2 | 2565-04-12 06:00:00.000 | 35 |
| +U | 2 | 2565-04-12 06:00:00.000 | 903 |
| -U | 3 | 2565-04-12 06:00:00.000 | 524 |
| +U | 3 | 2565-04-12 06:00:00.000 | 1353 |
3.1. SinkMateriallizer任务
回到示例代码中,数据的加工逻辑是对数据进行Group by后的聚合处理,查看其作业DAG,可以发现有3个任务节点,分别是KafkaSource->GroupAggregate->SinkMateriallizer,前两个任务节点的作用容易理解,第三个名为SinkMaterializer节点除了最后将数据写入到ES外,还做了什么操作?算子名和算子中的upsertMaterialize=[true]
表示什么含义?
根据算子名称并结合断点分析,数据会先进入到GroupAggFunction#processElement
方法中执行聚合计算,并将聚合结果继续发送给了下游
SinkUpsertMaterializer#processElement
方法中,聚合结果最终在此方法中生成供上述RowElasticsearchSinkFunction#process
方法中写入到ES的数据,接下来重点了解下SinkUpsertMaterializer#processElement
的具体实现。
!!! note “”
SinkUpsertMaterializer类的作用是为了解决changelog中乱序流造成的结果不正确问题。
该维护了一个List<RowData>
的状态,该状态中保存着changelog中相同key的结果数据,并且根据已保存到状态中的数据进行乱序处理。
public class SinkUpsertMaterializer extends TableStreamOperator<RowData>
implements OneInputStreamOperator<RowData, RowData> {
// Buffer of emitted insertions on which deletions will be applied first.
// RowKind可能是+I或+U,并且在应用删除时将被忽略
private transient ValueState<List<RowData>> state;
@Override
public void processElement(StreamRecord<RowData> element) throws Exception {
final RowData row = element.getValue();
List<RowData> values = state.value();
if (values == null) {
values = new ArrayList<>(2);
}
switch (row.getRowKind()) {
case INSERT:
case UPDATE_AFTER:
row.setRowKind(values.isEmpty() ? INSERT : UPDATE_AFTER);
values.add(row);
collector.collect(row);
break;
case UPDATE_BEFORE:
case DELETE:
final int lastIndex = values.size() - 1;
final int index = removeFirst(values, row);
if (index == -1) {
LOG.info(STATE_CLEARED_WARN_MSG);
return;
}
if (values.isEmpty()) {
// 此处会将元素的RowKind重新赋值为-D
row.setRowKind(DELETE);
collector.collect(row);
} else if (index == lastIndex) {
// Last row has been removed, update to the second last one
final RowData latestRow = values.get(values.size() - 1);
latestRow.setRowKind(UPDATE_AFTER);
collector.collect(latestRow);
}
break;
}
if (values.isEmpty()) {
state.clear();
} else {
state.update(values);
}
}
private int removeFirst(List<RowData> values, RowData remove) {
final Iterator<RowData> iterator = values.iterator();
int i = 0;
while (iterator.hasNext()) {
final RowData row = iterator.next();
// 忽略RowKind类型,对RowData中其他数据进行比较
remove.setRowKind(row.getRowKind());
if (equaliser.equals(row, remove)) {
iterator.remove();
return i;
}
i++;
}
return -1;
}
}
processElement方法大致处理逻辑如下流程图所示。
举个例子,
情形一:假如source表中有数据如左侧所示,经过示例sql处理后,输出了右侧的changelog结果
changelog
+----+------------+---------------+--------+
| op | account_id | log_hour | amount |
+----+------------+---------------+--------+
| +I | 3 | 2565-04-12 05 | 640 |
| +I | 3 | 2565-04-12 06 | 524 |
| -U | 3 | 2565-04-12 06 | 524 |
| +U | 3 | 2565-04-12 06 | 1353 |
此种情况下changelog数据经过processElement方法后输出结果为:
- 当第一条数据到达后,数据中RowKind=+I,状态为空,输出RowKind=+I的结果到下游;
- 当第二条数据到达后,数据中RowKind=+I,状态不为空,输出RowKind=+U的结果到下游,这时状态中保存了changlog中第一条和第二条数据内容;
- 当第三条数据到达后,数据中RowKind=-U,状态集合中最大index=1(lastindex),依次比较集合中元素和当前数据内容是否相同,发现集合中第二条数据和当前数据一致,因此返回index=1,并将从集合中移除index=1的元素。此时集合中仅剩amount=640的一个元素;
接下来满足lastindex=index的条件,因此获取集合中最后一条数据,即amount=640的那条数据,输出Rowkind= +U的结果到下游; - 当第四条数据到达后,数据中RowKind=+U,状态不为空,输出RowKind=+U的结果到下游。
情形二:假如source表中有数据如左侧所示,经过示例sql处理后,输出了右侧的changelog结果
changelog
+----+------------+---------------+--------+
| op | account_id | log_hour | amount |
+----+------------+---------------+--------+
| +I | 3 | 2565-04-12 06 | 524 |
| -U | 3 | 2565-04-12 06 | 524 |
| +U | 3 | 2565-04-12 06 | 1353 |
此种情况下changelog数据经过processElement方法后输出结果为:
- 当第一条数据到达后,数据中RowKind=+I,状态为空,输出RowKind=+I的结果到下游;
- 当第二条数据到达后,数据中RowKind=-U,集合中最大index=0(lastindex),依次比较集合中元素和当前数据内容是否相同,发现集合中第一条数据和当前数据一致,因此返回index=0,并将从集合中移除index=0的元素。此时集合为空。
接下来满足了集合为空的条件,输出Rowkind=-D的结果到下游; - 当第三条数据到达后,数据中RowKind=+U,状态为空,输出RowKind=+I的结果到下游。
4. 总结
导致写入到ES的数据从结果上看没有执行upsert而是执行了insert的根本原因是中间过程中执行了delete操作,将es中原始数据删除,然后继续写入了新数据。如果在delete执行后未继续写入新数据前停止任务,观察到的结果过将会是ES中数据条数减少。
而导致从ES中删除数据的原因是因为经过SinkUpsertMaterializer类处理后,将部分RowKind=-U
的数据转化成了RowKind=-D
类型的数据,从而触发ES sink执行DeleteRequest
请求。