Flink 学习三 Flink 流 process function API

Flink 学习三 Flink 流&process function API

1.Flink 多流操作

1.1.split 分流 (deprecated)

把一个数据流根据数据分成多个数据流 1.2 版本后移除

1.2.分流操作 (使用侧流输出)

public class _02_SplitStream {

    public static void main(String[] args) throws Exception {

        // 获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5);
        SingleOutputStreamOperator<Integer> processed = streamSource.process(new ProcessFunction<Integer, Integer>() {
            /**
             *
             * @param value 输出的数据
             * @param ctx A 上下文
             * @param out 主要流输出器
             * @throws Exception
             */
            @Override
            public void processElement(Integer value, ProcessFunction<Integer, Integer>.Context ctx,
                                       Collector<Integer> out) throws Exception {
                if (value % 3 == 0) {
                    //测流数据
                    ctx.output(new OutputTag<Integer>("3%0",TypeInformation.of(Integer.class)) , value);
                }if (value % 3 == 1) {
                    //测流数据
                    ctx.output(new OutputTag<Integer>("3%1",TypeInformation.of(Integer.class)) , value);
                }
                //主流 ,数据
                out.collect(value);
            }
        });

        DataStream<Integer> output0 = processed.getSideOutput(new OutputTag<>("3%0",TypeInformation.of(Integer.class)));
        DataStream<Integer> output1 = processed.getSideOutput(new OutputTag<>("3%1",TypeInformation.of(Integer.class)));
        output1.print();

        env.execute();
    }
}

1.3.connect

connect 连接 DataStream ,DataStream ==> ConnectedStream

两个DataStream 连接成一个新的ConnectedStream ,虽然两个流连接在一起,但是两个流依然是相互独立的,这个方法的最大用处是: 两个流共享State 状态

两个流在内部还是各自处理各自的逻辑 比如 CoMapFunction 内的map1,map2 还是各自处理 streamSource,streamSource2;

数据类型可以不一致

public class _03_ConnectedStream {

    public static void main(String[] args) throws Exception {

        // 获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5);
        DataStreamSource<Integer> streamSource2 = env.fromElements(10, 20, 30, 40, 50);

        ConnectedStreams<Integer, Integer> connected = streamSource.connect(streamSource2);

        // 原来的 MapFunction ==>  CoMapFunction  ; flatMap ==> CoMapFunction
        SingleOutputStreamOperator<Object> mapped = connected.map(new CoMapFunction<Integer, Integer, Object>() {
            @Override
            public Object map1(Integer value) throws Exception {
                return value + 1;
            }

            @Override
            public Object map2(Integer value) throws Exception {
                return value * 10;
            }
        });

        mapped.print();

        env.execute();
    }
}

------------------------------------------------------------------
  --------------------         --------------------    
    streamSource         --->         map1  
  --------------------         --------------------

  --------------------         --------------------    
    streamSource2       --->          map2  
  --------------------         -------------------- 
------------------------------------------------------------------    

1.4.union

可以合并多个流,流数据类型必须一致,


public class _04_UnionStream {

    public static void main(String[] args) throws Exception {

        // 获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5);
        DataStreamSource<Integer> streamSource2 = env.fromElements(10, 20, 30, 40, 50,80,1110);
        DataStream<Integer> unioned = streamSource.union(streamSource2);
        SingleOutputStreamOperator<String> union = unioned.map(new MapFunction<Integer, String>() {
            @Override
            public String map(Integer value) throws Exception {
                return "union" + value;
            }
        });
        union.print();
        env.execute();
    }
}


------------------------------------------------------------------
  --------------------          
    streamSource               
  --------------------              --------------------
						=====>        map
  --------------------              --------------------    
    streamSource2               
  --------------------         
------------------------------------------------------------------    

1.5.coGroup

coGroup 本质上是join 算子的底层算子

有界流的思想去处理; 比如上说是时间窗口: 5S内数据分组匹配

        <左边流>.coGroup(<右边流>)
                .where(<KeySelector>)
                .equalTo(<KeySelector>)
                .window(<窗口>)
                .apply(<处理逻辑>)

