说明:该篇博客是博主一字一码编写的,实属不易,请尊重原创,谢谢大家!
首先,今年比较忙没有多余时间去实操创作分享文章给大家,那就给大家分享下博主在实际工作中的一点点内容吧,就当交流交流~
需求
叙述
目前公司有个跨平台大项目正在内测中,是基于QT框架研发的客户端应用程序
客户端程序的更新不像web端程序只需要清理缓存(存在js更新时)刷新即可更新至最新代码,就需要服务端维护升级批次->客户端检测更新->拉取升级列表下载批次文件->替换程序目录下的文件(数据库增量升级以及脚本文件)
当程序代码打包至公司内网升级目录下,每次都需要去通知维护人,维护人则需要在升级平台维护及开放程序版本批次,整个流程如下:
1、登录进入升级平台
2、选择项目
2.1 Windows64
2.2 Windows32
2.3 统信aarch64
2.4 统信amd64
2.5 统信arm64
2.6 银河麒麟loongarch64
2.7 银河麒麟arm64
2.8 银河麒麟amd64
2.9 中标麒麟arm64
2.10 中标麒麟amd64
2.11 MacX86_64
2.12 MacArm64
3、新建批次
4、导入本地文件
5、上传到下载服务器
6、升级说明
7、产品版本号和用户显示版本号配置
8、开放批次
8.1 内测开放
8.2 正式开放
PS:由于是公司还未上线的项目,所以不能细致透露
痛点1:每次需要研发经理通知(存在忘记通知或延迟通知)
痛点2:手动维护繁琐枯燥,批次版本信息容易维护错误
痛点3:忘记或延迟通知相应人员进行测试
解决
叙述
与研发经理进行约定,每次程序打包生成到指定的共享目录,编写程序进行10s检测目录下是否有批次版本升级文件产生,如果有则进行记录并自动化进行维护升级批次,开放批次后并下发通知消息到指定QQ群
解决痛点1:
通过last_state.cfg
配置文件存储上一次(或第一次)目录的状态,每个目录记录其最后一次修改时间,启动项目或项目运行期间以此时间进行判断是否目录有更新
# 要检查的目录列表
directories_to_check = [r"N:\windows\内测\32", r"N:\windows\内测\64", r"N:\windows\公测\32", r"N:\windows\公测\64", r"N:\uos\公测\aarch64"]
last_state = {} # 上一次的目录状态字典
upgrade_flag = None
# 从配置中读取上一次的目录状态
def read_last_state():
global upgrade_flag
if os.path.exists('last_state.cfg'):
with open('last_state.cfg', 'r') as f:
for line in f:
path, last_time = line.strip().split('|')
last_state[path] = float(last_time)
upgrade_flag = True
else:
upgrade_flag = False
# 更新上一次的目录状态到配置
def update_last_state():
global upgrade_flag
upgrade_flag = True
with open('last_state.cfg', 'w') as f:
for path, last_time in last_state.items():
f.write(f'{path}|{last_time}\n')
# 循环判断多个目录下是否有新文件产生,并输出文件名和目录
def watch_dirs():
while True:
for directory_path in directories_to_check:
for filename in os.listdir(directory_path):
path = os.path.join(directory_path, filename)
mtime = os.path.getmtime(path)
if path not in last_state:
new_directory_flag = True
else:
cfg_mtime = float(last_state[path])
if mtime > cfg_mtime:
new_directory_flag = True
else:
new_directory_flag = False
if new_directory_flag:
n_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
n_filename = "{%s}" % filename
f_path = r"{}\{}".format(directory_path, filename)
if upgrade_flag:
print(f'{n_time} 发现新目录在 {directory_path} 中:{n_filename}')
# 增量升级复制文件
cp_file(f_path, filename)
# 调用升级
auto_update(f_path, filename)
# 升级完成后获取最新的目录修改时间并赋值给对应path
mtime = os.path.getmtime(path)
last_state[path] = mtime # 更新目录状态
update_last_state() # 更新目录状态到配置
time.sleep(10) # 每10秒检查一次
解决痛点2:
登录升级平台,1选择项目——2新增升级批次——3导入文件——4上传至下载服务器——5版本配置——6升级说明——7开放批次,使用selenium框架进行自动化处理
def auto_update(f_path, batch):
len_file = len(os.listdir(f_path))
if len_file > 0:
new_f_path = f_path.replace("\\", "_")
file_path_list = new_f_path.split("_")
os_name = file_path_list[1]
env = file_path_list[2]
frame_num = file_path_list[3]
upgrade_str = random.choice(str_lst)
if os_name == 'uos':
os_name = "统信"
os_type = os_name+frame_num
# print(os_type)
options = webdriver.EdgeOptions()
options.add_experimental_option('detach', True) # 不自动关闭浏览器
driver = webdriver.Edge(options=options) # 引入edge驱动
driver.maximize_window()
driver.get("http://xxxx")
# 登录平台
driver.find_element(by=By.XPATH, value='//*[@id="input_box"]/input').send_keys('yourname')
driver.find_element(by=By.XPATH, value='//*[@id="login_box"]/div[2]/input').send_keys('yourpwd')
driver.find_element(by=By.XPATH, value='//*[@id="login_box"]/button').click()
time.sleep(1)
# todo:切换项目名称
driver.find_element(by=By.XPATH, value='//*[@id="u20"]').click()
time.sleep(1)
ul = driver.find_element(by=By.XPATH, value='/html/body/div[5]/div/div/div/ul')
all_project_list = ul.find_elements(by=By.XPATH, value='li')
# print(all_project_list, type(all_project_list)) # 计算有多少个li
index = 0
pro_num = len(all_project_list)
for p_name in all_project_list:
index+=1
# print(index, p_name.text)
if os_type in p_name.text.lower():
x_path = '/html/body/div[5]/div/div/div/ul/li[{}]/span/a'.format(index)
driver.find_element(by=By.XPATH, value=x_path).click()
time.sleep(1)
# list[-1].text # 用列表标识符取最后一个li
# todo:新增升级批次
driver.find_element(by=By.CSS_SELECTOR, value='#u17_div > div > span > svg').click()
time.sleep(0.5)
y = batch.split(".")[2][:2]
m = batch.split(".")[2][2:]
u_m = batch.split(".")[2][2:].replace("0", "")
d = int(batch.split(".")[-1][:2]) - 10
if len(str(d)) == 1:
u_d = "0{}".format(d)
else:
u_d = d
new_batch_day = "20{}.{}.{}".format(y, m, u_d)
u_new_batch_day = "20{}.{}.{}".format(y, u_m, d)
# print(batch, to_day, new_batch_day, m)
u_batch = batch[-3:]
if to_day != new_batch_day:
b_xpath = '/html/body/div[last()]/div/div[2]/div/div[2]/div[2]/div/input'
driver.find_element(by=By.XPATH, value=b_xpath).clear()
time.sleep(0.5)
driver.find_element(by=By.XPATH, value=b_xpath).send_keys(u_new_batch_day)
u_batch_day = "{}.{}".format(new_batch_day, u_batch)
else:
u_batch_day = "{}.{}".format(to_day, u_batch)
n_xpath = '/html/body/div[last()]/div/div[2]/div/div[2]/div[3]/button[2]/span'
driver.find_element(by=By.XPATH, value=n_xpath).click()
time.sleep(1)
# todo:切换升级说明
driver.find_element(by=By.XPATH, value='//*[@id="u27_div"]/ul/li[3]/span/div/span').click()
time.sleep(0.5)
driver.find_element(by=By.XPATH, value='//*[@id="u27_div"]/div[3]/textarea').send_keys(upgrade_str)
time.sleep(0.5)
# todo:切换版本配置
driver.find_element(by=By.XPATH, value='//*[@id="u27_div"]/ul/li[2]/span/div/span').click()
time.sleep(0.5)
driver.find_element(by=By.XPATH, value='//*[@id="u27_div"]/div[2]/div[1]/input').send_keys(batch)
time.sleep(0.5)
driver.find_element(by=By.XPATH, value='//*[@id="u27_div"]/div[2]/div[2]/input').send_keys(u_batch_day)
time.sleep(0.5)
# todo:切换文件配置
driver.find_element(by=By.XPATH, value='//*[@id="u27_div"]/ul/li[1]/span/div/span').click()
# todo 导入文件
driver.find_element(by=By.XPATH, value='//*[@id="u27_div"]/div[1]/div/div[1]/button[1]/span[2]').click()
time.sleep(0.5)
dr_xpath = '/html/body/div[last()]/div/div[2]/div/div[2]/div[2]/div/span/div[1]/span/div/button/span[2]'
driver.find_element(by=By.XPATH, value=dr_xpath).click()
time.sleep(1)
# todo 上传文件
# 按shift以及松shift键
win32api.keybd_event(16, 0, 0, 0)
win32api.keybd_event(16, 0, win32con.KEYEVENTF_KEYUP, 0)
time.sleep(0.5)
autoit.send(f_path)
time.sleep(1)
win32api.keybd_event(13, 0, 0, 0)
win32api.keybd_event(13, 0, win32con.KEYEVENTF_KEYUP, 0)
win32api.keybd_event(13, 0, 0, 0)
win32api.keybd_event(13, 0, win32con.KEYEVENTF_KEYUP, 0)
time.sleep(1)
# todo 计算打开弹窗居中的坐标,并移动至此
c_x, c_y = autoit.win_get_client_size("打开")
m_x = int(int(center_x) - (int(c_x) / 2))
m_y = int(int(center_y) - (int(c_y) / 2))
autoit.win_move("打开", m_x, m_y)
time.sleep(1)
autoit.mouse_click("left", int(center_x), int(center_y))
time.sleep(0.5)
win32api.keybd_event(17, 0, 0, 0)
win32api.keybd_event(65, 0, 0, 0)
win32api.keybd_event(65, 0, win32con.KEYEVENTF_KEYUP, 0)
win32api.keybd_event(17, 0, win32con.KEYEVENTF_KEYUP, 0)
time.sleep(0.5)
win32api.keybd_event(13, 0, 0, 0)
win32api.keybd_event(13, 0, win32con.KEYEVENTF_KEYUP, 0)
time.sleep(1)
qr_xpath = "/html/body/div[last()]/div/div[2]/div/div[2]/div[3]/button[2]/span"
driver.find_element(by=By.XPATH, value=qr_xpath).click()
len_file = len(os.listdir(f_path))
print("{}目录文件个数为:{}个".format(batch, len_file))
if 0 < len_file <= 20:
print("开始导入文件,等待10秒....")
time.sleep(10)
else:
print("开始导入文件,等待30秒....")
time.sleep(30)
driver.find_element(by=By.XPATH, value='//*[@id="table"]/div/div[1]/table/thead/tr[1]/th[1]/div/label/span/span').click()
time.sleep(0.5)
driver.find_element(by=By.XPATH, value='//*[@id="u27_div"]/div[1]/div/div[1]/button[2]/span[2]').click()
time.sleep(0.5)
driver.find_element(by=By.XPATH, value='//*[@id="tablefwq"]/div/div[1]/table/thead/tr[1]/th[1]/label/span/span').click()
time.sleep(0.5)
driver.find_element(by=By.CSS_SELECTOR, value='body > div:last-child > div > div.ant-modal-wrap > div > div.ant-modal-content > div.ant-modal-footer > button.ant-btn.ant-btn-primary > span').click()
if 0 < len_file <= 20:
print("开始上传文件至下载服务器,等待15秒....")
time.sleep(15)
else:
print("开始上传文件至下载服务器,等待45秒....")
time.sleep(45)
# todo:下拉滚动底部,开放版本
driver.find_element(by=By.XPATH, value='//*[@id="u26_div"]/div[2]').click()
time.sleep(0.2)
win32api.keybd_event(35, 0, 0, 0)
win32api.keybd_event(35, 0, win32con.KEYEVENTF_KEYUP, 0)
time.sleep(0.2)
driver.find_element(by=By.XPATH, value='//*/li[last()]/span/div/div/div/span[1]').click()
time.sleep(0.5)
driver.switch_to.default_content()
time.sleep(0.5)
driver.quit()
else:
print("警告!目录:{} 下的升级文件为空,将不进行升级调用!".format(f_path))
解决痛点3:
维护升级批次成功后,将向指定QQ群发送自定义消息,这里需要借助go-cqhttp框架,下载解压后,按一下步骤进行配置即可
- Step1:下载后解压
go-cqhttp_windows_amd64.zip
,点击运行exe
- Step2:运行成功后,会生成
go-cqhttp.bat
文件,再运行这个批处理文件,出现如下窗口
- Step3:选择
0
,回车;编辑生成配置文件config.yml
,切记只填写qq号(密码不填写,选择扫码登录,这样更安全且不会出现错误)
- Step4:编辑打开目录下产生的
device.json
文件,修改其中protocol
的值为2
,否则一直登陆失败
配置完成后,拿出你的手机打开QQ进行扫码登录,登录后控制台日志出现警告不用管
接下来就是编写一个def
方法来实现与go-cqhttp
框架的交互,其实原理就是监听本地5700 socket
消息,就跟以前飞书发消息回复消息是一样原理
def send_msg(resp_dict):
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ip = '127.0.0.1'
client.connect((ip, 5700))
msg_type = resp_dict['msg_type'] # 回复类型(群聊/私聊)
number = resp_dict['number'] # 回复账号(群号/好友号)
msg = resp_dict['msg'] # 要回复的消息
# 将字符中的特殊字符进行url编码
msg = msg.replace(" ", "%20")
msg = msg.replace("\n", "%0a")
if msg_type == 'group':
payload = "GET /send_group_msg?group_id=" + number + "&message=" + msg + " HTTP/1.1\r\nHost:" + ip + ":5700\r\nConnection: close\r\n\r\n"
elif msg_type == 'private':
payload = "GET /send_private_msg?user_id=" + number + "&message=" + msg + " HTTP/1.1\r\nHost:" + ip + ":5700\r\nConnection: close\r\n\r\n"
new_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print("{} 发送{}".format(new_time, payload))
client.send(payload.encode("utf-8"))
client.close()
最后在auto_update
方法中调用send_msg
方法即可
if env == "内测":
driver.find_element(by=By.CSS_SELECTOR, value='body > div:nth-last-child(2) > div > div > div > ul > li:nth-child(2) > span > a').click()
msg = "内测升级 {}_{}_{}".format(os_name, frame_num, batch)
else:
driver.find_element(by=By.CSS_SELECTOR, value='body > div:nth-last-child(2) > div > div > div > ul > li:nth-child(3) > span > a').click()
msg = "正式升级 {}_{}_{}".format(os_name, frame_num, batch)
resp_group_dict = {'msg_type': 'group', 'number': 'QQ群号', 'msg': msg}
send_msg(resp_group_dict)
效果截图: