python监控数据处理应用服务Socket心跳解决方案

1. 概述

从网页、手机App上抓取数据应用服务,涉及到多个系统集成协同工作,依赖工具较多。例如,使用Frida进行代码注入和动态分析,以实现对网络通信的监控和数据捕获。在这样的集成环境中,手机模拟器、手机中应用、消息侦听、数据获取服务等各自独立运行,任何一个环节出现问题,整个流程势必中断。除了必要的数据检验处理外,还需要实时侦听各个独立服务是否存活,并确认流程是否畅通。常用的监控工具往往只能监控进程和端口,无法深入系统内部进行监控,因此,我们采用Socket通讯方式,自主开发监控机制。

具体方案如下:

  1. Pika侦听与心跳机制

    • 手动启动Pika监听器,循环读取消息队列中的消息。
    • 每次读取消息后,调用心跳函数向监控端发送心跳信息。
    • 如果长时间未发送心跳信息(超过预设的时间阈值),则认为该服务已经死掉,此时重启数据应用服务进程和模拟器。
  2. 模拟器数据监控

    • 监控从模拟器端获取的数据流。
    • 如果长时间(超过预设的时间阈值)未成功获取到数据,则认为数据获取过程存在问题,此时同样重启数据应用服务进程和模拟器。

通过上述机制,确保在复杂的集成环境中,各个服务能够稳定运行,一旦出现问题能够及时发现并自动恢复,从而提高整体系统的可靠性和稳定性。
在这里插入图片描述

2. Socket通讯与心跳

2.1. 关于Socket

Python中的socket库是一种用于网络通信的标准库,它提供了丰富的函数和类来创建和管理网络连接

socket库概述

  • 功能:Python的socket库允许开发者创建客户端和服务器端应用程序,实现网络通信。

  • 协议支持:它支持多种协议,包括TCP(面向连接、可靠传输)和UDP(无连接、快速传输)。

  • 操作方式:支持同步和异步通信,其中同步通信是一种阻塞式的方式,而异步通信则不会阻塞程序的其他操作。

基本操作

  • 创建套接字:使用socket.socket()函数来创建一个套接字对象,这是进行网络通信的基础。例如,创建一个TCP套接字可以使用sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

  • 绑定地址:对于服务器而言,需要将套接字与特定的网络接口和端口绑定,使用bind()方法完成此操作。

  • 监听连接:服务器通过调用listen()方法开始监听来自客户端的连接请求。

  • 接受连接:使用accept()方法接受客户端的连接请求,并返回一个新的套接字对象和客户端地址信息。

  • 发送和接收数据:利用send()recv()方法在客户端和服务器之间发送和接收数据。

  • 关闭套接字:通信完成后,使用close()方法关闭套接字以释放资源。

总的来说,Python的socket库为网络编程提供了强大的工具,使得开发者能够轻松地构建各种类型的网络应用程序。无论是简单的TCP或UDP客户端和服务器,还是复杂的网络服务,socket库都能提供必要的支持。

2.2. 监控服务端Socket Server

import socket
from loguru import logger
from time import sleep
from datetime import datetime
import time
import json

logger.add("monitor_{time}.log",
            rotation="1 weeks",  # 1周分隔转日志文件  
            retention="2 month"  # 保留2个月的日志文件 
            )
  
def start_server(host='localhost', port=5005):
    # 启动应用程序
    # 应用函数()
    sleep(90)
    # 创建缓存    
    station_list = []  
    poi_list = []      
    heartbeat_list = [time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())]# 心跳缓存
    # 创建socket对象
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
        # 绑定地址和端口号
        server_socket.bind((host, port))        
        # 开始监听传入的连接请求
        server_socket.listen()
        logger.info(f"monitor Server listening on {host}:{port}")
       
        server_socket.settimeout(3)   # 超时3秒
        timeout = 0        
        while True:
            try:
                # 接受一个新的连接
                client_socket, client_address = server_socket.accept()
                logger.info(f"Connected by {client_address}")
                
                with client_socket:
                    while True:
                        # 接收数据
                        data = client_socket.recv(1024)
                        if not data:
                            sleep(1)
                            break
                        message = data.decode('utf-8')
                        print(f"Received from client: {message}")
                        sleep(1)
                        timeout = 0
                        # 监控处理函数
                        monitor(message, station_list, poi_list, heartbeat_list)          
            except socket.timeout:
                timeout += 1
                if timeout*5%10==0:
                    logger.info(f"app is starting, time out {timeout*5}")
                if timeout*5 > 150:   # 超过2分钟30秒没有启动,重启启动
                    timeout = 0
                    # 应用函数()
                    sleep(60)
                    heartbeat_list = [time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())]                               
            except Exception as e:
                logger.error(f"monitor Server error occurred: {e}")
                break

