【头歌实训】PySpark Streaming 数据源

文章目录

  • 第1关:MySQL 数据源
    • 任务描述
    • 相关知识
      • PySpark JDBC 概述
      • PySpark JDBC
      • PySpark Streaming JDBC
    • 编程要求
    • 测试说明
    • 答案代码
  • 第2关:Kafka 数据源
    • 任务描述
    • 相关知识
      • Kafka 概述
      • Kafka 使用基础
      • PySpark Streaming Kafka
    • 编程要求
    • 测试说明
    • 答案代码

第1关:MySQL 数据源

任务描述

本关任务:读取套接字流数据,完成词频统计,将结果写入 Mysql 中。

相关知识

为了完成本关任务,你需要掌握:

  1. PySpark JDBC 概述;
  2. PySpark JDBC;
  3. PySpark Streaming JDBC。

PySpark JDBC 概述

在 PySpark 中支持通过 JDBC 的方式连接到其他数据库获取数据生成 DataFrame,当然也同样可以使用 Spark SQL 去读写数据库。除了 JDBC 数据源外,还支持 ParquetJSONHive 等数据源。

PySpark JDBC

在学习 PySpark Streaming JDBC 之前,我们先来了解一下在 PySpark 中如何使用 JDBC。

需求:

  • 读取 Mysql 中的数据;
  • 往 Mysql 中写入数据。

首先,打开右侧命令行窗口,等待连接后,进入 MySQL,任意创建一个库,在该库中任意创建一张表,任意写入一些数据。

# 启动 mysql 服务
service mysql start
# 进入 mysql
mysql -uroot -p123123
# 创建 test 库
create database if not exists test;
# 创建表
use test;
create table if not exists student(
id int,
name varchar(50),
class varchar(50));
# 数据写入
insert into  student values(1,"zhangsan","A");
insert into  student values(2,"lisi","B");
insert into  student values(3,"wangwu","C");

创建完成后,进入 python3 shell 界面。

python3

,

开始编写程序,第一步,先导入相关包

from findspark import init
init()
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

第二步,创建 Spark 对象

spark = SparkSession.builder.appName("read_mysql").master("local[*]").getOrCreate()

第三步,读取 Mysql 中的数据

dataFrame = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver")  .option("url", "jdbc:mysql://localhost:3306/test").option("dbtable", "student")  .option("user", "root").option("password", "123123").load()

第四步,输出读取的数据

# 注意,show() 方法默认只会显示前 20 行数据。
dataFrame.show()

输出结果如图所示: ,

第五步,将读取的数据以追加的方式写入库中

dataFrame.write.format("jdbc").option("driver", "com.mysql.jdbc.Driver")  .option("url", "jdbc:mysql://localhost:3306/test").option("dbtable", "student").option("user", "root").option("password", "123123").mode(saveMode="append").save()

进入 Mysql 中查看结果:

# 进入 Mysql
mysql -uroot -p123123
# 查询数据
select * from test.student;

,

PySpark Streaming JDBC

通过对 PySpark JDBC 的学习,我们了解了在 Python 中是如何使用 JDBC 的,现在来学习 PySpark Streaming JDBC 的连接方式。

需求:通过读取套接字流,进行词频统计,将数据写入 Mysql 中。

首先,打开右侧命令行窗口,等待连接后,进入 MySQL,创建 spark 库,在该库中创建 wordcount 表。

# 启动 mysql 服务
service mysql start
# 进入 mysql
mysql -uroot -p123123
# 创建 test 库
create database if not exists spark;
# 创建表
use spark;
create table if not exists wordcount(
word varchar(50),
count int);

创建完成后,进入主目录 /root,创建代码文件 mysql.py,对其进行编辑。

cd /root
vi mysql.py

开始编写程序,第一步,先导入相关包

from findspark import init
init()
import time
import pymysql
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

第二步,创建 Spark 环境与检查点

sc = SparkContext(appName="mysql_streaming", master="local[*]")
ssc = StreamingContext(sc, 10)
# 设置套接字流信息
inputStream = ssc.socketTextStream("localhost", 7777)
# 设置检查点
ssc.checkpoint("/usr/local/spark")