在这里插入图片描述

数据组比如说是时间窗口是5或者是10s 为一批数据, 时间窗口内的数据完成后,根据 where,和 equalTo 选择的key 数据一致 来分组

public class _05_CoGroupStream {

    public static void main(String[] args) throws Exception {

        // 获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Person> name_idCard = env.socketTextStream("192.168.141.131", 8888).map(x -> {
            Person person = new Person();
            person.setName(x.split(",")[0]);
            person.setIdCard(x.split(",")[1]);
            return person;
        }).returns(TypeInformation.of(Person.class)).name("==idCard==");
        //name_idCard.print();

        DataStream<Person> name_addr = env.socketTextStream("192.168.141.131", 7777).map(x -> {
            Person person = new Person();
            person.setName(x.split(",")[0]);
            person.setAddr(x.split(",")[1]);
            return person;
        }).returns(TypeInformation.of(Person.class)).name("==addr==");
        //name_addr.print();

        DataStream<Person> dataStream = name_idCard.coGroup(name_addr)
                // 左边流的key
                .where(new KeySelector<Person, Object>() {
                    @Override
                    public Object getKey(Person value) throws Exception {
                        return value.getName();
                    }
                })
                // 右边流的key
                .equalTo(new KeySelector<Person, Object>() {
                    @Override
                    public Object getKey(Person value) throws Exception {
                        return value.getName();
                    }
                })
                //时间窗口
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                //处理逻辑  左边 Person ,右边  Person ,输出 Person
                .apply(new CoGroupFunction<Person, Person, Person>() {
                    /**
                     * first 协调组第一个流个数据
                     * second 协调组第二个流数据
                     */
                    @Override
                    public void coGroup(Iterable<Person> first, Iterable<Person> second, Collector<Person> out) throws Exception {
                        //左连接实现
                        Iterator<Person> iterator = first.iterator();
                        while (iterator.hasNext()) {
                            Person next1 = iterator.next();
                            Iterator<Person> iterator1 = second.iterator();
                            Boolean noDataFlag = true;
                            while (iterator1.hasNext()) {
                                Person result = new Person(next1);
                                Person next = iterator1.next();
                                result.setAddr(next.getAddr());
                                out.collect(result);
                                noDataFlag = false;
                            }
                            if (noDataFlag) {
                                out.collect(next1);
                            }
                        }
                    }
                });

        dataStream.print();

        env.execute();
    }
}

1.6. join 关联操作

用于关联两个流,需要指定join 条件;需要在窗口中进行关联后的计算逻辑

join 使用coGroup 实现的

public class _06_JoinStream {

    public static void main(String[] args) throws Exception {

        // 获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //Perple 数据打平为Tuple  name,idCard,addr
        DataStream<Tuple3<String, String,String>> name_idCard = env.socketTextStream("192.168.141.131", 8888).map(x -> {
            return Tuple3.of(x.split(",")[0],x.split(",")[1],"");
        }).returns(TypeInformation.of(new TypeHint<Tuple3<String, String,String>>() {
        })) ;

        DataStream<Tuple3<String, String,String>> name_addr = env.socketTextStream("192.168.141.131", 7777).map(x -> {
            return Tuple3.of(x.split(",")[0],"",x.split(",")[1]);
        }) .returns(TypeInformation.of(new TypeHint<Tuple3<String, String,String>>() {
        }));
        //name_addr.print();

        DataStream<Tuple3<String, String,String>> dataStream = name_idCard.join(name_addr)
                // 左边流的f0 字段
                .where(tp3->tp3.f0)
                // 右边流的f0 字段
                .equalTo(tp3->tp3.f0)
                //时间窗口
                .window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
                //处理逻辑  左边 Person ,右边  Person ,输出 Person
                .apply(new JoinFunction<Tuple3<String, String,String>, Tuple3<String, String,String>, Tuple3<String, String,String>>() {
                    /**
                     * @param first 匹配到的数据  first input.
                     * @param second 匹配到的数据 second input.
                     * @return
                     * @throws Exception
                     */
                    @Override
                    public Tuple3 join(Tuple3 first, Tuple3 second) throws Exception {
                        return Tuple3.of(first.f0,first.f1,second.f2);
                    }
                });

        dataStream.print();
        env.execute();
    }
}

