算子:详细篇

目录

一、执行环境

        1.1 创建执行环境

        1.2 执行模式

二、源算子

        2.1 从集合中读取数据

        2.2 从文件读取数据

        2.3 从socket读取数据

        2.4 从kafka读取数据

 三、转换算子

        3.1 基本转换算子

            (1)映射(map)

            (2)过滤(filter)

            (3)扁平映射(flatMap)

        3.2 聚合转换算子(Aggregation)

            (1) 按键分区(keyBy)

            (2) 简单聚合(sum/min/max/minBy/maxBy)

            (3) 归约聚合(reduce)

        3.3 用户自定义函数

        3.4 物理分区算子

        1.自定义分区:

        2.随机分区:

四、输出算子

        4.1 连接到外部系统

        4.2 传输到文件

        4.3 传输到kafka

        4.4 传输到MySQL

       4.5 自定义Sink输出


一、执行环境

        1.1 创建执行环境

                我们要获取的执行环境,是StreamExecutionEnvironment类的对象,这是所有Flink程序的基础。在代码中创建执行环境的方式,就是调用这个类的静态方法,具体有以下三种。

               (1)getExecutionEnvironment

        最简单的方式,就是直接调用getExecutionEnvironment方法。它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境。也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境。

                (2)createLocalEnvironment

        这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的CPU核心数。

                (3)createRemoteEnvironment

        这个方法返回集群执行环境。需要在调用时指定JobManager的主机名和端口号,并指定要在集群中运行的Jar包。 

                1.2 执行模式

                1)流执行模式(Streaming)

                默认情况下,程序使用的就是Streaming执行模式。

                 2)批执行模式(Batch)

                专门用于批处理的执行模式。

                 3) 自动模式(AutoMatic)

                在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。

二、源算子

        

 一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。

        2.1 从集合中读取数据

fromCollection方法进行读取

java:

public class FlinkFromCollectionExample {  
  
    public static void main(String[] args) throws Exception {  
  
        // 创建流处理环境  
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  
        // 从集合中读取数据  
        List<String> data = Arrays.asList("element1", "element2", "element3");  
        DataStream<String> stream = env.fromCollection(data);  
  
        // 使用 MapFunction 对每个元素进行处理,这里我们简单地将其转换为大写  
        DataStream<String> upperCaseStream = stream.map(new MapFunction<String, String>() {  
            @Override  
            public String map(String value) {  
                return value.toUpperCase();  
            }  
        });  
  
        // 打印结果到控制台  
        upperCaseStream.print();  
  
        // 执行作业  
        env.execute("Flink from collection example");  
    }  
}
/**这个示例中,我们首先创建了一个流处理环境,然后使用 fromCollection 方法从 Java 的 List 中读取数据。接下来,我们使用 map 操作对读取的数据进行处理(这里简单地将每个元素转换为大写)。最后,我们打印结果到控制台并执行作业。*/

scala:

object SimpleFlinkApp {  
  def main(args: Array[String]): Unit = {  
    // 创建执行环境  
    val env = StreamExecutionEnvironment.getExecutionEnvironment  
  
    // 创建数据源,这里我们使用集合作为数据源  
    val data = env.fromElements(1, 2, 3, 4, 5)  
  
    // 定义一个简单的转换操作  
    val transformedData = data.map(x => x * 2)  
  
    // 定义一个简单的打印操作  
    transformedData.print()  
  
    // 开始执行作业  
    env.execute("Flink Scala WordCount Example")  
  }  
}
/**这个示例展示了如何使用 Flink 的 DataStream API 从一个集合中读取数据,然后通过 map 转换操作将每个元素乘以2,最后使用 print 操作将结果打印到控制台。

注意:在实际应用中,你可能需要从外部系统(如数据库、消息队列等)读取数据,而不是从一个集合中读取。*/

        2.2 从文件读取数据

readTextFile方法进行读取

java:

public class FileReadingExample {  
    public static void main(String[] args) throws Exception {  
        // 创建执行环境  
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  
        // 从文件中读取数据  
        DataStream<String> text = env.readTextFile(new Path("file:///path/to/your/file"));  
  
        // 定义一个简单的转换操作,将每行文本转换为单词元组  
        DataStream<Tuple2<String, Integer>> counts = text  
            .flatMap(new Tokenizer())  
            .keyBy(0)  
            .sum(1);  
  
        // 打印结果到控制台  
        counts.print();  
  
        // 开始执行作业  
        env.execute("Flink Java File Reading Example");  
    }  
  
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {  
        @Override  
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {  
            // 按空格拆分每行文本,并将每个单词元组发送到下游操作符  
            String[] words = value.toLowerCase().split("\\W+");  
            for (String word : words) {  
                if (word.length() > 0) {  
                    out.collect(new Tuple2<>(word, 1));  
                }  
            }  
        }  
    }  
}
/**这个示例展示了如何使用 Flink 的 DataStream API 从文件中读取数据,并使用自定义的 Tokenizer 类将每行文本转换为单词元组。然后,使用 keyBy 和 sum 操作符对单词进行计数,并将结果打印到控制台。*/