第三步,对数据进行相关操作

# 累加器(状态更新)
def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)
pairs = inputStream.flatMap(lambda x: x.split(" ")).filter(lambda x: x != "").map(lambda x: (x, 1))
wordCounts = pairs.updateStateByKey(updateFunction)
wordCounts.pprint(100)

第四步,写入 Mysql 处理

def dbfunc(records):
    db = pymysql.connect("localhost", "root", "123123", "spark")
    cursor = db.cursor()
    
    def doinsert(p):
        sql = "insert into wordcount(word,count) values ('%s', '%s')" % (str(p[0]), str(p[1]))
        try:
            cursor.execute(sql)
            db.commit()
        except:
            db.rollback()
    for item in records:
        doinsert(item)
def func(rdd):
    repartitionedRDD = rdd.repartition(3)
    repartitionedRDD.foreachPartition(dbfunc)
wordCounts.foreachRDD(func=func)

第五步,启动与停止

ssc.start()
time.sleep(30)
ssc.stop()

第六步,新增一个命令行窗口,启动数据流服务

nc -l -p 7777

第七步,返回代码文件窗口,运行程序

python3 /root/mysql.py

第八步,程序启动后,切换到数据流服务窗口,输入如下数据:

hello pyspark
hello pyspark streaming
hello jdbc

程序结束后,进入 Mysql 中查看结果:

# 进入 Mysql
mysql -uroot -p123123
# 查询数据
select distinct(word),count from spark.wordcount;

结果如图所示:

,

编程要求

打开右侧代码文件窗口,在 BeginEnd 区域补充代码,执行程序,读取套接字流数据,按空格进行分词,完成词频统计。在 Mysql 中创建 work 数据库,在该库中创建表 wordcount,添加字段 word(字符型),字段 count(整型),将词频统计结果写入该表中。

代码文件目录: /data/workspace/myshixun/project/step1/work.py

套接字流相关信息:

  • 地址:localhost
  • 端口:8888
  • 输入数据:

待程序启动后(5s),请在 60 秒内写入数据,如果需要调整时间,你可以通过修改代码文件中 time.sleep(60) 来指定时间。

When summer comes, people like to go to the beach and play in the seawater.
It is such a good way to drive away the hotness.
But it has been reported that many people drawn while they were swimming on the beach. 
The people who died were good at swimming, the reason they got killed was the invisible demon under the seawater. 
In the afternoon, there are some vortexes under the seawater, which people can’t see. 
When people go swimming, they will be absorbed by the vortexes, even though they are good at swimming, they can’t resist the strong power.
So when we go to play in the beach, we must take care.

输入内容后,注意按回车。

Mysql 信息:

  • 账号:root
  • 密码:123123
  • 地址:localhost
  • 端口:3306

请在程序运行完成后再进行评测,否则会影响最终结果。

测试说明

平台将对你编写的代码进行评测,如果与预期结果一致,则通关,否则测试失败。

答案代码

from findspark import init
init()
import time
import pymysql
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext(appName="mysql_streaming", master="local[*]")

ssc = StreamingContext(sc, 10)

# 设置检查点
ssc.checkpoint("/usr/local/work")

# 累加器(状态更新)
def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)

# 设置套接字流
############### Begin ###############
inputStream = ssc.socketTextStream("localhost", 8888)

############### End ###############

pairs = inputStream.flatMap(lambda x: x.split(" ")).filter(lambda x: x != "").map(lambda word: (word, 1))

wordCounts = pairs.updateStateByKey(updateFunction)

wordCounts.pprint(100)

def dbfunc(records):

# 根据传入的 records 参数,完成数据写入 Mysql 操作

############### Begin ###############
	# 连接 MySQL 数据库
    connection = pymysql.connect(
        host='localhost',
        user='root',
        password='123123',
        database='work',
        port=3306,
    )

    with connection.cursor() as cursor:
        # 根据传入的 records 参数,完成数据写入 Mysql 操作
        for record in records:
            word, count = record
            cursor.execute('INSERT INTO wordcount (word, count) VALUES (%s, %s)', (word, count))

    connection.commit()
    connection.close()

