Hudi Flink SQL源码调试学习(1)

前言

本着学习hudi-flink源码的目的,利用之前总结的文章Hudi Flink SQL代码示例及本地调试中的代码进行调试,记录调试学习过程中主要的步骤及对应源码片段。

版本

  • Flink 1.15.4
  • Hudi 0.13.0

目标

在文章Hudi Flink SQL代码示例及本地调试中提到:我们发现Table API的入口和DataStream API的入口差不多,DataStream API的入口是在HoodiePipelinesinksource方法里,而这两个方法也是分别调用了HoodieTableFactorycreateDynamicTableSinkcreateDynamicTableSource。那么Table API的代码怎么一步一步走到createDynamicTableSinkcreateDynamicTableSource的呢?返回HoodieTableSink之后又是怎么写数据的?因为我发现Hudi写数据的主要逻辑入口好像是在HoodieTableSink.getSinkRuntimeProvider的方法体里,这些问题之前都没有搞清楚,所以这次的目标就是要搞清楚:1、Table API 的入口到createDynamicTableSink返回HoodieTableSink的主要代码步骤; 2、在哪里调用HoodieTableSink.getSinkRuntimeProvider的方法体进行后面的写Hudi逻辑的

相关类:

  • HoodiePipeline (DataStream API)
  • HoodieTableFactory
  • HoodieTableSink
  • DataStreamSinkProviderAdapter (函数式接口)
  • TableEnvironmentImpl
  • BatchPlanner
  • PlannerBase
  • FactoryUtil
  • BatchExecSink
  • CommonExecSink

DataStream API

其实上面的问题在DataStream API代码里很容易看出来,我们先看一下DataStream API写Hudi的代码,详细代码在文章:Flink Hudi DataStream API代码示例

DataStream<RowData> dataStream = env.fromElements(
        GenericRowData.of(1, StringData.fromString("hudi1"), 1.1, 1000L, StringData.fromString("2023-04-07")),
        GenericRowData.of(2, StringData.fromString("hudi2"), 2.2, 2000L, StringData.fromString("2023-04-08"))
);

HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
        .column("id int")
        .column("name string")
        .column("price double")
        .column("ts bigint")
        .column("dt string")
        .pk("id")
        .partition("dt")
        .options(options);

builder.sink(dataStream, false);

HoodiePipeline.Builder.sink

    public DataStreamSink<?> sink(DataStream<RowData> input, boolean bounded) {
      TableDescriptor tableDescriptor = getTableDescriptor();
      return HoodiePipeline.sink(input, tableDescriptor.getTableId(), tableDescriptor.getResolvedCatalogTable(), bounded);
    }

HoodiePipeline.sink

  private static DataStreamSink<?> sink(DataStream<RowData> input, ObjectIdentifier tablePath, ResolvedCatalogTable catalogTable, boolean isBounded) {
    FactoryUtil.DefaultDynamicTableContext context = Utils.getTableContext(tablePath, catalogTable, Configuration.fromMap(catalogTable.getOptions()));
    HoodieTableFactory hoodieTableFactory = new HoodieTableFactory();
    return ((DataStreamSinkProvider) hoodieTableFactory.createDynamicTableSink(context)
        .getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded)))
        .consumeDataStream(input);
  }

HoodiePipeline.sink就可以找到答案:
1、HoodieTableFactory.createDynamicTableSink 返回HoodieTableSink
2、HoodieTableSink.getSinkRuntimeProvider 返回DataStreamSinkProviderAdapter
3、DataStreamSinkProviderAdapter.consumeDataStream调用HoodieTableSink.getSinkRuntimeProvider中的方法体执行后面的写Hudi逻辑。这里的dataStream为我们最开始在程序里创建的DataStream<RowData>

HoodieTableSink.getSinkRuntimeProvider