scala:

object FileReadingExample {  
  def main(args: Array[String]): Unit = {  
    // 创建执行环境  
    val env = StreamExecutionEnvironment.getExecutionEnvironment  
  
    // 使用 TextInputFormat 从文件中读取数据  
    val text = env.readTextFile("path/to/your/file")  
  
    // 定义一个简单的转换操作  
    val transformedData = text.map(line => line.split(" ") match {  
      case Array(word, _*) => (word, 1)  
    })  
  
    // 定义一个简单的计数操作  
    val counts = transformedData.keyBy(_._1).sum(1)  
  
    // 定义一个简单的打印操作  
    counts.print()  
  
    // 开始执行作业  
    env.execute("Flink Scala File Reading Example")  
  }  
}
/**在这个示例中,我们首先创建了一个 StreamExecutionEnvironment 对象,然后使用 readTextFile 方法从指定的文件路径读取数据。读取的数据是一个 DataStream[String],然后我们通过 map 转换操作将每一行数据拆分成单词并计数,最后通过 keyBy 和 sum 操作计算每个单词的出现次数。最后,我们使用 print 操作将结果打印到控制台。*/

        2.3 从socket读取数据

socketTextStream方法进行读取  一般是用于测试

 java:

DataStream<String> stream = env.socketTextStream("localhost", 7777);

scala:

val strem=env.socketTextStream("localhost",777)

        2.4 从kafka读取数据

fromSource方法进行读取

java: 

public class SourceKafka {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
            .setBootstrapServers("hadoop102:9092")
            .setTopics("topic_1")
            .setGroupId("iii")
            .setStartingOffsets(OffsetsInitializer.latest())
            .setValueOnlyDeserializer(new SimpleStringSchema()) 
            .build();

        DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");

        stream.print("Kafka");

        env.execute();
    }
}

scala:

object SourceKafka {
  def main(args: Array[String]): Unit = {
    val env=StreamExecutionEnvironment.getExecutionEnvironment
    val sourceKafka=KafkaSource.builder()
      .setTopics("topic_1")
      .setBootstrapServers("bigdata1:9092")
      .setGroupId("iii")
      .setStartingOffsets(OffsetsInitializer.latest())
      .setValueOnlyDeserializer(new SimpleStringSchema())
      .build()
    env.setParallelism(1)
    val stream=env.fromSource(sourceKafka,WatermarkStrategy.noWatermarks(),"kafka_source")
    
    stream.print("kafka")
    env.execute()
  }
}

 三、转换算子

        数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream。

        3.1 基本转换算子

            (1)映射(map)

        map是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。

 java:

public class TransMap {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<WaterSensor> stream = env.fromElements(
                new WaterSensor("sensor_1", 1, 1),
                new WaterSensor("sensor_2", 2, 2)
        );

        // 方式一:传入匿名类,实现MapFunction
        stream.map(new MapFunction<WaterSensor, String>() {
            @Override
            public String map(WaterSensor e) throws Exception {
                return e.id;
            }
        }).print();

        // 方式二:传入MapFunction的实现类
        // stream.map(new UserMap()).print();

        env.execute();
    }

    public static class UserMap implements MapFunction<WaterSensor, String> {
        @Override
        public String map(WaterSensor e) throws Exception {
            return e.id;
        }
    }
}

scala:

object SimpleMapExample {  
  def main(args: Array[String]): Unit = {  
    // 创建执行环境  
    val env = StreamExecutionEnvironment.getExecutionEnvironment  
  
    // 创建数据源  
    val text = env.fromElements("Hello, Flink", "Flink is powerful", "Stream processing at scale")  
  
    // 使用 map 操作转换数据  
    val mapped = text.map(word => word.toUpperCase)  
  
    // 打印结果到控制台  
    mapped.print()  
  
    // 执行任务  
    env.execute("Flink Scala Map Example")  
  }  
}
            (2)过滤(filter)

        filter转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉。

java:

