mysql-springboot netty-flink-kafka-spark(paimon)-minio

1、下载spark源码并编译

mkdir -p /home/bigdata && cd /home/bigdata

wget https://archive.apache.org/dist/spark/spark-3.4.3/spark-3.4.3.tgz

解压文件

tar -zxf spark-3.4.3.tgz 

cd spark-3.4.3

wget https://raw.githubusercontent.com/apache/incubator-celeborn/v0.4.0-incubating/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_4.patch
git apply Celeborn_Dynamic_Allocation_spark3_4.patch

源码构建编译

./dev/make-distribution.sh --name lukeyan --pip --tgz -Dhadoop.version=3.3.6 -Phive -Phive-thriftserver -Pkubernetes -Pvolcano
 

编译成功

构建完成的进行解压操作并添加相应的jar文件

解压编译的文件

tar -zxvf spark-3.4.3-bin-lukeyan.tgz 

cd spark-3.4.3-bin-lukeyan
 

添加jar文件

cd jars/
 

ls
wget  https://repo1.maven.org/maven2/org/apache/spark/spark-hadoop-cloud_2.12/3.4.3/spark-hadoop-cloud_2.12-3.4.3.jar
wget  https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-cloud-storage/3.3.6/hadoop-cloud-storage-3.3.6.jar
wget  https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.6/hadoop-aws-3.3.6.jar
wget https://maven.aliyun.com/repository/public/com/amazonaws/aws-java-sdk-bundle/1.12.367/aws-java-sdk-bundle-1.12.367.jar
# 添加 Paimon集成相关依赖
wget  https://repo1.maven.org/maven2/org/apache/paimon/paimon-spark-3.4/0.9.0/paimon-spark-3.4-0.9.0.jar
# 如果Kubernetes 的发行版使用的是 K3s 、RKE2等,还需要加入以下依赖
wget  https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-jdk18on/1.77/bcpkix-jdk18on-1.77.jar
wget  https://repo1.maven.org/maven2/org/bouncycastle/bcprov-jdk18on/1.77/bcprov-jdk18on-1.77.jar
cd ..
 

构建docker镜像

docker buildx build --load --platform linux/arm64 --tag spark-paimon-s3:3.4.3_2.12 .

查看镜像架构

docker inspect --format '{{.Architecture}}' azul/zulu-openjdk:17.0.9-17.46.19-jre

docker images
docker save -o jdk.tar azul/zulu-openjdk:17.0.9-17.46.19-jre
docker save -o flink.tar flink:1.19-scala_2.12-java17
docker pull --platform linux/arm64 azul/zulu-openjdk:17.0.9-17.46.19-jre
docker inspect --format '{{.Architecture}}' azul/zulu-openjdk:17.0.9-17.46.19-jre
docker buildx ls
 

x86上构建Arm镜像参考地址Centos7的x86上构建arm镜像docker_centos7 arm镜像-CSDN博客

将Dockerfile拷贝到当前目录下

FROM azul/zulu-openjdk:17.0.9-17.46.19-jre
ARG spark_uid=185

ENV HADOOP_CONF_DIR=/etc/hadoop/conf


# Before building the docker image, first build and make a Spark distribution following
# the instructions in https://spark.apache.org/docs/latest/building-spark.html.
# If this docker file is being used in the context of building your images from a Spark
# distribution, the docker build command should be invoked from the top level directory
# of the Spark distribution. E.g.:
# docker build -t spark:latest -f kubernetes/dockerfiles/spark/Dockerfile .

