StructuredStreamingKafka实现实时计算

首先我们要打开linux上的zookeeper集群和kafka集群,然后使用java调用相关的API实现发送消息即生产者的功能,代码如下:

package com.bigdata;

import com.alibaba.fastjson2.JSON;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.*;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.*;

public class Simulator {

    // 教材ID
    private static String[] arr2 = {"TextBookID_1", "TextBookID_2"};
    // 年级ID
    private static String[] arr3 = {"GradeID_1", "GradeID_2", "GradeID_3", "GradeID_4", "GradeID_5", "GradeID_6"};
    // 科目ID
    private static String[] arr4 = {"SubjectID_1_Math", "SubjectID_2_Chinese", "SubjectID_3_English"};
    // 章节ID
    private static String[] arr5 = {"ChapterID_chapter_1", "ChapterID_chapter_2", "ChapterID_chapter_3"};

    static List<String> stuList = new ArrayList<String>();
    static {
        for (int i = 1; i <= 50; i++) {
            stuList.add("StudentID_" + i);
        }

    }

    static List<String> questionList = new ArrayList<String>();
    static {
        for (int i = 1; i <= 20; i++) {
            questionList.add("QuestionID_" + i);
        }

    }

    // 编写一个方法,调用一次就获取一个问答数据
    public static Answer getAnswer(){
        Random random = new Random();
        int stuIndex = random.nextInt(stuList.size());
        String studentId = stuList.get(stuIndex);
        int textBookIndex = random.nextInt(arr2.length);
        String textBookId = arr2[textBookIndex];

        String gradeID = arr3[random.nextInt(arr3.length)];

        String subjectID = arr4[random.nextInt(arr4.length)];
        String chapterID = arr5[random.nextInt(arr5.length)];

        String questionId = questionList.get(random.nextInt(questionList.size()));
        int score = random.nextInt(101);

        long ts = System.currentTimeMillis();
        Timestamp timestamp = new Timestamp(ts);
        Date date = new Date();
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String answerTime = dateFormat.format(date);

        Answer answer = new Answer(studentId,textBookId,gradeID,subjectID,chapterID,questionId,score,answerTime,timestamp);
        return answer;
    }

    public static void main(String[] args) throws Exception{
        /*Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");*/

        //KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        File file = new File("D:\\2024培训代码\\kafka_project\\src\\main\\Data\\question_info.json");
        System.out.println(file.getAbsolutePath());
        FileWriter writer = new FileWriter(file);


        for (int i = 0; i < 2000; i++) {
            Answer answer = getAnswer();
            String answerJson = JSON.toJSONString(answer);
            System.out.println(answerJson);
            //Thread.sleep(1000);
            writer.write(answerJson+"\n");
            writer.flush();
            //ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("dhedu",answerJson);
            //kafkaProducer.send(producerRecord);
        }
        writer.close();
    }
}

注:数据为java代码随机生成的一些数据,可以选取自己文件夹里面的数据,另外端口号。主机名,以及topic名称以及生成数据后保存的文件路径需要换成自己的。其他代码均可cv亲测无误,而且代码比较固定可以自己整合成一个模板来写非常方便,另外kafka-producer相关代码已经被注释,需要释放开才可使用。运行后打开消费端消费所指定的topic,检查是否发送成功。

structuredStreaming&&kafka实时消费者端代码:

import os
# -*- coding: utf-8 -*-

import sys

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

"""
------------------------------------------
  Description : TODO:
  SourceFile : _02Demo
  Author  : BlackCat
  Date  : 2024/11/14
-------------------------------------------
"""
if __name__ == '__main__':
    spark = SparkSession.builder.master("local[2]").appName("streamingkafka").config(
        "spark.sql.shuffle.partitions", 2).getOrCreate()

    readDf = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "bigdata01:9092") \
        .option("subscribe", "topincc") \
        .load()

    # etlDf = readDf.selectExpr("CAST(value AS STRING)").filter(F.col("value").contains("success"))
    readDf.createOrReplaceTempView("temp_donghu")
    etlDf = spark.sql("""
            select cast(value as string) from temp_donghu where cast(value as string) like '%success%'
        """)

    # etlDf.writeStream \
    #    .format("console") \
    #    .outputMode("append") \
    #    .option("truncate", "false") \
    #    .start().awaitTermination()

    etlDf.writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "bigdata01:9092") \
        .option("topic", "yuekao2") \
        .option("checkpointLocation", "./ckp") \
        .start().awaitTermination()

    spark.stop()