1.7.broadcast

   datastream1: 用户id|行为|操作数据                   datastream2: 用户id|用户name|用户phone   
windows time1 ---------------------------------- 	---------------------------------
				12  |click| xxdssd						12  |aa| 131	
				13  |click| dasd             			 13  |cc| 1331					
				14  |click| ad    						14  |dd| 1321	
windows time2 ---------------------------------- 	---------------------------------
				12  |click| sfs          															
				13  |click| sdfs       
				15  |click| ghf     					17  |dd| 1321											
windows time3 ----------------------------------  	---------------------------------
				14  |click| ghf   
				17  |click| ghf 												
       
       注: 左边流数据是基础数据,使用 join不合适 ,适合 broadcast
           broadcast 适用于关联字典表 
  
       主流算子		<<<----------------------------------	广播状态				
       
       

public class _07_BroadcastStream {

	public static void main(String[] args) throws Exception {

		// 获取环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 数据打平为 用户id|行为|操作数据
		DataStream<Tuple3<String, String, String>> operationInfo = env.socketTextStream("192.168.141.131", 8888)
				.map(x -> {
					return Tuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2]);
				}).returns(TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {
				}));

		// 数据打平为 用户id|用户name|用户phone
		DataStream<Tuple3<String, String, String>> baseInfo = env.socketTextStream("192.168.141.131", 7777).map(x -> {
			return Tuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2]);
		}).returns(TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {
		}));

        //状态描述
		MapStateDescriptor<String, Tuple3<String, String, String>> userBaseInfoStateDesc = new MapStateDescriptor<>(
				"user base info", TypeInformation.of(String.class),
				TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {
				}));
		// 基础信息 变成广播流
		BroadcastStream<Tuple3<String, String, String>> userBaseInfoBroadcast = baseInfo
				.broadcast(userBaseInfoStateDesc);

		// 关联行为流和广播流
		BroadcastConnectedStream<Tuple3<String, String, String>, Tuple3<String, String, String>> connected = operationInfo
				.connect(userBaseInfoBroadcast);

		SingleOutputStreamOperator<Tuple5<String, String, String, String, String>> processed =
				// 连接后,处理的逻辑
				// connected 如果是keyedStream ===> 参数就是 KeyedBroadcastProcessFunction
				// connected 如果不是keyedStream ===> 参数就是 BroadcastProcessFunction
				connected.process(new BroadcastProcessFunction<Tuple3<String, String, String>, // 左流的数据
						Tuple3<String, String, String>, // 广播的类型
						Tuple5<String, String, String, String, String> // 返回数据类型
				>() {

					/**
					 * 此方法是处理主流方法 主流来一条处理一下
					 * 
					 * @throws Exception
					 */
					@Override
					public void processElement(Tuple3<String, String, String> value, // 左流 主流 数据
							BroadcastProcessFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, Tuple5<String, String, String, String, String>>.ReadOnlyContext ctx, // 上下文
							Collector<Tuple5<String, String, String, String, String>> out // 输出器
					) throws Exception {
						// 基础数据还没有 broadcastStateReadOnly
						// 和 processBroadcastElement 里面获取的 broadcastState 数据一致,只是是只读的
						// 数据是一致的
						ReadOnlyBroadcastState<String, Tuple3<String, String, String>> broadcastStateReadOnly = ctx
								.getBroadcastState(userBaseInfoStateDesc);
						if (broadcastStateReadOnly == null) {
							out.collect(Tuple5.of(value.f0, value.f1, value.f2, null, null));
						} else {
							Tuple3<String, String, String> baseInfo = broadcastStateReadOnly.get(value.f0);
							// 基础数据为空
							if (baseInfo == null) {
								out.collect(Tuple5.of(value.f0, value.f1, value.f2, null, null));
							} else {
								out.collect(Tuple5.of(value.f0, value.f1, value.f2, baseInfo.f1, baseInfo.f2));
							}
						}
					}

					/**
					 *
					 * 处理广播流数据:拿到数据后,存到状态里面
					 */
					@Override
					public void processBroadcastElement(Tuple3<String, String, String> value, // 广播流里面的一条数据
							BroadcastProcessFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, Tuple5<String, String, String, String, String>>.Context ctx, // 上下文
							Collector<Tuple5<String, String, String, String, String>> out // 输出器
					) throws Exception {
						// 上下文 里面获取状态
						BroadcastState<String, Tuple3<String, String, String>> broadcastState = ctx
								.getBroadcastState(userBaseInfoStateDesc);
                          //状态里面 以用户id 作为key , 基础信息为value
						broadcastState.put(value.f0, value);
					}
				});

		processed.print();

		env.execute();
	}
}

