大数据-玩转数据-Flink营销对账

一、说明

在电商网站中,订单的支付作为直接与营销收入挂钩的一环,在业务流程中非常重要。对于订单而言,为了正确控制业务流程,也为了增加用户的支付意愿,网站一般会设置一个支付失效时间,超过一段时间不支付的订单就会被取消。另外,对于订单的支付,我们还应保证用户支付的正确性,这可以通过第三方支付平台的交易数据来做一个实时对账。

二、思路

对于订单支付事件,用户支付完成其实并不算完,我们还得确认平台账户上是否到账了。而往往这会来自不同的日志信息,所以我们要同时读入两条流的数据来做合并处理。

三、数据准备

订单数据从OrderLog.csv中读取,交易数据从ReceiptLog.csv中读取
JavaBean类的准备

四、代码

package com.lyh.flink06;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;

import java.util.HashMap;
import java.util.Map;

public class Project_Order {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        SingleOutputStreamOperator<OrderEvent> orderEventString = env.readTextFile("input/OrderLog.csv")
                .map(line -> {
                    String[] data = line.split(",");
                    return new OrderEvent(
                            Long.valueOf(data[0]),
                            data[1],
                            data[2],
                            Long.valueOf(data[3])
                    );
                }).filter(log -> "pay".equals(log.getEventType()));

        SingleOutputStreamOperator<TxEvent> txEventString = env.readTextFile("input/ReceiptLog.csv")
                .map(line -> {
                    String[] data = line.split(",");
                    return new TxEvent(
                            data[0],
                            data[1],
                            Long.valueOf(data[2])
                    );
                });

        orderEventString.connect(txEventString)
                .keyBy(OrderEvent::getTxId,TxEvent::getTxId)
                .process(new KeyedCoProcessFunction<String, OrderEvent, TxEvent, String>() {
                    Map<String,OrderEvent> OrderEventmap = new HashMap<>();
                    Map<String,TxEvent> TxEventmap = new HashMap<>();
                    @Override
                    public void processElement1(OrderEvent value,
                                                Context ctx,
                                                Collector<String> out) throws Exception {
                        TxEvent txEvent = TxEventmap.get(ctx.getCurrentKey());
                        if (txEvent != null) {
                            out.collect("订单" + value.getOrderId() + "对账成功");
                        }else {
                            OrderEventmap.put(ctx.getCurrentKey(),value);
                        }

                    }

                    @Override
                    public void processElement2(TxEvent value,
                                                Context ctx,
                                                Collector<String> out) throws Exception {
                        OrderEvent orderEvent = OrderEventmap.get(ctx.getCurrentKey());
                        if (orderEvent != null) {
                           out.collect("订单" + orderEvent.getOrderId() + "对账成功");
                        }else {
                            TxEventmap.put(ctx.getCurrentKey(),value);
                        }
                    }
                }).print();
        env.execute();

    }
}

五、结果

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/86671.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

一云多芯,智能化转型的下一个工程化挑战

进入2023年&#xff0c;产业数字化和智能化转型升级进入了大规模工程化落地阶段。根据中国信通院《中国数字经济发展研究报告&#xff08;2023&#xff09;》&#xff0c;数字经济已经占我国GDP比重达到41.5%&#xff0c;相当于第二产业占国民经济的比重。随着产业数字化和智能…

【数据结构】 LinkedList的模拟实现与使用

文章目录 &#x1f340;什么是LinkedList&#x1f334;LinkedList的模拟实现&#x1f6a9;创建双链表&#x1f6a9;头插法&#x1f6a9;尾插法&#x1f6a9;任意位置插入&#x1f6a9;查找关键字&#x1f6a9;链表长度&#x1f6a9;打印链表&#x1f6a9;删除第一次出现关键字为…

YOLOv5+deepsort实现目标追踪。(附有各种错误解决办法)

一、YOLOv5算法相关配置 🐸这里如果是自己只想跑一跑YOLOV5的话,可以参考本章节。只想跑通YOLOv5+deepsort的看官移步到下一章节。 1.1 yolov5下载 🐸yolov5源码在github下载地址上或者Gitee上面都有。需要注意的是由于yolov5的代码库作者一直在维护,所以下载的时候需…

【Unity小技巧】Unity探究自制对象池和官方内置对象池(ObjectPool)的使用

文章目录 前言不使用对象池使用官方内置对象池应用 自制对象池总结源码参考完结 前言 对象池&#xff08;Object Pool&#xff09;是一种软件设计模式&#xff0c;用于管理和重用已创建的对象。在对象池中&#xff0c;一组预先创建的对象被维护在一个池中&#xff0c;并在需要时…

OJ练习第152题——分割回文串 II

分割回文串 II 力扣链接&#xff1a;132. 分割回文串 II 题目描述 给你一个字符串 s&#xff0c;请你将 s 分割成一些子串&#xff0c;使每个子串都是回文。 返回符合要求的 最少分割次数 。 示例 Java代码 class Solution {public int minCut(String s) {int n s.leng…

听说你还不知道什么是python?本文将带你发掘python的魅力并让你爱上他

文章目录 前言什么是pythonpython的由来我们为什么要学习python帮助python学习的网站总结 前言 各位朋友们&#xff0c;大家好。龙叔我后台经常收到私信问什么是Python&#xff1f;有必要学习这门语言么&#xff1f;今天&#xff0c;将通过本文告知大家Python是什么&#xff1…

浅谈日常使用的 Docker 底层原理-三大底座

