基于python-socket构建任务服务器(基于socket发送指令创建、停止任务)

在实现ia业务服务器时需要构建一个python-socket客户端,1、要求能与服务器保持心跳连接,每10秒钟发送一次心跳信号;2、要求能根据socket服务器发送的指令创建或终止一个定时任务。
为此以3个类实现该功能,分别为socket通信类(用于实现通信连接与任务创建)、任务池类(用于管理任务)、任务类(用于实现具体任务)。

1、socket通信客户端

这里定义的MySocket类主体结构如下图所示,共包含4个函数,2个线程(其本身继承Thread类实现主任务流程——run函数、接收服务器信息并创建任务添加到任务池;同时又在__init__函数中将self.thread_msg类封装为一个线程,每隔10秒钟向socket服务器发送一次心跳包)。check_connection函数用于检测socket是否与服务器断开连接,在send_msg函数中调用,当发现客户端掉线后则立刻进行重连。send_msg函数用于发送信息给服务器,因为run函数与thread_msg函数2个线程都需要调用连接与服务器发送数据,为避免冲突故而定义为函数在内部进行加锁。
在这里插入图片描述

#socket客户端
class MySocket(Thread):
    def __init__(self,config):
        super().__init__()
        # 1.创建套接字
        self.tcp_socket = socket(AF_INET,SOCK_STREAM)
        self.tcp_socket.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) #在客户端开启心跳维护
        # 2.准备连接服务器,建立连接
        self.serve_ip = config["serve_ip"]#当前"118.24.111,149"
        self.serve_port = config["serve_port"]  #端口当前7900
        self.sleep_time = config["sleep_time"]
        print("connect to : ",self.serve_ip,self.serve_port)
        self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
        self.lock = threading.RLock()
        
        self.taskpool=TaskPool()

        task_msg=threading.Thread(target=self.thread_msg)
        task_msg.daemon = True
        task_msg.start()
            #定时发送信息
    def run(self):
        while True:
            a=self.tcp_socket.recv(1024)#接受服务端的信息,最大数据为1k
            a=a.decode('utf-8')
            print("------主线程-----",a)
            jdata=json.loads(a)
            #jdata={"streamAddr":"rtmp://adasdasdxcvsdfj.sdfdsfsd","state":1,"count":5,"taskname":"aaa","jsonname":"a.json"}
            task=OCRTask(jdata)
            self.taskpool.append(task)
            
            json_data={  
                "type":"OCR_STATE_ACK",
                "timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致
                "streamAddr": jdata["streamAddr"]
            }
            #print( json_data)
            message = json.dumps(json_data)
            data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
            data=hex_to_bytes(data)
            self.send_msg(data)

    def check_connection(self):
        try:
            self.tcp_socket.getpeername()
            return True
        except socket.error:
            return False
    
    #定时发送心跳信息
    def thread_msg(self):
        while True:
            #message=input('You can say:')
            #json标注的模板
            json_data={  
                "timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致
                "type":"HEARBEAT"
            }
            #print( json_data)
            message = json.dumps(json_data)
            data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
            data=hex_to_bytes(data)

            #进行定时发送
            self.send_msg(data)
            # self.lock.acquire()
            # self.tcp_socket.send(data)#将发送的数据进行编码
            # self.lock.release()
            try:
                #进行定时发送
                self.lock.acquire()
                a=self.tcp_socket.recv(1024)#接受服务端的信息,最大数据为1k
                self.lock.release()
                time.sleep(self.sleep_time)
                print("ack: ",a.decode('utf-8'))
            except ConnectionRefusedError:
                print('服务器拒绝本次连接!!!!!')
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
            except TimeoutError:
                print('连接超时!!!!!')
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
            except OSError:
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
                print('智能终端无网络连接!!!!!')

    def send_msg(self,msg):
        if self.check_connection() is False:
            print('服务器掉线!!!!!')
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
        try:
            #进行定时发送
            self.lock.acquire()
            self.tcp_socket.send(msg)
            self.lock.release()
        except ConnectionRefusedError:
            print('服务器拒绝本次连接!!!!!')
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
        except TimeoutError:
            print('连接超时!!!!!')
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
        except OSError:
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
            print('智能终端无网络连接!!!!!')