2.Flink 编程 process function

2.1 process function 简介

process function相对于前面的map , flatmap ,filter 的区别就是,对数据的处理有更大的自由度; 可以获取到数据的上下文,数据处理逻辑 ,如何控制返回等交给编写者;

在事件驱动的应用中,使用最频繁的api 就是process function

注: 在对不同的流的时候, process function 的类型也不一致

数据流的转换

在这里插入图片描述

不同的DataStream 的process 处理方法需要的参数类型有如下几种

2.2 ProcessFunction


public class _01_ProcessFunction {

	public static void main(String[] args) throws Exception {

		// 获取环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 数据打平为 用户id|行为|操作数据
		DataStreamSource<String> streamSource = env.fromElements("1,click,data1", "2,click1,data2", "10,flow,data1",
				"22,doubleclick,data22");
		DataStream<Tuple3<String, String, String>> operationInfo = streamSource.map(x -> {
			return Tuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2]);
		}).returns(TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {
		}));

		// ProcessFunction
		SingleOutputStreamOperator<String> processed = operationInfo
				.process(new ProcessFunction<Tuple3<String, String, String>, String>() {
					// 处理元素
					@Override
					public void processElement(Tuple3<String, String, String> value,
							ProcessFunction<Tuple3<String, String, String>, String>.Context ctx, Collector<String> out)
							throws Exception {
						// 可以做主流输出
						out.collect(value.f0 + value.f1 + value.f2);
						// 可以做侧流输出
						ctx.output(new OutputTag<Tuple3<String, String, String>>("adasd",
								TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {
								})), value);
					}

					// 其余 声明周期方法 ... 任务状态 ... 都可以获取
					@Override
					public void open(Configuration parameters) throws Exception {
						super.open(parameters);
					}
				});

		processed.print();

		env.execute();
	}

}

2.3 KeyedProcessFunction

public class _02_KeyedProcessFunction {

	public static void main(String[] args) throws Exception {

		// 获取环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 数据打平为 用户id|行为|操作数据
		DataStreamSource<String> streamSource = env.fromElements("1,click,data1", "2,click1,data2", "10,flow,data1",
				"22,doubleclick,data22", "2,doubleclick,data22");
		DataStream<Tuple3<String, String, String>> operationInfo = streamSource.map(x -> {
			return Tuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2]);
		}).returns(TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {
		}));

		// keyedStream
		KeyedStream<Tuple3<String, String, String>, String> keyedStream = operationInfo.keyBy(tp3 -> tp3.f0);

		// ProcessFunction
		SingleOutputStreamOperator<String> processed = keyedStream
				.process(new ProcessFunction<Tuple3<String, String, String>, String>() {
					@Override
					public void processElement(Tuple3<String, String, String> value,
							ProcessFunction<Tuple3<String, String, String>, String>.Context ctx, Collector<String> out)
							throws Exception {
                        out.collect((value.f0 + value.f1 + value.f2).toUpperCase(Locale.ROOT));
					}
				});

        processed.print();

		env.execute();
	}

}

2.4 ProcessWindowFunction

2.5 ProcessAllWindowFunction

2.6 CoProcessFunction

2.7 ProcessJoinFunction

2.8 BroadcastProcessFunction

参考1.7

2.9 KeyedBroadcastProcessFunction

3.测试

package demo.sff.flink.exercise;