############### End ###############

# 分区设置
def func(rdd):
    repartitionedRDD = rdd.repartition(3)
    repartitionedRDD.foreachPartition(dbfunc)

wordCounts.foreachRDD(func=func)

ssc.start()

time.sleep(60)

ssc.stop() 

打开一个命令行窗口

# 启动 mysql 服务
service mysql start
# 进入 mysql
mysql -uroot -p123123
# 创建 test 库
create database if not exists work;
# 创建表
use work;
create table if not exists wordcount(
    word varchar(50),
    count int
);
# 退出 mysql
exit
# 创建检查点目录
mkdir -p /usr/local/work/
nc -l -p 8888

再打开一个窗口

chmod 777 /data/workspace/myshixun/project/step1/work.py
python3 /data/workspace/myshixun/project/step1/work.py # 现在开始运行代码文件,请在 60 秒内创建文件并写入下面数据

回到第一个窗口,把下面数据粘贴上去再打一个回车

When summer comes, people like to go to the beach and play in the seawater.
It is such a good way to drive away the hotness.
But it has been reported that many people drawn while they were swimming on the beach. 
The people who died were good at swimming, the reason they got killed was the invisible demon under the seawater. 
In the afternoon, there are some vortexes under the seawater, which people can’t see. 
When people go swimming, they will be absorbed by the vortexes, even though they are good at swimming, they can’t resist the strong power.
So when we go to play in the beach, we must take care.

第2关:Kafka 数据源

任务描述

本关任务:读取 Kafka 生产的数据,完成输出。

相关知识

为了完成本关任务,你需要掌握:

  1. Kafka 概述;
  2. Kafka 使用基础;
  3. PySpark Streaming Kafka。

Kafka 概述

Kafka 就是一个分布式的用于消息存储的发布订阅模式的消息队列。一般用于大数据的流式处理中。 具有高水平扩展性、高容错性、访问速度快、分布式等特性,主要应用场景是日志收集系统和消息系统。但是随着 Kafka 的快速发展,也被应用于高性能数据管道、数据集成、流分析等。

img

Kafka 使用基础

在学习 Pyspark streaming Kafka 之前,我们先来学习一下 Kafka 的使用基础。

首先,打开右侧命令行窗口,等待连接后,启动 Kafka 服务

# kafka 依赖 zookeeper,所以需要先启动 zookeeper 服务
cd /opt/zookeeper
bin/zkServer.sh start conf/zoo.cfg
# 启动 Kafka 服务
cd /opt/kafka
bin/kafka-server-start.sh -daemon config/server.properties

检查服务是否启动成功,输入 jps 后,出现如下所示,表示启动成功: ,

创建 topic

bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic first 

这个 topicfirst2181zookeeper 默认的端口号,partitiontopic 里面的分区数,replication-factor 是备份的数量,在 kafka 集群中使用,这里单机版就不用备份了。

查看当前服务器中的所有 topic

bin/kafka-topics.sh --zookeeper localhost:2181 --list 

,

创建 producer 生产者

在 xxx 节点发送消息。

bin/kafka-console-producer.sh --broker-list xxx:9092 --topic first

创建 consumer 消费者

在 xxx 节点接收消息。

bin/kafka-console-consumer.sh --zookeeper xxx:2181 --from-beginning --topic first

删除 topic

bin/kafka-topics.sh --zookeeper master:2181 --delete --topic first

PySpark Streaming Kafka

通过对 Kafka 基础使用的学习,现在来通过一个案例学习在 PySpark Streaming 中如何连接 Kafka。

需求:消费 Kafka 生产的数据,完成输出。

首先,打开右侧命令行窗口,等待连接后,启动 Kafka 服务。

# 启动 zookeeper 服务
cd /opt/zookeeper
bin/zkServer.sh start conf/zoo.cfg
# 启动 Kafka 服务
cd /opt/kafka
bin/kafka-server-start.sh -daemon config/server.properties

