FlinkSql使用ES sink并指定主键,为什么数据还是会被覆盖?

FlinkSql使用ES sink并指定主键,为什么数据还是会被覆盖?

1. 问题描述

根据ES connector文档中的描述,创建ES表并指定主键后将采用upsert模式。
但是在实际的使用过程中却发现部分数据仍然存在被直接覆盖的问题。
举个例子,假如ES中本来的数据如下图中示例1所示,在FlinkSql中要写入的数据如示例2所示,按照upsert的理解执行结果应该如示例3所示,但实际结果为示例4所示,即对整个json文档进行了覆盖,而不是部分字段的更新。
在这里插入图片描述
实际发生问题的场景如下:

  • 情形一:在任务启动前es中已经存在数据,任务启动后,发现部分主键key对应的json文档直接被覆盖成任务中新写入的数据
  • 情形二:任务中存在A和B两个insert into语句往同一个es表中写入数据,A和B除主键key之外其他写入的字段不完全相同,任务启动后,发现部分主键key对应的数据只包含A或B的结果。
    上述两种情景的sql中均包含join、groupBy等操作。

2. 问题排查

针对上述问题,在网上查找一番后并未直接找到结果。大多数给出的答案正如官方文档所述,会执行upsert操作。
针对自己在实际使用过程中遇到的问题,尝试从源码中一探究竟。
接下来,首先需要找到源码的切入点,从切入点设置断点进行逐步分析。

2.1. 自定义sink原理

源码基于flink 1.14.4

根据官方文档中对自定义sink的描述,connector sink的的工作原理如下
在这里插入图片描述
元数据的内容由create table语句所定义,通过CatalogTable的实例表示,该实例表示Catalog中未解析的元数据,包含可以在create table语句中表示的所有特征,框架将其解析为ResolvedCatalogTable实例(一个经过验证的CatalogTable),然后再将其传递给DynamicTableSinkFactory,最终传入到DynamicTableSink,用于insert into语句的写入。

DynamicTableSinkFactory负责提供connector的具体实现逻辑。用于将CatalogTable的元数据转化为DynamicTableSink
其接口中仅有一个方法,返回DynamicTableSink实例。

public interface DynamicTableSinkFactory extends DynamicTableFactory {
    /**
     * 根据CatalogTable的元数据转化为DynamicTableSink,CatalogTable通过org.apache.flink.table.factories.DynamicTableFactory.Context中获取
     * 实现应该此方法中执行create table中with选项的验证
     */
    DynamicTableSink createDynamicTableSink(Context context);
}

DynamicTableSink负责将动态表sink到外部存储。
动态表的内容可以被视为一个changelog,所有的更改都会连续写出,直到changelog结束为止。给定的ChangelogMode指示sink在运行时接受的一组更改。
该接口的实例可以看作是最终生成用于写入实际数据的具体运行时实现的工场。
最后planner将调用getSinkRuntimeProvider方法来获取运行时实现的提供程序,即SinkRuntimeProvider的实例。

因此自定义sink的核心是通过DynamicTableSink#getSinkRuntimeProvider方法向planner提供一个SinkRuntimeProvider实例,该实例中包含了向外部存储写入数据的关键。

public interface DynamicTableSink {
    /**
     * 返回sink在运行时接受的一组更改
     * planner可以提出建议,但sink拥有最终决定权。如果planner不支持此模式,它将引发错误。
     */
    ChangelogMode getChangelogMode(ChangelogMode requestedMode);
    /**
     * 返回用于写入数据的运行时实现,即SinkRuntimeProvider的实例
     * 表运行时独立于provider接口,要求sink实现接受内部数据结构(RowData)
     */
    SinkRuntimeProvider getSinkRuntimeProvider(Context context);

    interface Context {
        boolean isBounded();
        <T> TypeInformation<T> createTypeInformation(DataType consumedDataType);
        DataStructureConverter createDataStructureConverter(DataType consumedDataType);
    }