2、任务池实现

任务池的实现代码如下所示,主要包含3个函数(其中将remove_task封装为一个子线程,用于实时移除已经完成计算任务的线程),append函数用于将新创建的任务添加大任务池pool中,stop函数用于停止并移除正在运行中的任务。
在这里插入图片描述
其具体实现代码如下所示,其作为MySocket类中的一个成员属性,每当MySocket接收到服务器信息创建任务ocrtask后都调用TaskPool.append(ocrtask)将任务添加到任务池中。由任务池管理任务的声明周期,具体可见其append函数可以启动task或终止task。remove_task线程会自动将已经完成的任务移除。

#ocr任务线程池
class TaskPool:
    def __init__(self,sleep_time=0.5):
        self.pool=[]
        self.sleep_time=sleep_time
        task_msg=threading.Thread(target=self.remove_task)
        task_msg.daemon = True
        task_msg.start()

    #删除已经结束的任务
    def remove_task(self):
        while True:
            names=[]
            for task in self.pool:
                if task.get_count()==0: #生存时间为0,认为该任务已经完成需要被删除
                    task.stop()
                    self.pool.remove(task)
                else:
                    names.append(task.taskname)
            if len(names)>0:
                print(names)
            time.sleep(self.sleep_time)
            
    def append(self,ocrtask):
        if ocrtask.state==0:
            #终止任务
            self.stop(ocrtask)
        else:
            #启动任务
            ocrtask.start()
            self.pool.append(ocrtask)

    #终止任务
    def stop(self,ocrtask):
        for task in self.pool:
            if task.taskname==ocrtask.taskname:
                task.stop()
                self.pool.remove(task)

3、具体任务线程

任务的实现代码如下所示,其支持3中任务模式,使用state区分任务,state为0-停止识别,1-连续识别count张,2-持续识别(故而在state为2时将count设置的特别大)。这里以count控制任务的运行,任务每运行一次count减少1。当count小于等于0,则表示任务运行完成。在TaskPool的remove_task中检测到count为0时则会自动删除任务。

#ocr任务
class OCRTask(Thread):
    def __init__(self,json):
        super().__init__()
        self.streamAddr=json["streamAddr"]
        self.state=json["state"] # 0-停止识别,1-连续识别count张,2-持续识别
        if json["state"]==2:
            self.count=9999999999999999999999999
        else:
            self.count=json["count"]
        if "taskname" in json.keys():
            self.taskname=json["taskname"]
        else:
            self.taskname=json["streamAddr"]

        self.jsonname=json["jsonname"]
        self.lock = threading.RLock()

    def run(self):
        while self.get_count()>0:
            print('run %s'%self.taskname,end='*')
            time.sleep(2)
            self.lock.acquire()
            self.count-=1
            self.lock.release()
        print('%s finish!! '%self.taskname)

    #获取任务的生存时间
    def get_count(self):
        self.lock.acquire()
        now_count=self.count
        self.lock.release()
        #削减count
        return now_count

    #停止任务
    def stop(self):
        self.lock.acquire()
        self.count=-1
        self.lock.release()
        #停止任务
        pass

4、完整代码与使用效果

完整代码如下所示

from socket import *
import time,json
import yaml
import threading,struct
from threading import Thread
 
def hex_to_bytes(hex_str):
    """
    :param hex_str: 16进制字符串
    :return: byte_data 字节流数据
    """
    bytes_data = bytes()
    while hex_str :
        """16进制字符串转换为字节流"""
        temp = hex_str[0:2]
        s = int(temp, 16)
        bytes_data += struct.pack('B', s)
        hex_str = hex_str[2:]
    return bytes_data

# 读取Yaml文件方法
def read_yaml(yaml_path):
    with open(yaml_path, encoding="utf-8", mode="r") as f:
        result = yaml.load(stream=f,Loader=yaml.FullLoader)
        return result