public class TransFilter {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<WaterSensor> stream = env.fromElements(
                
new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3)
        );

        // 方式一:传入匿名类实现FilterFunction
        stream.filter(new FilterFunction<WaterSensor>() {
            @Override
            public boolean filter(WaterSensor e) throws Exception {
                return e.id.equals("sensor_1");
            }
        }).print();

        // 方式二:传入FilterFunction实现类
        // stream.filter(new UserFilter()).print();
        
        env.execute();
    }
    public static class UserFilter implements FilterFunction<WaterSensor> {
        @Override
        public boolean filter(WaterSensor e) throws Exception {
            return e.id.equals("sensor_1");
        }
    }
}

scala:

object FilterExample {  
  def main(args: Array[String]): Unit = {  
    // 创建执行环境  
    val env = StreamExecutionEnvironment.getExecutionEnvironment  
  
    // 创建数据源  
    val text = env.fromElements("Alice", "Bob", "Charlie", "David", "Eve")  
  
    // 定义过滤条件  
    def filterNameLength(name: String): Boolean = name.length > 3  
  
    // 使用 filter 函数进行过滤  
    val filtered = text.filter(filterNameLength)  
  
    // 打印过滤后的结果  
    filtered.print()  
  
    // 执行任务  
    env.execute("Flink Filter Example")  
  }  
}
            (3)扁平映射(flatMap)

        将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生0到多个元素。flatMap可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。

java:

public class TransFlatmap {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<WaterSensor> stream = env.fromElements(
                
new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3)

        );

        stream.flatMap(new MyFlatMap()).print();

        env.execute();
    }

    public static class MyFlatMap implements FlatMapFunction<WaterSensor, String> {

        @Override
        public void flatMap(WaterSensor value, Collector<String> out) throws Exception {

            if (value.id.equals("sensor_1")) {
                out.collect(String.valueOf(value.vc));
            } else if (value.id.equals("sensor_2")) {
                out.collect(String.valueOf(value.ts));
                out.collect(String.valueOf(value.vc));
            }
        }
    }
} 

scala:

object FlatMapExample {  
  def main(args: Array[String]): Unit = {  
    val env = StreamExecutionEnvironment.getExecutionEnvironment  
    val text = env.fromElements("Hello, world!", "Flink flatMap example")  
  
    val flatMapped = text.flatMap { line =>  
      line.split("\\s+")  // 将每行文本按空格拆分成单词  
    }  
  
    flatMapped.print()  // 打印结果到控制台  
  
    env.execute("Flink flatMap example")  
  }  
}

        3.2 聚合转换算子(Aggregation)

              计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并——这就是所谓的“聚合”(Aggregation),类似于MapReduce中的reduce操作。

            (1) 按键分区(keyBy)

        keyBy是聚合前必须要用到的一个算子。keyBy通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。

基于不同的key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的key的数据,都将被发往同一个分区。

java:

public class TransKeyBy {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<WaterSensor> stream = env.fromElements(
                new WaterSensor("sensor_1", 1, 1),
                new WaterSensor("sensor_1", 2, 2),
                new WaterSensor("sensor_2", 2, 2),
                new WaterSensor("sensor_3", 3, 3)
        );

        // 方式一:使用Lambda表达式
        KeyedStream<WaterSensor, String> keyedStream = stream.keyBy(e -> e.id);

        // 方式二:使用匿名类实现KeySelector
        KeyedStream<WaterSensor, String> keyedStream1 = stream.keyBy(new KeySelector<WaterSensor, String>() {
            @Override
            public String getKey(WaterSensor e) throws Exception {
                return e.id;
            }
        });

        env.execute();
    }
}

scala:

object KeyByExample {  
  def main(args: Array[String]): Unit = {  
    // 创建执行环境  
    val env = StreamExecutionEnvironment.getExecutionEnvironment  
  
    // 创建数据流  
    val dataStream = env.fromElements(  
      ("apple", 3),  
      ("banana", 2),  
      ("orange", 5),  
      ("apple", 1),  
      ("banana", 3),  
      ("orange", 2)  
    )  
  
    // 使用 keyBy 对数据进行分组,这里按照水果名称进行分组  
    val keyedStream = dataStream.keyBy(0)  
  
    // 打印结果  
    keyedStream.print()  
  
    // 执行作业  
    env.execute("KeyBy Example")  
  }  
}
            (2) 简单聚合(sum/min/max/minBy/maxBy)
  1. sum():在输入流上,对指定的字段做叠加求和的操作。
  2. min():在输入流上,对指定的字段求最小值。
  3. max():在输入流上,对指定的字段求最大值。
  4. minBy():与min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而minBy()则会返回包含字段最小值的整条数据。
  5. maxBy():与max()类似,在输入流上针对指定字段求最大值。两者区别与min()/minBy()完全一致。

