1、问题描述
学习SparkSql中,将spark中dataframe数据结构保存为jdbc的格式并提交到本地的mysql中,相关代码见文章末尾。
运行代码时报出相关配置文件错误,如下。
根据该报错,发现网络上多数解决方都是基于java开发的解决方案,尝试过多种jar配置途径,都没办法解决该问题。
后续发现时jar包有问题,通过参考其他文章思路,最终通过合适的jar包解决问题。
参考文章:pyspark连接mysql读取数据、写入数据(四种模式)、写入数据模式的调优_pyspark 执行mysql的语句读取数据 写入数据-CSDN博客
2、解决过程
首先产生该问题是没有配置合适的jar包,因此需要在本机的pyspark包中配置相关jar。
位置:E:\programfiles\anaconda\Lib\site-packages\pyspark\jars(这里根据各自的环境安排)
一般来说本地的mysql都是8.x版本的所以jar包一般用最新的即可。下载位置如下:
MySQL :: MySQL Connectors,选择jdbc格式。(需要提前注册oracle账号)
这里选择独立平台,之后在下载页面选择合适的压缩文件。
之后,在压缩包中选择该文件粘贴到之前的位置中去即可。
在这里插入图片描述
在此运行代码,发现此时将不再报错,查看数据库也可以发现数据表已经成功保存到本地mysql.
代码
# -*- coding: UTF-8 -*-
"""=================================================
@Project -> File :PySpark -> 15_dataframe_jdbc
@IDE :PyCharm
@Author :Strive Yang
@Date :2024-07-12 17:05
@Email : yangzy927@qq.com
@Desc :将sparksql中的dataframe数据以jdbc的形式保存到本地数据库中
=================================================="""
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, IntegerType
if __name__ == '__main__':
# 0、构建执行环境入口对象SparkSession
spark = SparkSession.builder. \
appName('test'). \
master('local[*]'). \
getOrCreate()
sc = spark.sparkContext
# 1、读取数据集u.data
schema = StructType().add('user_id', StringType(), nullable=True). \
add('movie_id', IntegerType(), nullable=True). \
add('rank', IntegerType(), nullable=True). \
add('ts', StringType(), nullable=True)
df = spark.read.format('csv'). \
option('sep', '\t'). \
option('header', False). \
option('encoding', 'utf8'). \
schema(schema=schema). \
load('文件路径')
# 将数据写入jdbc中,写入到本地的mysql中
df.write.mode('overwrite').\
format('jdbc').\
option('url','jdbc:mysql://localhost:3306/数据库名?useSSL=false&useUnicode=true').\
option('dbtable','表名').\
option('user','账户').\
option('password','密码').\
save()