2.3. 建立心跳线,客户端Socket Client

Socket Client

import socket
import time
from time import sleep
from loguru import logger

def start_client(message, host='localhost', port=5005):
    try:
        # 创建socket对象
        client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        
        # 连接到服务器
        client_socket.connect((host, port))
        logger.info(f"Connected to server at {host}:{port}")
        
        # 发送数据
        client_socket.sendall(message.encode())
        
        # 接收响应
        #data = client_socket.recv(1024)
        #print(f"Received from server: {data.decode()}")
    except socket.error as e:
        logger.info(f"Socket error occurred: {e}")        
    finally:    
        # 关闭连接
        client_socket.close()

建立心跳线,Socket Client应用

手动侦听消息,使用Pika库监听RabbitMQ中的消息,并循环读取消息。每次读取到消息时,调用心跳函数向监控端发送消息。如果长时间未发送消息,则认为服务已经死掉,触发重启控制软件和手机的操作。

# 消费消息
def startRabbitMQ():
    # 1.连接rabbit
    try:    
        credentials = pika.PlainCredentials('rabbit', '*****')  # 用户名和密码
        # 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
        connection = pika.BlockingConnection(pika.ConnectionParameters('192.*.*.*',port = 55671,virtual_host = '/xxxxx-dev',credentials = credentials))
    except pika.exceptions.AMQPError as e:
        logger.error(f"Error connecting to RabbitMQ in main process: {e}")
        exit(1)   
	# 建立心跳信息
    current_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
    message = json.dumps({"type": "heartbeat", "time": current_time})            
    start_client(message)
    
    try:        
        channel = connection.channel()
        time.sleep(1)
         
        channel.queue_declare(queue='xxxxx_poi_queue', durable=True)
        channel.basic_qos(prefetch_count=1)
        while True:
            logger.info('取消息开始时间')  
            method_frame, header_frame, body = channel.basic_get(queue='xxxxx_poi_queue', auto_ack=False)  
            if method_frame:  
                # 处理消息体 
                print('header_frame:',header_frame) 
                logger.info(f'body:,{body}')  
                _poi = body.decode('utf-8')  # 将 bytes 转换为字符串 
                
                current_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())

                datas = {"type":"poiid","time":current_time,"dat":_poi}
                message = json.dumps(datas)
                start_client(message)                                                 
                # 业务应用处理函数()      
                # 如果你设置了auto_ack=False,则需要手动确认消息  
                channel.basic_ack(delivery_tag=method_frame.delivery_tag)  
            else:  
                logger.info("没有消息可以获取,")  

            current_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
            message = json.dumps({"type": "heartbeat", "time": current_time})            
            start_client(message)
            time.sleep(10)
                  
    except pika.exceptions.AMQPError as e:
        logger.error(f"Error RabbitMQ in process: {e}")
        exit(1)    

监控应用、工具进程,在此略过,相关技术参见《Python监控服务进程及自启动服务方法与实践》。

3. 遇到的问题

3.1. 阻塞模式

在阻塞模式下,当调用某些socket API(如send、recv等)时,如果操作不能立即完成,调用线程会被挂起,直到操作完成或超时

在Python的socket编程中,settimeout()方法用于设置套接字操作的超时时间。当调用这个方法后,如果在指定的时间内没有完成相应的网络操作(如连接、发送或接收数据),程序将抛出一个socket.timeout异常。

  • 阻塞模式:默认情况下,套接字是阻塞模式的,这意味着如果进行的操作(如accept()recv()等)不能立即完成,程序会一直等待直到操作完成。

  • 非阻塞模式:通过设置超时时间,可以将套接字设置为非阻塞模式。在这种模式下,如果操作不能在指定时间内完成,程序会立即返回并抛出一个socket.timeout异常。

