项目成品
支持 | |
---|---|
自动挂单 | ✅完成 |
监控收款云音箱 | ✅完成 |
卷烟盘点 | ✅完成 |
补单 | ✅完成 |
自动入库 | ✅完成 |
监控微信支付 | ✅完成 |
自动提交会员信息 | ✅完成 |
用到的技术栈:Python+MQTT5.0+PHP
当顾客扫了三合一二维码且支付完成时,监控收到新的订单,将数据发送给订阅了YF的客户端,客户端通过收到的金额进行记录,并通过接口计算出与该金额最相符的香烟价格,并返回香烟条码,客户端拿到条码完成键盘输入,点击挂单完成支付
顾客扫码支付后回调自动操作
成品不通用,监控工具暂时只适合德宏地区,收款工具只适用于云南,如需使用需要配置相同分辨率电脑以及更换当地收款云音响域名
目录
项目成品
开发背景
配置后端服务及接口
搭建MQTT服务
Python基本连接实例
导入 Paho MQTT客户端
设置 MQTT Broker 连接参数
编写 MQTT 连接函数
发布消息
订阅消息
完整代码
消息发布代码
消息订阅代码
监控回调
Api接口
开发自动化操作程序
连接服务
自动挂单
补单
卷烟盘点
开发背景
朋友家的烟店,为了能够更快升级档位,公司规定:需要定期 盘点、会员扫码、挂单下单等操作,当然这只是所有企业的理想状态下,现实生活中需要考虑到很多因素,例如:终端无人操作、顾客不愿意提供个人信息积分、盘点费时等等,于是从开发到落地使用,经历了6个月多,至今才打算发文记录下,最开始没有考虑到监听收款音箱这个方案,我们商店使用的收款音箱是中国农业银行的,认为他不支持第三方接口,后面咨询了下确实不支持
原先使用的方案是:通过一个机器(单片机/旧手机)监听语音:农行收款**元,后面发现效果不理想,识别到的语音不完整,第三方语音识别接口成本太高,打算放弃了,搁置了1个月左右
后面觉得都弄了一半了,就差个监控回调,实在划不来,就因为这个导致项目半自动化,实在可惜,打算抓包看下能不能实现token保活,每经过一段时间触发一次,期望不是很高,因为这是银行的产品,我认为安全系数应该要高,结果实现了,亲测保活可以使用半年(简直太离谱了)
配置后端服务及接口
搭建MQTT服务
MQTTX 是由 EMQ 开发的一款开源跨平台 MQTT 5.0 桌面客户端,它兼容 macOS,Linux 以及 Windows 系统。监控和操作程序要能通信需要保持双向连接,且MQTT具有低延迟、低功耗的特点,emqx免费版提供的MQTT服务已经足够使用了,所以本次使用Emqx的服务。
如果你不会搭建MQTT,推荐看我的文章《Esp8266-01s、51单片机实现连接MQTT踩坑:附加烧录安信可固件+宝塔搭建MQTT服务器 全套攻略》
Python基本连接实例
文档《MQTT 客户端库 & SDKs》
————Python版本:3.7+ ————
导入 Paho MQTT客户端
from paho.mqtt import client as mqtt_client
设置 MQTT Broker 连接参数
设置 MQTT Broker 连接地址,端口以及 topic,同时我们调用 Python random.randint
函数随机生成 MQTT 客户端 id。
broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
client_id = f'python-mqtt-{random.randint(0, 1000)}'
编写 MQTT 连接函数
编写连接回调函数 on_connect
,该函数将在客户端连接后被调用,在该函数中可以依据 rc
来判断客户端是否连接成功。通常同时我们将创建一个 MQTT 客户端,该客户端将连接到 broker.emqx.io
。
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
# Set Connecting Client ID
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
发布消息
首先定义一个 while 循环语句,在循环中我们将设置每秒调用 MQTT 客户端 publish
函数向 /python/mqtt
主题发送消息。
def publish(client):
msg_count = 0
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
订阅消息
编写消息回调函数 on_message
,该函数将在客户端从 MQTT Broker 收到消息后被调用,在该函数中我们将打印出订阅的 topic 名称以及接收到的消息内容。
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic)
client.on_message = on_message
完整代码
消息发布代码
# python 3.6
import random
import time
from paho.mqtt import client as mqtt_client
broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 1000)}'
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
def publish(client):
msg_count = 0
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
if __name__ == '__main__':
run()
消息订阅代码
# python3.6
import random
from paho.mqtt import client as mqtt_client
broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 100)}'
def connect_mqtt() -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic)
client.on_message = on_message
def run():
client = connect_mqtt()
subscribe(client)
client.loop_forever()
if __name__ == '__main__':
run()
监控回调
由于涉及到账户资金安全,本次代码仅作参考:
# python 3.6
import random
import time
from datetime import datetime
import json5
from paho.mqtt import client as mqtt_client
import requests
def get_order(token, merchantid, client):
headers = {
'Host': 'mscs.abchina.com',
'Connection': 'keep-alive',
****
}
params = {
'merchantId': str(merchantid),
'datatime': '2',
'employeeid': 'all',
'tradechannel': '',
'pageno': '1',
'pageview': '13',
'tradestatus': 'SUCCESS',
'qrFlag': '0',
}
response = requests.get('https://****.com/app***oot/transd***ail/t***ail/add***ket', params=params,headers=headers)
data = response.json()
# print(response.text)
if 'transDetail' in data and data['transDetail']:
day_trans_num = data['day_trans_num']
merchant_name = data['merchant_name']
day_trans_amount = data['day_trans_amount']
print(f"======{merchant_name}=======")
print(f"今日收款流水:{day_trans_amount}")
print(f"今日收钱笔数:{day_trans_num}")
for transaction in data['transDetail']:
print(f"商户号: {transaction['employee_name']}-{transaction['trade_date']}")
print(f"收款金额: {transaction['cash_amount']}")
print(f"支付渠道: {transaction['trade_channel']}")
print(f"交易状态: {transaction['trade_status']}")
print("---")
if day_trans_num:
url = "http://*********/select.php"
response = requests.get(url)
if response.status_code == 200:
print(response.text)
new_data = response.text
else:
print(f"请求失败,状态码: {response.status_code}")
new_data = ''
if int(day_trans_num) > int(new_data):
order_data = {
'user_id': data['transDetail'][0]['user_id'],
'trade_date': data['transDetail'][0]['trade_date'],
'trade_channel': data['transDetail'][0]['trade_channel'],
'cash_amount': data['transDetail'][0]['cash_amount'],
'trade_status': data['transDetail'][0]['trade_status'],
'merchant': data['transDetail'][0]['employee_name'],
'total_price': day_trans_amount,
'total_num': day_trans_num
}
json_payload = json5.dumps(order_data)
print(json_payload)
# result = client.publish(topic, data['transDetail'][0]['cash_amount'])
result = client.publish(topic,json_payload)
url = "http://***/updata.php?newDayTransNum=" + day_trans_num
response = requests.get(url)
else:
url = "http://***/updata.php?newDayTransNum=" + day_trans_num
response = requests.get(url)
else:
print("No transaction details found.")
print("---300s后继续查询保持token存活----")
broker = 'mqtt.***.club'
port = 1883
topic = "YF"
client_id = f'listen-mqtt-{random.randint(0, 1000)}'
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
def publish(client):
token = 'e******9k'
merchantid = '924***40'
while True:
now = datetime.now()
current_time = now.strftime("%H:%M")
if "08:00" <= current_time <= "23:00":
sleep_duration = 18
else:
sleep_duration = 600
time.sleep(1)
get_order(token, merchantid, client)
print(f"Sleeping for {sleep_duration} seconds...")
time.sleep(sleep_duration)
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
if __name__ == '__main__':
run()
通过进程守护启动
数据库设计
新建listen监控当天的订单笔数和收款余额,用来判断通知是否是新的一笔订单
字段名 | |
id | 自增 |
day_trans_num | 当日订单笔数 |
新建shop数据表,用于存放香烟的数据
字段名 | 说明 |
id | |
name | 商品名 |
num | 数量0为没有库存不进行计算 |
price | 录入价格 |
smonke | 香烟条形码 |
Api接口
数据操作接口
select.php
<?php
// 引入数据库连接配置
require_once('db_config.php');
// 查询数据
$sql = "SELECT day_trans_num FROM listen WHERE id = 1";
$result = $conn->query($sql);
if ($result->num_rows > 0) {
// 输出数据
$row = $result->fetch_assoc();
echo $row["day_trans_num"];
} else {
echo '';
}
// 关闭数据库连接
$conn->close();
?>
Update.php
<?php
// 引入数据库连接配置
require_once('db_config.php');
// 获取POST请求的数据
$newDayTransNum = $_GET['newDayTransNum'];
// 更新数据
$sql = "UPDATE listen SET day_trans_num = $newDayTransNum WHERE id = '1'";
if ($conn->query($sql) === TRUE) {
echo "记录更新成功";
} else {
echo "Error: " . $sql . "<br>" . $conn->error;
}
// 关闭数据库连接
$conn->close();
?>
get_shopcode.php
<?php
// 连接到数据库
$servername = "localhost";
$username = "YF";
$password = "j7j*****tYr";
$dbname = "yf";
$conn = new mysqli($servername, $username, $password, $dbname);
// 检查连接
if ($conn->connect_error) {
die("连接失败: " . $conn->connect_error);
}
$price=$_GET['price'];
// 从外部传递进来的金额
$amount = $price; // 例如:30.5
// SQL 查询,找出价格小于等于传入金额的记录,并按价格降序排列
$sql = "SELECT smonke,price,name FROM shop WHERE num > 0 AND price <= $amount ORDER BY price DESC LIMIT 1";
$result = $conn->query($sql);
// 检查是否有结果
if ($result->num_rows > 0) {
// 输出商品编码
while($row = $result->fetch_assoc()) {
die(
json_encode(
array(
'code' => 200,
'msg' => '查询成功',
'name' => $row["name"],
'price' => $row["price"],
'smonkecode' => $row["smonke"]
),480)
);
}
} else {
echo "";
die(
json_encode(
array(
'code' => 100,
'msg' => '没有找到符合条件的商品',
'name' => $row["name"],
'price' => $row["price"],
'smonkecode' => $row["smonke"]
),480)
);
}
$conn->close();
?>
// SQL 查询,找出价格小于等于传入金额的记录,并按价格降序排列
$sql = "SELECT smonke,price,name FROM shop WHERE num > 0 AND price <= $amount ORDER BY price DESC LIMIT 1";
查询num不为0且用户付款价格大于烟价
开发自动化操作程序
连接服务
将emqx的实例代码稍做整改,完成基本的通讯能力后,我们需要进行自定义,规定room为YF,以及格式化 json
{
user_id: "oHFX**********U4",
trade_date: "20240309",
trade_channel: "wx",
cash_amount: "30",
trade_status: "SUCCESS"
}
键名 | 说明 |
---|---|
user_id | 用户id |
trade_date | 订单号 |
trade_channel | 交易方式 |
cash_amount | 交易金额 |
trade_status | 交易状态 |
对应的接收端只需要订阅到YF频道即可
def mqtt_init(self):
print(self)
# client = mqtt.Client()
client = mqtt.Client(userdata=self) # 将self作为userdata传递
client.on_connect = on_connect
client.on_message = on_message
client.connect("4*****25", 1883, 60)
client.loop_forever()
自动挂单
当服务端检测到一笔新的订单时,通过MQTT发送json到客户端自动化程序,客户端收款工具进行上传金额,服务器计算得出付款金额与价格最相近的香烟条码,并返回烟码,客户端通过python的autogui操作进行操作(例如点击事件、选中事件等等),例如:由于烟草公司要求挂单之前需要添加会员信息,于是我们新增一个自定义事件add_user()
# 收到消息的回调函数
def on_message(client, userdata, msg):
self = userdata
print(msg.topic + " " + str(msg.payload))
try:
data=json5.loads(msg.payload)
# 新增收到回调数据显示给工具
self.Text3Var.set(data['cash_amount'])# 收款金额
self.Text4Var.set(data['trade_channel']) # 支付渠道
self.Text5Var.set(data['trade_status']) # 交易状态
self.Text6Var.set(data['total_price']) # 总收款金额
self.Text7Var.set(data['total_num']) # 总笔数
self.Label11.config(text=data['merchant']) # 商户
#
# print(data['user_id'])
# b'{user_id: "oHFX_jtTmL1GALI0PdYu-QU3UfU4", trade_date: "20240309", trade_channel: "wx", cash_amount: "15", trade_status: "SUCCESS"}'
try:
amount = float(data['cash_amount']) # 尝试转换为浮点数
# 定义接口URL
url = 'https://********/yf/get_shopcode.php'
# 定义请求参数
params = {'price': amount}
# 发送GET请求
response = requests.get(url, params=params)
# 检查请求是否成功
if response.status_code == 200:
# 解析响应内容为JSON格式
data = response.json()
# 打印返回的数据
print("code:", data['code'])
print("msg:", data['msg'])
print("name:", data['name'])
print("price:", data['price'])
print("smonkecode:", data['smonkecode'])
print("--【正在进行商品添加】--")
# 添加前结束点一次轮询开关
error_click()
# 加入随机会员信息
add_user()
# 添加前删除原商品
delete_shop()
print('删除可能存在表盘的缓存')
# 添加商品
smonke = str(data['smonkecode'])
add_shop(smonke)
# 微信动态二维码结算
submit_()
else:
print("没有符合的数据")
# 打印状态码和错误信息
print("Error:", response.status_code)
# selected_index = select_shop(amount, price)
# print(selected_index)
# if selected_index is None:
# print("小于三元没烟了")
# else:
# print("--【正在进行商品添加】--")
# # 添加前结束点一次轮询开关
# error_click()
#
# # 加入随机会员信息
# add_user()
#
# # 添加前删除原商品
# delete_shop()
# print('delete')
# # 添加商品
# add_shop(price_code[selected_index])
# # 微信动态二维码结算
# submit_()
except ValueError: # 如果转换失败,则打印错误信息
print("接收到的消息不是一个有效的数字")
except json.JSONDecodeError:
print("error")
补单
gui界面编写按钮,点击触发
def Command3_Cmd(self, event=None):
text3_value = self.Text3Var.get()
amount = float(text3_value) # 尝试转换为浮点数
budan(amount)
def budan(cash_amount):
amount = float(cash_amount) # 尝试转换为浮点数
url = 'https://*****/yf/get_shopcode.php'
# 定义请求参数
params = {'price': amount}
# 发送GET请求
response = requests.get(url, params=params)
# 检查请求是否成功
if response.status_code == 200:
# 解析响应内容为JSON格式
data = response.json()
# 打印返回的数据
print("code:", data['code'])
print("msg:", data['msg'])
print("name:", data['name'])
print("price:", data['price'])
print("smonkecode:", data['smonkecode'])
#补单操作
print("--【正在进行商品添加】--")
# 添加前结束点一次轮询开关
error_click()
# 加入随机会员信息
add_user()
# 添加前删除原商品
delete_shop()
print('delete')
# 添加商品
smonke = str(data['smonkecode'])
add_shop(smonke)
# 微信动态二维码结算
submit_()
#补单操作
else:
print("没有符合的数据")
# 打印状态码和错误信息
print("Error:", response.status_code)
# selected_index = select_shop(amount, price)
# print(selected_index)
# if selected_index is None:
# print("小于三元没烟了")
# else:
# print("--【正在进行商品添加】--")
# # 添加前结束点一次轮询开关
# error_click()
#
# # 加入随机会员信息
# add_user()
#
# # 添加前删除原商品
# delete_shop()
# print('delete')
# # 添加商品
# add_shop(price_code[selected_index])
# # 微信动态二维码结算
# submit_()
卷烟盘点
香烟定期入库后,需要进行盘点,需要将入库的香烟信息记录下来,人工操作过于麻烦,需要事先记录入库香烟条码,手动录入每一个香烟入库,自动化操作,可以节约人力时间成本
最后
程序在使用时会遇到一些异常问题,例如:当前页面非最大化、模块遮挡等问题,建议使用像素点进行识别,例如本项目通过像素点判断多个点的颜色值如果是桌面的蓝色,三个点都是蓝色则在桌面,具体的逻辑方法开发者可以自己完成
🚀Python爬虫项目实战系列文章
⭐⭐欢迎订阅⭐⭐
【Python爬虫项目实战一】获取Chatgpt3.5免费接口文末付代码(过Authorization认证)
【Python爬虫项目实战二】Chatgpt还原验证算法-解密某宝伪知网数据接口
⭐⭐欢迎订阅⭐⭐