    interface DataStructureConverter extends RuntimeConverter {
        @Nullable
        Object toExternal(@Nullable Object internalStructure);
    }

    /**
     * 提供用于写入数据的实际运行时实现
     * SinkProvider 是sink的核心接口。
     * flink-table-api-java-bridge 中的 SinkFunctionProvider 和 OutputFormatProvider 可用于向后兼容。     *
     */
    interface SinkRuntimeProvider {
        // marker interface
    }
}

了解了自定义sink的实现原理后,接下里看下es sink的具体实现。

2.2. ES sink实现

本文中使用的es connector版本为

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
    <version>1.14.4</version>
</dependency>

DynamicTableSinkFactoryDynamicTableSink在es sink中的实现分别为Elasticsearch6DynamicSinkFactoryElasticsearch6DynamicSink

Elasticsearch6DynamicSinkFactory的实现如下,获取到scheme后对schema和配置参数进行验证,最后返回Elasticsearch6DynamicSink实例。

public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory {
    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        // 从CatalogTable中获取schema
        TableSchema tableSchema = context.getCatalogTable().getSchema();
        // 验证主键
        ElasticsearchValidationUtils.validatePrimaryKey(tableSchema);
        final FactoryUtil.TableFactoryHelper helper =
                FactoryUtil.createTableFactoryHelper(this, context);

        // 得到format
        final EncodingFormat<SerializationSchema<RowData>> format =
                helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION);

        helper.validate();
        Configuration configuration = new Configuration();
        context.getCatalogTable().getOptions().forEach(configuration::setString);
        Elasticsearch6Configuration config =
                new Elasticsearch6Configuration(configuration, context.getClassLoader());
        // 验证参数
        validate(config, configuration);

        // 最终返回Elasticsearch6DynamicSink
        return new Elasticsearch6DynamicSink(
                format, config, TableSchemaUtils.getPhysicalSchema(tableSchema));
    }
}

Elasticsearch6DynamicSink的实现如下,getChangelogMode方法表示ES sink接受除UPDATE_BEFORE类型之外的其他更改。getSinkRuntimeProvider方法返回了SinkFunctionProvider实例,为SinkRuntimeProvider的子接口。
通过实现SinkFunctionProvider接口中的SinkFunction<RowData> createSinkFunction()方法,从而得到SinkFunction实例。

final class Elasticsearch6DynamicSink implements DynamicTableSink {
    @Override
    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
        ChangelogMode.Builder builder = ChangelogMode.newBuilder();
        for (RowKind kind : requestedMode.getContainedKinds()) {
            // 不接受update_before类型的更改
            if (kind != RowKind.UPDATE_BEFORE) {
                builder.addContainedKind(kind);
            }
        }
        return builder.build();
    }

    @Override
    public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
        return () -> {
            SerializationSchema<RowData> format =
                    this.format.createRuntimeEncoder(context, schema.toRowDataType());
            // 负责生成真正和es交互的ActionRequest实例,如IndexRequest、UpdateRequest、DeleteRequest等
            final RowElasticsearchSinkFunction upsertFunction =
                    new RowElasticsearchSinkFunction(
                            IndexGeneratorFactory.createIndexGenerator(config.getIndex(), schema),
                            config.getDocumentType(),
                            format,
                            XContentType.JSON,
                            REQUEST_FACTORY,
                            KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()));

            final ElasticsearchSink.Builder<RowData> builder =
                    builderProvider.createBuilder(config.getHosts(), upsertFunction);

            builder.setFailureHandler(config.getFailureHandler());
            builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
            builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20));
            builder.setBulkFlushInterval(config.getBulkFlushInterval());
            builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
            config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
            config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
            config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);

            // we must overwrite the default factory which is defined with a lambda because of a bug
            // in shading lambda serialization shading see FLINK-18006
            if (config.getUsername().isPresent()
                    && config.getPassword().isPresent()
                    && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())
                    && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get())) {
                builder.setRestClientFactory(
                        new AuthRestClientFactory(
                                config.getPathPrefix().orElse(null),
                                config.getUsername().get(),
                                config.getPassword().get()));
            } else {
                builder.setRestClientFactory(
                        new DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
            }
            // 最终是得到ElasticsearchSink
            final ElasticsearchSink<RowData> sink = builder.build();

            if (config.isDisableFlushOnCheckpoint()) {
                sink.disableFlushOnCheckpoint();
            }

            return sink;
        };
    }
}