3.2. 数据传递编码与解码

在Python的socket编程中,传递字典类型数据时,通常需要将字典序列化为字符串或字节流进行传输。这是因为socket通信只接收bytes类型数据,而实际传过去的可能是str类型或其他非bytes类型。其中如下是关于字典类型数据的编码与解码的详细解析:

  • 编码:将字典转换为JSON字符串,然后将其编码为字节流进行发送。
import json
import socket

...
current_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
message = json.dumps({"type": "heartbeat", "time": current_time}) 
...
# 发送数据
client_socket.sendall(message.encode())

  • 解码:接收到字节流后,先将其解码为字符串,再从字符串中解析出字典。
# 接受一个新的连接
client_socket, client_address = server_socket.accept()
logger.info(f"Connected by {client_address}")

with client_socket:
    while True:
        # 接收数据
        data = client_socket.recv(1024)
        if not data:
            sleep(1)
            break
        message = data.decode('utf-8')
        print(f"Received from client: {message}")

其中,编码使用data.encode,解码使用data.decode

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/943906.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C# GDI+数码管数字控件

调用方法 int zhi 15;private void button1_Click(object sender, EventArgs e){if (zhi > 19){zhi 0;}lcdDisplayControl1.DisplayText zhi.ToString();} 运行效果 控件代码 using System; using System.Collections.Generic; using System.Drawing.Drawing2D; using …

药片缺陷检测数据集,8625张图片,使用YOLO,PASICAL VOC XML,COCO JSON格式标注,可识别药品是否有缺陷,是否完整

药片缺陷检测数据集,8625张图片,使用YOLO,PASICAL VOC XML,COCO JSON格式标注,可识别药品是否有缺陷,是否完整 有缺陷的标注信息: 无缺陷的标注信息 数据集下载: yolov11:https://d…

jenkins集成工具(一)部署php项目

目录 什么是CI 、CD Jenkins集成工具 一、Jenkins介绍 二、jenkins的安装和部署 环境部署 安装jenkins 安装gitlab 配置镜像源进行安装 修改密码 安装git工具 上传测试代码 Jenkins部署php项目wordpress 发布php代码 安装插件 测试代码发布 实现发布成功发送邮件…

Web开发:ORM框架之使用Freesql的分表分页写法

一、自动分表(高版本可用) 特性写法 //假如是按月分表:[Table(Name "log_{yyyyMM}", AsTable "createtime2022-1-1(1 month)")]注意:①需包含log_202201这张表 ②递增规律是一个月一次,确保他们…

【UE5.3.2】生成vs工程并rider打开

Rider是跨平台的,UE也是,当前现在windows上测试首先安装ue5.3.2 会自动有右键的菜单: windows上,右键,生成vs工程 生成的结果 sln默认是vs打开的,我的是vs2022,可以open with 选择 rider :Rider 会弹出 RiderLink是什么插

FFmpeg在python里推流被处理过的视频流

链式算法处理视频流 视频源是本地摄像头 # codinggbk # 本地摄像头直接推流到 RTMP 服务器 import cv2 import mediapipe as mp import subprocess as sp# 初始化 Mediapipe mp_drawing mp.solutions.drawing_utils mp_drawing_styles mp.solutions.drawing_styles mp_holis…

【工具】—— SpringBoot3.x整合swagger

Swagger 是一个规范和完整的框架,用于生成、描述、调用和可视化 RESTful 风格的 Web 服务的接口文档。Swagger简单说就是可以帮助生成接口说明文档,操作比较简单添加注解说明,可以自动生成格式化的文档。 项目环境 jdk17SpringBoot 3.4.0Sp…

Docker 部署 plumelog 最新版本 实现日志采集

1.配置plumelog.yml version: 3 services:plumelog:#此镜像是基于plumelog-3.5.3版本image: registry.cn-hangzhou.aliyuncs.com/k8s-xiyan/plumelog:3.5.3container_name: plumelogports:- "8891:8891"environment:plumelog.model: redisplumelog.queue.redis.redi…

图像处理-Ch5-图像复原与重建