getSinkRuntimeProvider返回DataStreamSinkProviderAdapter,其中Lambda 表达式dataStream -> {}DataStreamSinkProviderAdapter.consumeDataStream(dataStream)的具体实现

  @Override
  public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
    return (DataStreamSinkProviderAdapter) dataStream -> {

      // setup configuration
      long ckpTimeout = dataStream.getExecutionEnvironment()
          .getCheckpointConfig().getCheckpointTimeout();
      conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
      // set up default parallelism
      OptionsInference.setupSinkTasks(conf, dataStream.getExecutionConfig().getParallelism());

      RowType rowType = (RowType) schema.toSinkRowDataType().notNull().getLogicalType();

      // bulk_insert mode
      final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
      if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
        return Pipelines.bulkInsert(conf, rowType, dataStream);
      }

      // Append mode
      if (OptionsResolver.isAppendMode(conf)) {
        DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream, context.isBounded());
        if (OptionsResolver.needsAsyncClustering(conf)) {
          return Pipelines.cluster(conf, rowType, pipeline);
        } else {
          return Pipelines.dummySink(pipeline);
        }
      }

      DataStream<Object> pipeline;
      // bootstrap
      final DataStream<HoodieRecord> hoodieRecordDataStream =
          Pipelines.bootstrap(conf, rowType, dataStream, context.isBounded(), overwrite);
      // write pipeline
      pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);
      // compaction
      if (OptionsResolver.needsAsyncCompaction(conf)) {
        // use synchronous compaction for bounded source.
        if (context.isBounded()) {
          conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
        }
        return Pipelines.compact(conf, pipeline);
      } else {
        return Pipelines.clean(conf, pipeline);
      }
    };
  }

DataStreamSinkProviderAdapter其实是一个函数式接口,它是一种只包含一个抽象方法的接口。Lambda 表达式可以被赋值给一个函数式接口,从而实现接口的实例化

public interface DataStreamSinkProviderAdapter extends DataStreamSinkProvider {
  DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream);

  @Override
  default DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {
    return consumeDataStream(dataStream);
  }
}

函数式接口和Lambda 表达式参考下面两篇文章:
https://it.sohu.com/a/682888110_100123073
https://blog.csdn.net/Speechless_/article/details/123746047

Table API

知道了 DataStream API 调用步骤后,来对比看一下 Table API 的大致调用步骤,调试代码入口。

tableEnv.executeSql(String.format("insert into %s values (1,'hudi',10,100,'2023-05-28')", tableName));

整体调用流程

1、tableEnv.executeSql->TableEnvironmentImpl.executeSql->executeInternal(Operation operation)->executeInternal(List<ModifyOperation> operations)->this.translate->(PlannerBase)this.planner.translate

2.1、PlannerBase.translate->PlannerBase.translateToRel->getTableSink(catalogSink.getContextResolvedTable, dynamicOptions)->FactoryUtil.createDynamicTableSink->HoodieTableFactory.createDynamicTableSink

2.2、PlannerBase.translate->(BatchPlanner)translateToPlan(execGraph)->(ExecNodeBase)node.translateToPlan->(BatchExecSink)translateToPlanInternal->(CommonExecSink)createSinkTransformation->(HoodieTableSink)getSinkRuntimeProvider->(CommonExecSink)applySinkProvider->provider.consumeDataStream

具体代码

TableEnvironmentImpl

(TableEnvironmentImpl)executeSql

    public TableResult executeSql(String statement) {
        List<Operation> operations = this.getParser().parse(statement);
        if (operations.size() != 1) {
            throw new TableException("Unsupported SQL query! executeSql() only accepts a single SQL statement of type CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, CREATE CATALOG, DROP CATALOG, USE CATALOG, USE [CATALOG.]DATABASE, SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW [USER] FUNCTIONS, SHOW PARTITIONSCREATE VIEW, DROP VIEW, SHOW VIEWS, INSERT, DESCRIBE, LOAD MODULE, UNLOAD MODULE, USE MODULES, SHOW [FULL] MODULES.");
        } else {
            // 关键步骤:executeInternal
            return this.executeInternal((Operation)operations.get(0));
        }
    }