public interface SinkFunctionProvider
        extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
    /** 返回SinkFunction的实例 */
    SinkFunction<RowData> createSinkFunction();
}

因此planner在ES sink的实现中最终得到了ElasticsearchSink的实例。该实例负责调用ES相应API与ES进行数据交互。

2.3. 实际与ES的交互

通过ElasticsearchSink的继承关系,可以发现该类直接继承ElasticsearchSinkBase,并最终继承了SinkFunction

在这里插入图片描述
SinkFunction是实现自定义sink的根接口。每条写入sink的数据都将会调用该接口中的invoke方法。ElasticsearchSinkBase对此方法的实现如下,在其中通过调用ElasticsearchSinkFunction#process方法进行数据写入。

private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
@Override
public void invoke(T value, Context context) throws Exception {
    checkAsyncErrorsAndRequests();
    // 调用ElasticsearchSinkFunction的process方法
    elasticsearchSinkFunction.process(value, getRuntimeContext(), requestIndexer);
}

在ES sink中ElasticsearchSinkFunction的唯一实现是RowElasticsearchSinkFunction类,其关键代码如下。

class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<RowData> {
    private final RequestFactory requestFactory;
    @Override
    /**
     * 每条数据和ES实际交互的方法
     */
    public void process(RowData element, RuntimeContext ctx, RequestIndexer indexer) {
        // 根据每条数据的RowKind类型得到es api的UpdateRequest或DeleteRequest实例
        switch (element.getRowKind()) {
            case INSERT:
            case UPDATE_AFTER:
                // insert和update_after时,进行更新
                processUpsert(element, indexer);
                break;
            case UPDATE_BEFORE:
            case DELETE:
                // update_before时,删除数据
                processDelete(element, indexer);
                break;
            default:
                throw new TableException("Unsupported message kind: " + element.getRowKind());
        }
    }

    // 对数据进行IndexRequest或UpdateRequest请求
    private void processUpsert(RowData row, RequestIndexer indexer) {
        final byte[] document = serializationSchema.serialize(row);
        final String key = createKey.apply(row);
        if (key != null) {
            // 指定主键时,得到UpdateRequest
            final UpdateRequest updateRequest =
                    requestFactory.createUpdateRequest(
                            indexGenerator.generate(row), docType, key, contentType, document);
            indexer.add(updateRequest);
        } else {
            // 未指定主键时,得到IndexRequest
            final IndexRequest indexRequest =
                    requestFactory.createIndexRequest(
                            indexGenerator.generate(row), docType, key, contentType, document);
            indexer.add(indexRequest);
        }
    }

    // 对数据进行DeleteRequest请求
    private void processDelete(RowData row, RequestIndexer indexer) {
        final String key = createKey.apply(row);
        final DeleteRequest deleteRequest =
                requestFactory.createDeleteRequest(indexGenerator.generate(row), docType, key);
        indexer.add(deleteRequest);
    }
}

其中requestFactory.createUpdateRequestrequestFactory.createIndexRequestrequestFactory.createDeleteRequest的方法分别如下

private static class Elasticsearch6RequestFactory implements RequestFactory {
    @Override
    public UpdateRequest createUpdateRequest(
            String index,
            String docType,
            String key,
            XContentType contentType,
            byte[] document) {
        return new UpdateRequest(index, docType, key)
                .doc(document, contentType)
                .upsert(document, contentType);
    }

