import json import socket import threading import logging thislist = [] thisneednum = {} class ChatUdpMain: def __init__(self): #其他原有逻辑 begin #其他原有逻辑 end # 1.创建socket套接字 收 self.udp_socket_receive = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # AF_INET表示使用ipv4,默认不变,SOCK_DGRAM表示使用UDP通信协议 # 2.绑定端口port 收 local_addr_receive = ("127.0.0.1", 45678) # 默认本机任何ip ,指定端口号45678,用于接收数据 self.udp_socket_receive.bind(local_addr_receive) # 绑定端口 # 1.创建socket套接字 发 self.udp_socket_send = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # AF_INET表示使用ipv4,默认不变,SOCK_DGRAM表示使用UDP通信协议 # 2.绑定端口port 发 local_addr_send = ("127.0.0.1", 45671) # 默认本机任何ip,初始化使用,实际45671没有用 self.udp_socket_send.bind(local_addr_send) # 绑定端口 self.start() self.thislist = thislist self.thisneednum = thisneednum def close(self): try: self.udp_socket_send.close() self.udp_socket_receive.close() except Exception as e: logging.info(e) def send(self, val: str): try: data = str.encode(str(val)) self.udp_socket_send.sendto(data, ("127.0.0.1", 45679)) # 默认本机任何ip ,45679是发数据的端口 except Exception as e: logging.info(e) # 开启线程 def start(self): # 启动新线程接收数据 threading.Thread(target=self._recv, name='recv').start() #接收qt发来的实时数据 def _recv(self): while 1: try: self.recevalue = self.udp_socket_receive.recv(5000) print("收到数据 ") print(self.recevalue) try: i = 1 for k, v in json.loads(self.recevalue).items(): if k in self.thislist: self.thisneednum[i] = v i += 1 print("数据解析成功") self.jisuan(self.thisneednum) except ValueError: print("ValueError") except Exception as e: logging.info(e) def jisuan(self, d: dict): for i in self.thisneednum: print(self.thisneednum[i]) #这里就是计算 #得到结果 # JSON 字符串 json_string = '{"Type": "PCA","name": "Kyrie", "age": 31}' # 解析 JSON 字符串 self.data = json.loads(json_string) self.data = json.dumps(self.data) # 给qt发结果 self.send(self.data) #self.close() def needcs(): with open("config-PCA.ini", "r") as f: str = f.read() strlist = str.split(',') for item in strlist: thislist.append(item) #主函数 def main(): needcs() chatUdp = ChatUdpMain() if __name__ == '__main__': main()