RUN set -ex && \
    apt-get update && \
    ln -s /lib /lib64 && \
    apt install -y bash tini libc6 libpam-modules krb5-user libnss3 procps net-tools && \
    mkdir -p /opt/spark && \
    mkdir -p /opt/spark/examples && \
    mkdir -p /opt/spark/work-dir && \
    touch /opt/spark/RELEASE && \
    rm /bin/sh && \
    ln -sv /bin/bash /bin/sh && \
    echo "auth required pam_wheel.so use_uid" >> /etc/pam.d/su && \
    chgrp root /etc/passwd && chmod ug+rw /etc/passwd && \
    rm -rf /var/cache/apt/* && rm -rf /var/lib/apt/lists/*

COPY jars /opt/spark/jars
# Copy RELEASE file if exists
COPY RELEAS[E] /opt/spark/RELEASE
COPY bin /opt/spark/bin
COPY sbin /opt/spark/sbin
COPY kubernetes/dockerfiles/spark/entrypoint.sh /opt/
COPY kubernetes/dockerfiles/spark/decom.sh /opt/
COPY examples /opt/spark/examples
COPY kubernetes/tests /opt/spark/tests
COPY data /opt/spark/data


ENV SPARK_HOME /opt/spark

WORKDIR /opt/spark/work-dir
RUN chmod g+w /opt/spark/work-dir
RUN chmod a+x /opt/decom.sh

ENTRYPOINT [ "/opt/entrypoint.sh" ]

# Specify the User that the actual main process will run as
USER ${spark_uid}

执行构建镜像的命令

docker buildx build --load --platform linux/arm64 --tag spark-paimon-s3:3.4.3_2.12 .

得到基础镜像spark-paimon-s3:3.4.3_2.12

参考地址ApachePaimon 实践系列1-环境准备 (qq.com)
 

2、编写程序 

KafkaSparkPaimonS3

使用spark读取消费kafka,将固定格式的数据保存到S3协议的对象存储上,

这里s3使用了Minio

程序代码

package com.example.cloud;

import org.apache.spark.sql.streaming.{DataStreamReader, StreamingQuery}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object KafkaSparkPaimonS3 {
  def main(args: Array[String]): Unit = {
    val kafkaConsumer: String = "kafka-service:9092"
    val kafkaTopic: String = "mysql-flink-cdc-kafka"
    val startingOffsets: String = "latest"
    val kafkaGroupId: String = "KafkaSparkPaimonS3Group"
    val failOnDataLoss: Boolean = false
    val maxOffsetsPerTrigger: Int = 3000
    val lakePath: String = "s3a://paimon/warehouse"
    val checkpointLocation: String = "s3a://spark/checkpoints"
    val s3endpoint: String = "http://minio:9000"
    val s3access: String = "uotAvnxXwcz90yNxWhq2"
    val s3secret: String = "MlDBAOfRDG9lwFTUo9Qic9dLbuFfHsxJfwkjFD4v"
    val schema_base = StructType(List(
      StructField("before", StringType),
      StructField("after", StringType),
      StructField("source", MapType(StringType, StringType)),
      StructField("op", StringType),
      StructField("ts_ms", LongType),
      StructField("transaction", StringType)
    ))
    println("create spark session ..........................................................")
    val sparkConf = SparkSession.builder()
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("sspark.sql.catalog.paimon.metastore", "filesystem")
      .config("spark.sql.catalog.paimon.warehouse", lakePath)
      .config("spark.sql.catalog.paimon.s3.endpoint", s3endpoint)
      .config("spark.sql.catalog.paimon.s3.access-key", s3access)
      .config("spark.sql.catalog.paimon.s3.secret-key", s3secret)
      .config("spark.sql.catalog.paimon", "org.apache.paimon.spark.SparkCatalog")
      .config("spark.sql.catalog.paimon.s3.path-style.access", "true")
      .config("spark.sql.extensions", "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
      .config("spark.sql.catalog.paimon.s3.path-style.access", "true")
      .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")
      .config("spark.hadoop.fs.s3a.multipart.size", "104857600")
      .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
      .config("spark.hadoop.fs.s3a.access.key", s3access)
      .config("spark.hadoop.fs.s3a.secret.key", s3secret)
      .config("spark.hadoop.fs.s3a.endpoint", s3endpoint)
      .config("spark.hadoop.fs.s3a.connection.timeout", "200000")
    val sparkSession: SparkSession = sparkConf.getOrCreate()
    println("get spark DataStreamReader start  ..........................................................")
    val dsr: DataStreamReader = sparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaConsumer)
      .option("subscribe", kafkaTopic)
      .option("startingOffsets", startingOffsets)
      .option("failOnDataLoss", failOnDataLoss)
      .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
      .option("kafka.group.id", kafkaGroupId)
      .option("includeHeaders", "true")
    println("get spark DataStreamReader end  ..........................................................")
    val df: DataFrame = dsr.load()
    println("配置kafka消费流 spark DataFrame end  ..........................................................")
    import org.apache.spark.sql.functions._
    import sparkSession.implicits._
    val frame: Dataset[Row] = df.select(from_json('value.cast("string"), schema_base) as "value").select($"value.*")
      .alias("data")
      .select(
        get_json_object($"data.after", "$.uuid").as("uuid"),
        get_json_object($"data.after", "$.product").as("product"),
        get_json_object($"data.after", "$.promotion").as("promotion"),
        get_json_object($"data.after", "$.value_added_service").as("value_added_service"),
        get_json_object($"data.after", "$.logistics").as("logistics"),
        get_json_object($"data.after", "$.weight").as("weight"),
        get_json_object($"data.after", "$.color").as("color"),
        get_json_object($"data.after", "$.version").as("version"),
        get_json_object($"data.after", "$.shop").as("shop"),
        get_json_object($"data.after", "$.evaluate").as("evaluate"),
        get_json_object($"data.after", "$.order_num").as("order_num"),
        get_json_object($"data.after", "$.rider").as("rider"),
        get_json_object($"data.after", "$.order_time").as("order_time"),
        get_json_object($"data.after", "$.create_time").as("create_time"),
        get_json_object($"data.after", "$.pay_price").as("pay_price"),
        get_json_object($"data.after", "$.pay_type").as("pay_type"),
        get_json_object($"data.after", "$.address").as("address")
      )
    println("get spark Dataset from kafka  ..........................................................")
    sparkSession.sql("USE paimon;")
    println("spark engine use paimon catalog ..........................................................")
    sparkSession.sql("create database m31094;")
    println("create my favourite database for u ..........................................................")
    val tablePath = "paimon.m31094.my_table"
    println("create table to store data  ..........................................................")
    sparkSession.sql("use m31094;")
    sparkSession.sql(
      s"""
          CREATE TABLE IF NOT EXISTS $tablePath (
              uuid STRING,
              product STRING,
              promotion STRING,
              value_added_service STRING,
              logistics STRING,
              weight STRING,
              color STRING,
              version STRING,
              shop STRING,
              evaluate STRING,
              order_num STRING,
              rider STRING,
              order_time STRING,
              create_time STRING,
              pay_price STRING,
              pay_type STRING,
              address STRING
          ) TBLPROPERTIES (
                'partitioned_by' = 'uuid'
            )
      """)
    println("将 DataFrame 写入 Paimon 表  ..........................................................")

    println("尽可能的详细打印数据吧哈哈哈哈 ..........................................................")

    val query: StreamingQuery = frame //是一个已经创建的 Dataset[Row],通常是从流数据源(如 Kafka、文件等)获得的数据。
      .writeStream //开始一个流式写入操作。
      .foreachBatch { (batchDF: Dataset[Row], batchId: Long) =>
        println(s"处理批量流的UID是 batch ID: $batchId")
        // 打印当前批次的数据
        println("莫醒醒..........................................................")
        batchDF.show(truncate = false) // 设置 truncate = false 以完整显示列内容
      }
      .format("paimon")
      //指定数据输出格式为 Paimon。
      .option("write.merge-schema", "true")
      //允许在写入时合并模式(schema),即动态更新表的模式以适应新数据。
      .option("write.merge-schema.explicit-cast", "true")
      //在合并模式时,明确转换数据类型,以确保兼容性和正确性。
      .outputMode("append")
      //指定输出模式为追加模式,表示只将新的数据行添加到目标表中,不会更新或删除已有的数据。
      .option("checkpointLocation", checkpointLocation)
      //设置检查点位置,这对于流处理非常重要,有助于在故障恢复时重新启动流处理任务。
      .start("s3a://paimon/warehouse/m31094.db/my_table") //启动流式查询并将数据写入指定的 S3 路径
    println("spark流通过paimon方式写入数据湖 ..........................................................")
    println("查看数据内容和结构  ..........................................................")
    println(df.schema) // 打印 Schema
    println("打印 Schema  ..........................................................")
    println("Stream processing started...")
    query.awaitTermination() //使当前线程等待,直到流查询结束。这意味着程序会持续运行,直到手动停止或出现错误。
    println("流处理已结束,程序终止。")
  }
}
 

 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.example.cloud</groupId>
    <artifactId>KafkaSparkPaimonS3</artifactId>
    <version>2.4.5</version>
    <name>KafkaSparkPaimonS3</name>
    <properties>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <spark.version>3.4.1</spark.version>
        <paimon.version>0.9.0</paimon.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-bundle</artifactId>
            <version>1.12.367</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-aws</artifactId>
            <version>3.3.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.3.6</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-text</artifactId>
            <version>1.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.paimon</groupId>
            <artifactId>paimon-spark-common</artifactId>
            <version>${paimon.version}</version>
        </dependency>
       <dependency>
            <groupId>org.apache.paimon</groupId>
            <artifactId>paimon-s3</artifactId>
            <version>${paimon.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.paimon</groupId>
            <artifactId>paimon-spark-3.4</artifactId>
            <version>${paimon.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.paimon</groupId>
            <artifactId>paimon-s3-impl</artifactId>
            <version>${paimon.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.commons</groupId>
                    <artifactId>commons-text</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>audience-annotations</artifactId>
                    <groupId>org.apache.yetus</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
       <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-token-provider-kafka-0-10_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.20</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>${project.artifactId}</finalName>
        <resources>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.example.cloud.KafkaSparkPaimonS3</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>3.1.0</version>
                <executions>
                    <execution>
                        <phase>prepare-package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                            <excludeTransitive>false</excludeTransitive>
                            <stripVersion>false</stripVersion>
                            <includeScope>runtime</includeScope>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-resources-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy-resources</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-resources</goal>
                        </goals>
                        <configuration>
                            <encoding>UTF-8</encoding>
                            <outputDirectory>
                                ${project.build.directory}/config
                            </outputDirectory>
                            <resources>
                                <resource>
                                    <directory>src/main/resources/</directory>
                                </resource>
                            </resources>
                        </configuration>
                    </execution>
                    <execution>
                        <id>copy-sh</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-resources</goal>
                        </goals>
                        <configuration>
                            <encoding>UTF-8</encoding>
                            <outputDirectory>
                                ${project.build.directory}
                            </outputDirectory>
                            <resources>
                                <resource>
                                    <directory>bin/</directory>
                                </resource>
                            </resources>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
 

Dockerfile

FROM spark-paimon-s3:3.4.3_2.12
RUN mkdir -p /opt/spark/examples/jars
COPY target /opt/spark/examples/jars  

构建镜像的命令

docker buildx build --load --platform linux/arm64 --tag  spark-paimon-s3-app:3.4.3_2.12 --no-cache .
docker save -o spark-paimon-s3-app.tar spark-paimon-s3-app:3.4.3_2.12 

3、配置minio

minio.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: minio
  namespace: default
spec:
  replicas: 1
  selector:
    matchLabels:
      app: minio
  template:
    metadata:
      labels:
        app: minio
    spec:
      containers:
        - name: minio
          image: minio/minio:latest
          imagePullPolicy: IfNotPresent
          args:
            - server
            - /data
          env:
            - name: MINIO_ROOT_USER
              value: "admin"
            - name: MINIO_ROOT_PASSWORD
              value: "密码"
          command:
            - /bin/sh
            - -c
            - minio server /data --console-address ":5000"
          ports:
            - name: api
              protocol: TCP
              containerPort: 9000
            - name: ui
              protocol: TCP
              containerPort: 5000
          volumeMounts:
            - name: minio-storage
              mountPath: /data
      volumes:
        - name: minio-storage
          persistentVolumeClaim:
            claimName: minio-pvc
---
apiVersion: v1
kind: Service
metadata:
  name: minio
  namespace: default
spec:
  selector:
    app: minio
  type: NodePort
  ports:
    - name: api
      protocol: TCP
      port: 9000
      targetPort: 9000
    - name: ui
      protocol: TCP
      port: 5000
      targetPort: 5000

minio-pvc.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: minio-pvc  # PVC 的名称
  namespace: default
spec:
  accessModes:
    - ReadWriteMany  # 访问模式,此处为单节点读写
  resources:
    requests:
      storage: 100Gi  # 请求的存储容量大小
  storageClassName: nfs-client  # 存储类,根据需要选择 

4、运行程序

4.1、springboot -mysql产生原始数据

产生的MySQL原始数据

4.2 数据从MySQL到kafka

mysql->flink cdc->kafka

MysqlFlinkCdcToKafka

在k8s上提交flink任务

/home/d/flink/bin/flink run-application --target kubernetes-application -Dkubernetes.namespace=default -Dkubernetes.cluster-id=flink-cdc-mysql -Dkubernetes.container.image.ref=flinkcdctokafka:0.1-snapshot -Dkubernetes.container.image.pull-policy=IfNotPresent -Dkubernetes.service-account=default -Dkubernetes.rest-service.exposed.type=NodePort -Djobmanager.memory.process.size=2048mb -Dtaskmanager.memory.process.size=2024mb -Dtaskmanager.numberOfTaskSlots=1 -Dhigh-availability.type=kubernetes -Dhigh-availability.storageDir=s3a://flink-cdc/recovery -Dstate.checkpoints.dir=s3a://flink-cdc/flink_cp -Dstate.savepoints.dir=s3a://flink-cdc/flink_sp -Dstate.backend.incremental=true -Ds3.access-key=uotAvnxXwcz90yNxWhq2 -Ds3.secret-key=MlDBAOfRDG9lwFTUo9Qic9dLbuFfHsxJfwkjFD4v -Ds3.path.style.access=true -Ds3.endpoint=http://minio:9000 -Duser.timezone=Asia/Shanghai -c "com.example.cloud.MysqlFlinkCdcToKafka" local:///opt/flink/usrlib/MysqlFlinkCdcToKafka-jar-with-dependencies.jar

通过flink cdc将MySQL的数据写入到kafka的指定topic 

4.3 kafka到minio

kafka-spark-minio

spark提交命令,提交spark任务到k8s集群中运行

/opt/streaming/spark-3.4.3-bin-hadoop3/bin/spark-submit --name KafkaSparkPaimonS3 --master spark://10.10.10.99:7077 --deploy-mode client --driver-cores 2 --driver-memory 4g --num-executors 2 --executor-cores 2 --executor-memory 4g --class com.example.cloud.KafkaSparkPaimonS3 --conf spark.driver.extraClassPath=/opt/streaming/spark-3.4.3-bin-hadoop3/jars --conf spark.executor.extraClassPath=/opt/streaming/spark-3.4.3-bin-hadoop3/jars --jars /opt/lib/kafka-clients-3.8.0.jar,/opt/lib/spark-sql-kafka-0-10_2.13-3.4.3.jar,/opt/lib/spark-token-provider-kafka-0-10_2.13-3.4.3.jar /opt/KafkaSparkPaimonS3-jar-with-dependencies.jar

本地spark运行,可以通过spark sql查询数据的情况

本地执行spark-sql

/opt/streaming/spark-3.4.3-bin-hadoop3/bin/spark-sql --jars /opt/lib/paimon-spark-3.4-0.9.0.jar --conf 'spark.sql.catalog.paimon.metastore=filesystem' --conf 'spark.sql.catalog.paimon.warehouse=s3a://paimon/warehouse' --conf 'spark.sql.catalog.paimon.s3.endpoint=http://10.10.10.99:31212' --conf 'spark.sql.catalog.paimon.s3.access-key=uotAvnxXwcz90yNxWhq2' --conf 'spark.sql.catalog.paimon.s3.secret-key=MlDBAOfRDG9lwFTUo9Qic9dLbuFfHsxJfwkjFD4v' --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog' --conf 'spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions' --conf 'spark.sql.catalog.paimon.s3.path-style.access=true' --conf 'spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore' --conf 'spark.hadoop.fs.s3a.multipart.size=104857600' --conf 'spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem' --conf 'spark.hadoop.fs.s3a.access.key=uotAvnxXwcz90yNxWhq2' --conf 'spark.hadoop.fs.s3a.secret.key=MlDBAOfRDG9lwFTUo9Qic9dLbuFfHsxJfwkjFD4v' --conf 'spark.hadoop.fs.s3a.endpoint=http://10.10.10.99:31212' --conf 'spark.hadoop.fs.s3a.connectiopaimonn.timeout=200000'

 执行上面的本地spark-sql,开启spark终端后

use paimon;

use databases;

5、运行效果

 6、minio上存储

flink数据同步

 k8s上部署的容器服务

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/912897.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Spring】体系结构

Spring框架至今集成了多个模块&#xff0c;这些模块分布在数据访问/集成&#xff08;Data Access/Integration&#xff09;、Web层、面向切面的编程&#xff08;Aspect Oriented Programming&#xff0c;AOP&#xff09;模块、植入&#xff08;Instrumentation&#xff09;模块…

软件缺陷等级评定综述

1. 前言 正确评估软件缺陷等级&#xff0c;在项目的生命周期中有着重要的作用&#xff1a; 指导缺陷修复的优先级和资源分配 在软件开发和维护过程中&#xff0c;资源&#xff08;包括人力、时间和资金&#xff09;是有限的。通过明确缺陷的危险等级&#xff0c;可以帮助团队合…

Linux:vim命令总结及环境配置

文章目录 前言一、vim的基本概念二、vim模式命令解析1. 命令模式1&#xff09;命令模式到其他模式的转换&#xff1a;2&#xff09;光标定位&#xff1a;3&#xff09;其他命令&#xff1a; 2. 插入模式3. 底行模式4. 替换模式5. 视图模式6. 外部命令 三、vim环境的配置1. 环境…

Obsidian的Git插件设置配置全流程 -- 简单的电脑端多平台同步方案及常见问题

Obsidian的Git插件设置配置全流程 -- 简单的电脑端多平台同步方案及常见问题 参考文章引言1. git 介绍及安装2. git 本地配置及远程仓库链接3. obsidian 的 git 插件4. 常用的使用场景和对应的命令4.1. 本地仓库已推送到远端&#xff0c;如何在另一个电脑上第一次同步4.2 多端同…

【优选算法篇】微位至简,数之恢宏——解构 C++ 位运算中的理与美

文章目录 C 位运算详解&#xff1a;基础题解与思维分析前言第一章&#xff1a;位运算基础应用1.1 判断字符是否唯一&#xff08;easy&#xff09;解法&#xff08;位图的思想&#xff09;C 代码实现易错点提示时间复杂度和空间复杂度 1.2 丢失的数字&#xff08;easy&#xff0…

Redis(3):持久化

一、Redis高可用概述 在介绍Redis高可用之前&#xff0c;先说明一下在Redis的语境中高可用的含义。   我们知道&#xff0c;在web服务器中&#xff0c;高可用是指服务器可以正常访问的时间&#xff0c;衡量的标准是在多长时间内可以提供正常服务&#xff08;99.9%、99.99%、9…

高亚科技签约酸动力,助力研发管理数字化升级

近日&#xff0c;中国企业管理软件资深服务商高亚科技与广东酸动力生物科技有限公司&#xff08;以下简称“酸动力”&#xff09;正式签署合作协议。借助高亚科技的8Manage PM项目管理软件&#xff0c;酸动力将进一步优化项目过程跟踪与节点监控&#xff0c;提升研发成果的高效…

大模型领域最值得看的 9 本新书,找到了

在人工智能革命的浪潮中&#xff0c;程序员们正站在技术变革的最前沿。本书单精选了关于人工智能在各行业应用的最新著作&#xff0c;从医疗诊断到金融风控&#xff0c;从智能制造到智慧城市&#xff0c;全面展现AI如何重塑行业生态&#xff0c;推动社会进步。通过阅读这些书籍…

加入GitHub Spark需要申请

目录 加入GitHub Spark需要申请 GitHub Spark 一、产品定位与特点 二、核心组件与功能 三、支持的AI模型 四、应用场景与示例 五、未来展望 六、申请体验 加入GitHub Spark需要申请 GitHub Spark 是微软旗下GitHub在2024年10月30日的GitHub Universe大会上推出的一款革…

Rust移动开发:Rust在iOS端集成使用介绍

iOS调用Rust 上篇介绍了 Rust移动开发&#xff1a;Rust在Android端集成使用介绍, 这篇主要看下iOS上如何使用Rust&#xff0c;Rust可以给移动端开发提供跨平台&#xff0c;通用组件支持。 该篇适合对iOS、Rust了解&#xff0c;想知道如何整合调用和编译的&#xff0c;如果想要…

【月之暗面kimi-注册/登录安全分析报告】

前言 由于网站注册入口容易被黑客攻击&#xff0c;存在如下安全问题&#xff1a; 暴力破解密码&#xff0c;造成用户信息泄露短信盗刷的安全问题&#xff0c;影响业务及导致用户投诉带来经济损失&#xff0c;尤其是后付费客户&#xff0c;风险巨大&#xff0c;造成亏损无底洞 …

2024 CSS保姆级教程四

CSS中的动画 CSS动画&#xff08;CSS Animations&#xff09;是为层叠样式表建议的允许可扩展标记语言&#xff08;XML&#xff09;元素使用CSS的动画的模块​ 即指元素从一种样式逐渐过渡为另一种样式的过程​ 常见的动画效果有很多&#xff0c;如平移、旋转、缩放等等&#…

[C++11] Lambda 表达式

lambda 表达式&#xff08;Lambda Expressions&#xff09;作为一种匿名函数&#xff0c;为开发者提供了简洁、灵活的函数定义方式。相比传统的函数指针和仿函数&#xff0c;lambda 表达式在简化代码结构、提升代码可读性和编程效率方面表现出色。 Lambda 表达式的基本语法 在…

AI4SCIENSE(鄂维南院士:再谈AI for Science)

鄂维南院士&#xff1a;再谈AI for Science_哔哩哔哩_bilibili 以往处理高维问题 量子力学&#xff1a;单变量乘积 统计学&#xff1a;旋转 AI4S 处理数据 蛋白质折叠&#xff1f; 不是纯粹的数据驱动 物理学等学科基本原理 例&#xff1a;分子动力学 数据模型 流程图 这…

接收nVisual中rabbitmq数据不成功问题排查

rabbitmq服务部署成功的情况下&#xff0c;消息对接不成功一般原因为消息发送失败&#xff0c;发送失败大多数可能为global_settings表配置错误。下面从两个方面解决消息对接不成功问题。 1.数据是否成功发送 检查global_settings表中rabbitmq发送消息配置信息是否正确 #MQS…

SpringBoot框架学习总结 及 整合 JDBC Mybatis-plus JPA Redis 我的学习笔记

SpringBoot框架学习总结 及 整合 JDBC Mybatis-plus JPA Redis 我的学习笔记 一、SpringBoot概述二、创建SpringBoot程序1. 使用maven方式创构建2. 使用Spring Initializr构建3. SpringBoot热部署4. SpringBoot的跨域处理 三、基础配置1.配置文件的作用2.配置文件格式2.yaml3.S…

【AIGC】如何通过ChatGPT轻松制作个性化GPTs应用

创建个性化的GPTs应用是一个涉及技术、设计和用户体验的过程。以下是详细步骤&#xff1a; ###1.确定应用目标和用户群体 在开始之前&#xff0c;你需要明确你的应用的目标和目标用户。这将帮助你在设计、开发和个性化方面做出相应的决策。例如&#xff0c;如果你的应用是为了…

strtok函数详解

strtok函数 strtok 函数是一个字符串分割函数&#xff0c;用于将字符串分割成一系列的标记。这个函数通过一组分隔符字符来确定标记的边界&#xff0c;每次调用都会返回字符串中的下一个标记&#xff0c;并且将原始字符串中的分隔符替换为空字符‘\0’&#xff0c;从而实际上是…

【Linux】Linux入门实操——vim、目录结构、远程登录、重启注销

一、Linux 概述 1. 应用领域 服务器领域 linux在服务器领域是最强的&#xff0c;因为它免费、开源、稳定。 嵌入式领域 它的内核最小可以达到几百KB, 可根据需求对软件剪裁&#xff0c;近些年在嵌入式领域得到了很大的应用。 主要应用&#xff1a;机顶盒、数字电视、网络…

系统管理与规划师

综合 工业化、信息化两化融合&#xff1a;战略、资源、经济、设备和技术的融合 诺兰6时期&#xff1a;&#xff08;初普控&#xff0c;整数成&#xff09;初始、普及、控制、整合、数据管理、成熟期&#xff1b;技术转型期介于控制和整合间 IT战略规划 IT战略制定&#xff1a;使…