FlinkAPI开发之数据合流

案例用到的测试数据请参考文章:
Flink自定义Source模拟数据流
原文链接:https://blog.csdn.net/m0_52606060/article/details/135436048

概述

在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以Flink中合流的操作会更加普遍,对应的API也更加丰富。

联合(Union)

最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union)类似于SQL里面的union。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。

在代码中,我们只要基于DataStream直接调用.union()方法,传入其他DataStream作为参数,就可以实现流的联合了;得到的依然是一个DataStream:

stream1.union(stream2, stream3, ...)

案例:合并两个订单数据流

package com.zxl.flink;

import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class DemoTest {
    public static void main(String[] args) throws Exception {
        //创建Flink流处理执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度为1
        environment.setParallelism(1);
        //调用Flink自定义Source
        // TODO: 2024/1/6 订单数据
        DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());
        // TODO: 2024/1/7 为了效果更佳明显先分割成两条数据流
        DataStream<Orders> operator01 = ordersDataStreamSource.filter(orders -> orders.getOrder_amount() > 50);
        DataStream<Orders> operator02 = ordersDataStreamSource.filter(orders -> orders.getOrder_amount() < 50);
        // TODO: 2024/1/7 合流操作
        DataStream<Orders> stream = operator01.union(operator02);
        stream.print();
        environment.execute();
    }
}

在这里插入图片描述

连接(Connect)

流的联合虽然简单,不过受限于数据类型不能改变,灵活性大打折扣,所以实际应用较少出现。除了联合(union),Flink还提供了另外一种方便的合流操作——连接(connect),这个流的最终效果和SQLunion类似,只不过可以对每个流和输出的流数据进行处理更加灵活。

在这里插入图片描述
代码实现:需要分为两步:首先基于一条DataStream调用.connect()方法,传入另外一条DataStream作为参数,将两条流连接起来,得到一个ConnectedStreams;然后再调用同处理方法得到DataStream。这里可以的调用的同处理方法有.map()/.flatMap(),以及.process()方法

package com.zxl.flink;

import com.zxl.bean.Orders;
import com.zxl.bean.Payments;
import com.zxl.datas.OrdersData;
import com.zxl.datas.PaymentData;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;

public class DemoTest {
    public static void main(String[] args) throws Exception {
        //创建Flink流处理执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度为1
        environment.setParallelism(1);
        //调用Flink自定义Source
        // TODO: 2024/1/6 订单数据
        DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());
        // TODO: 2024/1/6 支付数据
        DataStreamSource<Payments> paymentsDataStreamSource = environment.addSource(new PaymentData());
        // TODO: 2024/1/7 连接两条流
        ConnectedStreams<Orders, Payments> connectedStreams = ordersDataStreamSource.connect(paymentsDataStreamSource);
        // TODO: 2024/1/7 对连接的两条流和输出的流进行处理

        // TODO: 2024/1/7 通过Map进行处理
        SingleOutputStreamOperator<String> mapStream = connectedStreams.map(new CoMapFunction<Orders, Payments, String>() {
            @Override
            public String map1(Orders orders) throws Exception {
                return orders.toString();
            }

            @Override
            public String map2(Payments payments) throws Exception {
                return payments.toString();
            }
        });
        // TODO: 2024/1/7 通过flatMap进行处理
        SingleOutputStreamOperator<String> flatMaStream = connectedStreams.flatMap(new CoFlatMapFunction<Orders, Payments, String>() {
            @Override
            public void flatMap1(Orders orders, Collector<String> collector) throws Exception {
                collector.collect(orders.toString());
            }

            @Override
            public void flatMap2(Payments payments, Collector<String> collector) throws Exception {
                collector.collect(payments.toString());
            }
        });
        // TODO: 2024/1/7 收集订单金额和支付金额大于50元的数据
        SingleOutputStreamOperator<String> processStream = connectedStreams.process(new CoProcessFunction<Orders, Payments, String>() {

            @Override
            public void processElement1(Orders orders, CoProcessFunction<Orders, Payments, String>.Context context, Collector<String> collector) throws Exception {
                if (orders.getOrder_amount() > 50) {
                    collector.collect(orders.toString());
                }
            }

            @Override
            public void processElement2(Payments payments, CoProcessFunction<Orders, Payments, String>.Context context, Collector<String> collector) throws Exception {
                if (payments.getPayment_amount() > 50) {
                    collector.collect(payments.toString());
                }
            }
        });
        flatMaStream.print();
        environment.execute();
    }
}

Map进行处理结果
在这里插入图片描述

flatMap进行处理结果
在这里插入图片描述
process进行处理的结果
在这里插入图片描述

窗口联结(Window Join)

Flink为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。
1)窗口联结的调用
窗口联结在代码中的实现,首先需要调用DataStream的.join()方法来合并两条流,得到一个JoinedStreams;接着通过.where()和.equalTo()方法指定两条流中联结的key;然后通过.window()开窗口,并调用.apply()传入联结窗口函数进行处理计算。通用调用形式如下:

stream1.join(stream2)
        .where(<KeySelector>)
        .equalTo(<KeySelector>)
        .window(<WindowAssigner>)
        .apply(<JoinFunction>)

上面代码中.where()的参数是键选择器(KeySelector),用来指定第一条流中的key;而.equalTo()传入的KeySelector则指定了第二条流中的key。两者相同的元素,如果在同一窗口中,就可以匹配起来,并通过一个“联结函数”(JoinFunction)进行处理了。这里.window()传入的就是窗口分配器,之前讲到的三种时间窗口都可以用在这里:滚动窗(tumbling window)、滑动窗口(sliding window)和会话窗口(session window)。而后面调用.apply()可以看作实现了一个特殊的窗口函数。注意这里只能调用.apply(),没有其他替代的方法。传入的JoinFunction也是一个函数类接口,使用时需要实现内部的.join()方法。这个方法有两个参数,分别表示两条流中成对匹配的数据。其实仔细观察可以发现,窗口join的调用语法和我们熟悉的SQL中表的join非常相似:SELECT * FROM table1 t1, table2 t2 WHERE t1.id = t2.id; 这句SQL中where子句的表达,等价于inner join … on,所以本身表示的是两张表基于id的“内连接”(inner join)。而Flink中的window join,同样类似于inner join。也就是说,最后处理输出的,只有两条流中数据按key配对成功的那些;如果某个窗口中一条流的数据没有任何另一条流的数据匹配,那么就不会调用JoinFunction的.join()方法,也就没有任何输出了。

package com.zxl.flink;

import com.zxl.bean.Orders;
import com.zxl.bean.Payments;
import com.zxl.datas.OrdersData;
import com.zxl.datas.PaymentData;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import scala.Tuple9;