executeInternal(Operation operation)

    public TableResultInternal executeInternal(Operation operation) {
        if (operation instanceof ModifyOperation) {
            // 关键步骤:executeInternal
            return this.executeInternal(Collections.singletonList((ModifyOperation)operation));
        } else if (operation instanceof StatementSetOperation) {
            return this.executeInternal(((StatementSetOperation)operation).getOperations());

executeInternal(List<ModifyOperation> operations)

    public TableResultInternal executeInternal(List<ModifyOperation> operations) {
        // 关键步骤:translate
        List<Transformation<?>> transformations = this.translate(operations);
        List<String> sinkIdentifierNames = this.extractSinkIdentifierNames(operations);
        TableResultInternal result = this.executeInternal(transformations, sinkIdentifierNames);
        if ((Boolean)this.tableConfig.get(TableConfigOptions.TABLE_DML_SYNC)) {
            try {
                result.await();
            } catch (ExecutionException | InterruptedException var6) {
                result.getJobClient().ifPresent(JobClient::cancel);
                throw new TableException("Fail to wait execution finish.", var6);
            }
        }

        return result;
    }

translate
这里的planner为BatchPlanner,因为我们设置了batch模式EnvironmentSettings.inBatchMode()

    protected List<Transformation<?>> translate(List<ModifyOperation> modifyOperations) {
        // 这里的planner为BatchPlanner,因为我们设置了batch模式EnvironmentSettings.inBatchMode()
        // 关键步骤:PlannerBase.translate
        return this.planner.translate(modifyOperations);
    }

BatchPlanner

(BatchPlanner的父类)PlannerBase.translate

  override def translate(
      modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
    beforeTranslation()
    if (modifyOperations.isEmpty) {
      return List.empty[Transformation[_]]
    }
    // 关键步骤:translateToRel
    val relNodes = modifyOperations.map(translateToRel)
    val optimizedRelNodes = optimize(relNodes)
    val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled = false)
    // 关键步骤:translateToPlan
    val transformations = translateToPlan(execGraph)
    afterTranslation()
    transformations
  }

PlannerBase.translateToRel

  private[flink] def translateToRel(modifyOperation: ModifyOperation): RelNode = {
    val dataTypeFactory = catalogManager.getDataTypeFactory
    modifyOperation match {
      case s: UnregisteredSinkModifyOperation[_] =>
        val input = getRelBuilder.queryOperation(s.getChild).build()
        val sinkSchema = s.getSink.getTableSchema
        // validate query schema and sink schema, and apply cast if possible
        val query = validateSchemaAndApplyImplicitCast(
          input,
          catalogManager.getSchemaResolver.resolve(sinkSchema.toSchema),
          null,
          dataTypeFactory,
          getTypeFactory)
        LogicalLegacySink.create(
          query,
          s.getSink,
          "UnregisteredSink",
          ConnectorCatalogTable.sink(s.getSink, !isStreamingMode))

      case collectModifyOperation: CollectModifyOperation =>
        val input = getRelBuilder.queryOperation(modifyOperation.getChild).build()
        DynamicSinkUtils.convertCollectToRel(
          getRelBuilder,
          input,
          collectModifyOperation,
          getTableConfig,
          getFlinkContext.getClassLoader
        )

      case catalogSink: SinkModifyOperation =>
        val input = getRelBuilder.queryOperation(modifyOperation.getChild).build()
        val dynamicOptions = catalogSink.getDynamicOptions
        // 关键步骤:getTableSink
        getTableSink(catalogSink.getContextResolvedTable, dynamicOptions).map {
          case (table, sink: TableSink[_]) =>
            // Legacy tables can't be anonymous
            val identifier = catalogSink.getContextResolvedTable.getIdentifier
            // check the logical field type and physical field type are compatible
            val queryLogicalType = FlinkTypeFactory.toLogicalRowType(input.getRowType)
            // validate logical schema and physical schema are compatible
            validateLogicalPhysicalTypesCompatible(table, sink, queryLogicalType)
            // validate TableSink
            validateTableSink(catalogSink, identifier, sink, table.getPartitionKeys)
            // validate query schema and sink schema, and apply cast if possible
            val query = validateSchemaAndApplyImplicitCast(
              input,
              table.getResolvedSchema,
              identifier.asSummaryString,
              dataTypeFactory,
              getTypeFactory)
            val hints = new util.ArrayList[RelHint]
            if (!dynamicOptions.isEmpty) {
              hints.add(RelHint.builder("OPTIONS").hintOptions(dynamicOptions).build)
            }
            LogicalLegacySink.create(
              query,
              hints,
              sink,
              identifier.toString,
              table,
              catalogSink.getStaticPartitions.toMap)

          case (table, sink: DynamicTableSink) =>
            DynamicSinkUtils.convertSinkToRel(getRelBuilder, input, catalogSink, sink)
        } match {
          case Some(sinkRel) => sinkRel
          case None =>
            throw new TableException(
              s"Sink '${catalogSink.getContextResolvedTable}' does not exists")
        }

PlannerBase.getTableSink

  private def getTableSink(
      contextResolvedTable: ContextResolvedTable,
      dynamicOptions: JMap[String, String]): Option[(ResolvedCatalogTable, Any)] = {
    contextResolvedTable.getTable[CatalogBaseTable] match {
      case connectorTable: ConnectorCatalogTable[_, _] =>
        val resolvedTable = contextResolvedTable.getResolvedTable[ResolvedCatalogTable]
        toScala(connectorTable.getTableSink) match {
          case Some(sink) => Some(resolvedTable, sink)
          case None => None
        }

      case regularTable: CatalogTable =>
        val resolvedTable = contextResolvedTable.getResolvedTable[ResolvedCatalogTable]
        ...

        if (
          !contextResolvedTable.isAnonymous &&
          TableFactoryUtil.isLegacyConnectorOptions(
            catalogManager.getCatalog(objectIdentifier.getCatalogName).orElse(null),
            tableConfig,
            isStreamingMode,
            objectIdentifier,
            resolvedTable.getOrigin,
            isTemporary
          )
        ) {
         ...
        } else {
          ...
          // 关键步骤:FactoryUtil.createDynamicTableSink
          val tableSink = FactoryUtil.createDynamicTableSink(
            factory,
            objectIdentifier,
            tableToFind,
            Collections.emptyMap(),
            getTableConfig,
            getFlinkContext.getClassLoader,
            isTemporary)
          Option(resolvedTable, tableSink)
        }

      case _ => None
    }

FactoryUtil.createDynamicTableSink

根据’connector’=‘hudi’ 找到factory为org.apache.hudi.table.HoodieTableFactory,接着调用HoodieTableFactory.createDynamicTableSink

    public static DynamicTableSink createDynamicTableSink(
            @Nullable DynamicTableSinkFactory preferredFactory,
            ObjectIdentifier objectIdentifier,
            ResolvedCatalogTable catalogTable,
            Map<String, String> enrichmentOptions,
            ReadableConfig configuration,
            ClassLoader classLoader,
            boolean isTemporary) {
        final DefaultDynamicTableContext context =
                new DefaultDynamicTableContext(
                        objectIdentifier,
                        catalogTable,
                        enrichmentOptions,
                        configuration,
                        classLoader,
                        isTemporary);

        try {
            // 'connector'='hudi' 
            // org.apache.hudi.table.HoodieTableFactory
            final DynamicTableSinkFactory factory =
                    preferredFactory != null
                            ? preferredFactory
                            : discoverTableFactory(DynamicTableSinkFactory.class, context);
            // 关键步骤:HoodieTableFactory.createDynamicTableSink
            return factory.createDynamicTableSink(context);
        } catch (Throwable t) {
            throw new ValidationException(
                    String.format(
                            "Unable to create a sink for writing table '%s'.\n\n"
                                    + "Table options are:\n\n"
                                    + "%s",
                            objectIdentifier.asSummaryString(),
                            catalogTable.getOptions().entrySet().stream()
                                    .map(e -> stringifyOption(e.getKey(), e.getValue()))
                                    .sorted()
                                    .collect(Collectors.joining("\n"))),
                    t);
        }
    }

HoodieTableFactory.createDynamicTableSink

第一个问题解决

  public DynamicTableSink createDynamicTableSink(Context context) {
    Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
    checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)),
        "Option [path] should not be empty.");
    setupTableOptions(conf.getString(FlinkOptions.PATH), conf);
    ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
    sanityCheck(conf, schema);
    setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema);
    // 关键步骤:HoodieTableSink
    return new HoodieTableSink(conf, schema);
  }