#ocr任务
class OCRTask(Thread):
    def __init__(self,json):
        super().__init__()
        self.streamAddr=json["streamAddr"]
        self.state=json["state"] # 0-停止识别,1-连续识别count张,2-持续识别
        if json["state"]==2:
            self.count=9999999999999999999999999
        else:
            self.count=json["count"]
        if "taskname" in json.keys():
            self.taskname=json["taskname"]
        else:
            self.taskname=json["streamAddr"]

        self.jsonname=json["jsonname"]
        self.lock = threading.RLock()

    def run(self):
        while self.get_count()>0:
            print('run %s'%self.taskname,end='*')
            time.sleep(2)
            self.lock.acquire()
            self.count-=1
            self.lock.release()
        print('%s finish!! '%self.taskname)

    #获取任务的生存时间
    def get_count(self):
        self.lock.acquire()
        now_count=self.count
        self.lock.release()
        #削减count
        return now_count

    #停止任务
    def stop(self):
        self.lock.acquire()
        self.count=-1
        self.lock.release()
        #停止任务
        pass

#ocr任务线程池
class TaskPool:
    def __init__(self,sleep_time=0.5):
        self.pool=[]
        self.sleep_time=sleep_time
        task_msg=threading.Thread(target=self.remove_task)
        task_msg.daemon = True
        task_msg.start()

    #删除已经结束的任务
    def remove_task(self):
        while True:
            names=[]
            for task in self.pool:
                if task.get_count()==0:
                    task.stop()
                    self.pool.remove(task)
                else:
                    names.append(task.taskname)
            if len(names)>0:
                print(names)
            time.sleep(self.sleep_time)
            
    def append(self,ocrtask):
        if ocrtask.state==0:
            #终止任务
            self.stop(ocrtask)
        else:
            #启动任务
            ocrtask.start()
            self.pool.append(ocrtask)

    #终止任务
    def stop(self,ocrtask):
        for task in self.pool:
            if task.taskname==ocrtask.taskname:
                task.stop()
                self.pool.remove(task)

#socket客户端
class MySocket(Thread):
    def __init__(self,config):
        super().__init__()
        # 1.创建套接字
        self.tcp_socket = socket(AF_INET,SOCK_STREAM)
        self.tcp_socket.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) #在客户端开启心跳维护
        # 2.准备连接服务器,建立连接
        self.serve_ip = config["serve_ip"]#当前"118.24.111,149"
        self.serve_port = config["serve_port"]  #端口当前7900
        self.sleep_time = config["sleep_time"]
        print("connect to : ",self.serve_ip,self.serve_port)
        self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
        self.lock = threading.RLock()
        
        self.taskpool=TaskPool()

        task_msg=threading.Thread(target=self.thread_msg)
        task_msg.daemon = True
        task_msg.start()
            #定时发送信息
    
    #通信线程-用于接收服务器的指令
    def run(self):
        while True:
            a=self.tcp_socket.recv(1024)#接受服务端的信息,最大数据为1k
            a=a.decode('utf-8')
            print("------主线程-----",a)
            jdata=json.loads(a)
            #jdata={"streamAddr":"rtmp://adasdasdxcvsdfj.sdfdsfsd","state":1,"count":5,"taskname":"aaa","jsonname":"a.json"}
            task=OCRTask(jdata)
            self.taskpool.append(task)
            
            json_data={  
                "type":"OCR_STATE_ACK",
                "timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致
                "streamAddr": jdata["streamAddr"]
            }
            #print( json_data)
            message = json.dumps(json_data)
            data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
            data=hex_to_bytes(data)
            self.send_msg(data)

    #检测socket连接是否断开
    def check_connection(self):
        try:
            self.tcp_socket.getpeername()
            return True
        except socket.error:
            return False
    
    #定时发送心跳信息--子线程
    def thread_msg(self):
        while True:
            #message=input('You can say:')
            #json标注的模板
            json_data={  
                "timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致
                "type":"HEARBEAT"
            }
            #print( json_data)
            message = json.dumps(json_data)
            data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
            data=hex_to_bytes(data)

            #进行定时发送
            self.send_msg(data)
            # self.lock.acquire()
            # self.tcp_socket.send(data)#将发送的数据进行编码
            # self.lock.release()
            try:
                #进行定时发送
                self.lock.acquire()
                a=self.tcp_socket.recv(1024)#接受服务端的信息,最大数据为1k
                self.lock.release()
                time.sleep(self.sleep_time)
                print("ack: ",a.decode('utf-8'))
            except ConnectionRefusedError:
                print('服务器拒绝本次连接!!!!!')
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
            except TimeoutError:
                print('连接超时!!!!!')
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
            except OSError:
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
                print('智能终端无网络连接!!!!!')

    #发送信息
    def send_msg(self,msg):
        if self.check_connection() is False:
            print('服务器掉线!!!!!')
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
        try:
            #进行定时发送
            self.lock.acquire()
            self.tcp_socket.send(msg)
            self.lock.release()
        except ConnectionRefusedError:
            print('服务器拒绝本次连接!!!!!')
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
        except TimeoutError:
            print('连接超时!!!!!')
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
        except OSError:
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
            print('智能终端无网络连接!!!!!')

