6.python网络编程

文章目录

    • 1.生产者消费者-生成器版
    • 2.生产者消费者--异步版本
    • 3.客户端/服务端-多线程版
    • 4.IO多路复用TCPServer模型
      • 4.1Select
      • 4.2Epoll
    • 5.异步IO多路复用TCPServer模型

1.生产者消费者-生成器版

import time


# 消费者
def consumer():
    cnt = yield
    while True:
        if cnt <= 0:
            # 暂停、让出CPU
            cnt = yield cnt
        cnt -= 1
        time.sleep(1)
        print('consumer consum 1 cnt. cnt =', cnt)


# 生产者 (调度器)
def producer(cnt):
    gen = consumer()
    # 激活生成器
    next(gen)
    gen.send(cnt)
    while True:
        cnt += 1
        print('producer producer 5 cnt. cnt =', cnt)
        # 调度消费者
        current = int(time.time())
        if current % 5 == 0:
            cnt = gen.send(cnt)
        else:
            time.sleep(1)


if __name__ == '__main__':
    producer(0)

2.生产者消费者–异步版本

import asyncio
import time
from queue import Queue
from threading import Thread


def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


async def do_sleep(x, queue):
    await asyncio.sleep(x)
    queue.put('ok')


def consumer(input_queue1, out_queue1):
    while True:
        task = input_queue1.get()
        if not task:
            time.sleep(1)
            continue
        asyncio.run_coroutine_threadsafe(do_sleep(int(task), out_queue1), new_loop)


if __name__ == '__main__':
    print(time.ctime())
    new_loop = asyncio.new_event_loop()
    loop_thread = Thread(target=start_loop, args=(new_loop,))
    loop_thread.daemon = True
    loop_thread.start()

    input_queue = Queue()
    input_queue.put(5)
    input_queue.put(3)
    input_queue.put(1)

    out_queue = Queue()

    consumer_thread = Thread(target=consumer, args=(input_queue, out_queue,))
    consumer_thread.daemon = True
    consumer_thread.start()

    while True:
        msg = out_queue.get()
        print("协程运行完...")
        print("当前时间:", time.ctime())

3.客户端/服务端-多线程版

客户端/服务模型

在这里插入图片描述

客户端

# -*- encoding=utf-8 -*-
# 客户端

import socket


client = socket.socket()
print('client.fileno:', client.fileno())

client.connect(('127.0.0.1', 8999))

while True:
    content = str(input('>>>'))
    client.send(content.encode())
    content = client.recv(1024)
    print('client recv content:', content)

服务端

import socket
import threading


def thread_process(s):
    while True:
        content = s.recv(1024)
        if len(content) == 0:
            break
        s.send(content.upper())
        print(str(content, encoding='utf-8'))  # 接受来自客户端的消息,并打印出来
        s.close()


server = socket.socket()  # 1. 新建socket
server.bind(('127.0.0.1', 8999))  # 2. 绑定IP和端口(其中127.0.0.1为本机回环IP)
server.listen(5)  # 3. 监听连接

while True:
    s, addr = server.accept()  # 4. 接受连接
    new_thread = threading.Thread(target=thread_process, args=(s,))
    print('new thread process connect addr:{}'.format(addr))
    new_thread.start()

注意:

  • AddressFamily=AF_INET:(用于 Internet 进程间通信)

  • AddressFamily=AF_UNIX(用于同一台机器进程间通信)

  • 现象:报错[WinError 10038],原因分析:socket 先 close 再调 recv 就会报错,解决办法:if not tcpCliSock._closed:

4.IO多路复用TCPServer模型

4.1Select

服务端

import select
import socket
from queue import Queue, Empty
from time import sleep

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.setblocking(False)
server_address = ("127.0.0.1", 8999)
print('starting up on %s port %s' % server_address)
server.bind(server_address)
server.listen(5)
inputs = [server]
outputs = []
message_queues = {}