import demo.sff.flink.source.Person;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.ParquetWriterFactory;
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Random;

/**
 * 创建流 Stream 1: id | event | count 1,event1,3 2,event1,5 3,event1,4
 *
 * Stream 2: id | gender | city 1 , male ,beijin 2 ,female,shanghai
 *
 * 需求 : 1.Stream 1 按照 count字段展开为对应的个数 比如id=1 展开为3条 1,event1,随机1 1,event1,随机2
 * 1,event1,随机3 ,id=2 展开为5 条
 *
 * 2.Stream 1 关联上 Stream 2 数据
 *
 * 3.关联不上 测流 其余主流
 *
 * 4.主流,性别分组,取出最大随机数
 *
 * 5.主流写入mysql
 *
 * 6.测流写入parquet
 */
public class Test1 {

	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(1);

		// 创建流 后面可以使用socket 替换 stream2 先写入广播 不然关联不上
		DataStreamSource<String> stream1 = env.fromElements("1,event1,3", "2,event1,5", "3,event3,4");
		DataStreamSource<String> stream2 = env.fromElements("1,male,beijin", " 2,female,shanghai");

		DataStream<Tuple3<String, String, String>> streamOperator1 = stream1
				.map(x -> Tuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2]))
				.returns(TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {
				}));
		DataStream<Tuple3<String, String, String>> streamOperator2 = stream2
				.map(x -> Tuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2]))
				.returns(TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {
				}));

		// 需求1
		DataStream<Tuple3<String, String, String>> mapDataStream = streamOperator1
				.flatMap(new FlatMapFunction<Tuple3<String, String, String>, Tuple3<String, String, String>>() {
					@Override
					public void flatMap(Tuple3<String, String, String> value,
							Collector<Tuple3<String, String, String>> out) throws Exception {
						Integer integer = Integer.valueOf(value.f2);
						for (Integer i = 0; i < integer; i++) {
							int r = new Random().nextInt(100);
							out.collect(Tuple3.of(value.f0, value.f1, r + ""));
						}
					}
				}).returns(TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {
				}));

		// mapDataStream.print();

		// 需求2 stream2 数据广播
		MapStateDescriptor<String, Tuple3<String, String, String>> descriptor = new MapStateDescriptor<String, Tuple3<String, String, String>>(
				"userinfo", TypeInformation.of(String.class),
				TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {
				}));
		BroadcastStream<Tuple3<String, String, String>> tuple3BroadcastStream = streamOperator2.broadcast(descriptor);

		BroadcastConnectedStream<Tuple3<String, String, String>, Tuple3<String, String, String>> tuple3BroadcastConnectedStream = mapDataStream
				.connect(tuple3BroadcastStream);

		SingleOutputStreamOperator<Tuple5<String, String, String, String, String>> processed = tuple3BroadcastConnectedStream
				.process(
						new BroadcastProcessFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, Tuple5<String, String, String, String, String>>() {

							@Override
							public void processElement(Tuple3<String, String, String> value,
									BroadcastProcessFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, Tuple5<String, String, String, String, String>>.ReadOnlyContext ctx,
									Collector<Tuple5<String, String, String, String, String>> out) throws Exception {
								ReadOnlyBroadcastState<String, Tuple3<String, String, String>> broadcastState = ctx
										.getBroadcastState(descriptor);
								// 需求3.关联不上 测流 其余主流
								if (broadcastState == null) {
									// out.collect(Tuple5.of(value.f0, value.f1, value.f2, null, null));
									ctx.output(new OutputTag<String>("nojoin", TypeInformation.of(String.class)),
											value.f0 + value.f1 + value.f2);
								} else {
									Tuple3<String, String, String> stringTuple3 = broadcastState.get(value.f0);
									if (stringTuple3 == null) {
										// out.collect(Tuple5.of(value.f0, value.f1, value.f2, null, null));
										ctx.output(new OutputTag<String>("nojoin", TypeInformation.of(String.class)),
												value.f0 + value.f1 + value.f2);
									} else {
										out.collect(Tuple5.of(value.f0, value.f1, value.f2, stringTuple3.f1,
												stringTuple3.f2));
									}
								}
							}

							@Override
							public void processBroadcastElement(Tuple3<String, String, String> value,
									BroadcastProcessFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, Tuple5<String, String, String, String, String>>.Context ctx,
									Collector<Tuple5<String, String, String, String, String>> out) throws Exception {
								BroadcastState<String, Tuple3<String, String, String>> broadcastState = ctx
										.getBroadcastState(descriptor);
								broadcastState.put(value.f0, value);

							}
						})
				.returns(TypeInformation.of(new TypeHint<Tuple5<String, String, String, String, String>>() {
				}));
		// 主流
		processed.print();
		// 测流
		DataStream<String> sideOutput = processed
				.getSideOutput(new OutputTag<String>("nojoin", TypeInformation.of(String.class)));

		// sideOutput.print();

		// 需求4 主流,性别分组,取出最大随机数
		SingleOutputStreamOperator<Tuple5<String, String, Integer, String, String>> streamOperator = processed
				.keyBy(x -> x.f3)
				.map(new MapFunction<Tuple5<String, String, String, String, String>, Tuple5<String, String, Integer, String, String>>() {
					@Override
					public Tuple5<String, String, Integer, String, String> map(
							Tuple5<String, String, String, String, String> value) throws Exception {
						return Tuple5.of(value.f0, value.f1, Integer.valueOf(value.f2), value.f3, value.f4);
					}
				}).returns(TypeInformation.of(new TypeHint<Tuple5<String, String, Integer, String, String>>() {
				}));
		SingleOutputStreamOperator<Tuple5<String, String, Integer, String, String>> maxBy = streamOperator
				.keyBy(tp5 -> tp5.f3).maxBy(2);
		maxBy.print();

		// 5.主流写入mysql  未验证 待测试
		String sql = " insert into testa values (?,?,?,?,?) on duplicate key a=?,b=?,c=?,d=?,e=?  ";
		SinkFunction<Tuple5<String, String, Integer, String, String>> jdbcSink = JdbcSink.sink(sql,
				new JdbcStatementBuilder<Tuple5<String, String, Integer, String, String>>() {
					@Override
					public void accept(PreparedStatement preparedStatement,
							Tuple5<String, String, Integer, String, String> tuple5) throws SQLException {
						preparedStatement.setString(0, tuple5.f0);
						preparedStatement.setString(1, tuple5.f1);
						preparedStatement.setInt(2, tuple5.f2);
						preparedStatement.setString(3, tuple5.f3);
						preparedStatement.setString(4, tuple5.f4);
						preparedStatement.setString(5, tuple5.f0);
						preparedStatement.setString(6, tuple5.f1);
						preparedStatement.setInt(7, tuple5.f2);
						preparedStatement.setString(8, tuple5.f3);
						preparedStatement.setString(9, tuple5.f4);
					}
				}, JdbcExecutionOptions.builder().withBatchSize(2) // 两条数据一批插入
						.withMaxRetries(3) // 失败插入重试次数
						.build(),
				new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withPassword("root") // jdbc 连接信息
						.withUsername("root")// jdbc 连接信息
						.withUrl("jdbc:mysql://192.168.141.131:3306/flinkdemo").build());
		streamOperator.addSink(jdbcSink);

		// 6.测流写入parquet  未验证 待测试
		ParquetWriterFactory<String> writerFactory = ParquetAvroWriters.forReflectRecord(String.class);
		FileSink<String> build = FileSink.forBulkFormat(new Path("d:/sink"), writerFactory)
				.withBucketAssigner(new DateTimeBucketAssigner<String>()) // 文件分桶策略
				.withBucketCheckInterval(5)// 文件夹异步线程创建和检测周期
				.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("flinkdemo") // 文件前缀
						.withPartSuffix(".txt") // 文件后缀
						.build())// 文件的输出格式对象
				.build();

		sideOutput.sinkTo(build);

		env.execute();

	}

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/31531.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