BatchExecSink

回到方法PlannerBase.translate,它会在后面调用translateToPlanexecGraph.getRootNodes返回的内容为BatchExecSink (想知道为啥是BatchExecSink,可以看PlannerBase.translate中调用的translateToExecNodeGraph方法),
BatchExecSinkBatchExecNode的子类,所以会执行node.translateToPlan

PlannerBase.translateToPlan

  override protected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] = {
    beforeTranslation()
    val planner = createDummyPlanner()

    val transformations = execGraph.getRootNodes.map {
      // BatchExecSink
      // 关键步骤:ExecNodeBase.translateToPlan
      case node: BatchExecNode[_] => node.translateToPlan(planner)
      case _ =>
        throw new TableException(
          "Cannot generate BoundedStream due to an invalid logical plan. " +
            "This is a bug and should not happen. Please file an issue.")
    }
    afterTranslation()
    transformations
  }

BatchExecSink

public class BatchExecSink extends CommonExecSink implements BatchExecNode<Object> {
    ...
public abstract class CommonExecSink extends ExecNodeBase<Object>
        implements MultipleTransformationTranslator<Object> {
    ...
    

ExecNodeBase.translateToPlan

    public final Transformation<T> translateToPlan(Planner planner) {
        if (transformation == null) {
            transformation =
                    // 关键步骤:BatchExecSink.translateToPlanInternal
                    translateToPlanInternal(
                            (PlannerBase) planner,
                            ExecNodeConfig.of(
                                    ((PlannerBase) planner).getTableConfig(),
                                    persistedConfig,
                                    isCompiled));
            if (this instanceof SingleTransformationTranslator) {
                if (inputsContainSingleton()) {
                    transformation.setParallelism(1);
                    transformation.setMaxParallelism(1);
                }
            }
        }
        return transformation;
    }

BatchExecSink.translateToPlanInternal

    protected Transformation<Object> translateToPlanInternal(
            PlannerBase planner, ExecNodeConfig config) {
        final Transformation<RowData> inputTransform =
                (Transformation<RowData>) getInputEdges().get(0).translateToPlan(planner);
        // org.apache.hudi.table.HoodieTableSink        
        final DynamicTableSink tableSink = tableSinkSpec.getTableSink(planner.getFlinkContext());
        // 关键步骤:CommonExecSink.createSinkTransformation
        return createSinkTransformation(
                planner.getExecEnv(), config, inputTransform, tableSink, -1, false);
    }

CommonExecSink.createSinkTransformation

这里的tableSink为HoodieTableSink,会调用HoodieTableSink的getSinkRuntimeProvider方法返回runtimeProvider(没有执行里面的方法体)


    protected Transformation<Object> createSinkTransformation(
            StreamExecutionEnvironment streamExecEnv,
            ExecNodeConfig config,
            Transformation<RowData> inputTransform,
            // 这里的tableSink为HoodieTableSink
            DynamicTableSink tableSink,
            int rowtimeFieldIndex,
            boolean upsertMaterialize) {
        final ResolvedSchema schema = tableSinkSpec.getContextResolvedTable().getResolvedSchema();
        final SinkRuntimeProvider runtimeProvider =
               // 关键步骤:HoodieTableSink.getSinkRuntimeProvider
                tableSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded));
        final RowType physicalRowType = getPhysicalRowType(schema);
        final int[] primaryKeys = getPrimaryKeyIndices(physicalRowType, schema);
        final int sinkParallelism = deriveSinkParallelism(inputTransform, runtimeProvider);
        final int inputParallelism = inputTransform.getParallelism();
        final boolean inputInsertOnly = inputChangelogMode.containsOnly(RowKind.INSERT);
        final boolean hasPk = primaryKeys.length > 0;