    @Override
    public IndexRequest createIndexRequest(
            String index,
            String docType,
            String key,
            XContentType contentType,
            byte[] document) {
        return new IndexRequest(index, docType, key).source(document, contentType);
    }

    @Override
    public DeleteRequest createDeleteRequest(String index, String docType, String key) {
        return new DeleteRequest(index, docType, key);
    }
}

到此,我们已经找到了ES sink和ES进行交互的具体逻辑,对于changelog中每条数据根据其RowKind类型决定执行INSERT、UPDATE还是DELETE请求。并且根据是否指定主键,细分为INSERT或UPDATE。

  • 指定主键,只会进行UPDATE和DELETE操作。
  • 未指定主键,只进行INSERT操作。

综上所述,在指定主键的情况下,如果changelog中只有INSERT和UPDATE_AFTER类型的数据时,那么仅调用UpdateRequest API将数据写入ES,数据只会执行Upsert操作。但是针对实际结果中发生数据覆盖的情况,只可能是存在DELETE类型的数据,导致其执行了processDelete方法,先将数据删除,随着changelog中数据不断到来再次执行processUpsert方法。最终导致结果中只保留了最新写入的数据,即看到的数据覆盖的情况。

DELETE类型的数据是否会真正生成?什么时机生成?

3. 验证演示

验证使用的示例代码如下,kafka中为随机生成的账单id从1到5,时间递增的账单数据。SQL加工逻辑为汇总每个id、每小时的账单金额,将结果写入到使用账单id作为主键的es表中。

public class EsConnectorSQLTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = EnvConf.getEnvWithUI();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);

        // 创建source表,交易表
        tEnv.executeSql("CREATE TABLE transactions (\n" +
                "    account_id  BIGINT,\n" +
                "    amount      BIGINT,\n" +
                "    transaction_time TIMESTAMP(3),\n" +
                "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
                ") WITH (\n" +
                "    'connector' = 'kafka',\n" +
                "    'topic'     = 'transactions',\n" +
                "    'properties.bootstrap.servers' = '******',\n" +
                "    'scan.startup.mode' = 'latest-offset',\n" +
                "    'format'    = 'csv'\n" +
                ")");

        // 创建sink表,消费报告表
        tEnv.executeSql("CREATE TABLE spend_report (\n" +
                "    account_id BIGINT,\n" +
                "    log_ts     TIMESTAMP(3),\n" +
                "    amount     BIGINT\n," +
                "    PRIMARY KEY (account_id) NOT ENFORCED" +
                ") WITH (\n" +
                "  'connector' = 'elasticsearch-6',\n" +
                "  'hosts' = ******n" +
                "  'index' = '******',\n" +
                "  'document-type' = '******',\n" +
                "  'sink.bulk-flush.max-actions' = '1',\n" +
                "  'username' = '******',\n" +
                "  'password' = '******',\n" +
                "  'format'   = 'json'\n" +
                ")");

        // 从交易表中写数据到消费报告表中
        Table report = report(tEnv.from("transactions"));
        report.executeInsert("spend_report");
        // 通过如下代码观察select的执行结果数据,changelog
        // report.execute().print();
    }

    public static Table report(Table transactions) {
        /*
         * 将每个账号在小时内的金额进行汇总
         * */
        return transactions.select(
                        $("account_id"),
                        $("transaction_time").floor(TimeIntervalUnit.HOUR).as("log_ts"),
                        $("amount"))
                .groupBy($("account_id"), $("log_ts"))
                .select(
                        $("account_id"),
                        $("log_ts"),
                        $("amount").sum().as("amount"));
    }
}

通过在RowElasticsearchSinkFunction#process方法中设置断点,可以观察到对于同一个key的确产生了-D类型的数据,导致删除了ES中数据,后续再执行upsert方法时,由于之前的数据已经被删除,所以更新后只剩下了新写入的数据。