if "__main__"==__name__:
    #进行定时通信测试
    config=read_yaml("config.yaml")
    socket_client=MySocket(config)
    socket_client.start()

使用效果如下所示,这里基于socket调试工具作为客户端

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/400619.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ONLYOFFICE编辑器升级大揭秘:v8.0版新特性实测与评价

ONLYOFFICE编辑器升级大揭秘:v8.0版新特性实测与评价 一个人简介二前言三操作步骤创建房间我的文档 四开发人员工具应用程序接口Javascript开发工具包插件SDK网络钩子 五测评总结六体验地址 一个人简介 🏘️🏘️个人主页:以山河作…

阿里云OSS对象存储使用教程

一.OSS介绍 阿里云对象存储 OSS(Object Storage Service)是一款海量、安全、低成本、高可靠的云存储服务,提供最高可达 99.995 % 的服务可用性。多种存储类型供选择,全面优化存储成本。 官网地址:https://www.aliyun.com/product/…

【Unity】双击C#脚本文件以单个文件打开(Visual Studio)、父类找不到、引用找不到、无法跳转等问题

问题:新安装一个Unity后,突然发现在工程里双击C#脚本,会一个一个打开,虽然也是用VS软件打开了,但是它无法被正确识别为Unity工程的C#脚本,也就是说所有命名空间无效,因为没关联上整个工程的解决…

STM32使用软件SPI协议操作TFT18彩屏

时间记录:2024/2/20 一、SPI协议介绍 (1)SPI设备通过4根线进行通信,CS片选线,选择从设备,SCK时钟线,由主设备产生时钟,主机MOSI线连从机MISO线,由主机向从机发送信息&am…

单片机03--按键--寄存器版

GPIO端口相关寄存器(STM32F40x芯片) 目标: 开关KEY1控制开灯。 分析: KEY1---PA0--->输入---->浮空输入/下拉输入 KEY1不导通时,PA0输入为低电平,KEY1导通时,PA0输入为高电平。 实现…

vitis安装及遇到的问题

ubuntu不能上网安装不了 我开始遇到:win 和 ubuntu可以互ping, 但是无法上网 1. 试一下:这里改成禁用 disable 2. 试一下这个 ping 8.8.8.8和ping www.baidu.com都OK,但是打不开网页-CSDN博客 安装过程: 安装文件:F…

C++从入门到精通 第十四章(STL容器)【下】