        ...

        return (Transformation<Object>)
                // 关键步骤:CommonExecSink.applySinkProvider
                applySinkProvider(
                        sinkTransform,
                        streamExecEnv,
                        runtimeProvider,
                        rowtimeFieldIndex,
                        sinkParallelism,
                        config);
    }


CommonExecSink.applySinkProvider

先通过new DataStream<>(env, sinkTransformation)生成dataStream,接着通过执行provider.consumeDataStream调用HoodieTableSink.getSinkRuntimeProvider中的方法体,这里的provider为HoodieTableSink.getSinkRuntimeProvider返回的DataStreamSinkProviderAdapter

    private Transformation<?> applySinkProvider(
            Transformation<RowData> inputTransform,
            StreamExecutionEnvironment env,
            SinkRuntimeProvider runtimeProvider,
            int rowtimeFieldIndex,
            int sinkParallelism,
            ExecNodeConfig config) {
        TransformationMetadata sinkMeta = createTransformationMeta(SINK_TRANSFORMATION, config);
        if (runtimeProvider instanceof DataStreamSinkProvider) {
            Transformation<RowData> sinkTransformation =
                    applyRowtimeTransformation(
                            inputTransform, rowtimeFieldIndex, sinkParallelism, config);
            // 生成dataStream
            final DataStream<RowData> dataStream = new DataStream<>(env, sinkTransformation);
            final DataStreamSinkProvider provider = (DataStreamSinkProvider) runtimeProvider;
            // 关键步骤:provider.consumeDataStream
            return provider.consumeDataStream(createProviderContext(config), dataStream)
                    .getTransformation();
        } else if (runtimeProvider instanceof TransformationSinkProvider) {
            ...

provider.consumeDataStream(已经在上面的类DataStreamSinkProviderAdapter提过)

它会调用HoodieTableSink.getSinkRuntimeProvider中的方法体(Lambda 表达式)执行后面的写hudi逻辑
第二个问题解决

  default DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {
    return consumeDataStream(dataStream);
  }

总结

本文主要简单记录了自己调试 Hudi Flink SQL 源码的过程,并没有对源码进行深入的分析(自己水平也不够)。主要目的是为了弄清楚从Table API 的入口到createDynamicTableSink返回HoodieTableSink的主要代码步骤以及在哪里调用HoodieTableSink.getSinkRuntimeProvider的方法体以进行后面的写Hudi逻辑,这样便于后面对Hudi源码的分析和学习。

本文新学习知识点:函数式接口以及对应的 Lambda 表达式的实现

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/53800.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【LLM系列之指令微调】长话短说大模型指令微调的“Prompt”

1 指令微调数据集形式“花样”太多 大家有没有分析过 prompt对模型训练或者推理的影响&#xff1f;之前推理的时候&#xff0c;发现不加训练的时候prompt&#xff0c;直接输入模型性能会变差的&#xff0c;这个倒是可以理解。假如不加prompt直接训练&#xff0c;是不是测试的时…

基于高通QCC5171的对讲机音频数据传输系统设计

一 研发资料准备 二 设计方法 蓝牙连接与配对&#xff1a;使用QCC5171的蓝牙功能&#xff0c;实现设备之间的蓝牙连接和配对。确保设备能够相互识别并建立起稳定的蓝牙连接。 音频采集与处理&#xff1a;将麦克风采集到的音频数据通过QCC5171的ADC&#xff08;模数转换器&…

linux系统编程重点复习--线程同步

目录 复习目标&#xff1a; 1 互斥锁 1.1互斥锁的使用步骤 1.2 练习 1.3 死锁 2 读写锁 3 条件变量 4 信号量 复习目标&#xff1a; 熟练掌握互斥量的使用说出什么叫死锁以及解决方案熟练掌握读写锁的使用熟练掌握条件变量的使用理解条件变量实现的生产消费者模型理解…

python简单的病毒编程代码,如何用python写一个病毒

大家好&#xff0c;本文将围绕python简单的病毒编程代码展开说明&#xff0c;如何用python做恶搞病毒是一个很多人都想弄明白的事情&#xff0c;想搞清楚如何用python写一个病毒需要先了解以下几个事情。 1、Python能不能写病毒 国家计算机病毒应急处理中心通过对互联网的监测…

算法leetcode|64. 最小路径和(rust重拳出击)

文章目录 64. 最小路径和&#xff1a;样例 1&#xff1a;样例 2&#xff1a;提示&#xff1a; 分析&#xff1a;题解&#xff1a;rust&#xff1a;go&#xff1a;c&#xff1a;python&#xff1a;java&#xff1a; 64. 最小路径和&#xff1a; 给定一个包含非负整数的 m x n 网…

windows 安装 mongodb 数据库

软件下载 访问官方的下载地址&#xff1a; https://www.mongodb.com/try/download/community &#xff0c;然后选择对应的版本进行下载 下载好了之后双击进行安装 软件安装 1、点击 next 点击下一步 2、勾选接受协议&#xff0c;点击 next 3、第三页有两个选项&#x…

redisson分布式锁学习

什么是分布式锁? 当有多个线程并发访问同一共享数据时,如果多个线程同时都去修改这个共享数据,且修改操作不是原子操作,就很有可能出现线程安全问题&#xff0c;而产生线程安全问题的根本原因是缺乏对共享数据访问的同步和互斥。 为了解决这个问题&#xff0c;通常我们的做法…

P2P网络NAT穿透原理(打洞方案)

1.关于NAT NAT技术&#xff08;Network Address Translation&#xff0c;网络地址转换&#xff09;是一种把内部网络&#xff08;简称为内网&#xff09;私有IP地址转换为外部网络&#xff08;简称为外网&#xff09;公共IP地址的技术&#xff0c;它使得一定范围内的多台主机只…

SpringBoot超级详解

1.父工程的父工程 在父工程的父工程中的核心依赖&#xff0c;专门用来版本管理的 版本管理。 2.父工程 资源过滤问题&#xff0c;都帮解决了&#xff0c;什么配置文件&#xff0c;都已经配置好了&#xff0c;资源过滤问题是帮助&#xff0c;过滤解决让静态资源文件能够过滤到…

别再分库分表了,来试试它吧

什么是NewSQL传统SQL的问题 升级服务器硬件数据分片NoSQL 的问题 优点缺点NewSQL 特性NewSQL 的主要特性三种SQL的对比TiDB怎么来的TiDB社区版和企业版TIDB核心特性 水平弹性扩展分布式事务支持金融级高可用实时 HTAP云原生的分布式数据库高度兼容 MySQLOLTP&OLAP&#xff…

openssl/bn.h: No such file or directory

报错截图 解决方法 ubuntu apt install libssl-dev -y centos yum install openssl-devel -y

第六章 支持向量机

文章目录 支持向量机间隔和支持向量对偶问题问题推导SMO 核函数实验 支持向量机 ⽀持向量机&#xff08;Support Vector Machines&#xff0c;SVM&#xff09; 优点&#xff1a;泛化错误率低&#xff0c;计算开销不⼤&#xff0c;结果易解释。缺点&#xff1a;对参数调节和核…

Python 教程之标准库概览

概要 Python 标准库非常庞大&#xff0c;所提供的组件涉及范围十分广泛&#xff0c;使用标准库我们可以让您轻松地完成各种任务。 以下是一些 Python3 标准库中的模块&#xff1a; 「os 模块」 os 模块提供了许多与操作系统交互的函数&#xff0c;例如创建、移动和删除文件和…

CLIP-GCD: Simple Language Guided Generalized Category Discovery(论文翻译)

CLIP-GCD: Simple Language Guided Generalized Category Discovery 摘要1 介绍2 相关工作2.1 NCD2.2 无监督聚类2.3 自监督和多模态预训练 3 方法3.1 GCD 问题设置3.2 我们的方法3.2.1 使用CLIP 在GCD 4 实验4.1 模型架构细节4.2 数据集和评估4.3 和最先进水平比较4.4 分析4.5…

Linux下 Docker容器引擎基础(1)

简述&#xff1a; Docker的容器技术可以在一台主机上轻松为任何应用创建一个轻量级的、可移植的、自给自足的容器。通过这种容器打包应用程序&#xff0c;意味着简化了重新部署、调试这些琐碎的重复工作&#xff0c;极大的提高了工作效率。例如&#xff1a;项目从腾讯云迁移阿…

尚硅谷大数据项目《在线教育之采集系统》笔记002

视频地址&#xff1a;尚硅谷大数据项目《在线教育之采集系统》_哔哩哔哩_bilibili 目录 P032 P033 P033 P034 P035 P036 P032 P033 # 1、定义组件&#xff0c;为各组件命名 a1.sources r1 a1.channels c1 a1.sinks - k1# 2、配置sources&#xff0c;描述source a1.sour…

ALLEGRO之Route菜单

本文主要介绍了ALLEGRO的Route菜单。 &#xff08;1&#xff09;Connect&#xff1a;走线&#xff1b; &#xff08;2&#xff09;Slide&#xff1a;推挤&#xff1b; &#xff08;3&#xff09;Timing Vision&#xff1a;等长设计时使用&#xff1f;暂不清楚&#xff1b; &…

oracle,获取每日24*60,所有分钟数

前言&#xff1a; 为规范用户的时间录入&#xff0c;因此我们采用下拉的方式&#xff0c;让用户选择需要的时间&#xff0c;因此我们需要将一天24小时的时间拆分为类似00:00,00:01...23:00,23:01,23:59。因此我们需要生成24*601440行的下拉复选值。具体效果如下图所示。 思路 1…

C语言字串函数、内存函数介绍以及模拟实现

目录 前言 本期内容介绍&#xff1a; 一、字符串函数 strlen介绍 strlen 模拟实现&#xff08;三种方式&#xff09; 方法一&#xff1a;计数器法 方法二&#xff1a;递归法&#xff08;不创建临时变量法&#xff09; 方法三&#xff1a;指针-指针 strcpy介绍 strcpy模…