通过观察示例代码main方法中report.execute().print()结果,对于各个id首次聚合结果类型为+I,当数据执行更新时,先生成-U类型结果,将原先的结果回撤,然后紧跟着生成+U类型的最新结果。

selece的结果中并不会产生-D类型的数据,但是实际到达RowElasticsearchSinkFunction#process方法时却包含了-D类型的数据,因此-D类型的数据并不是有select语句直接得到的。

+----+----------------------+-------------------------+----------------------+
| op |           account_id |                  log_ts |               amount |
+----+----------------------+-------------------------+----------------------+
| +I |                    5 | 2565-04-12 05:00:00.000 |                  461 |
| +I |                    1 | 2565-04-12 05:00:00.000 |                  517 |
| +I |                    2 | 2565-04-12 05:00:00.000 |                  630 |
| +I |                    3 | 2565-04-12 05:00:00.000 |                  640 |
| +I |                    4 | 2565-04-12 05:00:00.000 |                  356 |
| -U |                    5 | 2565-04-12 05:00:00.000 |                  461 |
| +U |                    5 | 2565-04-12 05:00:00.000 |                 1140 |
| -U |                    1 | 2565-04-12 05:00:00.000 |                  517 |
| +U |                    1 | 2565-04-12 05:00:00.000 |                  688 |
| +I |                    2 | 2565-04-12 06:00:00.000 |                   35 |
| +I |                    3 | 2565-04-12 06:00:00.000 |                  524 |
| +I |                    4 | 2565-04-12 06:00:00.000 |                  290 |
| +I |                    5 | 2565-04-12 06:00:00.000 |                  612 |
| +I |                    1 | 2565-04-12 06:00:00.000 |                   12 |
| -U |                    2 | 2565-04-12 06:00:00.000 |                   35 |
| +U |                    2 | 2565-04-12 06:00:00.000 |                  903 |
| -U |                    3 | 2565-04-12 06:00:00.000 |                  524 |
| +U |                    3 | 2565-04-12 06:00:00.000 |                 1353 |

3.1. SinkMateriallizer任务

回到示例代码中,数据的加工逻辑是对数据进行Group by后的聚合处理,查看其作业DAG,可以发现有3个任务节点,分别是KafkaSource->GroupAggregate->SinkMateriallizer,前两个任务节点的作用容易理解,第三个名为SinkMaterializer节点除了最后将数据写入到ES外,还做了什么操作?算子名和算子中的upsertMaterialize=[true]表示什么含义?
在这里插入图片描述
根据算子名称并结合断点分析,数据会先进入到GroupAggFunction#processElement方法中执行聚合计算,并将聚合结果继续发送给了下游
SinkUpsertMaterializer#processElement方法中,聚合结果最终在此方法中生成供上述RowElasticsearchSinkFunction#process方法中写入到ES的数据,接下来重点了解下SinkUpsertMaterializer#processElement的具体实现。

!!! note “”
SinkUpsertMaterializer类的作用是为了解决changelog中乱序流造成的结果不正确问题。

该维护了一个List<RowData>的状态,该状态中保存着changelog中相同key的结果数据,并且根据已保存到状态中的数据进行乱序处理。