java:

public class SumMinMaxExample {  
    public static void main(String[] args) throws Exception {  
        // 创建流处理环境  
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
        DataStream<Tuple2<Integer, String>> stream = env.fromElements(  
            new Tuple2<>(1, "a"), new Tuple2<>(2, "b"), new Tuple2<>(3, "c"), new Tuple2<>(4, "d"), new Tuple2<>(5, "e")  
        ); // 输入数据流  
  
        // 求和  
        DataStream<Tuple2<Integer, String>> sum = stream.map(new MapFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {  
            @Override  
            public Tuple2<Integer, String> map(Tuple2<Integer, String> value) throws Exception {  
                return new Tuple2<>(value.f0, value.f1, value.f0); // 将每个元素转换为三元组 (value, _, sum)  
            }  
        }); // 计算每个元素的和  
        sum.print(); // 打印结果  
  
        // 求最小值  
        DataStream<Tuple2<Integer, String>> min = stream.map(new MapFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {  
            @Override  
            public Tuple2<Integer, String> map(Tuple2<Integer, String> value) throws Exception {  
                return new Tuple2<>(value.f0, value.f1, value.f0); // 将每个元素转换为三元组 (value, _, min)  
            }  
        }); // 计算每个元素的最小值  
        min.print(); // 打印结果  
  
        // 求最大值  
        DataStream<Tuple2<Integer, String>> max = stream.map(new MapFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {  
            @Override  
            public Tuple2<Integer, String> map(Tuple2<Integer, String> value) throws Exception {  
                return new Tuple2<>(value.f0, value.f1, value.f0); // 将每个元素转换为三元组 (value, _, max)  
            }  
        }); // 计算每个元素的最大值  
        max.print(); // 打印结果  
  
        // 按某个字段求最小值(例如按第二个字段)  
        DataStream<Tuple2<String, Integer>> minBySecondField = stream.map(new MapFunction<Tuple2<Integer, String>, Tuple2<String, Integer>>() {  
            @Override  
            public Tuple2<String, Integer> map(Tuple2<Integer, String> value) throws Exception {  
                return new Tuple2<>(value.f1, value.f0); // 将每个元素转换为二元组 (_, value)  
            }  
        }); // 按第二个字段求最小值(由于是二元组,此处不需进一步处理)  
        minBySecondField.print(); // 打印结果  
  
        // 按某个字段求最大值(例如按第二个字段)  
        DataStream<Tuple2<String, Integer>> maxBySecondField = stream.map(new MapFunction<Tuple2<Integer, String>, Tuple2<String, Integer>>() {  
            @Override  
            public Tuple2<String, Integer> map(Tuple2<Integer, String> value) throws Exception {  
                return new Tuple2<>(value.f1, value.f0); // 将每个元素转换为二元组 (_, value)  
            }  
        }); // 按第二个字段求最大值(由于是二元组,此处不需

scala:

object SumMinMaxExample {  
  def main(args: Array[String]): Unit = {  
    // 创建流处理环境  
    val env = StreamExecutionEnvironment.getExecutionEnvironment  
    val stream = env.fromElements(  
      (1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e")  
    ) // 输入数据流  
  
    // 求和  
    val sum = stream.map(x => (x._1, x._2, x._1)) // 将每个元素转换为三元组 (value, _, sum)  
    sum.print() // 打印结果  
  
    // 求最小值  
    val min = stream.map(x => (x._1, x._2, x._1)) // 将每个元素转换为三元组 (value, _, min)  
    min.print() // 打印结果  
  
    // 求最大值  
    val max = stream.map(x => (x._1, x._2, x._1)) // 将每个元素转换为三元组 (value, _, max)  
    max.print() // 打印结果  
  
    // 按某个字段求最小值(例如按第二个字段)  
    val minBySecondField = stream.map(x => (x._2, x._1)) // 将每个元素转换为二元组 (_, value)  
    minBySecondField.keyBy(0).minBy(1).print() // 按第二个字段求最小值并打印结果  
  
    // 按某个字段求最大值(例如按第二个字段)  
    val maxBySecondField = stream.map(x => (x._2, x._1)) // 将每个元素转换为二元组 (_, value)  
    maxBySecondField.keyBy(0).maxBy(1).print() // 按第二个字段求最大值并打印结果  
  
    // 启动流处理作业  
    env.execute("Sum, Min, Max and MinBy/MaxBy example")  
  }  
}
            (3) 归约聚合(reduce)

        reduce可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。

        reduce操作也会将KeyedStream转换为DataStream。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。

java:

public class ReduceExample {  
    public static void main(String[] args) throws Exception {  
        // 创建流处理环境  
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();  
  
        // 创建一个整数数据集  
        DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);  
  
        // 使用reduce()函数对数据集进行求和  
        DataSet<Integer> sum = numbers.reduce(new ReduceFunction<Integer>() {  
            @Override  
            public Integer reduce(Integer value1, Integer value2) throws Exception {  
                return value1 + value2;  
            }  
        });  
  
        // 打印结果  
        sum.print();  
  
        // 执行流处理作业  
        env.execute("Reduce example");  
    }  
}

scala:

object ReduceExample {  
  def main(args: Array[String]): Unit = {  
    // 创建流处理环境  
    val env = StreamExecutionEnvironment.getExecutionEnvironment  
    val stream = env.fromElements(1, 2, 3, 4, 5) // 输入数据流  
  
    // 求和  
    val sum = stream.reduce(_ + _) // 使用 reduce 操作进行求和  
    sum.print() // 打印结果  
  
    // 启动流处理作业  
    env.execute("Reduce example")  
  }  
}

        3.3 用户自定义函数

        3.4 物理分区算子

        1.自定义分区:

        使用用户定义的 Partitioner 为每个元素选择目标任务。

Java:

dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);


Scala

dataStream.partitionCustom(partitioner, "someKey")
dataStream.partitionCustom(partitioner, 0)

2.随机分区:

将元素随机地均匀划分到分区。

Java:

dataStream.shuffle();


Scala:

dataStream.shuffle()

四、输出算子