注:端口号,主机名,以及topic名称以及生成数据后保存的文件路径需要换成自己的,其他代码均可cv亲测无误。另外os端指定路径代码已省略,它会走系统默认,不用写即可。可直接运行消费生成者端发送的数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/920504.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

OceanBase V4.x应用实践:如何排查表被锁问题

DBA在日常工作中常常会面临以下两种常见情况&#xff1a; 业务人员会提出问题&#xff1a;“表被锁了&#xff0c;导致业务受阻&#xff0c;请帮忙解决。” 业务人员还会反馈&#xff1a;“某个程序通常几秒内就能执行完毕&#xff0c;但现在却运行了好几分钟&#xff0c;不清楚…

同三维T80003JEHS 4K/60帧HDMI/SDI超高清H.265解码器

1路HDMI和1路SDI输出&#xff0c;1路3.5音频输入和1路3.5音频输出&#xff0c;1个USB2.0口1个USB3.0口&#xff0c;带1个RS232串口&#xff0c;2个网口&#xff0c;支持1路4K60或4路4K30或16路1080P或32路720P解码输出。4种画面分割显示模式。 产品简介&#xff1a; 同三维T80…

【更新中】《硬件架构的艺术》笔记(五):低功耗设计

介绍 能量以热量形式消耗&#xff0c;温度升高芯片失效率也会增加&#xff0c;增加散热片或风扇会增加整体重量和成本&#xff0c;在SoC级别对功耗进行控制就可以减少甚至可能消除掉这些开支&#xff0c;产品也更小更便宜更可靠。本章描述了减少动态功耗和静态功耗的各种技术。…

网络安全审计概述与分类

目录 网络安全审计概述等保五个级别对审计要求网络安全审计系统组成网络安全审计系统类型 网络安全审计概述 4A分别是认证、授权、账号、审计 网络安全审计是指对网络信息系统的安全相关活动信息进行获取、记录、存储分析和利用的工作。 网络安全审计的作用在于建立“事后”…

安宝特方案 | AR助力紧急救援,科技守卫生命每一刻!

在生死时速的紧急救援战场上&#xff0c;每一秒都至关重要&#xff01;随着科技的发展&#xff0c;增强现实&#xff08;AR&#xff09;技术正在逐步渗透到医疗健康领域&#xff0c;改变着传统的医疗服务模式。 安宝特AR远程协助解决方案&#xff0c;凭借其先进的技术支持和创新…

IDEA:2023版远程服务器debug

很简单&#xff0c;但是很多文档没有写清楚&#xff0c;wocao 一、首先新建一个远程jvm 二、配置 三、把上面的参数复制出来 -agentlib:jdwptransportdt_socket,servery,suspendn,address5005 四、然后把这串代码放到服务器中&#xff08;这里的0.0.0.0意思是所有IP都能访问&a…

Midjourney基础命令和提示词

1 基础命令 1.1 /imagine prompt 生成图片的核心命令&#xff0c;prompt 后输入描述。 /imagine prompt: A majestic dragon flying over a misty mountain, cinematic lighting, 4K resolution 高级提示 1.1.1 基本参数 图片比例 --ar 图片比例 混乱 Aspect Ratios --…

ElasticSearch7.x入门教程之索引概念和基础操作(三)

文章目录 前言一、索引基本概念二、索引基本使用elasticsearch-head插件Kibana使用 总结 前言 要想熟悉使用ES的索引&#xff0c;则必须理解索引相关的概念&#xff0c;尤其是在工作当中。 在此记录&#xff0c;方便开展工作。 一、索引基本概念 尽量以通俗的话语。 1、集群…

【SQL50】day 2

目录 1.每位经理的下属员工数量 2.员工的直属部门 3.判断三角形 4.上级经理已离职的公司员工 5.换座位 6.电影评分 7.修复表中的名字 8.患某种疾病的患者 9.删除重复的电子邮箱 1.每位经理的下属员工数量 # Write your MySQL query statement below #e1是经理&#xff0c;…

基于 RocketMQ 实现 AMQP 协议实践