public class SinkUpsertMaterializer extends TableStreamOperator<RowData>
        implements OneInputStreamOperator<RowData, RowData> {

    // Buffer of emitted insertions on which deletions will be applied first.
    // RowKind可能是+I或+U,并且在应用删除时将被忽略
    private transient ValueState<List<RowData>> state;

    @Override
    public void processElement(StreamRecord<RowData> element) throws Exception {
        final RowData row = element.getValue();
        List<RowData> values = state.value();
        if (values == null) {
            values = new ArrayList<>(2);
        }

        switch (row.getRowKind()) {
            case INSERT:
            case UPDATE_AFTER:
                row.setRowKind(values.isEmpty() ? INSERT : UPDATE_AFTER);
                values.add(row);
                collector.collect(row);
                break;

            case UPDATE_BEFORE:
            case DELETE:
                final int lastIndex = values.size() - 1;
                final int index = removeFirst(values, row);
                if (index == -1) {
                    LOG.info(STATE_CLEARED_WARN_MSG);
                    return;
                }
                if (values.isEmpty()) {
                    // 此处会将元素的RowKind重新赋值为-D
                    row.setRowKind(DELETE);
                    collector.collect(row);
                } else if (index == lastIndex) {
                    // Last row has been removed, update to the second last one
                    final RowData latestRow = values.get(values.size() - 1);
                    latestRow.setRowKind(UPDATE_AFTER);
                    collector.collect(latestRow);
                }
                break;
        }

        if (values.isEmpty()) {
            state.clear();
        } else {
            state.update(values);
        }
    }

    private int removeFirst(List<RowData> values, RowData remove) {
        final Iterator<RowData> iterator = values.iterator();
        int i = 0;
        while (iterator.hasNext()) {
            final RowData row = iterator.next();
            // 忽略RowKind类型,对RowData中其他数据进行比较
            remove.setRowKind(row.getRowKind());
            if (equaliser.equals(row, remove)) {
                iterator.remove();
                return i;
            }
            i++;
        }
        return -1;
    }
}

processElement方法大致处理逻辑如下流程图所示。
在这里插入图片描述
举个例子,

情形一:假如source表中有数据如左侧所示,经过示例sql处理后,输出了右侧的changelog结果
在这里插入图片描述

changelog
+----+------------+---------------+--------+
| op | account_id |      log_hour | amount |
+----+------------+---------------+--------+
| +I |          3 | 2565-04-12 05 |    640 |
| +I |          3 | 2565-04-12 06 |    524 |
| -U |          3 | 2565-04-12 06 |    524 |
| +U |          3 | 2565-04-12 06 |   1353 |

此种情况下changelog数据经过processElement方法后输出结果为:

  1. 当第一条数据到达后,数据中RowKind=+I,状态为空,输出RowKind=+I的结果到下游;
  2. 当第二条数据到达后,数据中RowKind=+I,状态不为空,输出RowKind=+U的结果到下游,这时状态中保存了changlog中第一条和第二条数据内容;
  3. 当第三条数据到达后,数据中RowKind=-U,状态集合中最大index=1(lastindex),依次比较集合中元素和当前数据内容是否相同,发现集合中第二条数据和当前数据一致,因此返回index=1,并将从集合中移除index=1的元素。此时集合中仅剩amount=640的一个元素;
    接下来满足lastindex=index的条件,因此获取集合中最后一条数据,即amount=640的那条数据,输出Rowkind= +U的结果到下游;
  4. 当第四条数据到达后,数据中RowKind=+U,状态不为空,输出RowKind=+U的结果到下游。

情形二:假如source表中有数据如左侧所示,经过示例sql处理后,输出了右侧的changelog结果

在这里插入图片描述

changelog
+----+------------+---------------+--------+
| op | account_id |      log_hour | amount |
+----+------------+---------------+--------+
| +I |          3 | 2565-04-12 06 |    524 |
| -U |          3 | 2565-04-12 06 |    524 |
| +U |          3 | 2565-04-12 06 |   1353 |

此种情况下changelog数据经过processElement方法后输出结果为:

  1. 当第一条数据到达后,数据中RowKind=+I,状态为空,输出RowKind=+I的结果到下游;
  2. 当第二条数据到达后,数据中RowKind=-U,集合中最大index=0(lastindex),依次比较集合中元素和当前数据内容是否相同,发现集合中第一条数据和当前数据一致,因此返回index=0,并将从集合中移除index=0的元素。此时集合为空。
    接下来满足了集合为空的条件,输出Rowkind=-D的结果到下游;
  3. 当第三条数据到达后,数据中RowKind=+U,状态为空,输出RowKind=+I的结果到下游。

4. 总结