        4.1 连接到外部系统

        Flink1.12以前,Sink算子的创建是通过调用DataStream的.addSink()方法实现的。

stream.addSink(new SinkFunction(…));

        addSink方法同样需要传入一个参数,实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。

        Flink1.12开始,同样重构了Sink架构,

stream.sinkTo(…)

        当然,Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。

        4.2 传输到文件

        FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。

FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:

  1. 行编码: FileSink.forRowFormat(basePath,rowEncoder)。
  2. 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。

Java:

public class FileSinkExample {  
    public static void main(String[] args) throws Exception {  
        // 创建流执行环境  
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  
        // 创建数据源数据流,这里简单起见,我们直接创建一个包含字符串的流  
        DataStream<String> text = env.fromElements("Hello", "World", "Flink", "Streaming");  
  
        // 使用RichSinkFunction创建一个将数据写入文件的Sink  
        text.addSink(new RichSinkFunction<String>() {  
            private FileWriter fileWriter;  
            private BufferedWriter bufferedWriter;  
  
            @Override  
            public void open(Configuration parameters) throws IOException {  
                super.open(parameters);  
                fileWriter = new FileWriter("output.txt", true); // 第二个参数表示是否追加到文件末尾  
                bufferedWriter = new BufferedWriter(fileWriter);  
            }  
  
            @Override  
            public void invoke(String value, RuntimeContext runtimeContext) throws IOException {  
                bufferedWriter.write(value); // 将元素写入文件  
                bufferedWriter.newLine(); // 换行  
            }  
  
            @Override  
            public void close() throws IOException {  
                super.close();  
                bufferedWriter.close(); // 关闭写入器  
                fileWriter.close(); // 关闭文件写入器  
            }  
        });  
  
        // 执行任务  
        env.execute("File Sink Example");  
    }  
}

scala:

object FileSinkExample {  
  def main(args: Array[String]): Unit = {  
    // 创建执行环境  
    val env = ExecutionEnvironment.getExecutionEnvironment  
  
    // 创建数据源  
    val data = env.fromElements("Hello, Flink!", "Goodbye, Flink!")  
  
    // 创建文件输出流  
    val output = data.writeAsText("/path/to/output/file")  
  
    // 执行任务  
    env.execute("File Sink Example")  
  }  
}