public class DemoTest {
    public static void main(String[] args) throws Exception {
        //创建Flink流处理执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度为1
        environment.setParallelism(1);
        // TODO: 2024/1/7 定义时间语义
        environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //调用Flink自定义Source
        // TODO: 2024/1/6 订单数据
        DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());
        // TODO: 2024/1/6 支付数据
        DataStreamSource<Payments> paymentsDataStreamSource = environment.addSource(new PaymentData());
        // TODO: 2024/1/7 配置订单数据水位线
        SingleOutputStreamOperator<Orders> ordersWater = ordersDataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy
                // TODO: 2024/1/7 指定watermark生成:升序的watermark,没有等待时间
                .<Orders>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Orders>() {
                    @Override
                    public long extractTimestamp(Orders orders, long l) {
                        return orders.getOrder_date();
                    }
                })
        );
        // TODO: 2024/1/7 配置支付数据水位线
        SingleOutputStreamOperator<Payments> paymentsWater = paymentsDataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy
                .<Payments>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Payments>() {
                    @Override
                    public long extractTimestamp(Payments payments, long l) {
                        return payments.getPayment_date();
                    }
                })
        );
        // TODO window join
        // 1. 落在同一个时间窗口范围内才能匹配
        // 2. 根据keyby的key,来进行匹配关联
        // 3. 只能拿到匹配上的数据,类似有固定时间范围的inner join
        DataStream<Tuple9> dataStream = ordersWater.join(paymentsWater)
                .where(orders -> orders.getOrder_id())
                .equalTo(payments -> payments.getOrder_id())
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .apply(new JoinFunction<Orders, Payments, Tuple9>() {
                    @Override
                    public Tuple9 join(Orders orders, Payments payments) throws Exception {
                        Tuple9<Long, Long, Long, Integer, Integer, Long, Integer, Long, String> tuple9 = new Tuple9<>(orders.getOrder_id(), orders.getUser_id(), orders.getOrder_date(), orders.getOrder_amount(), orders.getProduct_id(), orders.getOrder_num(), payments.getPayment_amount(), payments.getPayment_date(), payments.getPayment_type());
                        return tuple9;
                    }
                });
        dataStream.print("join后的数据");
        environment.execute();
    }
}

在这里插入图片描述

间隔联结(Interval Join)

在有些场景下,我们要处理的时间间隔可能并不是固定的。这时显然不应该用滚动窗口或滑动窗口来处理——因为匹配的两个数据有可能刚好“卡在”窗口边缘两侧,于是窗口内就都没有匹配了;会话窗口虽然时间不固定,但也明显不适合这个场景。基于时间的窗口联结已经无能为力了。
为了应对这样的需求,Flink提供了一种叫作“间隔联结”(interval join)的合流操作。顾名思义,间隔联结的思路就是针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔,看这期间是否有来自另一条流的数据匹配。

间隔联结的原理

间隔联结具体的定义方式是,我们给定两个时间点,分别叫作间隔的“上界”(upperBound)和“下界”(lowerBound);于是对于一条流(不妨叫作A)中的任意一个数据元素a,就可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp + upperBound],即以a的时间戳为中心,下至下界点、上至上界点的一个闭区间:我们就把这段时间作为可以匹配另一条流数据的“窗口”范围。所以对于另一条流(不妨叫B)中的数据元素b,如果它的时间戳落在了这个区间范围内,a和b就可以成功配对,进而进行计算输出结果。所以匹配的条件为:
a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
这里需要注意,做间隔联结的两条流A和B,也必须基于相同的key;下界lowerBound应该小于等于上界upperBound,两者都可正可负;间隔联结目前只支持事件时间语义。
如下图所示,我们可以清楚地看到间隔联结的方式:
在这里插入图片描述

下方的流A去间隔联结上方的流B,所以基于A的每个数据元素,都可以开辟一个间隔区间。我们这里设置下界为-2毫秒,上界为1毫秒。于是对于时间戳为2的A中元素,它的可匹配区间就是[0, 3],流B中有时间戳为0、1的两个元素落在这个范围内,所以就可以得到匹配数据对(2, 0)和(2, 1)。同样地,A中时间戳为3的元素,可匹配区间为[1, 4],B中只有时间戳为1的一个数据可以匹配,于是得到匹配数据对(3, 1)。
所以我们可以看到,间隔联结同样是一种内连接(inner join)。与窗口联结不同的是,interval join做匹配的时间段是基于流中数据的,所以并不确定;而且流B中的数据可以不只在一个区间内被匹配。

间隔联结的调用

间隔联结在代码中,是基于KeyedStream的联结(join)操作。DataStream在keyBy得到KeyedStream之后,可以调用.intervalJoin()来合并两条流,传入的参数同样是一个KeyedStream,两者的key类型应该一致;得到的是一个IntervalJoin类型。后续的操作同样是完全固定的:先通过.between()方法指定间隔的上下界,再调用.process()方法,定义对匹配数据对的处理操作。调用.process()需要传入一个处理函数,这是处理函数家族的最后一员:“处理联结函数”ProcessJoinFunction。