导语 在Apache CoC 2024 杭州站大会中&#xff0c;腾讯云高级工程师张乐为与会者带来了精彩的演讲。围绕《基于 RocketMQ 底座实现 AMQP 协议》的背景、目标、方案设计以及几个核心技术实现做了详细的阐述。 作者简介 张乐 腾讯高级工程师&#xff0c;负责腾讯云 RabbitMQ S…

python成绩分级 2024年6月python二级真题 青少年编程电子学会编程等级考试python二级真题解析

目录 python成绩分级 一、题目要求 1、编程实现 2、输入输出 二、算法分析 三、程序代码 四、程序说明 五、运行结果 六、考点分析 七、 推荐资料 1、蓝桥杯比赛 2、考级资料 3、其它资料 python成绩分级 2024年6月 python编程等级考试二级编程题 一、题目要求 …

我做了一份斯坦福CS229吴恩达机器学习笔记

吴恩达教授的机器学习课程&#xff0c;可以说是AI领域的一块金字招牌。这门在斯坦福大学开设的课程&#xff0c;历经十余年&#xff0c;依旧是机器学习入门的经典之作。 记得当年&#xff0c;这门课火爆到吴恩达教授不得不将其搬到线上&#xff0c;结果不仅在斯坦福&#xff0c…

ABAP开发学习——SNRO

SAP凭证号码的指定分为外部给号和内部给号。 Internal number range即内部给号,指系统根据预先维护好的号码范围&#xff08;只能是阿拉伯数字&#xff09;依序给号,给出已有数字的下一个编号。 External number range即外部给号,后台配置时指指定一个号码范围&#xff08;可以…

SpringBoot3整合Hutool-captcha实现图形验证码

文章目录 验证码需求分析:项目创建import方式的使用说明exclude方式定义接口:接口定义定义 CaptchaController前端代码在整合技术框架的时候,想找一个图形验证码相关的框架,看到很多验证码不在更新了或者是在中央仓库下载不下来,还需要多引入依赖。后面看到了Hutool **图形…

mysql-connector-java的jar包的下载方法汇总

方法一&#xff1a; 网址&#xff1a;http://mvnrepository.com/artifact/mysql/mysql-connector-java 1.进去后选择自己的版本&#xff1a; 2.然后再点击 3. 需要下载其他的jar包&#xff08;或者依赖&#xff09;都是在此网址中可以下载到的 3.1 3.2 3.3 3.4 方法二&#…

Spring Cloud Stream实现数据流处理

1.什么是Spring Cloud Stream&#xff1f; 我看很多回答都是“为了屏蔽消息队列的差异&#xff0c;使我们在使用消息队列的时候能够用统一的一套API&#xff0c;无需关心具体的消息队列实现”。 这样理解是有些不全面的&#xff0c;Spring Cloud Stream的核心是Stream&#xf…

i春秋-签到题

练习平台地址 竞赛中心 题目描述 题目内容 点击GUESS后会有辨识细菌的选择题 全部完成后会有弹窗提示 输入nickname后提示获得flag F12检查 元素中没有发现信息 检查后发现flag在控制台中 flag flag{663a5c95-3050-4c3a-bb6e-bc4f2fb6c32e} 注意事项 flag不一定要在元素中找&a…

无人机 PX4飞控 | CUAV 7-Nano 飞行控制器介绍与使用

无人机 PX4飞控 | CUAV 7-Nano 飞行控制器介绍与使用 7-Nano简介硬件参数接口定义模块连接供电部分遥控器电机 固件安装 7-Nano简介 7-Nano是一款针对小型化无人系统设备研发的微型自动驾驶仪。它由雷迅创新自主研发和生产&#xff0c;其创新性的采用叠层设计&#xff0c;在极…

怎么编译OpenWrt镜像?-基于Widora开发板

1.准备相应的环境&#xff0c;我使用的环境是VMware16ubuntu20.04&#xff0c;如图1所示安装编译所需的依赖包&#xff1b; sudo apt-get install build-essential asciidoc binutils bzip2 gawk gettext git libncurses5-dev libz-dev patch python3 python2.7 unzip zlib1g-…

优化表单交互:在 el-select 组件中嵌入表格显示选项

介绍了一种通过 el-select 插槽实现表格样式数据展示的方案&#xff0c;可更直观地辅助用户选择。支持列配置、行数据绑定及自定义搜索&#xff0c;简洁高效&#xff0c;适用于复杂选择场景。完整代码见GitHub 仓库。 背景 在进行业务开发选择订单时&#xff0c;如果单纯的根…