新建project:
- pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>test.wyh</groupId>
<artifactId>Flink117_Test_01</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
</project>
- Main类(先用批的方式测试)
package test01;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class EnvDemo {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//指定端口,默认8081
conf.set(RestOptions.BIND_PORT, "8082");
//会自动识别是远程集群还是本地IDEA环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
//设置流/批,默认是流
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.readTextFile("test_input/test_word.txt")
.flatMap(
(String value, Collector< Tuple2<String, Integer>> out) -> {
String[] words = value.split(" ");
for (String word : words){
out.collect(Tuple2.of(word, 1));
}
})
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(value -> value.f0)
.sum(1)
.print();
env.execute();
}
}
- 创建测试文件
- 运行程序
-------------------------------------------------
- Main类(用批的方式测试)
package test01;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class EnvDemo {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//指定端口,默认8081
conf.set(RestOptions.BIND_PORT, "8082");
//会自动识别是远程集群还是本地IDEA环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
//设置流/批,默认是流
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.readTextFile("test_input/test_word.txt")
.flatMap(
(String value, Collector< Tuple2<String, Integer>> out) -> {
String[] words = value.split(" ");
for (String word : words){
out.collect(Tuple2.of(word, 1));
}
})
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(value -> value.f0)
.sum(1)
.print();
env.execute();
}
}
- 运行程序