写在前面: 本系列专栏主要介绍C的相关知识,思路以下面的参考链接教程为主,大部分笔记也出自该教程,笔者的原创部分主要在示例代码的注释部分。除了参考下面的链接教程以外,笔者还参考了其它的一些C教材(比…

大数据信用报告查询方式一般有几种?哪种比较好?

在了解这个问题之前,想必你对大数据信用与人行信用的区别都是比较清楚了,本文呢就着重讲一下大数据信用报告查询方式有几种,哪种比较好,感兴趣的朋友不妨一起去看看。 大数据信用报告常见的三种查询方式: 一、二维码分…

SSH连接密码问题:原因、表现与解决方案

SSH连接密码问题:原因、表现与解决方案 写在最前面1. 密码错误2. SSH服务配置问题3. 账户锁定或禁用4. 密钥认证问题5. SSH版本不兼容6. 服务器负载或连接数过多7. IP地址被限制 小结 写在最前面 SSH(Secure Shell)是一种网络协议&#xff0…

Flutter学习4 - Dart数据类型

1、基本数据类型 num、int、double (1)常用数据类型 num类型,是数字类型的父类型,有两个子类 int 和 double 通过在函数名前加下划线,可以将函数变成私有函数,私有函数只能在当前文件中调用 //常用数据…

C++ bfs 的状态表示(六十二)【第九篇】

今天我们来学习一下bfs的复杂状态表示 1.bfs状态表示 无论是深度优先搜索还是广度优先搜索,搜索的过程均会建立一棵 搜索树,搜索树上的每一个结点都是一个 状态,而搜索的过程又可以看作是 状态的转移。 对于 BFS,搜索过程中产生…

力扣 309. 买卖股票的最佳时机含冷冻期

题目来源:https://leetcode.cn/problems/best-time-to-buy-and-sell-stock-with-cooldown/description/ C题解:动态规划 状态1:表示持有股票。更新为之前持有股票(dp[i-1][0])或者不持有股票且不处于冷冻期后买入&…

【网络安全】漏洞挖掘入门教程(非常详细),小白是如何挖漏洞(技巧篇)从零基础入门到精通!

温馨提示: 初学者最好不要上手就去搞漏洞挖掘,因为漏洞挖掘需要很多的系统基础知识和一些理论知识做铺垫,而且难度较大…… 较合理的途径应该从漏洞利用入手,不妨分析一些公开的CVE漏洞。很多漏洞都有比较好的资料,分…

Windows Server 2012 安装

1.镜像安装 镜像安装:Windows Server 2012 2.安装过程(直接以图的形式呈现) 2012激活秘钥:J7TJK-NQPGQ-Q7VRH-G3B93-2WCQD

利用RBI(Remote Browser Isolation)技术访问ChatGPT

系统组网图 #mermaid-svg-Bza2puvd8MudMbqR {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-Bza2puvd8MudMbqR .error-icon{fill:#552222;}#mermaid-svg-Bza2puvd8MudMbqR .error-text{fill:#552222;stroke:#552222;…

VMware ESXi 8.0的安装、配置、使用

VMware ESXi 8.0的安装、配置、使用 ESXi的安装与配置下载镜像安装网络配置 Web控制台的管理操作激活开启直通网络配置修改电源模式创建虚拟机 其他ESXI秘钥克隆虚拟机 ESXi的安装与配置 下载镜像 官网:https://www.vmware.com/ 文档:https://docs.vm…

Java基础(二十六):Java8 Stream流及Optional类

Java基础系列文章 Java基础(一):语言概述 Java基础(二):原码、反码、补码及进制之间的运算 Java基础(三):数据类型与进制 Java基础(四):逻辑运算符和位运算符 Java基础(五):流程控制语句 Java基础(六)&#xff1…

外汇天眼:交易讲究时机,不要在这几个时间交易

每个交易者都想知道,什么时候是入场买卖的最好时机。 到底是1.1800入场呢? 还是等到1.1900? 但是,交易中不仅仅是关于从哪里入场,同样的,知道什么时候不去交易也是非常重要的。 这听起来像是一回事&#x…

私房菜|私房菜定制上门服务系统|基于springboot+vue私房菜定制上门服务系统设计与实现(源码+数据库+文档)

私房菜定制上门服务系统目录 目录 基于springbootvue私房菜定制上门服务系统设计与实现 一、前言 二、系统功能设计 三、系统实现 1、管理员功能实现 (1)菜品管理 (2)公告管理 (3) 厨师管理 2、用…

04 动力云客之登录后获取用户信息+JWT存进Redis+Filter验证Token + token续期

1. 登录后获取用户信息 非常好实现. 只要新建一个controller, 并调用SS提供的Authentication对象即可 package com.sunsplanter.controller;RestController public class UserController {GetMapping(value "api/login/info")public R loginInfo(Authentication a…