创建 topic

bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic test

新增一个命令行窗口,等待连接后,在 /root 目录下创建 test.py 程序文件

cd /root
touch test.py

编辑文件 test.py,编写程序,第一步,导入相关文件包。

from pyspark.sql import SparkSession

第二步,创建 Spark 环境

spark=SparkSession.builder.appName("kafka_stream").master("local[*]").getOrCreate()

第三步,创建 Kafka 数据流

在 pyspark 中,我们通过 KafkaUtils.createStream() 方法创建 Kafka 数据流,但该方法在 Spark 3.0 中以及弃用,现在采用 spark.readStream.format("kafka") 方法来创建 Kafka 数据流。

df = spark \
    .readStream \
    .format("kafka") \
    # 绑定 Kafka 生产地址
    .option("kafka.bootstrap.servers", "localhost:9092") \
    # 订阅 topic
    .option("subscribe", "test") \
    # 设置偏移量(最新)
    .option("startingOffsets","latest") \
    .load()

第四步,收集数据

table = df.selectExpr("CAST(value AS STRING)")

第五步,输出到屏幕,启动程序

table.writeStream \
    # 指定监听间隔时间
    .trigger(processingTime='5 seconds') \
    # 输出方式
    .outputMode("append") \
    # 不将内容进行清空
    .option("truncate", "false")\
    .format("console") \
    .start() \
    # 60 秒后停止程序
    .awaitTermination(timeout=60)

编写完程序后,保存退出,切换到 Kafka 服务的命令行窗口,创建生产者。

cd /opt/kafka
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

第六步,返回编写程序文件的命令行窗口,运行程序

spark-submit --master local[*] --driver-class-path /opt/kafka/libs/kafka-clients-2.8.0.jar --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.2 --jars /opt/spark/jars/spark-streaming-kafka-0-10-assembly_2,12-3,0,2.jar --py-files test.zip test.py 

注意,运行程序前需要先压缩程序文件,压缩命令语法如下:

zip  压缩后文件名.zip  原文件名

第七步,程序启动后,切换到 Kafka 数据流服务窗口,输入如下数据:

hello kafka
hello pyspark streaming
I love big data

结果如图所示:

,

编程要求

打开右侧代码文件窗口,在 BeginEnd 区域补充代码。 在 Kafka 中创建一个 topic,作为一个生产者,完善程序,读取 Kafka 流数据并以 append 方式输出。通过 spark-submit 的方式运行代码文件,将输出信息保存到 /data/workspace/myshixun/project/step2/result.txt 结果文件中。

代码文件目录: /data/workspace/myshixun/project/step2/work.py

Kafka 相关信息:

  • Kafka 主目录:/opt/kafka
  • Zookeeper 主目录:/opt/zookeeper
  • Zookeeper 地址:localhost:2181

Kafka 输入内容:

程序启动后(15s左右),请在 60 秒内写入数据,如果需要调整时间,你可以通过修改代码文件中 .awaitTermination(timeout=60)timeout 指定时间。

Hello world!
Hello python!
Hello spark!
Hello Kafka!
I love bigdata.

提交命令:

注意压缩文件。

spark-submit --master local[*] --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.2 --py-files xxx.zip xxx.py > /data/workspace/myshixun/project/step2/result.txt

请等待程序运行完成后进行评测,否则会影响最终结果。

测试说明

平台将对你编写的代码进行评测,如果与预期结果一致,则通关,否则测试失败。

答案代码

from pyspark.sql import SparkSession
  
spark = SparkSession.builder.appName("kafka_stream").master("local[*]").getOrCreate()

############### Begin ###############

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "test") \
    .option("startingOffsets","latest") \
    .load()


table = df.selectExpr("CAST(value AS STRING) as message")

table.writeStream \
    .trigger(processingTime='5 seconds') \
    .outputMode("append") \
    .option("truncate", "false")\
    .format("console") \
    .start() \
    .awaitTermination(timeout=60) 

############### Begin ###############


进入右侧命令行窗口