通用调用形式

stream1
    .keyBy(<KeySelector>)
    .intervalJoin(stream2.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String(){

        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
            out.collect(left + "," + right);
        }
    });

可以看到,抽象类ProcessJoinFunction就像是ProcessFunction和JoinFunction的结合,内部同样有一个抽象方法.processElement()。与其他处理函数不同的是,它多了一个参数,这自然是因为有来自两条流的数据。参数中left指的就是第一条流中的数据,right则是第二条流中与它匹配的数据。每当检测到一组匹配,就会调用这里的.processElement()方法,经处理转换之后输出结果。

间隔联结实例

案例需求:在电商网站中,某些用户行为往往会有短时间内的强关联。我们这里举一个例子,我们有两条流,一条是下订单的流,一条是支付的流。我们可以针对同一个订单,来做这样一个联结。也就是一个订单的和这个订单的支付数据进行一个联结查询。

代码实现:正常使用

package com.zxl.flink;

import com.zxl.bean.Orders;
import com.zxl.bean.Payments;
import com.zxl.datas.OrdersData;
import com.zxl.datas.PaymentData;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import scala.Tuple9;


public class DemoTest {
    public static void main(String[] args) throws Exception {
        //创建Flink流处理执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度为1
        environment.setParallelism(1);
        // TODO: 2024/1/7 定义时间语义
        environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //调用Flink自定义Source
        // TODO: 2024/1/6 订单数据
        DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());
        // TODO: 2024/1/6 支付数据
        DataStreamSource<Payments> paymentsDataStreamSource = environment.addSource(new PaymentData());
        // TODO: 2024/1/7 配置订单数据水位线
        SingleOutputStreamOperator<Orders> ordersWater = ordersDataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy
                // TODO: 2024/1/7 指定watermark生成:升序的watermark,没有等待时间
                .<Orders>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Orders>() {
                    @Override
                    public long extractTimestamp(Orders orders, long l) {
                        return orders.getOrder_date();
                    }
                })
        );
        // TODO: 2024/1/7 配置支付数据水位线
        SingleOutputStreamOperator<Payments> paymentsWater = paymentsDataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy
                .<Payments>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Payments>() {
                    @Override
                    public long extractTimestamp(Payments payments, long l) {
                        return payments.getPayment_date();
                    }
                })
        );
        // TODO interval join
        //1. 分别做keyby,key其实就是关联条件
        KeyedStream<Orders, Long> ordersKeyedStream = ordersWater.keyBy(orders -> orders.getOrder_id());
        KeyedStream<Payments, Long> paymentsKeyedStream = paymentsWater.keyBy(payments -> payments.getOrder_id());
        //2. 调用 interval join
        SingleOutputStreamOperator<Tuple9> process = ordersKeyedStream.intervalJoin(paymentsKeyedStream)
                .between(Time.seconds(-10), Time.seconds(10))
                .process(new ProcessJoinFunction<Orders, Payments, Tuple9>() {
                    /**
                     * 两条流的数据匹配上,才会调用这个方法
                     * @param left  ks1的数据
                     * @param right ks2的数据
                     * @param ctx   上下文
                     * @param out   采集器
                     * @throws Exception
                     */
                    @Override
                    public void processElement(Orders orders, Payments payments, ProcessJoinFunction<Orders, Payments, Tuple9>.Context context, Collector<Tuple9> collector) throws Exception {
                        Tuple9<Long, Long, Long, Integer, Integer, Long, Integer, Long, String> tuple9 = new Tuple9<>(orders.getOrder_id(), orders.getUser_id(), orders.getOrder_date(), orders.getOrder_amount(), orders.getProduct_id(), orders.getOrder_num(), payments.getPayment_amount(), payments.getPayment_date(), payments.getPayment_type());
                        collector.collect(tuple9);
                    }
                });
        process.print();
        environment.execute();
    }
}