        4.3 传输到kafka

addSink 方法实现

java:


public class ProducerKafkaFlink {
    public static void main(String[] args) throws Exception {
 
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        //从kafka读取数据
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test-consumer-group");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("tuzisir", new SimpleStringSchema(), properties);
        DataStream<String> stream = env.addSource(myConsumer);
        stream.print();
 
        //将结果写到kafka
        stream.addSink(new FlinkKafkaProducer<>(
                "localhost:9092",
                "student-write",
                new SimpleStringSchema()
        )).name("flink-connectors-kafka");
 
        env.execute("write to kafka");
 
    }
}

scala:

object KafkaExample {  
  def main(args: Array[String]): Unit = {  
    // 创建流处理环境  
    val env = StreamExecutionEnvironment.getExecutionEnvironment  
  
    // 创建数据源  
    val input = env.fromElements("Hello", "World")  
  
    // 定义Kafka生产者配置  
    val kafkaProps = new Properties()  
    kafkaProps.setProperty("bootstrap.servers", "localhost:9092")  
    kafkaProps.setProperty("group.id", "test")  
  
    // 创建Kafka序列化器  
    val schema = new KafkaSerializationSchema[String](new SimpleStringSchema()) {  
      override def serialize(element: String, partitioner: Int): Array[Byte] = {  
        element.getBytes(StandardCharsets.UTF_8)  
      }  
    }  
  
    // 创建Kafka生产者并输出数据到Kafka主题  
    val kafkaProducer = new FlinkKafkaProducer[String]("my-topic", schema, kafkaProps)  
    input.addSink(kafkaProducer)  
  
    // 执行流处理任务  
    env.execute("Flink Kafka Example")  
  }  
}

        4.4 传输到MySQL

toAppendStream 方法实现

java:

public class FlinkToMySQLExample {  
    public static void main(String[] args) throws Exception {  
        // 设置执行环境  
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);  
        tableEnv.enableCatalogs(); // 启用 catalogs 支持  
        tableEnv.useCatalog("kafka"); // 使用名为 "kafka" 的 catalog  
        tableEnv.getCatalog("kafka").get().open(); // 打开 catalog 连接  
        tableEnv.executeSql("CREATE TABLE kafka_table (name STRING, age INT) WITH (...)"); // 创建 Kafka 表并指定连接参数(这里需要指定 Kafka 的连接参数)  
        tableEnv.executeSql("CREATE TABLE mysql_table (name STRING, age INT) WITH ('connector' = 'mysql', 'hostname' = 'localhost', 'database-name' = 'mydb', 'username' = 'root', 'password' = 'password')"); // 创建 MySQL 表并指定连接参数(这里需要指定 MySQL 的连接参数)  
        // 读取 Kafka 中的数据并插入到 MySQL 中  
        Table kafkaTable = tableEnv.sqlQuery("SELECT * FROM kafka_table"); // 从 Kafka 表中选择数据  
        tableEnv.toAppendStream(kafkaTable, Row.class).map(row -> row).addSink(Sinks.jdbc("INSERT INTO mysql_table VALUES (?, ?)", "name, age", new JdbcAppendStreamSinkFunction<>(new JdbcConnectionOptions("jdbc:mysql://localhost:3306/mydb", "root", "password")))); // 将数据插入到 MySQL 表中  
        env.execute(); // 执行 Flink 作业  
    }  
}

scala:

object WriteToMySQL {  
  def main(args: Array[String]): Unit = {  
    // 设置执行环境  
    val env = StreamExecutionEnvironment.getExecutionEnvironment  
    val tEnv = StreamTableEnvironment.create(env)  
  
    // 创建输入数据流  
    val inputStream = env.fromElements("John", "Anna", "Peter", "Linda")  
      // 使用简单的字符串格式化器  
      .map(new MapFunction[String, Row]() {  
        override def map(value: String): Row = {  
          val row = new Row(1)  
          row.setField(0, value)  
          row  
        }  
      })  
      // 注册为表进行查询操作  
      val table = tEnv.fromDataStream(inputStream, $"name")  
    tEnv.toAppendStream[Row](table, $"name") // 将表转换为流并输出名字字段,流中的每条记录都是一个名字。  
      // 写入 MySQL 数据库,此处以 localhost:3306/dbname 为例,请根据实际情况修改。  
      // 注意:MySQL JDBC URL 的格式为 jdbc:mysql://hostname:port/databaseName?characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&allowMultiQueries=true&useUnicode=true&autoReconnect=true&serverTimezone=UTC&allowMultiQueries=true&useUnicode=true&autoReconnect=true&allowPublicKeyRetrieval=true&allowMultiQueries=true&useUnicode=true&autoReconnect=true。其中hostname:port/databaseName为你实际的MySQL地址和数据库名。此处的例子只是为了演示。  
      // 注意:在生产环境中,需要配置好合适的异常处理和重试机制。本示例中未包含。  
      // 注意:此处的代码示例是简化的,只包含基本的写入操作,并未包含所有可能的错误处理和优化。在生产环境中,需要更全面的错误处理和优化策略。此处的代码仅供参考。  
      // 注意:在生产环境中,需要配置好合适的序列化和反序列化机制。本示例中未包含。  
      // 注意:在使用 JDBC 连接器时,需要考虑连接池的使用和资源的管理。本示例中未包含。  
      // 注意:在使用 JDBC 连接器时,需要考虑 SQL 注入攻击的风险。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含

4.5 自定义Sink输出

        如果我们想将数据存储到我们自己的存储设备中,而Flink并没有提供可以直接使用的连接器,就只能自定义Sink进行输出了。与Source类似,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。

stream.addSink(new MySinkFunction<String>());

        在实现SinkFunction的时候,需要重写的一个关键方法invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。

        这种方式比较通用,对于任何外部存储系统都有效;不过自定义Sink想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/351043.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

网络分层和网络原理之UDP和TCP

温故而知新 目录 网络分层 应用层 http协议 传输层 介绍 UDP协议 TCP协议 网络层 数据链路层 物理层 网络分层 一. 应用层 应用程序 现成的应用层协议有超文本协议http(不仅仅有文本&#xff09;. http协议 http://t.csdnimg.cn/e0e8khttp://t.csdnimg.cn/e0e8k 自定义应…

云手机哪一款好用?

随着海外市场的不断发展&#xff0c;云手机市场也呈现蓬勃的态势&#xff0c;众多云设备软件纷纷涌现。企业在选择云手机软件时&#xff0c;如何找到性能卓越的软件成为一项关键任务。在众多选择中&#xff0c;OgPhone云手机凭借其卓越的性能和独特功能脱颖而出。以下是OgPhone…

音频格式之AAC:(3)AAC编解码原理详解

系列文章目录 音频格式的介绍文章系列&#xff1a; 音频编解码格式介绍(1) ADPCM&#xff1a;adpcm编解码原理及其代码实现 音频编解码格式介绍(2) MP3 &#xff1a;音频格式之MP3&#xff1a;(1)MP3封装格式简介 音频编解码格式介绍(2) MP3 &#xff1a;音频格式之MP3&#x…

一文详解C++拷贝构造函数

文章目录 引入一、什么是拷贝构造函数&#xff1f;二、什么情况下使用拷贝构造函数&#xff1f;三、使用拷贝构造函数需要注意什么&#xff1f;四、深拷贝和浅拷贝浅拷贝深拷贝 引入 在现实生活中&#xff0c;可能存在一个与你一样的自己&#xff0c;我们称其为双胞胎。 相当…

5|领域建模实践(上):怎样既准确又深刻地理解业务知识?

上节课咱们完成了事件风暴&#xff0c;梳理了系统的行为需求。但你可能也发现了&#xff0c;其实还有些微妙的业务概念还没有澄清&#xff0c;这就要靠领域建模来完成了。 建立领域模型是 DDD 的核心。要建好领域建模&#xff0c;需要理论和实践相结合。由于我们的模型有一定的…

CSC签证费报销的相关规定及要求-主要国家签证费报销凭据

国家留学基金委&#xff08;CSC&#xff09;派出流程很多是在留学服务机构办理&#xff0c;即北京教育部留学服务中心及教育部出国人员上海集训部&#xff0c;其中含签证费报销。本篇知识人网小编以上海集训部为例&#xff0c;详细解读一下签证费报销的相关规定及要求&#xff…

sql 行转列 日周月 图表统计

目录 目录 需求 准备 月 分析 按月分组 行转列 错误版本 正确版本 日 分析 行转列 周 分析 按周分组 行转列 本年 需求 页面有三个按钮 日周月&#xff0c;统计一周中每天(日)&#xff0c;一月中每周(周)&#xff0c;一年中每月(月)&#xff0c;设备台数 点…

Linux中断 -- 中断路由、优先级、数据和标识

目录 1.中断路由 2.中断优先级 3.中断平衡 4.Linux内核中重要的数据结构 5.中断标识 承前文&#xff0c;本文从中断路由、优先级、数据结构和标识意义等方面对Linux内核中断进行一步的解析。 1.中断路由 Aset affinity flow GIC文中有提到SPI类型中断的路由控制器寄存器为…

Leetcode—114. 二叉树展开为链表【中等】

2023每日刷题&#xff08;九十八&#xff09; Leetcode—114. 二叉树展开为链表 Morris-like算法思想 可以发现展开的顺序其实就是二叉树的先序遍历。算法和 94 题中序遍历的 Morris 算法有些神似&#xff0c;我们需要两步完成这道题。 将左子树插入到右子树的地方将原来的右…

Java - OpenSSL与国密OpenSSL

文章目录 一、定义 OpenSSL&#xff1a;OpenSSL是一个开放源代码的SSL/TLS协议实现&#xff0c;也是一个功能丰富的加密库&#xff0c;提供了各种主要的加密算法、常用的密钥和证书封装管理功能以及SSL协议。它被广泛应用于Web服务器、电子邮件服务器、VPN等网络应用中&#x…

线性表--栈

1.什么是栈&#xff1f; 栈是一种特殊的线性表&#xff0c;其只允许在固定的一端进行插入和删除元素操作。进行数据插入和删除 操作的一端称为栈顶&#xff0c;另一端称为栈底。栈中的数据元素遵守后进先出的原则。 压栈&#xff1a;栈的插入操作叫做进栈/压栈/入栈&#xff…

YOLOv5改进 | Conv篇 | 在线重参数化卷积OREPA助力二次创新(提高推理速度 + FPS)

一、本文介绍 本文给大家带来的改进机制是一种重参数化的卷积模块OREPA,这种重参数化模块非常适合用于二次创新,我们可以将其替换网络中的其它卷积模块可以不影响推理速度的同时让模型学习到更多的特征。OREPA是通过在线卷积重参数化(Online Convolutional Re-parameteriza…

TensorFlow2实战-系列教程3:猫狗识别1

&#x1f9e1;&#x1f49b;&#x1f49a;TensorFlow2实战-系列教程 总目录 有任何问题欢迎在下面留言 本篇文章的代码运行界面均在Jupyter Notebook中进行 本篇文章配套的代码资源已经上传 1、项目介绍 基本流程&#xff1a; 数据预处理&#xff1a;图像数据处理&#xff0c…

Spring 的执行流程以及 Bean 的作用域和生命周期

文章目录 Bean 的作用域更改作用域的方式singletonprototype Spring 执行流程Bean 的生命周期 Bean 的作用域 Spring 容器在初始化⼀个 Bean 的实例时&#xff0c;同时会指定该实例的作用域。Bean 有6种作用域 singleton&#xff1a;单例作用域prototype&#xff1a;原型作用域…

Hadoop-MapReduce-MRAppMaster启动篇

一、源码下载 下面是hadoop官方源码下载地址&#xff0c;我下载的是hadoop-3.2.4&#xff0c;那就一起来看下吧 Index of /dist/hadoop/core 二、上下文 在上一篇<Hadoop-MapReduce-源码跟读-客户端篇>中已经将到&#xff1a;作业提交到ResourceManager&#xff0c;那…

首发:2024全球DAO组织发展研究

作者&#xff0c;张群&#xff08;专注DAO及区块链应用研究&#xff0c;赛联区块链教育首席讲师&#xff0c;工信部赛迪特邀资深专家&#xff0c;CSDN认证业界专家&#xff0c;微软认证专家&#xff0c;多家企业区块链产品顾问&#xff09; DAO&#xff08;去中心化自治组织&am…

adb测试冷启动和热启动 Permission Denial解决

先清理日志 adb shell logcat -c 打开手机模拟器中的去哪儿网&#xff0c;然后日志找到包名和MainActivity adb shell logcat |grep Main com.Qunar/com.mqunar.atom.alexhome.ui.activity.MainActivity 把手机模拟器的去哪儿的进程给杀掉 执行 命令 adb shell am start -W…

TensorFlow2实战-系列教程1:回归问题预测

&#x1f9e1;&#x1f49b;&#x1f49a;TensorFlow2实战-系列教程 总目录 有任何问题欢迎在下面留言 本篇文章的代码运行界面均在Jupyter Notebook中进行 本篇文章配套的代码资源已经上传 1、环境测试 import tensorflow as tf import numpy as np tf.__version__打印结果 ‘…

深入理解Redis:如何设置缓存数据的过期时间及其背后的机制

目录 Redis 给缓存数据设置过期时间 Redis是如何判断数据是否过期的呢&#xff1f; 过期的数据的删除策略 Redis 内存淘汰机制 Redis 给缓存数据设置过期时间 一般情况下&#xff0c;我们设置保存的缓存数据的时候都会设置一个过期时间。为什么呢&#xff1f; 因为内存是有…

4小时精通MyBatisPlus框架

目录 1.介绍 2.快速入门 2.1.环境准备 2.2.快速开始 2.2.1引入依赖 2.2.2.定义Mapper ​编辑 2.2.3.测试 2.3.常见注解 ​编辑 2.3.1.TableName 2.3.2.TableId 2.3.3.TableField 2.4.常见配置 3.核心功能 3.1.条件构造器 3.1.1.QueryWrapper 3.1.2.UpdateWra…