项目
一、Hudi+Spark+Kafka(Scala)
配置详见【1.Scala配置】
依赖详见【1.Hudi+Spark+Kafka依赖】
1-1 构建SparkSession对象
def main(args: Array[String]): Unit = {
//1.构建SparkSession对象
val spark: SparkSession = SparkUtils.createSparkSession(this.getClass);
//2.从Kafka实时消费数据
val kafkaStreamDF: DataFrame = readFromKafka(spark, "order-topic")
//3.提取数据,转换数据类型
val streamDF: DataFrame = process(kafkaStreamDF);
//4.保存数据至Hudi表中:MOR(读取时保存)
saveToHudi(streamDF);
//5.流式应用启动以后,等待终止
spark.streams.active.foreach(query => println(s"Query: ${query.name} is Running ............."))
spark.streams.awaitAnyTermination()
}
1-2 从Kafka/CSV文件读取数据
/**
* 指定Kafka topic名称,实时消费数据
*
* @param spark
* @param topicName
* @return
*/
def readFromKafka(spark: SparkSession, topicName: String): DataFrame = {
spark.readStream
.format("kafka") //指定Kafka
.option("kafka.bootstrap.servers", "node1.itcast.cn:9099") //指定Kafka的服务IP和端口
.option("subscribe", topicName) //订阅Kafka的topic的名称
.option("startingOffsets", "latest") //从最新消费
.option("maxOffsetsPerTrigger", 100000) //每次最多处理10万条数据
.option("failOnDataLoss", value = false) //如果数据丢失是否失败
.load()
}
/**
* 读取CSV格式文本文件数据,封装到DataFrame数据集
*/
def readCsvFile(spark: SparkSession, path: String): DataFrame = {
spark.read
// 设置分隔符为\t
.option("sep", "\\t")
// 文件首行为列名称
.option("header", "true")
// 依据数值自动推断数据类型
.option("inferSchema", "true")
// 指定文件路径
.csv(path)
}
1-3 ETL转换后存储至Hudi表中
/**
* 对Kafka获取数据,进行转换操作,获取所有字段的值,转换为String,以便保存到Hudi表
* @param streamDF
* @return
*/
def process(streamDF: DataFrame): DataFrame = {
streamDF
//选择字段
.selectExpr(
"CAST(key AS STRING) order_id",
"CAST(value AS STRING) AS message",
"topic","partition","offset","timestamp"
)
//解析message数据,提取字段值
.withColumn("user_id",get_json_object(col("message"),"$.userId"))
.withColumn("order_time",get_json_object(col("message"),"$.orderTime"))
.withColumn("ip",get_json_object(col("message"),"$.ip"))
.withColumn("order_money",get_json_object(col("message"),"$.orderMoney"))
.withColumn("order_status",get_json_object(col("message"),"$.orderStatus"))
//删除message字段
.drop(col("message"))
//转换订单日期时间格式为Long类型,作为Hudi表中合并数据字段
.withColumn("ts",to_timestamp(col("order_time"),"yyyy-MM-dd HH:mm:ss.SSS"))
//订单日期时间提取分区日期:yyyyMMdd
.withColumn("day",substring(col("order_time"),0,10))
}
/**
* 将流式数据集DataFrame保存至Hudi表,表类型可选:COW和MOR
*/
def saveToHudi(streamDF: DataFrame): Unit = {
streamDF.writeStream
.outputMode(OutputMode.Append())
.queryName("query-hudi-streaming")
// 针对每微批次数据保存
.foreachBatch((batchDF: Dataset[Row], batchId: Long) => {
println(s"============== BatchId: ${batchId} start ==============")
writeHudiMor(batchDF) // TODO:表的类型MOR
})
.option("checkpointLocation", "/datas/hudi-spark/struct-ckpt-1001")
.start()
}
/**
* 将数据集DataFrame保存到Hudi表中,表的类型:MOR(读取时合并)
*/
def writeHudiMor(dataframe: DataFrame): Unit = {
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
dataframe.write
.format("hudi")
.mode(SaveMode.Append)
// 表的名称
.option(TBL_NAME.key, "tbl_hudi_order")
// 设置表的类型
.option(TABLE_TYPE.key(), "MERGE_ON_READ")
// 每条数据主键字段名称
.option(RECORDKEY_FIELD_NAME.key(), "order_id")
// 数据合并时,依据时间字段
.option(PRECOMBINE_FIELD_NAME.key(), "ts")
// 分区字段名称
.option(PARTITIONPATH_FIELD_NAME.key(), "day")
// 分区值对应目录格式,是否与Hive分区策略一致
.option(HIVE_STYLE_PARTITIONING_ENABLE.key(), "true")
// 插入数据,产生shuffle时,分区数目
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
// 表数据存储路径
.save("/hudi-warehouse/tbl_hudi_order")
}
1-4 SparkSQL加载Hudi表数据并分析
/**
* 从Hudi表加载数据,指定数据存在路径
*/
def readFromHudi(spark: SparkSession, path: String): DataFrame = {
// a. 指定路径,加载数据,封装至DataFrame
val didiDF: DataFrame = spark.read.format("hudi").load(path);
// b. 选择字段
didiDF
// 选择字段
.select(
"order_id", "product_id", "type", "traffic_type",
"pre_total_fee", "start_dest_distance", "departure_time"
)
}
/**
* 订单类型统计,字段:product_id
*/
def reportProduct(dataframe: DataFrame): Unit = {
val reportDF: DataFrame = dataframe.groupBy("product_id").count();
val to_name = udf(
(product_id: Int) => {
product_id match {
case 1 => "滴滴专车"
case 2 => "滴滴企业专车"
case 3 => "滴滴快车"
case 4 => "滴滴企业快车"
}
}
)
val resultDF: DataFrame = reportDF.select(
to_name(col("product_id")).as("order_type"), //
col("count").as("total") //
)
resultDF.printSchema();
resultDF.show(10, truncate = false);
}
二、Hudi+Flink+Kafka(Java)
依赖详见【2.Hudi+Flink+Kafka依赖】
2-1 从Kafka消费数据
第1步获取表执行环境无需赘述。
第2步创建输入表:指定了Kafka的服务IP和端口、topic等信息,从这里读取数据
第3步中转换数据为Hudi表中需要的格式(添加两个必须字段:数据合并字段ts,分区字段partition_day)
package cn.itcast.hudi;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
public class FlinkSQLKafkaDemo {
public static void main(String[] args) {
//1.获取表执行环境
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode() //流式
.build();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);
//2.创建输入表:从Kafka消费数据
tableEnvironment.executeSql(
"CREATE TABLE order_kafka_source (\n" +
" orderId STRING,\n" +
" userId STRING,\n" +
" orderTime STRING,\n" +
" ip STRING,\n" +
" orderMoney DOUBLE,\n" +
" orderStatus INT\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'order-topic',\n" +
" 'properties.bootstrap.servers' = 'node1.itcast.cn:9099',\n" +
" 'properties.group.id' = 'gid-1001',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'format' = 'json',\n" +
" 'json.fail-on-missing-field' = 'false',\n" +
" 'json.ignore-parse-errors' = 'true'\n" +
")"
);
//3.转换数据:可以使用SQL,也可以是Table api
Table table = tableEnvironment.from("order_kafka_source")
//添加字段:hudi表数据合并字段,"orderId":"20211122103434136000001" -> 20211122103434136
.addColumns(
$("orderId").substring(0, 17).as("ts")
)
//添加字段:hudi表中分区字段,"orderTime":"2021-11-22 10:34:34.136" -> 2021-11-22
.addColumns(
$("orderTime").substring(0, 10).as("partition_day")
);
tableEnvironment.createTemporaryView("view_order",table);
//4.创建输出表:将结果数据输出
tableEnvironment.executeSql("select * from view_order").print();
}
}
2-2 将数据输出到hudi表中
第4步创建输出表:指定了输出Hudi表路径(本地路径、Hadoop等)、表类型、数据合并字段、分组字段等,数据输出到这里
第5步将数据插入到输出Hudi表中
package cn.itcast.hudi;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
/**
* 基于Flink SQL Connector实现:实时消费Topic中数据,转换处理后,实时存储Hudi表中
*/
public class FlinkSQLHudiDemo {
public static void main(String[] args) {
//1.获取表执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(5000);//由于增量将数据写入到Hudi表,所以需要启动Flink CheckPoint检查点
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode() //流式
.build();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env,settings);
//2.创建输入表:从Kafka消费数据
tableEnvironment.executeSql(
"CREATE TABLE order_kafka_source (\n" +
" orderId STRING,\n" +
" userId STRING,\n" +
" orderTime STRING,\n" +
" ip STRING,\n" +
" orderMoney DOUBLE,\n" +
" orderStatus INT\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'order-topic',\n" +
" 'properties.bootstrap.servers' = 'node1.itcast.cn:9099',\n" +
" 'properties.group.id' = 'gid-1001',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'format' = 'json',\n" +
" 'json.fail-on-missing-field' = 'false',\n" +
" 'json.ignore-parse-errors' = 'true'\n" +
")"
);
//3.转换数据:可以使用SQL,也可以是Table api
Table table = tableEnvironment.from("order_kafka_source")
//添加字段:hudi表数据合并字段,"orderId":"20211122103434136000001" -> 20211122103434136
.addColumns(
$("orderId").substring(0, 17).as("ts")
)
//添加字段:hudi表中分区字段,"orderTime":"2021-11-22 10:34:34.136" -> 2021-11-22
.addColumns(
$("orderTime").substring(0, 10).as("partition_day")
);
tableEnvironment.createTemporaryView("view_order", table);
//4.创建输出表:将数据输出到hudi表中
tableEnvironment.executeSql(
"CREATE TABLE order_hudi_sink (\n" +
" orderId STRING PRIMARY KEY NOT ENFORCED,\n" +
" userId STRING,\n" +
" orderTime STRING,\n" +
" ip STRING,\n" +
" orderMoney DOUBLE,\n" +
" orderStatus INT,\n" +
" ts STRING,\n" +
" partition_day STRING\n" +
")\n" +
"PARTITIONED BY (partition_day) \n" +
"WITH (\n" +
" 'connector' = 'hudi',\n" +
" 'path' = 'file:///D:/flink_hudi_order',\n" +
" 'table.type' = 'MERGE_ON_READ',\n" +
" 'write.operation' = 'upsert',\n" +
" 'hoodie.datasource.write.recordkey.field' = 'orderId',\n" +
" 'write.precombine.field' = 'ts',\n" +
" 'write.tasks'= '1'\n" +
")"
);
// 5.通过子查询方式,将数据写入输出表(注意,字段顺序要一致)
tableEnvironment.executeSql(
"INSERT INTO order_hudi_sink\n" +
"SELECT\n" +
" orderId, userId, orderTime, ip, orderMoney, orderStatus, ts, partition_day\n" +
"FROM view_order"
);
}
}
2-3 从hudi表中加载数据
创建输入表,加载Hudi表查询数据即可。
package cn.itcast.hudi;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
/**
* 基于Flink SQL Connector实现:从Hudi表中加载数据,编写SQL查询
*/
public class FlinkSQLReadDemo {
public static void main(String[] args) {
//1.获取表执行环境
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);
//2.创建输入表,加载Hudi表查询数据
tableEnvironment.executeSql(
"CREATE TABLE order_hudi(\n" +
" orderId STRING PRIMARY KEY NOT ENFORCED,\n" +
" userId STRING,\n" +
" orderTime STRING,\n" +
" ip STRING,\n" +
" orderMoney DOUBLE,\n" +
" orderStatus INT,\n" +
" ts STRING,\n" +
" partition_day STRING\n" +
")\n" +
"PARTITIONED BY (partition_day)\n" +
"WITH (\n" +
" 'connector' = 'hudi',\n" +
" 'path' = 'file:///D:/flink_hudi_order',\n" +
" 'table.type' = 'MERGE_ON_READ',\n" +
" 'read.streaming.enabled' = 'true',\n" +
" 'read.streaming.check-interval' = '4'\n" +
")"
);
//3.执行查询语句,流式读取Hudi数据
tableEnvironment.executeSql(
"SELECT orderId, userId, orderTime, ip, orderMoney, orderStatus, ts ,partition_day FROM order_hudi"
).print();
}
}
附:依赖
1.Hudi+Spark+Kafka依赖
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>jboss</id>
<url>http://repository.jboss.com/nexus/content/groups/public</url>
</repository>
</repositories>
<properties>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.0.0</spark.version>
<hadoop.version>2.7.3</hadoop.version>
<hudi.version>0.9.0</hudi.version>
</properties>
<dependencies>
<!-- 依赖Scala语言 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Spark Core 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark SQL 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Structured Streaming + Kafka 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Hadoop Client 依赖 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- hudi-spark3 -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark3-bundle_2.12</artifactId>
<version>${hudi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark SQL 与 Hive 集成 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.13</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.12</version>
</dependency>
</dependencies>
<build>
<outputDirectory>target/classes</outputDirectory>
<testOutputDirectory>target/test-classes</testOutputDirectory>
<resources>
<resource>
<directory>${project.basedir}/src/main/resources</directory>
</resource>
</resources>
<!-- Maven 编译的插件 -->
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
2.Hudi+Flink+Kafka依赖
<repositories>
<repository>
<id>nexus-aliyun</id>
<name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</repository>
<repository>
<id>central_maven</id>
<name>central maven</name>
<url>https://repo1.maven.org/maven2</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<flink.version>1.12.2</flink.version>
<hadoop.version>2.7.3</hadoop.version>
<mysql.version>8.0.16</mysql.version>
</properties>
<dependencies>
<!-- Flink Client -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Table API & SQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink-bundle_${scala.binary.version}</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-10.0</version>
</dependency>
<!-- MySQL/FastJson/lombok -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<!-- slf4j及log4j -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<!--<encoding>${project.build.sourceEncoding}</encoding>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- <mainClass>com.itcast.flink.batch.FlinkBatchWordCount</mainClass> -->
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
附:报错
1.运行报错
【报错代码】
Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
【原因】
windows下运行时需要安装Windows下运行的支持插件:hadoop2.7-common-bin
网址:https://gitcode.net/mirrors/cdarlint/winutils?utm_source=csdn_github_accelerator
选择需要版本的包下载,配置环境变量HADOOP_HOME和path,重启idea再运行就不会报错了
cd hudi/server/hadoop
./bin/hadoop checknative
2.运行报错
【报错】
NoSuchFieldError: INSTANCE
【原因】
由于代码中的httpclient和httpcore版本过高, 而hadoop中的版本过低导致(<4.3)
【解决】
将&HADOOP_HOME/share/hadoop/common/lib 下和 &HADOOP_HOME/share/hadoop/tools/lib/下的httpclient和httpcore替换成高版本(>4.3)
cd /home/zhangheng/hudi/server/hadoop/share/hadoop/common/lib
rm httpclient-4.2.5.jar
rm httpcore-4.2.5.jar
cd /home/zhangheng/hudi/server/hadoop/share/hadoop/tools/lib
rm httpclient-4.2.5.jar
rm httpcore-4.2.5.jar
scp -r D:\Users\zh\Desktop\Hudi\compressedPackage\httpclient-4.4.jar zhangheng@10.8.4.212:/home/zhangheng/hudi/server/hadoop/share/hadoop/common/lib
scp -r D:\Users\zh\Desktop\Hudi\compressedPackage\httpcore-4.4.jar zhangheng@10.8.4.212:/home/zhangheng/hudi/server/hadoop/share/hadoop/common/lib
scp -r D:\Users\zh\Desktop\Hudi\compressedPackage\httpclient-4.4.jar zhangheng@10.8.4.212:/home/zhangheng/hudi/server/hadoop/share/hadoop/tools/lib
scp -r D:\Users\zh\Desktop\Hudi\compressedPackage\httpcore-4.4.jar zhangheng@10.8.4.212:/home/zhangheng/hudi/server/hadoop/share/hadoop/tools/lib
3.运行警告
【警告】
WARN ProcfsMetricsGetter: Exception when trying to compute pagesize, as a result reporting of ProcessTree metrics is stopped
【原因】
spark版本太高,最开始选的spark版本为v3.0.0,但是不太合适,改成v2.4.6,就ok了。
【解决】
官方网址:https://archive.apache.org/dist/spark/spark-2.4.6/
下载安装配置环境变量:spark-2.4.6-bin-hadoop2.7.tgz
附:配置
1.Scala配置
1.Windows安装Scala:https://www.scala-lang.org/
安装完成后配置环境变量SCALA_HOME、path
输入scala -version查看是否安装成功
2.idea安装Scala插件:plugins搜索scala直接安装
重启之后,找到file(工具)——>project structure,找到左下角Glob libararies,然后点击中间 + 号,选择最后一个 Scala SDK,找到自己安装scala的版本,点击ok即可
2.idea中虚拟机配置
Tools -> Deployment -> Browse Remote Host
配置自己虚拟机的SSH configuration、Root path、Web server URL。