在实现ia业务服务器时需要构建一个python-socket客户端,1、要求能与服务器保持心跳连接,每10秒钟发送一次心跳信号;2、要求能根据socket服务器发送的指令创建或终止一个定时任务。
为此以3个类实现该功能,分别为socket通信类(用于实现通信连接与任务创建)、任务池类(用于管理任务)、任务类(用于实现具体任务)。
1、socket通信客户端
这里定义的MySocket类主体结构如下图所示,共包含4个函数,2个线程(其本身继承Thread类实现主任务流程——run函数、接收服务器信息并创建任务添加到任务池
;同时又在__init__函数中将self.thread_msg类封装为一个线程,每隔10秒钟向socket服务器发送一次心跳包
)。check_connection函数用于检测socket是否与服务器断开连接,在send_msg函数中调用,当发现客户端掉线后则立刻进行重连。send_msg函数用于发送信息给服务器,因为run函数与thread_msg函数2个线程都需要调用连接与服务器发送数据,为避免冲突故而定义为函数在内部进行加锁。
#socket客户端
class MySocket(Thread):
def __init__(self,config):
super().__init__()
# 1.创建套接字
self.tcp_socket = socket(AF_INET,SOCK_STREAM)
self.tcp_socket.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) #在客户端开启心跳维护
# 2.准备连接服务器,建立连接
self.serve_ip = config["serve_ip"]#当前"118.24.111,149"
self.serve_port = config["serve_port"] #端口当前7900
self.sleep_time = config["sleep_time"]
print("connect to : ",self.serve_ip,self.serve_port)
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 连接服务器,建立连接,参数是元组形式
self.lock = threading.RLock()
self.taskpool=TaskPool()
task_msg=threading.Thread(target=self.thread_msg)
task_msg.daemon = True
task_msg.start()
#定时发送信息
def run(self):
while True:
a=self.tcp_socket.recv(1024)#接受服务端的信息,最大数据为1k
a=a.decode('utf-8')
print("------主线程-----",a)
jdata=json.loads(a)
#jdata={"streamAddr":"rtmp://adasdasdxcvsdfj.sdfdsfsd","state":1,"count":5,"taskname":"aaa","jsonname":"a.json"}
task=OCRTask(jdata)
self.taskpool.append(task)
json_data={
"type":"OCR_STATE_ACK",
"timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致
"streamAddr": jdata["streamAddr"]
}
#print( json_data)
message = json.dumps(json_data)
data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
data=hex_to_bytes(data)
self.send_msg(data)
def check_connection(self):
try:
self.tcp_socket.getpeername()
return True
except socket.error:
return False
#定时发送心跳信息
def thread_msg(self):
while True:
#message=input('You can say:')
#json标注的模板
json_data={
"timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致
"type":"HEARBEAT"
}
#print( json_data)
message = json.dumps(json_data)
data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
data=hex_to_bytes(data)
#进行定时发送
self.send_msg(data)
# self.lock.acquire()
# self.tcp_socket.send(data)#将发送的数据进行编码
# self.lock.release()
try:
#进行定时发送
self.lock.acquire()
a=self.tcp_socket.recv(1024)#接受服务端的信息,最大数据为1k
self.lock.release()
time.sleep(self.sleep_time)
print("ack: ",a.decode('utf-8'))
except ConnectionRefusedError:
print('服务器拒绝本次连接!!!!!')
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 连接服务器,建立连接,参数是元组形式
except TimeoutError:
print('连接超时!!!!!')
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 连接服务器,建立连接,参数是元组形式
except OSError:
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 连接服务器,建立连接,参数是元组形式
print('智能终端无网络连接!!!!!')
def send_msg(self,msg):
if self.check_connection() is False:
print('服务器掉线!!!!!')
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 连接服务器,建立连接,参数是元组形式
try:
#进行定时发送
self.lock.acquire()
self.tcp_socket.send(msg)
self.lock.release()
except ConnectionRefusedError:
print('服务器拒绝本次连接!!!!!')
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 连接服务器,建立连接,参数是元组形式
except TimeoutError:
print('连接超时!!!!!')
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 连接服务器,建立连接,参数是元组形式
except OSError:
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 连接服务器,建立连接,参数是元组形式
print('智能终端无网络连接!!!!!')
2、任务池实现
任务池的实现代码如下所示,主要包含3个函数(其中将remove_task封装为一个子线程,用于实时移除已经完成计算任务的线程),append函数用于将新创建的任务添加大任务池pool中,stop函数用于停止并移除正在运行中的任务。
其具体实现代码如下所示,其作为MySocket类中的一个成员属性,每当MySocket接收到服务器信息创建任务ocrtask后都调用TaskPool.append(ocrtask)将任务添加到任务池中。由任务池管理任务的声明周期,具体可见其append函数可以启动task或终止task。remove_task线程会自动将已经完成的任务移除。
#ocr任务线程池
class TaskPool:
def __init__(self,sleep_time=0.5):
self.pool=[]
self.sleep_time=sleep_time
task_msg=threading.Thread(target=self.remove_task)
task_msg.daemon = True
task_msg.start()
#删除已经结束的任务
def remove_task(self):
while True:
names=[]
for task in self.pool:
if task.get_count()==0: #生存时间为0,认为该任务已经完成需要被删除
task.stop()
self.pool.remove(task)
else:
names.append(task.taskname)
if len(names)>0:
print(names)
time.sleep(self.sleep_time)
def append(self,ocrtask):
if ocrtask.state==0:
#终止任务
self.stop(ocrtask)
else:
#启动任务
ocrtask.start()
self.pool.append(ocrtask)
#终止任务
def stop(self,ocrtask):
for task in self.pool:
if task.taskname==ocrtask.taskname:
task.stop()
self.pool.remove(task)
3、具体任务线程
任务的实现代码如下所示,其支持3中任务模式,使用state区分任务,state为0-停止识别,1-连续识别count张,2-持续识别(故而在state为2时将count设置的特别大)。这里以count控制任务的运行,任务每运行一次count减少1。当count小于等于0,则表示任务运行完成。在TaskPool的remove_task中检测到count为0时则会自动删除任务。
#ocr任务
class OCRTask(Thread):
def __init__(self,json):
super().__init__()
self.streamAddr=json["streamAddr"]
self.state=json["state"] # 0-停止识别,1-连续识别count张,2-持续识别
if json["state"]==2:
self.count=9999999999999999999999999
else:
self.count=json["count"]
if "taskname" in json.keys():
self.taskname=json["taskname"]
else:
self.taskname=json["streamAddr"]
self.jsonname=json["jsonname"]
self.lock = threading.RLock()
def run(self):
while self.get_count()>0:
print('run %s'%self.taskname,end='*')
time.sleep(2)
self.lock.acquire()
self.count-=1
self.lock.release()
print('%s finish!! '%self.taskname)
#获取任务的生存时间
def get_count(self):
self.lock.acquire()
now_count=self.count
self.lock.release()
#削减count
return now_count
#停止任务
def stop(self):
self.lock.acquire()
self.count=-1
self.lock.release()
#停止任务
pass
4、完整代码与使用效果
完整代码如下所示
from socket import *
import time,json
import yaml
import threading,struct
from threading import Thread
def hex_to_bytes(hex_str):
"""
:param hex_str: 16进制字符串
:return: byte_data 字节流数据
"""
bytes_data = bytes()
while hex_str :
"""16进制字符串转换为字节流"""
temp = hex_str[0:2]
s = int(temp, 16)
bytes_data += struct.pack('B', s)
hex_str = hex_str[2:]
return bytes_data
# 读取Yaml文件方法
def read_yaml(yaml_path):
with open(yaml_path, encoding="utf-8", mode="r") as f:
result = yaml.load(stream=f,Loader=yaml.FullLoader)
return result
#ocr任务
class OCRTask(Thread):
def __init__(self,json):
super().__init__()
self.streamAddr=json["streamAddr"]
self.state=json["state"] # 0-停止识别,1-连续识别count张,2-持续识别
if json["state"]==2:
self.count=9999999999999999999999999
else:
self.count=json["count"]
if "taskname" in json.keys():
self.taskname=json["taskname"]
else:
self.taskname=json["streamAddr"]
self.jsonname=json["jsonname"]
self.lock = threading.RLock()
def run(self):
while self.get_count()>0:
print('run %s'%self.taskname,end='*')
time.sleep(2)
self.lock.acquire()
self.count-=1
self.lock.release()
print('%s finish!! '%self.taskname)
#获取任务的生存时间
def get_count(self):
self.lock.acquire()
now_count=self.count
self.lock.release()
#削减count
return now_count
#停止任务
def stop(self):
self.lock.acquire()
self.count=-1
self.lock.release()
#停止任务
pass
#ocr任务线程池
class TaskPool:
def __init__(self,sleep_time=0.5):
self.pool=[]
self.sleep_time=sleep_time
task_msg=threading.Thread(target=self.remove_task)
task_msg.daemon = True
task_msg.start()
#删除已经结束的任务
def remove_task(self):
while True:
names=[]
for task in self.pool:
if task.get_count()==0:
task.stop()
self.pool.remove(task)
else:
names.append(task.taskname)
if len(names)>0:
print(names)
time.sleep(self.sleep_time)
def append(self,ocrtask):
if ocrtask.state==0:
#终止任务
self.stop(ocrtask)
else:
#启动任务
ocrtask.start()
self.pool.append(ocrtask)
#终止任务
def stop(self,ocrtask):
for task in self.pool:
if task.taskname==ocrtask.taskname:
task.stop()
self.pool.remove(task)
#socket客户端
class MySocket(Thread):
def __init__(self,config):
super().__init__()
# 1.创建套接字
self.tcp_socket = socket(AF_INET,SOCK_STREAM)
self.tcp_socket.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) #在客户端开启心跳维护
# 2.准备连接服务器,建立连接
self.serve_ip = config["serve_ip"]#当前"118.24.111,149"
self.serve_port = config["serve_port"] #端口当前7900
self.sleep_time = config["sleep_time"]
print("connect to : ",self.serve_ip,self.serve_port)
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 连接服务器,建立连接,参数是元组形式
self.lock = threading.RLock()
self.taskpool=TaskPool()
task_msg=threading.Thread(target=self.thread_msg)
task_msg.daemon = True
task_msg.start()
#定时发送信息
#通信线程-用于接收服务器的指令
def run(self):
while True:
a=self.tcp_socket.recv(1024)#接受服务端的信息,最大数据为1k
a=a.decode('utf-8')
print("------主线程-----",a)
jdata=json.loads(a)
#jdata={"streamAddr":"rtmp://adasdasdxcvsdfj.sdfdsfsd","state":1,"count":5,"taskname":"aaa","jsonname":"a.json"}
task=OCRTask(jdata)
self.taskpool.append(task)
json_data={
"type":"OCR_STATE_ACK",
"timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致
"streamAddr": jdata["streamAddr"]
}
#print( json_data)
message = json.dumps(json_data)
data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
data=hex_to_bytes(data)
self.send_msg(data)
#检测socket连接是否断开
def check_connection(self):
try:
self.tcp_socket.getpeername()
return True
except socket.error:
return False
#定时发送心跳信息--子线程
def thread_msg(self):
while True:
#message=input('You can say:')
#json标注的模板
json_data={
"timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致
"type":"HEARBEAT"
}
#print( json_data)
message = json.dumps(json_data)
data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
data=hex_to_bytes(data)
#进行定时发送
self.send_msg(data)
# self.lock.acquire()
# self.tcp_socket.send(data)#将发送的数据进行编码
# self.lock.release()
try:
#进行定时发送
self.lock.acquire()
a=self.tcp_socket.recv(1024)#接受服务端的信息,最大数据为1k
self.lock.release()
time.sleep(self.sleep_time)
print("ack: ",a.decode('utf-8'))
except ConnectionRefusedError:
print('服务器拒绝本次连接!!!!!')
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 连接服务器,建立连接,参数是元组形式
except TimeoutError:
print('连接超时!!!!!')
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 连接服务器,建立连接,参数是元组形式
except OSError:
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 连接服务器,建立连接,参数是元组形式
print('智能终端无网络连接!!!!!')
#发送信息
def send_msg(self,msg):
if self.check_connection() is False:
print('服务器掉线!!!!!')
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 连接服务器,建立连接,参数是元组形式
try:
#进行定时发送
self.lock.acquire()
self.tcp_socket.send(msg)
self.lock.release()
except ConnectionRefusedError:
print('服务器拒绝本次连接!!!!!')
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 连接服务器,建立连接,参数是元组形式
except TimeoutError:
print('连接超时!!!!!')
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 连接服务器,建立连接,参数是元组形式
except OSError:
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 连接服务器,建立连接,参数是元组形式
print('智能终端无网络连接!!!!!')
if "__main__"==__name__:
#进行定时通信测试
config=read_yaml("config.yaml")
socket_client=MySocket(config)
socket_client.start()
使用效果如下所示,这里基于socket调试工具作为客户端