while inputs:
    print('waiting for the next event')
    readable, writable, exceptional = select.select(inputs, outputs, inputs)
    for s in readable:
        if s is server:
            connection, client_address = s.accept()
            print(f"connection from {client_address}")
            connection.setblocking(False)
            inputs.append(connection)
            message_queues[connection] = Queue()
            continue

        data = s.recv(1024).decode()
        if data == "":
            print(f'closing:{s.getpeername()}')
            if s in outputs:
                outputs.remove(s)
            inputs.remove(s)
            s.close()
            message_queues.pop(s)
            continue

        print(f'received {data} from {s.getpeername()} ')
        message_queues[s].put(data)
        if s not in outputs:
            outputs.append(s)

    for s in writable:
        try:
            queue_item = message_queues.get(s)
            send_data = ''
            if queue_item:
                send_data = queue_item.get_nowait()
        except Empty:
            print(outputs.remove(s))
            print(f"{s.getpeername()} has closed")
        else:
            if queue_item:
                s.send(send_data.encode())

    for s in exceptional:
        print(f"Exception condition on {s.getpeername}")
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()
        message_queues.pop(s)

    sleep(1)

客户端

import socket

messages = ['This is the message ', 'It will be sent ', 'in parts ', ]
server_address = ("127.0.0.1", 8999)
socks = [socket.socket(socket.AF_INET, socket.SOCK_STREAM), socket.socket(socket.AF_INET, socket.SOCK_STREAM), ]
print('connecting to %s port %s' % server_address)
for s in socks:
    s.connect(server_address)

for index, message in enumerate(messages):
    for s in socks:
        print('%s: sending "%s"' % (s.getsockname(), message + str(index)))
        s.send((message + str(index)).encode('utf-8'))

for s in socks:
    data = s.recv(1024)
    print('%s: received "%s"' % (s.getsockname(), data))
    if data != "":
        print('closing socket', s.getsockname())
        s.close()

  • 为什么要将server放入到inputs中

在select模型中,将server放入到inputs中,当执行select时就会去检查server是否可读,就说明在缓冲区里有数据,对于server来说,有连接进入。使用accept获得客户端socket文件后,首先要放入到inputs当中,等待其发送消息。

  • readable

select会将所有可读的socket返回,包括server在内,假设一个客户端socket的缓冲区里有2000字节的内容,而这一次你只是读取了1024个字节,没有关系,下一次执行select模型时,由于缓冲区里还有数据,这个客户端socket还会被放入到readable列表中。因此,在读取数据时,不必再像之前那样使用一个while循环一直读取。

  • writable

在每一次写操作执行后,都从socket从writable中删除,这样做的原因很简单,该写的数据已经写完了,如果不删除,下一次select操作时,又会把他放入到writable中,可是现在已经没有数据需要写了啊,这样做没有意义,只会浪费select操作的时间,因为它要遍历outputs中的每一个socket,判断他们是否可写以决定是否将其放入到writtable中

  • 异常

在exceptional中,是发生错误和异常的socket,有了这个数组,就在也不用操心错误和异常了,不然程序写起来非常的复杂,有了统一的管理,发生错误后的清理工作将变得非常简单

4.2Epoll

服务端

# -*- encoding=utf-8 -*-
# IO多路复用TCPServer模型

import select
import socket


def serve():
    server = socket.socket()
    server.bind(('127.0.0.1', 8999))
    server.listen(1)

    epoll = select.epoll()
    epoll.register(server.fileno(), select.EPOLLIN)

    connections = {}
    contents = {}

    while True:
        events = epoll.poll(10)
        for fileno, event in events:
            if fileno == server.fileno():
                # 当fd为当前服务器的描述符时,获取新连接
                s, addr = server.accept()  # 获取套接字和地址
                print(f"new connection from addr:{addr},fileno:{s.fileno()},socket:{s}")
                epoll.register(s.fileno(), select.EPOLLIN)
                connections[s.fileno()] = s
            elif event == select.EPOLLIN:
                # 当fd不为服务器描述符为客户端描述符时,读事件就绪,有新数据可读
                s = connections[fileno]
                content = s.recv(1024)
                if content:
                    # 当客户端发送数据时
                    print(f"recv content is {content}")
                    print(f"fileno:{fileno} event:{event}")
                    epoll.modify(fileno, select.EPOLLOUT)
                    contents[fileno] = content
                else:
                    # 当客户端退出连接时
                    print(f"recv content is null")
                    print(f"fileno;{fileno} event:{event} ")
                    epoll.unregister(fileno)
                    s.close()
                    connections.pop(fileno)
            elif event == select.EPOLLOUT:
                # 当fd不为服务器描述符为客户端描述符时,写事件就绪
                try:
                    content = contents[fileno]
                    s = connections[fileno]
                    s.send(content)
                    epoll.modify(s.fileno(), select.EPOLLIN)
                    print(f"modify content is {content}")
                    print(f"fileno;{fileno} event:{event} ")
                except Exception as error:
                    epoll.unregister(fileno)
                    s.close()
                    connections.pop(fileno)
                    contents.pop(fileno)
                    print(f"modify content is failed")
                    print(f"fileno;{fileno} event:{event} ")


if __name__ == '__main__':
    serve()

客户端

# -*- encoding=utf-8 -*-
# 客户端

import socket


client = socket.socket()
print('client.fileno:', client.fileno())

client.connect(('127.0.0.1', 8999))

while True:
    content = str(input('>>>'))
    client.send(content.encode())
    content = client.recv(1024)
    print('client recv content:', content.decode())

5.异步IO多路复用TCPServer模型

import socket
import select
from collections import deque


class Future:
    """可等待对象 Future"""

    def __init__(self, loop):
        self.loop = loop
        self.done = False
        self.co = None

    def set_done(self):
        self.done = True

    def set_coroutine(self, co):
        self.co = co

    def __await__(self):
        if not self.done:
            yield self
        return


class SocketWrapper:
    """套接字协程适配器"""

    def __init__(self, sock: socket.socket, loop):
        self.loop = loop
        self.sock = sock
        self.sock.setblocking(False)
        self.fileno = self.sock.fileno()

    def create_future_for_events(self, events):
        future: Future = Future(loop=self.loop)

        def handler():
            future.set_done()
            self.loop.unregister_handler(self.fileno)
            if future.co:
                self.loop.add_coroutine(future.co)

        self.loop.register_handler(self.fileno, events, handler)

        return future

    async def accept(self):
        while True:
            try:
                sock, addr = self.sock.accept()
                return SocketWrapper(sock, self.loop), addr
            except BlockingIOError:
                future = self.create_future_for_events(select.EPOLLIN)
                await future

    async def recv(self, backlog):
        while True:
            try:
                return self.sock.recv(backlog)
            except BlockingIOError:
                future = self.create_future_for_events(select.EPOLLIN)
                await future

    async def send(self, data):
        while True:
            try:
                return self.sock.send(data)
            except BlockingIOError:
                future = self.create_future_for_events(select.EPOLLOUT)
                await future


class EventLoop:
    """调度器:epoll事件驱动"""
    current = None
    runnable = deque()
    epoll = select.epoll()
    handler = {}

    @classmethod
    def instance(cls):
        if not EventLoop.current:
            EventLoop.current = EventLoop()
        return EventLoop.current

    def register_handler(self, fileno, events, handler):
        self.handler[fileno] = handler
        self.epoll.register(fileno, events)

    def unregister_handler(self, fileno):
        self.epoll.unregister(fileno)
        self.handler.pop(fileno)

    def add_coroutine(self, co):
        self.runnable.append(co)

    def run_coroutine(self, co):
        try:
            future: Future = co.send(None)
            future.set_coroutine(co)
        except Exception as e:
            print(e)
            print('coroutine {} stopped'.format(co.__name__))

    def run_forever(self):
        while True:
            while self.runnable:
                self.run_coroutine(co=self.runnable.popleft())
            events = self.epoll.poll(1)
            for fileno, event in events:
                handler = self.handler.get(fileno)
                handler()


class TCPServer:

    def __init__(self, loop: EventLoop):
        self.loop = loop
        self.listen_sock: SocketWrapper = self.create_listen_socket()
        self.loop.add_coroutine(self.serve_forever())

    def create_listen_socket(self, ip='localhost', port=8999):
        sock = socket.socket()
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind((ip, port))
        sock.listen()
        return SocketWrapper(sock, self.loop)

    async def handler_client(self, sock: SocketWrapper):
        while True:
            data = await sock.recv(1024)
            if not data:
                print('client disconnected')
                break
            await sock.send(data.upper())

    async def serve_forever(self):
        while True:
            sock, addr = await self.listen_sock.accept()
            print(f'client connect addr = {addr}')
            self.loop.add_coroutine(self.handler_client(sock))


if __name__ == '__main__':
    loop = EventLoop.instance()
    server = TCPServer(loop)
    loop.run_forever()



本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/588076.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

LLM优化:开源星火13B显卡及内存占用优化

1. 背景 本qiang~这两天接了一个任务&#xff0c;部署几个开源的模型&#xff0c;并且将本地经过全量微调的模型与开源模型做一个效果对比。 部署的开源模型包括&#xff1a;星火13B&#xff0c;Baichuan2-13B, ChatGLM6B等 其他两个模型基于transformers架构封装&#xff0…

使用Git把写好的项目放到github上

把之前的文章差缺补漏了一下&#xff0c;发现少一个TUserController文件&#xff0c;然后加上了。 以及发现前后端交互时的跨域问题需要处理。 在Controller文件里加入注释 CrossOrigin(origins "*")即可。 不然数据在Vue里显示不出来。 ** 壹 首先先把前端项目 de…

ElasticSearch教程入门到精通——第四部分(基于ELK技术栈elasticsearch 7.x新特性)

ElasticSearch教程入门到精通——第四部分&#xff08;基于ELK技术栈elasticsearch 7.x新特性&#xff09; 1. Elasticsearch进阶1.1 核心概念1.1.1 索引Index1.1.1.1 索引创建原则1.1.1.2 Inverted Index 1.1.2 类型Type1.1.3 文档Document1.1.4 字段Field1.1.5 映射Mapping1.…

AI项目二十一:视频动态手势识别

若该文为原创文章&#xff0c;转载请注明原文出处。 一、简介 人工智能的发展日新月异&#xff0c;也深刻的影响到人机交互领域的发展。手势动作作为一种自然、快捷的交互方式&#xff0c;在智能驾驶、虚拟现实等领域有着广泛的应用。手势识别的任务是&#xff0c;当操作者做出…

翻译: 什么是ChatGPT 通过图形化的方式来理解 Transformer 架构 深度学习六

合集 ChatGPT 通过图形化的方式来理解 Transformer 架构 翻译: 什么是ChatGPT 通过图形化的方式来理解 Transformer 架构 深度学习一翻译: 什么是ChatGPT 通过图形化的方式来理解 Transformer 架构 深度学习二翻译: 什么是ChatGPT 通过图形化的方式来理解 Transformer 架构 深…

设备能源数据采集新篇章

在当今这个信息化、智能化的时代&#xff0c;设备能源数据的采集已经成为企业高效运营、绿色发展的重要基石。而今天&#xff0c;我们要向大家介绍的就是一款颠覆传统、引领未来的设备能源数据采集神器——HiWoo Box网关&#xff01; 一、HiWoo Box网关&#xff1a;一站式解决…

virtualbox kafka nat + host-only集群 + windows 外网 多网卡

virtualbox kafka nat + host-only集群 + windows 映射访问 kafka集群搭建背景kafka集群搭建 背景 使用virtualbox搭建kafka集群,涉及到不同网络策略的取舍 首先 桥接 网络虽说 啥都可以,但是涉及到过多ip的时候,而且还不能保证使用的ip不被占用,所以个人选择kafka虚拟机…

2024.5.5 机器学习周报

引言 Abstract 文献阅读 1、题目 SuperGlue: Learning Feature Matching with Graph Neural Networks 2、引言 本文介绍了SuperGlue&#xff0c;这是一种神经网络&#xff0c;它通过联合寻找对应关系并拒绝不匹配的点来匹配两组局部特征。通过求解一个可微的最优运输问题…

【Mac】Photoshop 2024 for mac最新安装教程

软件介绍 Photoshop 2024是Adobe公司推出的一款图像处理软件&#xff0c;它支持Windows和Mac OS系统。Adobe Photoshop是业界领先的图像编辑和处理软件之一&#xff0c;广泛用于设计、摄影、数字绘画等领域。 Photoshop 2024的功能包括&#xff1a; 1.图像编辑&#xff1a;提…

如何提高商务认知与情商口才(3篇)

如何提高商务认知与情商口才&#xff08;3篇&#xff09; **篇&#xff1a;提高商务认知 商务认知的提升是一个系统工程&#xff0c;需要我们不断地积累知识、理解市场和关注行业动态。以下是一些具体的方法&#xff1a; 持续学习&#xff1a;通过阅读商业书籍、参加行业研讨…

相机知识的补充

一&#xff1a;镜头 1.1MP的概念 相机中MP的意思是指百万像素。MP是mega pixel的缩写。mega意为一百万&#xff0c;mega pixel 指意为100万像素。“像素”是相机感光器件上的感光最小单位。就像是光学相机的感光胶片的银粒一样&#xff0c;记忆在数码相机的“胶片”&#xff…

【深耕 Python】Data Science with Python 数据科学(17)Scikit-learn机器学习(二)

写在前面 关于数据科学环境的建立&#xff0c;可以参考我的博客&#xff1a; 【深耕 Python】Data Science with Python 数据科学&#xff08;1&#xff09;环境搭建 往期数据科学博文一览&#xff1a; 【深耕 Python】Data Science with Python 数据科学&#xff08;2&…

论文精读-基于FPGA的卷积神经网络和视觉Transformer通用加速器

论文精读-基于FPGA的卷积神经网络和视觉Transformer通用加速器 优势&#xff1a; 1.针对CNN和Transformer提出了通用的计算映射&#xff08;共用计算单元&#xff0c;通过不同的映射指令&#xff0c;指导数据通路和并行计算&#xff09; 2.非线性与归一化加速单元&#xff0…

路由器的构成

一、路由器简介 路由器是互联网中的关键设备&#xff1a; 连接不同的网络路由器是多个输入端口和多个输出端口的专用计算机&#xff0c;其任务是转发分组&#xff08;转发给下一跳路由器&#xff09;下一跳路由器也按照这种方法处理分组&#xff0c;直到该分组到达终点为止 …

线程的概念

文章目录 1、什么是线程2、进程和线程的区别3、多线程的概述4、在Java中实现多线程的方法1.继承Thread类2.实现Runnable接口3.使用匿名内部类来继承Thread类&#xff0c;实现run方法4.使用匿名内部类来实现Runnable接口&#xff0c;实现run方法5.使用 lambda表达式 1、什么是线…

STM32 DMA直接存储器存取

单片机学习&#xff01; 目录 文章目录 前言 一、DMA简介 1.1 DMA是什么 1.2 DMA作用 1.3 DMA通道 1.4 软硬件触发 1.5 芯片资源 二、存储器映像 2.1 存储器 2.2 STM32存储器 三、DMA框图 3.1 内核与存储器 3.2 寄存器 3.3 DMA数据转运 3.4 DMA总线作用 3.5 DMA请求 3.6 DMA结构…

linux的常见命令

&#x1f4dd;个人主页&#xff1a;五敷有你 &#x1f525;系列专栏&#xff1a;Linux ⛺️稳中求进&#xff0c;晒太阳 Linux中检查进程是否存在&#xff1a; ps -ef | grep [进程名或进程ID] pgrep -f [进程名|进程ID] pidof [进程名] Linux中检查某个端口是否被…

neo4j 的插入速度为什么越来越慢,可能是使用了过多图谱查询操作

文章目录 背景描述分析解决代码参考neo4j 工具类Neo4jDriver知识图谱构建效果GuihuaNeo4jClass 背景描述 使用 tqdm 显示&#xff0c;处理的速度&#xff1b; 笔者使用 py2neo库&#xff0c;调用 neo4j 的API 完成节点插入&#xff1b; 有80万条数据需要插入到neo4j图数据中&am…

目标检测发展概述

前言 本篇文章只是简单介绍一下目标检测这一计算机视觉方向的发展历史&#xff0c;因此重点在于介绍而不是完整阐述各个时期的代表算法&#xff0c;只要能够简单了解到目标检测的发展历史那么本文的目的就达到了。 目标检测的任务 从上图不难看出&#xff0c;目标检测是计算机…

第十五届蓝桥杯

经历半年以来的学习&#xff0c;终于出结果了。期间无数次想要放弃&#xff0c;但是我都挺过来了&#xff0c;因为我还不能倒下。期间有很多次焦虑&#xff0c;一直在想&#xff0c;我要是没拿奖我是不是白学了。但是学到最后就释怀了&#xff0c;因为在备赛期间&#xff0c;我…