uniapp设置滚动条滚动到指定位置

场景&#xff1a;左侧菜单栏&#xff0c;每次切换时&#xff0c;需要右侧商品展示区保持滚动条及页面在最顶部 1.利用scroll-view 中scroll-top属性实现 1.1设置scrollToTop属性为0 data() {return {// 保证每次切换&#xff0c;滚动条位置都在最顶部scrollToTop: 0,}; } 1.…

ansible的剧本(playbook)

一、playbooks 概述以及实例操作 1、playbooks 的组成 playbooks 本身由以下各部分组成 &#xff08;1&#xff09;Tasks&#xff1a;任务&#xff0c;即通过 task 调用 ansible 的模板将多个操作组织在一个 playbook 中运行 &#xff08;2&#xff09;Variables&#xff1…

iOS 单元测试之常用框架 OCMock 详解 | 京东云技术团队

一、单元测试 1.1 单元测试的必要性 测试驱动开发并不是一个很新鲜的概念了。在日常开发中&#xff0c;很多时候需要测试&#xff0c;但是这种输出是必须在点击一系列按钮之后才能在屏幕上显示出来的东西。测试的时候&#xff0c;往往是用模拟器一次一次的从头开始启动 app&a…

团体程序设计天梯赛-练习集L2篇④

&#x1f680;欢迎来到本文&#x1f680; &#x1f349;个人简介&#xff1a;Hello大家好呀&#xff0c;我是陈童学&#xff0c;一个与你一样正在慢慢前行的普通人。 &#x1f3c0;个人主页&#xff1a;陈童学哦CSDN &#x1f4a1;所属专栏&#xff1a;PTA &#x1f381;希望各…