在这里插入图片描述

代码实现:处理迟到数据

package com.zxl.flink;

import com.zxl.bean.Orders;
import com.zxl.bean.Payments;
import com.zxl.datas.OrdersData;
import com.zxl.datas.PaymentData;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import scala.Tuple9;

import java.time.Duration;


public class DemoTest {
    public static void main(String[] args) throws Exception {
        //创建Flink流处理执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度为1
        environment.setParallelism(1);
        // TODO: 2024/1/7 定义时间语义
        environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //调用Flink自定义Source
        // TODO: 2024/1/6 订单数据
        DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());
        // TODO: 2024/1/6 支付数据
        DataStreamSource<Payments> paymentsDataStreamSource = environment.addSource(new PaymentData());
        // TODO: 2024/1/7 配置订单数据水位线
        SingleOutputStreamOperator<Orders> ordersWater = ordersDataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy
                // TODO: 2024/1/7 指定watermark生成:升序的watermark,没有等待时间
                .<Orders>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(new SerializableTimestampAssigner<Orders>() {
                    @Override
                    public long extractTimestamp(Orders orders, long l) {
                        return orders.getOrder_date() * 1000;
                    }
                })
        );
        // TODO: 2024/1/7 配置支付数据水位线
        SingleOutputStreamOperator<Payments> paymentsWater = paymentsDataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy
                .<Payments>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(new SerializableTimestampAssigner<Payments>() {
                    @Override
                    public long extractTimestamp(Payments payments, long l) {
                        return payments.getPayment_date() * 1000;
                    }
                })
        );

        // TODO interval join
        //配置测输出流标签
        OutputTag<Orders> orders_tag = new OutputTag<Orders>("orders", Types.POJO(Orders.class));
        OutputTag<Payments> payments_tag = new OutputTag<Payments>("payments", Types.POJO(Payments.class));
        //1. 分别做keyby,key其实就是关联条件
        KeyedStream<Orders, Long> ordersKeyedStream = ordersWater.keyBy(orders -> orders.getOrder_id());
        KeyedStream<Payments, Long> paymentsKeyedStream = paymentsWater.keyBy(payments -> payments.getOrder_id());
        //2. 调用 interval join
        SingleOutputStreamOperator<Tuple9> process = ordersKeyedStream.intervalJoin(paymentsKeyedStream)
                .between(Time.seconds(-2), Time.seconds(2))
                .sideOutputLeftLateData(orders_tag)
                .sideOutputRightLateData(payments_tag)
                .process(new ProcessJoinFunction<Orders, Payments, Tuple9>() {
                    /**
                     * 两条流的数据匹配上,才会调用这个方法
                     * @param left  ks1的数据
                     * @param right ks2的数据
                     * @param ctx   上下文
                     * @param out   采集器
                     * @throws Exception
                     */
                    @Override
                    public void processElement(Orders orders, Payments payments, ProcessJoinFunction<Orders, Payments, Tuple9>.Context context, Collector<Tuple9> collector) throws Exception {
                        Tuple9<Long, Long, Long, Integer, Integer, Long, Integer, Long, String> tuple9 = new Tuple9<>(orders.getOrder_id(), orders.getUser_id(), orders.getOrder_date(), orders.getOrder_amount(), orders.getProduct_id(), orders.getOrder_num(), payments.getPayment_amount(), payments.getPayment_date(), payments.getPayment_type());
                        collector.collect(tuple9);
                    }
                });
        ordersKeyedStream.print();
        paymentsKeyedStream.print();
        process.print("主流");
        process.getSideOutput(orders_tag).printToErr("orders迟到的数据");
        process.getSideOutput(payments_tag).printToErr("payments迟到的数据");
        environment.execute();
    }
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/308973.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JMeter 批量接口测试

