首先我们要打开linux上的zookeeper集群和kafka集群,然后使用java调用相关的API实现发送消息即生产者的功能,代码如下:
package com.bigdata;
import com.alibaba.fastjson2.JSON;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.*;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.*;
public class Simulator {
// 教材ID
private static String[] arr2 = {"TextBookID_1", "TextBookID_2"};
// 年级ID
private static String[] arr3 = {"GradeID_1", "GradeID_2", "GradeID_3", "GradeID_4", "GradeID_5", "GradeID_6"};
// 科目ID
private static String[] arr4 = {"SubjectID_1_Math", "SubjectID_2_Chinese", "SubjectID_3_English"};
// 章节ID
private static String[] arr5 = {"ChapterID_chapter_1", "ChapterID_chapter_2", "ChapterID_chapter_3"};
static List<String> stuList = new ArrayList<String>();
static {
for (int i = 1; i <= 50; i++) {
stuList.add("StudentID_" + i);
}
}
static List<String> questionList = new ArrayList<String>();
static {
for (int i = 1; i <= 20; i++) {
questionList.add("QuestionID_" + i);
}
}
// 编写一个方法,调用一次就获取一个问答数据
public static Answer getAnswer(){
Random random = new Random();
int stuIndex = random.nextInt(stuList.size());
String studentId = stuList.get(stuIndex);
int textBookIndex = random.nextInt(arr2.length);
String textBookId = arr2[textBookIndex];
String gradeID = arr3[random.nextInt(arr3.length)];
String subjectID = arr4[random.nextInt(arr4.length)];
String chapterID = arr5[random.nextInt(arr5.length)];
String questionId = questionList.get(random.nextInt(questionList.size()));
int score = random.nextInt(101);
long ts = System.currentTimeMillis();
Timestamp timestamp = new Timestamp(ts);
Date date = new Date();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String answerTime = dateFormat.format(date);
Answer answer = new Answer(studentId,textBookId,gradeID,subjectID,chapterID,questionId,score,answerTime,timestamp);
return answer;
}
public static void main(String[] args) throws Exception{
/*Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");*/
//KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
File file = new File("D:\\2024培训代码\\kafka_project\\src\\main\\Data\\question_info.json");
System.out.println(file.getAbsolutePath());
FileWriter writer = new FileWriter(file);
for (int i = 0; i < 2000; i++) {
Answer answer = getAnswer();
String answerJson = JSON.toJSONString(answer);
System.out.println(answerJson);
//Thread.sleep(1000);
writer.write(answerJson+"\n");
writer.flush();
//ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("dhedu",answerJson);
//kafkaProducer.send(producerRecord);
}
writer.close();
}
}
注:数据为java代码随机生成的一些数据,可以选取自己文件夹里面的数据,另外端口号。主机名,以及topic名称以及生成数据后保存的文件路径需要换成自己的。其他代码均可cv亲测无误,而且代码比较固定可以自己整合成一个模板来写非常方便,另外kafka-producer相关代码已经被注释,需要释放开才可使用。运行后打开消费端消费所指定的topic,检查是否发送成功。
structuredStreaming&&kafka实时消费者端代码:
import os
# -*- coding: utf-8 -*-
import sys
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
"""
------------------------------------------
Description : TODO:
SourceFile : _02Demo
Author : BlackCat
Date : 2024/11/14
-------------------------------------------
"""
if __name__ == '__main__':
spark = SparkSession.builder.master("local[2]").appName("streamingkafka").config(
"spark.sql.shuffle.partitions", 2).getOrCreate()
readDf = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "bigdata01:9092") \
.option("subscribe", "topincc") \
.load()
# etlDf = readDf.selectExpr("CAST(value AS STRING)").filter(F.col("value").contains("success"))
readDf.createOrReplaceTempView("temp_donghu")
etlDf = spark.sql("""
select cast(value as string) from temp_donghu where cast(value as string) like '%success%'
""")
# etlDf.writeStream \
# .format("console") \
# .outputMode("append") \
# .option("truncate", "false") \
# .start().awaitTermination()
etlDf.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "bigdata01:9092") \
.option("topic", "yuekao2") \
.option("checkpointLocation", "./ckp") \
.start().awaitTermination()
spark.stop()
注:端口号,主机名,以及topic名称以及生成数据后保存的文件路径需要换成自己的,其他代码均可cv亲测无误。另外os端指定路径代码已省略,它会走系统默认,不用写即可。可直接运行消费生成者端发送的数据。