模糊聚类在负荷实测建模中的应用(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

VSCode使用CodeWhisperer(AI编程)

安装AWS Toolkit插件&#xff0c;点击侧边插件搜索并安装 2.点击aws ->CodeWhisperer ->Start 3.在下拉菜单中点击Use a personal email to sign up and sign in with AWS Builder ID 4.点Copy Code and Proceed&#xff0c;这会自动复制一些东西 5. 收到提示打开外部网…

【跑实验05】利用CLIP中的图像编码器,如何遍历文件夹中的图像,将图像文件改为28*28的尺寸,然后输出到excel中的每一列,最后一列全都标记为0

文章目录 一、初步实现二、警告信息的解决三、Excel的限制四、尝试解决 一、初步实现 要遍历文件夹中的图像并将其尺寸调整为28x28&#xff0c;并将结果输出到Excel中&#xff0c;可以按照以下步骤进行操作&#xff1a; 首先&#xff0c;确保您已经安装了Pandas库&#xff0c…

“插入排序:小数据量排序的王者“

文章目录 &#x1f50d;什么是插入排序&#xff1f;&#x1f511;插入排序的优缺点&#x1f680;实现插入排序 &#x1f50d;什么是插入排序&#xff1f; 插入排序是一种简单的排序算法&#xff0c;它的基本思想是&#xff1a;将待排序的元素&#xff0c;从第二个元素开始&…

Adobe Creative Cloud 摄影计划 - 当图像与想象力相遇。 PS+LRc套餐 国际版 1年订阅/398

这里重点介绍国际版摄影计划套餐详情&#xff1a; 国际版包括&#xff1a;Photoshop、Lightroom Classic、Photoshop Express、Lightroom Mobile、Lightroom、云服务。中国版包括&#xff1a;Photoshop、Lightroom Classic、Photoshop Express、Lightroom Mobile 桌面应用程序…

力扣高频SQL50题(基础版)——第十天

力扣高频SQL50题(基础版)——第十天 1 只出现过一次的最大数字 1.1 题目内容 1.1.1 基本题目信息 1.1.2 示例输入输出1 1.1.3 示例输入输出2 1.2 示例sql语句 # 查不到时的结果自然就为Null SELECT MAX(t.num) num FROM (SELECT numFROM MyNumbersGROUP By numHAVING count…

2023考研一战上岸 电子科技大学 860软件工程 经验分享

目录 1. 前言&#xff1a;考研&#xff0c;心态最重要&#xff01; 2. 初试各科复习经验 (1) 数学一 (2) 英语一 (3) 专业课 (4) 政治 (5) 四门课时间划分 3. 复试流程和备考建议 (1) 复试流程 (2) 备考建议 4. 结语 首先&#xff0c;先简要做一个自我介绍&#xff…

【开源与项目实战:开源实战】82 | 开源实战三(中):剖析Google Guava中用到的几种设计模式

上一节课&#xff0c;我们通过 Google Guava 这样一个优秀的开源类库&#xff0c;讲解了如何在业务开发中&#xff0c;发现跟业务无关、可以复用的通用功能模块&#xff0c;并将它们从业务代码中抽离出来&#xff0c;设计开发成独立的类库、框架或功能组件。 今天&#xff0c;…

网络安全学术顶会——CCS '22 议题清单、摘要与总结(上)

注意&#xff1a;本文由GPT4与Claude联合生成。 按语&#xff1a;ChatGPT在计算机领域的翻译质量还是欠缺一些&#xff0c;翻译出来的中文有的不够自然&#xff0c;经常完全按照英文的表达方式来&#xff0c;导致中文特别长&#xff0c;很绕。GPT4的翻译效果相对ChatGPT效果要好…

内网安全:内网渗透.(拿到内网主机最高权限 vulntarget 靶场 A)

内网安全&#xff1a;内网渗透.&#xff08;拿到内网主机最高权限&#xff09; 内网穿透又被称为NAT穿透&#xff0c;内网端口映射外网&#xff0c;在处于使用了NAT设备的私有TCP/IP网络中的主机之间建立连接的问题。通过映射端口&#xff0c;让外网的电脑找到处于内网的电脑。…

TensorFlow2进行CIFAR-10数据集动物识别,保存模型并且进行外部下载图片测试

首先&#xff0c;你已经安装好anaconda3、创建好环境、下载好TensorFlow2模块并且下载好jupyter了&#xff0c;那么我们就直接打开jupyter开始进行CIFAR10数据集的训练。 第一步&#xff1a;下载CIFAR10数据集 下载网址&#xff1a;http://www.cs.toronto.edu/~kriz/cifar-10…

【网络协议详解】——IPv4(学习笔记)

目录 &#x1f552; 1. IPv4地址概述&#x1f552; 2. 分类编址&#x1f552; 3. 划分子网&#x1f558; 3.1 概述&#x1f558; 3.2 如何实现&#x1f558; 3.3 无分类编址&#x1f558; 3.4 应用规划&#x1f564; 3.4.1 定长的子网掩码FLSM&#xff08;Fixed Length Subnet …

第4章 网络层

1‌、下列关于路由算法描述错误的是&#xff08; &#xff09; A. 链路状态算法是一种全局路由算法&#xff0c;每个路由器需要维护全局状态信息B. OSPF 是一种域内路由协议&#xff0c;核心是基于 Dijkstra 最低费用路径算法C. RIP 是一种域内路由算法&#xff0c;核心是基…

MUR8060PT-ASEMI快恢复二极管MUR8060PT

编辑-Z MUR8060PT在TO-247封装里采用的2个芯片&#xff0c;其尺寸都是140MIL&#xff0c;是一款高耐压大电流快恢复二极管。MUR8060PT的浪涌电流Ifsm为600A&#xff0c;漏电流(Ir)为10uA&#xff0c;其工作时耐温度范围为-55~150摄氏度。MUR8060PT采用抗冲击硅芯片材质&#x…

Maven编译常见问题收集

1、父pom里面有引入lombok依赖&#xff0c;为什么子pom有用到lombok&#xff0c;依然识别不到呢 这是因为父pom引入依赖的时候&#xff0c;把 <dependency></dependency>依赖标签&#xff0c;最外层包 在了<dependencyManagement></dependencyManagemen…

【spring】spring是什么?详解它的特点与模块

作者&#xff1a;Insist-- 个人主页&#xff1a;insist--个人主页 作者会持续更新网络知识和python基础知识&#xff0c;期待你的关注 目录 一、spring介绍 二、spring的特点&#xff08;七点&#xff09; 1、简化开发 2、AOP的支持 3、声明式事务的支持 4、方便测试 5、…