一、背景 最近在进行某中台的接口测试准备&#xff0c;发现接口数量非常多&#xff0c;有6、70个&#xff0c;而且每个接口都有大量的参数并且需要进行各种参数验证来测试接口是否能够正确返回响应值。想了几种方案后&#xff0c;决定尝试使用JMeter的csv读取来实现批量的接口…

【Docker项目实战】使用Docker部署nullboard任务管理工具

【Docker项目实战】使用Docker部署nullboard任务管理工具 一、nullboard介绍1.1 nullboard简介1.2 任务看板工具介绍 二、本地环境介绍2.1 本地环境规划2.2 本次实践介绍2.3 注意事项 三、本地环境检查3.1 检查Docker服务状态3.2 检查Docker版本3.3 检查docker compose 版本 四…

export default 和exprot

1.默认导入和默认导出 语法: export default {需要输出的内容} 接收: import 成员变量的名字 from 文件夹的路径 案例&#xff1a; a.mjs文件夹下默认导出 export default{a:10,b:20,show(){console.log(123);} } 在b.mjs文件中用成员变量进行接收 import AA from &q…

【昕宝爸爸定制】如何将集合变成线程安全的?

如何将集合变成线程安全的? ✅典型解析&#x1f7e2;拓展知识仓☑️Java中都有哪些线程安全的集合&#xff1f;&#x1f7e0;线程安全集合类的优缺点是什么&#x1f7e1;如何选择合适的线程安全集合类☑️如何解决线程安全集合类并发冲突问题✔️乐观锁实现方式 (具体步骤)。✅…

城堡世界源码

随着数字技术的飞速发展和人们对于娱乐需求的不断提升&#xff0c;城堡世界源码开发逐渐成为了新的热门话题。城堡世界是一个集潮流、艺术、科技于一体的数字娱乐新领域&#xff0c;通过将虚拟现实、增强现实等技术融入传统玩具设计中&#xff0c;为玩家们带来了全新的互动体验…

建站为什么需要服务器?(Web服务器与计算机对比)

​  在部署网站时&#xff0c;底层基础设施在确保最佳性能、可靠性和可扩展性方面发挥着至关重要的作用。虽然大多数人都熟悉个人计算机 (PC) 作为日常工作和个人任务的设备&#xff0c;但 PC 和 Web 服务器之间存在显著差异。在这篇文章中&#xff0c;我们将讨论这些差异是什…

拼多多API的未来:无限可能性和创新空间

拼多多&#xff0c;作为中国电商市场的巨头之一&#xff0c;自成立以来一直保持着高速的发展态势。其API的开放为开发者提供了无限的可能性和创新空间&#xff0c;使得更多的商业逻辑和功能得以实现。本文将深入探讨拼多多API的未来发展&#xff0c;以及它所具备的无限可能性和…

Python基础学习(一)

Python基础语法学习记录 输出 将结果或内容呈现给用户 print("休对故人思故国&#xff0c;且将新火试新茶&#xff0c;诗酒趁年华") # 输出不换行&#xff0c;并且可以指定以什么字符结尾 print("青山依旧在",end ",") print("几度夕阳红…

2024-01-03 无重叠区间

435. 无重叠区间 思路&#xff1a;和最少数量引爆气球的箭的思路基本都是一致了&#xff01;贪心就是比较左边的值是否大于下一个右边的值 class Solution:def eraseOverlapIntervals(self, points: List[List[int]]) -> int:points.sort(keylambda x: (x[0], x[1]))# 比较…

入驻抖店的费用是多少?最新具体费用详情!

我是电商珠珠 抖店的入驻费用是新手比较关心的问题&#xff0c;网上的说法不一&#xff0c;有说开店要几w的&#xff0c;还有的说不要钱的&#xff0c;什么说法都有。 搞得想要开店的人&#xff0c;心有点慌&#xff0c;害怕超出自己的预算。 接下来我就跟大家详细讲一下&am…

Java中异常处理-详解