# kafka 依赖 zookeeper,所以需要先启动 zookeeper 服务
cd /opt/zookeeper
bin/zkServer.sh start conf/zoo.cfg
# 启动 Kafka 服务
cd /opt/kafka
bin/kafka-server-start.sh -daemon config/server.properties
# 创建 topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic test
# 创建 producer 生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

再打开一个命令行窗口

cd /data/workspace/myshixun/project/step2/
zip work.zip work.py
spark-submit --master local[*] --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.2 --py-files work.zip work.py > result.txt

回到前一个命令行窗口,在程序启动 15s 左右时间后再填入下面数据,并且在 60s 内完成写入

Hello world!
Hello python!
Hello spark!
Hello Kafka!
I love bigdata.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/274235.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

亿赛通电子文档安全管理系统 linkfilterservice 未授权漏洞

产品简介 亿赛通电子文档安全管理系统,(简称:CDG)是一款电子文档安全加密软件,该系统利用驱动层透明加密技术,通过对电子文档的加密保护,防止内部员工泄密和外部人员非法窃取企业核心重要数据资…

在Spring Cloud中使用Ribbon完成一个简单的负载均衡demo

Spring Cloud系列断更了有一段时间了,这段时间最近都在忙着项目上的事,天天修复bug以及调整需求,反正各种操劳,了解业务需求,然后开发相关功能,很久都没碰Spring Cloud系列的相关文章了,最近回头…

ASP.Net实现新闻添加查询(三层架构,含照片)