Ch5 图像复原 文章目录 Ch5 图像复原图像退化与复原(Image Degradation and Restoration)噪声模型(Noise Models)i.i.d.空间随机噪声(Generating Spatial Random Noise with a Specified Distribution)周期噪声(Periodic Noise)估计噪声参数(Estimating Noise Parameters) 在仅…

在vscode的ESP-IDF中使用自定义组件

以hello-world为例,演示步骤和注意事项 1、新建ESP-IDF项目 选择模板 从hello-world模板创建 2、打开项目 3、编译结果没错 正在执行任务: /home/azhu/.espressif/python_env/idf5.1_py3.10_env/bin/python /home/azhu/esp/v5.1/esp-idf/tools/idf_size.py /home…

WordPress网站中如何修复504错误

504网关超时错误是非常常见的一种网站错误。这种错误发生在上游服务器未能在规定时间内完成请求的情况下,对访问者而言,出现504错误无疑会对访问体验大打折扣,从而对网站的转化率和收入造成负面影响。 504错误通常源于服务器端或网站本身的问…

C++——运算符重载

一、运算符重载 ①含义 函数重载或函数多态:同名函数完成相同的基本操作 C将重载的概念扩展到运算符上,于是出现了运算符重载 C中有很多运算符已经被重载 *运算符,运用于地址,可以得到存储在这个地址的值;运用于两个…

抖去推碰一碰系统技术源码/open SDK转发技术开发

抖去推碰一碰系统技术源码/open SDK转发技术开发 碰一碰智能系统#碰碰卡系统#碰一碰系统#碰一碰系统技术源头开发 碰碰卡智能营销系统开发是一种集成了人工智能和NFC技术的工具,碰碰卡智能营销系统通过整合数据分析、客户关系管理、自动化营销活动、多渠道整合和个…

【Unity3D】ECS入门学习(六)状态组件 ISystemStateComponentData

当需要获知组件是否被销毁时,ECS是没有回调告知的,因此可以将组件继承于ISystemStateComponentData接口,这样即使组件的实体被销毁了,该组件本身是不会消失的,所以可以通过在组件实体销毁后,去设置状态组件…

期权懂|如何计算期权卖方平仓后的盈利?

锦鲤三三每日分享期权知识,帮助期权新手及时有效地掌握即市趋势与新资讯! 如何计算期权卖方平仓后的盈利? 期权卖方平仓后的盈利计算涉及多个因素,包括期权的交易价格、平仓价格以及权利金的变动等。 交易价格:期权卖…

ARM64 Windows 10 IoT工控主板运行x86程序效率测试

ARM上的 Windows 10 IoT 企业版支持仿真 x86 应用程序,而 ARM上的 Windows 11 IoT 企业版则支持仿真 x86 和 x64 应用程序。英创推出的名片尺寸ARM64工控主板ESM8400,可预装正版Windows 10 IoT企业版操作系统,x86程序可无需修改而直接在ESM84…

【Ubuntu 20.4安装截图软件 flameshot 】

步骤一: 安装命令: sudo apt-get install flameshot 步骤二: 设置快捷方式: Ubuntu20.4 设置菜单,点击 号 步骤三: 输入软件名称, 软件快捷命令(flameshot gui)&am…

NAT 技术如何解决 IP 地址短缺问题?

NAT 技术如何解决 IP 地址短缺问题? 前言 这是我在这个网站整理的笔记,有错误的地方请指出,关注我,接下来还会持续更新。 作者:神的孩子都在歌唱 随着互联网的普及和发展,IP 地址的需求量迅速增加。尤其是 IPv4 地址&…

算法题(17):删除有序数组中的重复项

审题: 需要我们原地删除数组中的重复数据,并输出有效数据个数 思路: 方法一:原地解法(双指针) 设置left指针指向当前的非重复数据,right负责遍历数组,遇到和left指向的数据不同的数据…

LaTeXChecker:使用 Python 实现以主 TEX 文件作为输入的 LaTeX 检查和统计工具

使用 Python 实现以主 TEX 文件作为输入的 LaTeX 检查和统计工具,适用于包括但不限于一稿多模板的复杂排版方式,工具以只读模式运行。 Github 链接:https://github.com/BatchClayderman/LaTeXChecker import os from sys import argv, exec…