学习大数据DAY57 新的接口配置

作业

完成 API 接口和文件的接入, 并部署到生产调度平台, 每个任务最后至少
要有两条 不报错 的日志, 报错就驳回作业
作业不需要复制日志
API = Appliation Program Interface 应用程序接口 => JSON 的地址
客户需求: 把
https://zhiyun.pub:9099/site/c-class?page=1 所有数据定 时同步到 Hive 数仓
分析 分页数据
https://zhiyun.pub:9099/site/c-class?page=1
https://zhiyun.pub:9099/site/c-class?page=2
...
https://zhiyun.pub:9099/site/c-class?page=20
技术: Python + requests 请求库
需要的依赖包
pip install requests hdfs
c_org_busi.py:
#!/bin/python3
import requests
from hdfs import *
import os
# *客户需求: 把 https://zhiyun.pub:9099/site/c-class?page=1 所有
数据定时同步到 Hive 数仓
lines = []
page = 1
pages = 1
def get_data(page=1):
global pages
print(f"正在抽取第{page}页的数据")
url = f"https://zhiyun.pub:9099/site/c-class?page={page}"
r = requests.get(url)
data = r.json()
if data["status"] == 1:
page_data = data["data"]
# 更新页数
pages = data["pages"]
# print(f"pages: {pages}")
# print(page_data)
for item in page_data:
# {'id': '1', 'levels': '1', 'classcode': '01',
'classname': '中西成药', 'saletax': '0.00', 'createtime':
'1900-01-20 11:16:47', 'createuser': '1002', 'notes': 'null',
'stamp': '562664386'}
# 字典 => Hive 数据格式# A B C D ...
values = item.values()
# print(values)
# dict_values(['1', '1', '01', '中西成药', '0.00',
'1900-01-20 11:16:47', '1002', 'null', '562664386'])
# 把所有元素转换成字符串
str_list = []
for value in values:
str_list.append(f"{value}")
# print(str_list)
# ['1', '1', '01', '中西成药', '0.00', '1900-01-20
11:16:47', '1002', 'null', '562664386']
# 字符串的 join 方法, 把列表的所有元素拼接起来
line = "\t".join(str_list)
# print(line)
# 1
1
01
中西成
药
0.00
1900-01-20
11:16:47
1002
null
562664386
lines.append(line)
# print(lines)
def do_get_data():
global page, pages
while page <= pages:
# print(f"pages: {pages}")
get_data(page)
# print(f"pages: {pages}")
page = page + 1
# 写入到数据文件
with
open("/zhiyun/shihaihong/data/c_class.data","w",encoding="utf-
8") as f:
content = "\n".join(lines)
f.write(content)
print("文件写入成功")
# 上传到 HDFSdef upload_data_hdfs():
# 创建 HDFS 目录
client = Client("http://192.168.200.100:9870")
client.makedirs("/zhiyun/shihaihong/ods/c_class")
# 上传
# 注意再次上传会报已存在错误
client.upload("/zhiyun/shihaihong/ods/c_class",
"/zhiyun/shihaihong/data/c_class.data");
print("上传成功")
# Hive 建表
def craete_hive_table():
os.system('''
hive -e '
create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
create external table if not exists ods_shihaihong.c_class(
id int,
levels int,
classcode string,
classname string,
saletax decimal(10,2),
createtime timestamp,
createuser string,
notes string,
stamp int
) row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as textfile
location "/zhiyun/shihaihong/ods/c_class";
'
''')
# 验证数据
def check_data():
os.system('''
hive -e '
select count(1) from ods_shihaihong.c_class;
select * from ods_shihaihong.c_class limit 5;
'
''')# 爬取数据
do_get_data()
upload_data_hdfs()
craete_hive_table()
check_data()
运行后验证:
部署到调度平台:
执行前,要现在 hdfs 建立文件:
hadoop fs -mkdir -p /zhiyun/lijinquan/ods/c_class
把之前上传的数据清空,然后用任务调度再执行一遍。
执行一次,查看执行日志:
文件接入:
需求: 定时下载
https://zhiyun.pub:9099/设备清单.xlsx
, 然后上传到
HDFS 建表,更新
c_tools.py:
#!/bin/python3
import requests
from hdfs import *
import os
import pandas
# *客户需求: 定时下载 https://zhiyun.pub:9099/设备清单.xlsx , 然后
上传到 HDFS 建表,更新
def get_data():
print(f"正在抽取数据")
url = f"https://zhiyun.pub:9099/设备清单.xlsx"
r = requests.get(url)
if r.status_code == 200:
with open("/zhiyun/shihaihong/data/download_设备清
单.xlsx","wb") as f:
f.write(r.content)
df=pandas.read_excel("/zhiyun/shihaihong/data/downl
oad_设备清单.xlsx")df.to_csv("/zhiyun/shihaihong/data/download_设备清
单.csv",index=False,header=False)
print("文件下载成功")
f.close()
else:
print("文件下载失败")
# 上传到 HDFS
def upload_data_hdfs():
# 创建 HDFS 目录
client = Client("http://192.168.200.100:9870")
client.makedirs("/zhiyun/shihaihong/ods/c_tools")
# 上传
# 注意再次上传会报已存在错误
client.upload("/zhiyun/shihaihong/ods/c_tools",
"/zhiyun/shihaihong/data/download_设备清单.csv");
print("上传成功")
# Hive 建表
def create_hive_table():
os.system('''
hive -e '
create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
create external table if not exists ods_shihaihong.c_tools(
id int,
hospital string,
tool_name string,
manufacturer string,
produce_date string,
administrative_officer string,
picture string
) row format delimited fields terminated by ","
lines terminated by "\n"
stored as textfile
location "/zhiyun/shihaihong/ods/c_tools";
'
''')
# 验证数据
# def check_data():#
os.system('''
#
hive -e '
#
select count(1) from ods_shihaihong.c_class;
#
select * from ods_shihaihong.c_class limit 5;
# '
#
''')
# 爬取数据
get_data()
upload_data_hdfs()
create_hive_table()
# check_data()
运行结束后验证:
检查 HDFS 是否有上传文件:
检查 Hive 数据库中是否有此库:
任务定时调度:
登录老师的任务调度中心:
编辑 GLUE IDE
跟前面差不多,就是生产调度的 HDFS 路径需要注意修改
client = Client("http://cdh02:9870")
执行日志:
把上一个脚本的代码也写入生产调度中心:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/878677.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

nginx安装及vue项目部署

安装及简单配置 在usr/local下建好nginx文件夹&#xff0c;下载好nginx-1.26.2.tar.gz压缩文件.安装编译工具及库文件 yum -y install make zlib zlib-devel gcc-c libtool openssl openssl-devel pcre-devel gcc、gcc-c # 主要用来进行编译相关使用 openssl、ope…

大模型笔记03--快速体验dify

大模型笔记03--快速体验dify 介绍部署&测试部署 dify测试dify对接本地ollama大模型对接阿里云千问大模型在个人网站中嵌入dify智能客服 注意事项说明 介绍 Dify 是一款开源的大语言模型(LLM) 应用开发平台。它融合了后端即服务&#xff08;Backend as Service&#xff09;…

使用mlp算法对Digits数据集进行分类

程序功能 这个程序使用多层感知机&#xff08;MLP&#xff09;对 Digits 数据集进行分类。程序将数据集分为训练集和测试集&#xff0c;创建并训练一个具有两个隐藏层的 MLP 模型。训练完成后&#xff0c;模型对测试数据进行预测&#xff0c;并通过准确率、分类报告和混淆矩阵…

鸿蒙 ArkUI组件二

ArkUI组件&#xff08;续&#xff09; 文本组件 在HarmonyOS中&#xff0c;Text/Span组件是文本控件中的一个关键部分。Text控件可以用来显示文本内容&#xff0c;而Span只能作为Text组件的子组件显示文本内容。 Text/Span组件的用法非常简单和直观。我们可以通过Text组件来显…

Spring-IOC容器-ApplicationContext

IOC:Inversion of Control 控制反转&#xff0c;是一种设计原则&#xff0c;spring 中通过DI&#xff08;dependency Injection&#xff09;来具体实现。 比如原本对象的实例化&#xff0c;是通过程序主动New出来&#xff0c;IOC中的对象实例交给Spring框架来实例化&#xff0…

TDengine 与 SCADA 强强联合:提升工业数据管理的效率与精准

随着时序数据库&#xff08;Time Series Database&#xff09;的日益普及&#xff0c;越来越多的工业自动化控制&#xff08;工控&#xff09;人员开始认识到其强大能力。然而&#xff0c;时序数据库在传统实时数据库应用领域&#xff0c;特别是在过程监控层的推广仍面临挑战&a…

使用docker配置wordpress

docker的安装 配置docker yum源 sudo yum install -y yum-utils sudo yum-config-manager \ --add-repo \ http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo下载最新版本docker sudo yum install -y docker-ce docker-ce-cli containerd.io docker-buildx-…

门磁模块详解(防盗感应开关 STM32)

目录 一、介绍 二、程序设计 main.c文件 gate_guard.h文件 gate_guard.c文件 三、实验效果 四、资料获取 项目分享 一、介绍 MC-38常闭式门磁开关是作为IO开关输入数字信号的&#xff0c;原理是合在一起信号是导通的 , 配合有线主机使用 不能单独使用。适用于非铁质&a…

Linux——应用层自定义协议与序列化

目录 一应用层 1再谈 "协议" 2序列化与反序列化 3理解read,write,recv,send 4Udp vs Tcp 二网络版本计算器 三手写序列和反序列化 四进程间关系与守护进程 1进程组 1.1什么是进程组 1.2组长进程 2会话 2.1什么是会话 2.2会话下的前后台进程 3作业控…

08_Python数据类型_字典

Python的基础数据类型 数值类型&#xff1a;整数、浮点数、复数、布尔字符串容器类型&#xff1a;列表、元祖、字典、集合 字典 字典&#xff08;Dictionary&#xff09;是一种可变容器模型&#xff0c;它可以存储任意类型对象&#xff0c;其中每个对象都存储为一个键值对。…

C++ | Leetcode C++题解之第407题接雨水II

题目&#xff1a; 题解&#xff1a; class Solution { public:int trapRainWater(vector<vector<int>>& heightMap) {int m heightMap.size(), n heightMap[0].size();int maxHeight 0;int dirs[] {-1, 0, 1, 0, -1};for (int i 0; i < m; i) {maxHei…

python中的各类比较与计算

运算符 1.算数运算符2.关系运算符3.逻辑运算符4.关于短路求值5.赋值运算符1&#xff09;的使用链式赋值多元赋值 2)复合赋值运算符 6.位运算符7.成员运算符8.身份运算符 1.算数运算符 # 加 print(1 2) # 减 print(2 - 1) # 乘 print(1 * 2) # 余数 4%31余数为1 print(4 % 3…

【Redis】之Geo

概述 Geo就是Geolocation的简写形式&#xff0c;代表地理坐标。在Redis中&#xff0c;构造了能够存储地址坐标信息的一种数据结构&#xff0c;帮助我们根据经纬度来检索数据。 命令行操作方法 GEOADD 可以用来添加一个或者多个地理坐标。 GEODIST 返回一个key中两个成员之…

F12抓包11:UI自动化 - Recoder(记录器)

课程大纲 使用场景&#xff08;导入和导出&#xff09;: ① 测试的重复性工作&#xff0c;本浏览器录制并进行replay&#xff1b; ② 导入/导出录制脚本&#xff0c;移植后replay&#xff1b; ③ 导出给开发进行replay复现bug&#xff1b; ④ 进行前端性能分析。 1、录制脚…

微软数据库的SQL注入漏洞解析——Microsoft Access、SQLServer与SQL注入防御

说明:本文仅是用于学习分析自己搭建的SQL漏洞内容和原理,请勿用在非法途径上,违者后果自负,与笔者无关;本文开始前请认真详细学习《‌中华人民共和国网络安全法》‌及其相关法规内容【学法时习之丨网络安全在身边一图了解网络安全法_中央网络安全和信息化委员会办公室】 。…

pytorch快速入门(一)—— 基本工具及平台介绍

前言 该pytorch学习笔记应该配合b站小土堆的《pytorch深度学习快速入门教程》使用 环境配置&#xff1a;Anaconda Python编译器&#xff1a;pycharm、jupyter 两大法宝函数 dir&#xff08;&#xff09;&#xff1a;知道包中有什么东西&#xff08;函数 / 属性..…

llama网络结构及源码

目录 模型初始化 config lm_head transformer wte h rms_1/rms_2 attn c_attn c_proj 线性层mlp ln_f rope_cache mask_cache kv_caches tokenizer tokenizer初始化 tokennizer.encoder 位置编码和mask 确定最大文本长度 建立rope_cache 建立mask_cache …

信奥初赛解析:1.1-计算机概述

目录 前言 知识要点 一、发展史 二、计算机的分类 三、计算机的基本特征 四、计算机的应用 课堂练习 题目列表 定项选择题 不定项选择题 参考答案 定项选择题 不定项选择题 前言 从今天开始&#xff0c;我们要重点讲初赛内容&#xff0c; 预计讲半年&#xff0c;信…

Linux下编译Kratos

本文记录在Linux下编译Kratos的流程。 零、环境 操作系统Ubuntu 22.04.4 LTSVS Code1.92.1Git2.34.1GCC11.4.0CMake3.22.1Boost1.74.0oneAPI2024.2.1 一、依赖与代码 1.1 安装依赖 apt-get update apt-get install vim openssh-server openssh-client ssh \build-essential …

Oracle发邮件功能:设置的步骤与注意事项?

Oracle发邮件配置教程&#xff1f;如何实现Oracle发邮件功能&#xff1f; Oracle数据库作为企业级应用的核心&#xff0c;提供了内置的发邮件功能&#xff0c;使得数据库管理员和开发人员能够通过数据库直接发送邮件。AokSend将详细介绍如何设置Oracle发邮件功能。 Oracle发邮…