异常&#xff08;Exception&#xff09; JVM 默认处理方案 把异常的名称&#xff0c;异常的原因&#xff0c;及异常出错的位置等信息输出在控制台程序停止执行 异常类型 编译时异常必须显示处理&#xff0c;否则程序会发生错误&#xff0c;无法通过编译运行时异常无需显示处理…

数据泄密零容忍:揭秘迅软科技文件加密系统的保密奥秘!

企事业单位内部的数据机密性至关重要&#xff0c;但机密数据往往以电子文档形式存储&#xff0c;并借助多样化的传播手段&#xff0c;导致文件泄密事件频发。无论是员工误操作导致的终端泄密&#xff0c;还是黑客入侵窃取机密数据&#xff0c;都可能导致重要文件被非法获取&…

使用echarts制作柱状图、折线图,并且下方带表格

实现效果: 调试地址: https://echarts.apache.org/examples/zh/editor.html?cline-simple 源码: option { title: { left: center, top: 0, text: 2022-05月 制造产量 达成情况(单位: 吨) (图1)\n\n集团目标产量: 106,675吨 集团实际产量: 2,636吨, text…

Springboot+vue的工作流程管理系统(有报告),Javaee项目,springboot vue前后端分离项目

演示视频&#xff1a; Springbootvue的工作流程管理系统(有报告)&#xff0c;Javaee项目&#xff0c;springboot vue前后端分离项目 项目介绍&#xff1a; 本文设计了一个基于Springbootvue的前后端分离的工作流程管理系统&#xff0c;采用M&#xff08;model&#xff09;V&am…

SENet实现遥感影像场景分类

今天我们分享SENet实现遥感影像场景分类。 数据集 本次实验我们使用的是NWPU-RESISC45 Dataset。NWPU Dataset 是一个遥感影像数据集&#xff0c;其中 NWPU-RESISC45 Dataset 是由西北工业大学创建的遥感图像场景分类可用基准&#xff0c;该数据集包含像素大小为 256*256 共计 …

1. seaborn-可视化统计关系

统计分析是了解数据集中的变量如何相互关联以及这些关系如何依赖于其他变量的过程。可视化是此过程的核心组件&#xff0c;这是因为当数据被恰当地可视化时&#xff0c;人的视觉系统可以看到指示关系的趋势和模式。 这里介绍三个seaborn函数。我们最常用的是relplot()。这是一…

golang实现加密解密文档

golang实现加密解密文档 package mainimport ("bytes""crypto/aes""crypto/cipher""crypto/rand""encoding/base64""flag""fmt""io""io/ioutil" )func main() {encodePtr : flag.…

阿赵UE学习笔记——8、贴图导入设置

阿赵UE学习笔记目录 大家好&#xff0c;我是阿赵。   继续学习虚幻引擎的用法&#xff0c;这次来说一下贴图的导入设置。   在内容浏览器里面可以看到纹理类型的资源&#xff0c;就是贴图了&#xff0c;鼠标悬浮在上面可以看到这个纹理贴图的信息&#xff1a; 双击纹理贴图…

Vue3技术解析(小册子)

随着 Vue 3 正式版本的发布&#xff0c;未来 Vue 3 将会成为前端的主流框架&#xff0c;这个毋庸置疑。Vue 3 在使用方面会兼容部分 Vue 2.x 的特性&#xff0c;比如 options API。 所以&#xff0c;究竟是要先学习 Vue 2 打好基础&#xff0c;还是直接学习 Vue 3 呢&#xff…

基于书生·浦语大模型应用开发范式介绍

文章目录 大模型应用开发范式LangChain简介构建向量数据库搭建知识库助手RAG方案优化建议 大模型应用开发范式 通用大模型的优势&#xff1a; 强大的语言理解、指令跟随、语言生成的能力可以理解用户自然语言的指令具有强大的知识储备和一定的逻辑推理能力。 通用大模型局限…