目录 演示功能: 点击启动生成页面 点击搜索模糊查询 点击添加跳转新界面 ​编辑 点击Button添加 步骤: 1、建文件 ​编辑 2、添加引用关系 3、根据数据库中的列写Models下的XueshengModels类 4、DAL下的DBHelper(对数据库进行操作…

Win10电脑蓝牙默认音量100的设置教程

在Win10电脑操作过程中,用户想设置连接蓝牙后音量默认是100,但不知道具体的设置操作步骤。这时候用户需要打开Win10系统上的注册表,点击修改注册表来完成这一设置,下面就是Win10电脑蓝牙默认音量100的设置教程介绍,帮助…

【网络安全 | 指纹识别工具】WhatWeb使用详析

前言 WhatWeb 是一款用于识别 Web 应用程序和 Web 服务器的开源工具。它可以识别网站使用的编程语言、Web 框架、Web 服务器软件、Web 应用程序等信息,从而帮助安全测试人员快速了解目标网站的技术特征,发现可能存在的漏洞。 本文将对 WhatWeb 的使用方法…

使用Rust发送邮件

SMTP协议与MIME协议 SMTP(简单邮件传输协议,Simple Mail Transfer Protocol)是一种用于发送和接收电子邮件的互联网标准通信协议。它定义了电子邮件服务器如何相互发送、接收和中继邮件。SMTP 通常用于发送邮件,而邮件的接收通常由 POP&#…

在wps里导入Mathtype、改变字体

1 在wps里导入Mathtype 开发工具--加载项 2. 在“模板和加载项”窗口中再点击“添加” 3.找到mathtype安装路径下面的“OfficeSupprot”,这时会看到有“32”和“64”两个文件夹,分别对应WPS软件的系统(任务管理器可以直接查看wps版本&#x…

three.js实现3D汽车展厅效果展示

项目搭建 本案例还是借助框架书写three项目,借用vite构建工具搭建vue项目,搭建完成之后,用编辑器打开该项目,在终端执行 npm i 安装一下依赖,安装完成之后终端在安装 npm i three 即可。 因为我搭建的是vue3项目&…

理解 Go Mod Init

初始化Go模块和管理依赖的全面指南 go mod init 是Go编程语言(通常称为Golang)中用于初始化新Go模块的命令。在Go中,一个模块是一组相关的Go包,它们作为一个单元一起进行版本控制。通常,在项目目录的根目录下使用 go m…

NLP论文阅读记录 - 01 | 2021 神经抽象摘要方法及摘要事实一致性综述

文章目录 前言0、论文摘要一、Introduction二.背景2.1自动总结任务2.2 数据集DUC-2004Gigaword [Graff et al., 2003, Napoles et al., 2012]CNN/DailyMail [Nallapati 等人,2016]XSum [Narayan 等人,2018] 2.3 摘要系统的评估2.3.1 Rouge [Lin, 2004] 三…

用编程解决习题【计算机图像处理】

用编程解决习题【计算机图像处理】 前言版权第三章 03采样量化与像素间关系作业编程 第六章 06图像的直方图变换作业编程 第七章 07图像的噪声抑制作业编程 第十章 10二值图像的分析作业编程 最后 前言 2023-12-27 21:11:27 以下内容源自《【计算机图像处理】》 仅供学习交流…

CSS3用户界面弹性盒子

CSS3用户界面 resize 该CSS3属性用于定义元素是否应该调整大小,如果需要调整大小,那么以哪个轴进行调整。 语法:resize:both | horizontal | none | vertical 注意:该属性仅应用在overflow值而不是visible的元素上。通常而言&am…

《国货之光》-粗粮八宝粉

国潮正当时,好物当自强。赋能国货品牌,打造行业爆品。今天为大家带来的国货好物是老磨坊纯杂粮八宝粉 小时候,经常听妈妈说,要多吃杂粮,对身体好。然后每天不重样的给我做杂粮饭菜。不知道大家有没有和我相同的经历。…

Windows搭建FTP服务器教学以及计算机端口介绍

目录 一. FTP服务器介绍 FTP服务器是什么意思? 二.Windows Service 2012 搭建FTP服务器 1.开启防火墙 2.创建组 ​编辑3.创建用户 4.用户绑定组 5.安装ftp服务器 ​编辑6.配置ftp服务器 7.配置ftp文件夹的权限 8.连接测试 三.计算机端口介绍 什么是网络…

单集群400TB,OceanBase稳定支撑快手核心业务场景

一款日均超过千万人访问的短视频 App 快手,面对高并发流量如何及时有效地处理用户请求?通过在后端配置多套 MySQL 集群来支撑高流量访问,以解决大数据量存储和性能问题,这种传统的 MySQL 分库分表方案有何问题?快手对分…

c语言结构体数组

使用结构体变量建立数据类型后,我们就可以利用这个数据类型创建数组,就比如创建整形数组就可以写为 int arr[ ],那么创建一个自己建立的数据类型的数组就可以写为: 声明的结构体类型名 数组名。 例如我们声明一个结构体类型struct my后&am…

技术扫盲:如何优雅的使用 java -jar

java -jar xxx.jar java -jar 是一个用于在命令行界面中执行 Java 可执行 JAR 文件的命令。它的语法如下&#xff1a; java -jar <JAR 文件路径> [参数]其中&#xff1a; java 是 Java 运行时环境的可执行文件。-jar 是一个选项&#xff0c;表示要执行的文件是一个 JA…

文件IO

文章目录 文章目录 前言 一 . 文件 文件路径 文件类型 Java中操作文件 File 概述 属性 构造方法 方法 createNewFile mkdir 二 . 文件内容的读写 - IO InputStream 概述 FileInputStream 概述 利用 Scanner 进行字符读取 OutputStream 概述 PrintWriter封装O…

【教学类-42-03】20231225 X-Y 之间加法题判断题3.0(确保错误题有绝对错误的答案)

背景需求&#xff1a; 根据需求&#xff0c;0-5以内的判断是21题正确&#xff0c;21题错误&#xff0c;但由于错误答案是随机数抽取&#xff0c;有可能恰好是正确的&#xff0c;所以会出现每套题目的正确数和错误数不一样的情况 优化思路一&#xff1a; 设置如果错误答案与正…

14 UVM sequencer

sequencer是在sequence和driver之间建立连接的中介。最终&#xff0c;它将transactions或sequence items传递给driver&#xff0c;以便将其驱动到DUT。 1. uvm_sequencer class hierarchy uvm_sequencer class declaration: class uvm_sequencer #( type REQ uvm_sequence_i…