导致写入到ES的数据从结果上看没有执行upsert而是执行了insert的根本原因是中间过程中执行了delete操作,将es中原始数据删除,然后继续写入了新数据。如果在delete执行后未继续写入新数据前停止任务,观察到的结果过将会是ES中数据条数减少。
而导致从ES中删除数据的原因是因为经过SinkUpsertMaterializer类处理后,将部分RowKind=-U的数据转化成了RowKind=-D类型的数据,从而触发ES sink执行DeleteRequest请求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/593869.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

NumPy库与PyTorch库的异同点

目录 1.单位的创建和操作 1.创建 2.形状变换 2.数学和统计操作 1.矩阵乘法 2.广播 3.统计计算 3.GPU支持 4.在深度学习中的作用 5.应用范围 NumPy库为数组服务&#xff0c;PyTorch库为张量服务&#xff0c;这是最本质的区别。 1.单位的创建和操作 1.创建 NumPy:使…

【busybox记录】【shell指令】md5sum

目录 内容来源&#xff1a; 【GUN】【md5sum】指令介绍 【busybox】【md5sum】指令介绍 【linux】【md5sum】指令介绍 使用示例&#xff1a; 128位MD5 - 默认输出 128位MD5 - 将每个文件当做二进制处理 128位MD5 - 从文件中读取MD5值并做检查 128位MD5 - 创建一个BSD风…

浅谈OpenCV 粗略计算工件轮廓面积和外接圆直径(Emgu.CV)

前言 最近领导在做库房工具管理这块的功能&#xff0c;希望能集成OpenCV 粗略的计算出工具的长度&#xff0c;以方便用户再归还工具的时候&#xff0c;提示用户该放在那种尺寸的盒子里面&#xff0c;这便是这篇文章的由来。 我们的系统是基于.net开发的&#xff0c;所以采用的是…

项目管理-项目采购管理1/2

项目管理&#xff1a;每天进步一点点~ 活到老&#xff0c;学到老 ヾ(◍∇◍)&#xff89;&#xff9e; 何时学习都不晚&#xff0c;加油 1.项目采购管理-主要内容 项目采购管理过程--重点&#xff1a; ①ITTO 输入&#xff0c;输出工具和技术。 ②问题和解决方案。 ③论文…

【白话机器学习系列】白话特征向量

白话特征向量 一个方阵 A A A 与列向量 v v v 的乘积会生成一个新的列向量。这个新向量通常与原向量有着不同的方向&#xff0c;矩阵在这里代表一个线性变换。然而&#xff0c;某些向量会保持其原始方向。我们称这种向量为矩阵 A A A 的特征向量&#xff08;eigenvector&…

python数据分析——业务指标分析

业务指标分析 前言一、业务指标分析的定义二、业务问题构建问题构建的要求 三、业务问题的识别在识别问题的阶段对于企业内部收益者的补充&#xff1a; 四、竞争者分析标题竞争者分析的内容&#xff1a;标题竞争者分析目的&#xff1a;案例&#xff1a; 黑莓公司为什么会消亡&a…

dynamic_cast 静态转换

dynamic_cast 静态转换 const_cast 常量转换 重新解释转换(reinterpret_cast) 最不安全

RocketMq详解:一、RocketMQ 介绍及基本概念

文章目录 前言1.RocketMQ简介2.RocketMQ 特点3.核心特性4.应用场景5.RocketMQ 优势6.RocketMQ 四大核心组件6.1 NameServer1.NameServer作用2.NameServer被设计为无状态的原因3.和NameServer和Zookeeper的区别4.NameServer的高可用保障 6.2 Broker1.Broker部署方式2.高可用与负…

ssm105基于JAVAEE技术校园车辆管理系统+jsp

校园车辆管理系统设计与实现 摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本校园车辆管理系统就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短…

泰克示波器电流探头如何抓浪涌电流波形?