适合的读者&#xff0c;对Docker有过简单了解的朋友&#xff0c;想要进一步了解Docker容器的朋友。 前言 回想我这两年&#xff0c;一直都是在使用 Docker&#xff0c;看过的视频、拜读过的博客&#xff0c;大都是在介绍 Docker 的由来、使用、优点和发展趋势&#xff0c;但对…

QT学习笔记-开发环境编译Qt MySql数据库驱动与交叉编译Qt MySql数据库驱动

QT学习笔记-开发环境编译Qt MySql数据库驱动与交叉编译Qt MySql数据库驱动 0、背景1、基本环境2、开发环境编译Qt MySql数据库驱动2.1 依赖说明2.2 MySQL驱动编译过程 3、交叉编译Qt MySql数据库驱动3.1 依赖说明3.3.1 如何在交叉编译服务器上找到mysql.h及相关头文件3.3.2 如果…

【PHP】基础语法变量常量

文章目录 PHP简介前置知识了解静态网站的特点动态网站特点 PHP基础语法代码标记注释语句分隔(结束)符变量变量的基本概念变量的使用变量命名规则预定义变量可变变量变量传值内存分区 常量基本概念常量定义形式命名规则使用形式系统常量魔术常量 PHP简介 PHP定义&#xff1a;一…

【服务器】Strace显示后台进程输出

今天有小朋友遇到一个问题 她想把2331509和2854637这两个进程调到前台来&#xff0c;以便于在当前shell查看这两个python进程的实时输出 我第一反应是用jobs -l然后fg &#xff08;参考这里&#xff09; 但是发现jobs -l根本没有输出&#xff1a; 原因是jobs看的是当前ses…

Oracle Database12c数据库官网下载和安装教程

文章目录 下载安装Oracle自带的客户端工具使用 下载 进入oracle官网 点击下载连接之后右上角会有一个下载 我们只需要数据库本体就够了 运行这个下载器 等待下好之后即可 出现 Complete 之后代表下载成功&#xff0c;然后我们解压即可 安装 双击 双击setup.exe 根据…

NLP | 基于LLMs的文本分类任务

比赛链接&#xff1a;讯飞开放平台 来源&#xff1a;DataWhale AI夏令营3&#xff08;NLP&#xff09; Roberta-base&#xff08;BERT的改进&#xff09; ①Roberta在预训练的阶段中没有对下一句话进行预测&#xff08;NSP&#xff09; ②采用了动态掩码 ③使用字符级和词级…

引领行业高质量发展|云畅科技参编《低代码开发平台创新发展路线图(2023)》

8月8日-9日&#xff0c;中国电子技术标准化研究院于北京顺利召开《低代码开发平台创新发展路线图&#xff08;2023&#xff09;》封闭编制会。云畅科技、浪潮、百度、广域铭岛等来自低代码开发平台解决方案供应商、用户方、科研院所等近30家相关单位的40余位专家参与了现场编制…

android studio gradle build running慢 卡住不动 失败 原因与解决方式

快速导航 分析原因解决办法 分析原因 主要原因是 gradle 构建时无法从网络获取需要的包或库。 解决办法 将国外库替换为阿里云镜像库。 例如 google 对应的库是 maven { url ‘https://maven.aliyun.com/repository/google’ }

基于决策树(Decision Tree)的乳腺癌诊断

决策树(DecisionTree)学习是以实例为基础的归纳学习算法。算法从--组无序、无规则的事例中推理出决策树表示形式的分类规则,决策树也能表示为多个If-Then规则。一般在决策树中采用“自顶向下、分而治之”的递归方式,将搜索空间分为若千个互不相交的子集,在决策树的内部节点(非叶…

容灾双活方案,异地容灾备份与双活

数据信息的安全性和完整性面临着硬件问题、病毒入侵、自然灾害等各种威胁。为了应对这些威胁&#xff0c;公司需要采取有效的数据保护措施&#xff0c;其中特别重要的是外部容灾备份和双活技术。  让我们来看看其他地方的容灾备份。这是一种可以将数据复制到避免初始区域的设…

IO day 7

1、使用消息队列完成两个进程间相互通信 msgsnd #include <myhead.h>typedef struct {long msgtype;char data[1024]; }Msg_ds;#define SIZE sizeof(Msg_ds)-sizeof(long)int main(int argc, const char *argv[]) {//创建key值key_t key;if((key ftok("/",k…

物通博联嵌入式数据采集网关采集传感器的数据上传到云端

在当今的物联网&#xff08;IoT&#xff09;时代&#xff0c;各种传感器广泛应用于各种工业领域。传感器数据采集是实现自动化生产的基础&#xff0c;可以为企业决策提供科学的数据支持&#xff0c;通过各类智能传感器采集传输终端&#xff0c;将采集的传感器数据实时传输到设备…

如何将应用程序发布到 App Store

憧憬blog主页 在强者的眼中&#xff0c;没有最好&#xff0c;只有更好。我们是移动开发领域的优质创作者&#xff0c;同时也是阿里云专家博主。 ✨ 关注我们的主页&#xff0c;探索iOS开发的无限可能&#xff01; &#x1f525;我们与您分享最新的技术洞察和实战经验&#xff0…

电工-学习电工有哪些好处

学习电工有哪些好处&#xff1f;在哪学习电工&#xff1f; 学习电工有哪些好处&#xff1f;在哪学习电工&#xff1f;学习电工可以做什么&#xff1f;优势有哪些&#xff1f; 学习电工可以做什么&#xff1f;学习电工有哪些好处&#xff1f; 就业去向&#xff1a;可在企业单位…