文章目录
- DataSet Sources
- DataSet Transformation
- DataSet Sink
- 序列化器
- 样例一:读 csv 文件生成 csv 文件
- 样例二:读 starrocks 写 starrocks
- 样例三:DataSet、Table Sql 处理后写入 StarRocks
- `DataSet<Row>` 遍历
- 遇到的坑
分类:
- Source:数据源创建初始数据集,例如来自文件或 Java 集合。
- Transformation:数据转换将一个或多个 DataSet 转换为新的 DataSet
- Sink:将计算结果存储或返回
DataSet Sources
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 从本地文件系统读
DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");
// 读取HDFS文件
DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");
// 读取CSV文件
DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file").types(Integer.class, String.class, Double.class);
// 读取CSV文件中的部分
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file").includeFields("10010").types(String.class, Double.class);
// 读取CSV映射为一个java类
DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file").pojoType(Person.class, "name", "age", "zipcode");
// 读取一个指定位置序列化好的文件
DataSet<Tuple2<IntWritable, Text>> tuples =
env.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");
// 从输入字符创建
DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar");
// 创建一个数字序列
DataSet<Long> numbers = env.generateSequence(1, 10000000);
// 从关系型数据库读取
DataSet<Tuple2<String, Integer> dbData =
env.createInput(JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:persons")
.setQuery("select name, age from persons")
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
.finish());
DataSet Transformation
可参考:
Flink从入门到放弃(入门篇3)-DataSetAPI
Flink的DataSet基本算子总结
DataSet Sink
// text data
DataSet<String> textData = // [...]
// write DataSet to a file on the local file system
textData.writeAsText("file:///my/result/on/localFS");
// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS");
// write DataSet to a file and overwrite the file if it exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE);
// tuples as lines with pipe as the separator "a|b|c"
DataSet<Tuple3<String, Integer, Double>> values = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|");
// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file");
// this writes values as strings using a user-defined TextFormatter object
values.writeAsFormattedText("file:///path/to/the/result/file",
new TextFormatter<Tuple2<Integer, Integer>>() {
public String format (Tuple2<Integer, Integer> value) {
return value.f1 + " - " + value.f0;
}
});
使用自定义输出格式:
DataSet<Tuple3<String, Integer, Double>> myResult = [...]
// write Tuple DataSet to a relational database
myResult.output(
// build and configure OutputFormat
JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:persons")
.setQuery("insert into persons (name, age, height) values (?,?,?)")
.finish()
);
序列化器
- Flink 自带了针对诸如 int,long,String 等标准类型的序列化器
- 针对 Flink 无法实现序列化的数据类型,我们可以交给 Avro 和 Kryo
使用方法:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
使用avro序列化:env.getConfig().enableForceAvro();
使用kryo序列化:env.getConfig().enableForceKryo();
使用自定义序列化:env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)
样例一:读 csv 文件生成 csv 文件
参考:(3)Flink学习- Table API & SQL编程
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<!--使用Java编程语言支持DataStream / DataSet API的Table&SQL API-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.9.1</version>
<!--<scope>provided</scope>-->
</dependency>
<!--表程序规划器和运行时-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.9.1</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.9.1</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
import lombok.Data;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;
public class SQLWordCount {
public static void main(String[] args) throws Exception {
// 1、获取执行环境 ExecutionEnvironment (批处理用这个对象)
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(env);
// DataSet<WC> input = env.fromElements(
// WC.of("hello", 1),
// WC.of("hqs", 1),
// WC.of("world", 1),
// WC.of("hello", 1)
// );
// 注册数据集
// tEnv.registerDataSet("WordCount", input, "word, frequency");
// 2、加载数据源到 DataSet
DataSet<Student> csv = env.readCsvFile("D:\\tmp\\data.csv").ignoreFirstLine().pojoType(Student.class, "name", "age");
// 3、将DataSet装换为Table
Table students = bTableEnv.fromDataSet(csv);
bTableEnv.registerTable("student", students);
// 4、注册student表
Table result = bTableEnv.sqlQuery("select name,age from student");
result.printSchema();
DataSet<Student> dset = bTableEnv.toDataSet(result, Student.class);
// DataSet<Row> dset = bTableEnv.toDataSet(result, Row.class);
System.out.println("count-->" + dset.count());
dset.print();
// 5、sink输出
CsvTableSink sink1 = new CsvTableSink("D:\\tmp\\result.csv", ",", 1, FileSystem.WriteMode.OVERWRITE);
String[] fieldNames = {"name", "age"};
TypeInformation[] fieldTypes = {Types.STRING, Types.INT};
bTableEnv.registerTableSink("CsvOutPutTable", fieldNames, fieldTypes, sink1);
result.insertInto("CsvOutPutTable");
env.execute("SQL-Batch");
}
@Data
public static class Student {
private String name;
private int age;
}
}
准备测试文件 data.csv
:
name,age
zhangsan,23
lisi,43
wangwu,12
运行程序后会生成 D:\\tmp\\result.csv
文件。
样例二:读 starrocks 写 starrocks
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
public class SQLWordCount {
public static void main(String[] args) throws Exception {
TypeInformation[] fieldTypes = {Types.STRING, Types.INT};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8")
.setUsername("root").setPassword("")
.setQuery("select * from student").setRowTypeInfo(rowTypeInfo).finish();
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 方式一
DataSource s = env.createInput(jdbcInputFormat);
s.output(JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8")
.setUsername("root").setPassword("")
.setQuery("insert into student values(?, ?)")
.finish()
);
// 方式二
// DataSet<Row> dataSource = env.createInput(jdbcInputFormat);
//
// dataSource.output(JDBCOutputFormat.buildJDBCOutputFormat()
// .setDrivername("com.mysql.jdbc.Driver")
// .setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8")
// .setUsername("root").setPassword("")
// .setQuery("insert into student values(?, ?)")
// .finish()
// );
env.execute("SQL-Batch");
}
}
数据准备:
CREATE TABLE student (
name STRING,
age INT
) ENGINE=OLAP
DUPLICATE KEY(`name`)
DISTRIBUTED BY RANDOM
PROPERTIES (
"compression" = "LZ4",
"fast_schema_evolution" = "false",
"replicated_storage" = "true",
"replication_num" = "1"
);
insert into student values('zhangsan', 23);
参考:
flink 读取mysql源 JDBCInputFormat、自定义数据源
flink1.10中三种数据处理方式的连接器说明
flink读写MySQL的两种方式
注意:如果运行 java -cp flink-app-1.0-SNAPSHOT-jar-with-dependencies.jar com.xiaoqiang.app.SQLWordCount
时报错:Exception in thread "main" com.typesafe.config.ConfigException$UnresolvedSubstitution: reference.conf @ jar:file:flink-app-1.0-SNAPSHOT-jar-with-dependencies.jar!/reference.conf: 875: Could not resolve substitution to a value: ${akka.stream.materializer}
解决:报错:Flink Could not resolve substitution to a value: ${akka.stream.materializer}
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<!--<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>flink.KafkaDemo1</mainClass>
</transformer>-->
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
样例三:DataSet、Table Sql 处理后写入 StarRocks
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.types.Row;
public class SQLWordCount {
public static void main(String[] args) throws Exception {
TypeInformation[] fieldTypes = {Types.STRING, Types.INT};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8")
.setUsername("root").setPassword("")
.setQuery("select * from student").setRowTypeInfo(rowTypeInfo).finish();
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(env);
DataSet<Row> dataSource = env.createInput(jdbcInputFormat);
dataSource.print();
Table students = bTableEnv.fromDataSet(dataSource);
bTableEnv.registerTable("student", students);
Table result = bTableEnv.sqlQuery("select name, age from (select f0 as name, f1 as age from student) group by name, age");
result.printSchema();
DataSet<Row> dset = bTableEnv.toDataSet(result, Row.class);
dset.output(JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8")
.setUsername("root").setPassword("")
.setQuery("insert into student values(?, ?)")
.finish()
);
env.execute("SQL-Batch");
}
}
DataSet<Row>
遍历
try {
dataSet.map(new MapFunction<Row, String>() {
@Override
public String map(Row value) throws Exception {
// 在这里处理每一行的数据
float dateNum = (float) value.getField(2);
float dateAge = (float) value.getField(3);
return "dataSet 遍历完成";
}
}).print();
} catch (Exception e) {
throw new RuntimeException(e);
}
如果你需要转换每一行为多行输出,可以使用 FlatMapFunction
:
rows.flatMap(new FlatMapFunction<Row, Row>() {
@Override
public void flatMap(Row value, Collector<Row> out) throws Exception {
// 转换逻辑
// 例如,复制原行并输出
out.collect(value);
// 如果需要多行输出,可以再次调用out.collect(...)
}
}).collect();
请注意,.collect()
方法用于强制执行计算,并获取结果到客户端。在生产代码中,你可能想要将结果输出到文件或者发送到外部系统,而不是仅仅收集到客户端。
遇到的坑
坑1:Bang equal '!=' is not allowed under the current SQL conformance level
解决:将 sql 中的 !=
修改为 <>
坑2:java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'.
解释:在最后一行代码 env.execute()
执行的时候,没有新的数据接收器被定义,对于 Flink 批处理而前一行代码 result.print()
已经触发了代码的执行和输出,所以再执行 env.execute()
,就是多余的了,因此报了上面的异常。
解决方法:去掉最后一行代码 env.execute();
就可以了。