泰克示波器是一种常见的电子测量仪器&#xff0c;广泛应用于电子工程、通信工程、医疗设备等领域。它的主要功能是实时显示电信号的波形&#xff0c;从而帮助工程师和技术人员分析和调试电路。而在一些特定的应用场景中&#xff0c;例如电源、电机、电器设备等&#xff0c;我们…

模型全参数训练和LoRA微调所需显存的分析

大家好,我是herosunly。985院校硕士毕业,现担任算法研究员一职,热衷于机器学习算法研究与应用。曾获得阿里云天池比赛第一名,CCF比赛第二名,科大讯飞比赛第三名。拥有多项发明专利。对机器学习和深度学习拥有自己独到的见解。曾经辅导过若干个非计算机专业的学生进入到算法…

TypeScript 基础学习笔记:泛型 <T> vs 断言 as

TypeScript 基础学习笔记&#xff1a;泛型 <T> vs 断言 as &#x1f525; 引言 &#x1f44b; TypeScript (TS) 以其静态类型的魔力&#xff0c;让我们的代码更加健壮、易读且易于维护。今天&#xff0c;我们将深入探讨两个核心概念——泛型&#xff08;Generics&#x…

【华为】AC三层旁挂直接转发

【华为】AC三层旁挂直接转发 实验需求实验拓扑配置AC和AP二层通信ACLSW1LSW2AP2获取到的管理地址AP3获取到的管理地址 AP上线配置WLAN业务ACLSW1&#xff08;作DHCP地址池&#xff09;业务成功下发 访问公网&#xff08;NAT&#xff09;LSW1AR1 配置文档ACLSW1LSW2AR1ISP 实验需…

杭电acm1013 Digital Roots 数字根 Java解法 高精度

Problem - 1013 (hdu.edu.cn) 高精度算术模拟 开long没过想到开bI 开bl一次过 import java.math.BigInteger; import java.util.Scanner;public class Main {public static void main(String[] args) {Scanner sc new Scanner(System.in);BigInteger i;while (!(i sc.nextB…

Docker新建容器 修改运行容器端口

目录 一、修改容器的映射端口 二、解决方案 三、方案 一、修改容器的映射端口 项目需求修改容器的映射端口 二、解决方案 停止需要修改的容器 修改hostconfig.json文件 重启docker 服务 启动修改容器 三、方案 目前正在运行的容器 宿主机的3000 端口 映射 容器…

【Python项目】基于时间序列的【大气污染预测系统】

技术简介&#xff1a;使用Python技术、B/S架构、MYSQL数据库等实现。 系统简介&#xff1a;本系统的主要使用角色为普通用户和管理员用户&#xff0c;两者的功能几乎是一致的&#xff0c;但管理员用户比普通用户多了用户管理的功能&#xff0c;可以对系统内的用户进行管理。普通…

Java IO编程必备:FilterInputStream类的原理与实现

哈喽&#xff0c;各位小伙伴们&#xff0c;你们好呀&#xff0c;我是喵手。运营社区&#xff1a;C站/掘金/腾讯云&#xff1b;欢迎大家常来逛逛 今天我要给大家分享一些自己日常学习到的一些知识点&#xff0c;并以文字的形式跟大家一起交流&#xff0c;互相学习&#xff0c;一…

如何构建进攻性的网络安全防护策略

进攻性安全&#xff08;Offensive security&#xff09;是指一系列主动安全策略&#xff0c;这些策略与恶意行为者在现实世界的攻击中使用的策略相同&#xff0c;区别在于其目的是加强而非损害网络安全。常见的进攻性安全方法包括红队、渗透测试和漏洞评估。 进攻性安全行动通…

C++入门 ——类和对象(二)

this指针 this指针的引出 我们先来定义一个日期类 Date class Date { public:void Init(int year, int month, int day){_year year;_month month;_day day;}void Print(){cout <<_year<< "-" <<_month << "-"<< _da…

基于springboot+vue+Mysql的自习室预订系统

开发语言&#xff1a;Java框架&#xff1a;springbootJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包&#xff1a;…