Kafka 学习之:基于 flask 框架通过具体案例详解生产消费者模型,这一篇文章就够了

文章目录

  • 案例信息介绍
    • 后端异步处理请求和后端同步处理请求
      • 同步方式
      • 异步方式
  • 环境文件目录
  • 配置
    • .env
    • requirements.txt
  • 完整代码
    • ext.py
    • app.py
    • kafka_create_user.py
  • 运行方式
    • 本地安装 kafka
    • 运行 app.py
    • 使用 postman 测试
      • 建立 http 长连接,等待后端处理结果
      • 发送 RAW DATA

  • 在看这个文章之前,建议先学习 kafka的工作原理 这个系列视频讲得很好,虽然基于 Java 但是理解原理并不用区分语言。只需要看懂工作原理即可。

案例信息介绍

  • 假设我的网站需要高并发地处理 user 注册这个简单的功能。
  • 前端会发送 {"user_id": xxxx, "psw":xxx} 的信息到后端完成创建
    • 前端用 postman 来模拟
    • 后端用 flask 框架来简单演示
  • 下面我用一张大致的图来表示代码的架构:
    • 前端的原始数据进入后端之后,后端要用 kafka 的架构在有序地处理 user 的请求,在这个任务中所有 user 的请求都是 register,因此我们就创建一个 kafka 的 topic 专门用来处理 user 的这类请求
    • 同时由于 kafka 是通过队列的方式 异步地处理 user 的请求,所以当 kafka 处理完 user 的请求后,我们需要找到这个处理结果并返回给对应的 user

      如果大家对于 异步处理 user 请求和同步处理没有概念,那么下面一章我先给大家讲一下同步处理请求和异步处理的区别

架构图

后端异步处理请求和后端同步处理请求

同步方式

"""
 @file: app.py.py
 @Time    : 2024/3/30
 @Author  : Peinuan qin
"""
import threading
import json
from flask import Flask, jsonify, request
from flask_cors import CORS
from ext import FLASK_HOST, FLASK_PORT

app = Flask(__name__)
CORS(app)

@app.route("/login", methods=['POST'])
def create_user_post():
    data = request.json
	"""
	register user code ....
	"""
    return jsonify({"status": 200, "msg": "success"})



if __name__ == '__main__':
   	app.run(host=FLASK_HOST, port=FLASK_PORT, debug=True)
  • 上述方式可以看到我的 create_user_post 负责接受前端的数据并且即刻处理,处理之后将结果返回前端 jsonify({"status": 200, "msg": "success"}),这个过程是一行接着一行发生的,如果中途出现了很耗时的操作,那么程序会一直等着。
  • 在 Flask 应用中,如果 "register user code ...." 处理需要20秒,这确实会阻塞处理该请求的线程,直到该过程完成。由于 Flask 开发服务器默认是单线程的,这意味着在这20秒内,服务器将无法处理来自其他用户的任何其他请求。
  • 为了允许 Flask 同时处理多个请求,你可以启用多线程模式。这可以通过在 app.run() 中设置 threaded=True 来实现 app.run(host=FLASK_HOST, port=FLASK_PORT, debug=True, threaded=True)。这样,Flask 将能够为每个请求启动一个新的线程,从而允许同时处理多个请求。但这仍然并不是一种很好的方法,因为整个服务器来看,不具备扩展性。 假设我们服务器为每个 user 的请求开一个线程,那么服务器资源是有限的,当服务器宕机,也并不能很快的恢复这就导致扩展性很差。

异步方式

import threading
import json
from flask import Flask, jsonify, request
from flask_cors import CORS
from ext import FLASK_HOST, FLASK_PORT, users_streams, LOGIN_TOPIC, producer, logger, ResponseConsumer
from kafka_create_user import kafka_consumer_task

app = Flask(__name__)
CORS(app)

@app.route("/login", methods=['POST'])
def create_user_post():
    data = request.json
    # 发送数据到Kafka
    producer.produce(LOGIN_TOPIC, key=str(data['user_id']).encode("utf-8"), value=json.dumps(data).encode("utf-8"))
    producer.flush()
    logger.info("send message to consumer")
    return jsonify({"msg": "你好,请求正在处理"})
  • 我们先忽略其他的代码,只看这一部分。
  • 这里相当于我们接受 user 的请求之后,通过 kafka 把处理请求的需要转移到外部的服务器集群上去了。而 kafka 的特性在于非常高的可扩展性。增加 kafka 的节点就可以线性地将任务处理的数量提高。
  • 如果你看我上面给的那张图,kafka 可以通过无限制增加 consumer 的数量来提高数据的处理能力。而后端的服务器需要做的就是把这些数据不断地派发出去,这个步骤相比于直接在后端将所有的请求处理来说可以忽略不计。

环境文件目录

.
├── app.py
├── ext.py
├── kafka_create_user.py
└── requirements.txt

配置

.env

  • 首先构建一个配置文件 .env 来存放基础的配置信息
FLASK_HOST=0.0.0.0
FLASK_PORT=9300
# LOGIN 这个 topic 是用来处理用户注册这个业务的
LOGIN_TOPIC=LOGIN

# RESPONSE_TOPIC 则是用来构建 response 来返回前端成功或者失败
RESPONSE_TOPIC=RESPONSE_TOPIC

requirements.txt

kafka-python==2.0.0
colorlog==6.7.0
configparser==5.3.0
flask==2.3.2
flask_basicauth==0.2.0
Flask-JWT-Extended==4.6.0
Flask-Limiter==3.5.1
Flask-PyMongo==2.3.0
requests==2.31.0
gunicorn==21.0.0
pymongo==4.6.0
pdfminer.six==20231228
flask_cors==4.0.0
python-dotenv
orjson==3.10.0
langchain
langchain-community
langchain_openai
chromadb
  • python=3.10

完整代码

ext.py

"""
 @file: ext.py.py
 @Time    : 2024/3/30
 @Author  : Peinuan qin
 """
import json
import logging
import os
import queue
import threading
import colorlog
from confluent_kafka import Producer, Consumer, KafkaError
from dotenv import load_dotenv
from confluent_kafka.admin import AdminClient, NewTopic

# 加载 .env 中的变量
load_dotenv()

FLASK_HOST = os.environ['FLASK_HOST']
FLASK_PORT = os.environ['FLASK_PORT']
LOGIN_TOPIC = os.environ['LOGIN_TOPIC']
RESPONSE_TOPIC = os.environ['RESPONSE_TOPIC']

TOPICS = [LOGIN_TOPIC, RESPONSE_TOPIC]


def create_topic():
    # Kafka服务器配置
    admin_client = AdminClient({
        "bootstrap.servers": "localhost:9092"
    })

    # 创建新主题的配置
    topic_list = [NewTopic(topic, num_partitions=3, replication_factor=1) for topic in TOPICS]
    # 注意: replication_factor 和 num_partitions 可能需要根据你的Kafka集群配置进行调整

    # 创建主题
    fs = admin_client.create_topics(topic_list)

    # 处理结果
    for topic, f in fs.items():
        try:
            f.result()  # The result itself is None
            logger.info(f"Topic {topic} created")
        except Exception as e:
            logger.error(f"Failed to create topic {topic}: {e}")



# Handler for logging
handler = colorlog.StreamHandler()
formatter = colorlog.ColoredFormatter(
    "%(log_color)s%(asctime)s.%(msecs)03d - %(levelname)s - %(message)s",
    datefmt='%Y-%m-%d %H:%M:%S',
    log_colors={
        'DEBUG': 'cyan',
        'INFO': 'green',
        'WARNING': 'yellow',
        'ERROR': 'red',
        'CRITICAL': 'red,bg_white',
    }
)
handler.setFormatter(formatter)

# Logger
logger = colorlog.getLogger(__name__)
logger.addHandler(handler)
logger.setLevel(logging.INFO)


"""
尝试创建 topic
"""
create_topic()


# 初始化Kafka生产者
producer_config = {
    'bootstrap.servers': 'localhost:9092'
}
producer = Producer(**producer_config)



"""
定义专门用来回复 response 的 consumer
"""

class ResponseConsumer:
    """
    专门用来将各种处理好的结果返回给 user 作为 response; 也就是图中针对 RESPONSE TOPIC 的 consumer
    """
    def __init__(self):
        self.users_streams = {}
        self.config = {
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'user-response',			# 设置 groupid,如果不知道为什么要设置 groupid 可以去先看 kafka 的讲解视频
        'auto.offset.reset': 'earliest'}			# 告诉 Kafka 消费者在找不到初始偏移量(offset)或者偏移量无效时(比如,指定的偏移量已经被删除),应该从哪里开始消费消息。它可以设置为 'earliest' 或 'latest'。设置为 'earliest' 意味着消费者将从主题的开始处开始读取数据,即尽可能不漏掉任何消息;设置为 'latest' 意味着消费者将从新产生的消息开始读取,即只消费自启动之后产生的消息。
        
        self.consumer = Consumer(**self.config)
        logger.info("Create Response Consumer")
        self.consumer.subscribe([RESPONSE_TOPIC])
        logger.info("Subscribe Response Topic")
		
		# 因为可能有多个线程一起操作 consumer,所以通过 lock 来保证线程安全
        self.lock = threading.Lock()

    def get_or_make(self, user_id):
        """
        获取某个 user_id 的 response queue, 如果当前 user_id 的 response queue 不存在就创建一个
        每个 user_id 的 response queue 中都是返回给前端 user 的信息,也就是图中的  RESPONSE MSG
        :param user_id:
        :return:
        """
        with self.lock:
            # 如果当前 user_id 还没有 queue,就构建一个
            q = self.users_streams.get(user_id, queue.Queue())
            self.users_streams[user_id] = q
        return q

    def pop(self, user_id):
        with self.lock:
            self.users_streams.pop(user_id, None)

    def put(self, user_id, msg_dict):
	    """
        当 user_id 的请求处理完,产生的 RESPONSE MSG 放到 user_id 的队列里面
        :param user_id: 
        :param msg_dict: 
        :return: 
        """
        q = self.get_or_make(user_id)
        
        if q:
            with self.lock:
                self.users_streams[user_id].put(msg_dict)
                logger.info(f"put {msg_dict} into {user_id}'s queue")
            return True

        else:
            return False

    def listen_for_response(self):
        """
        不断拉取 RESPONSE TOPIC 的 producer 生成的结果
        :return:
        """

        try:
            while True:
                msg = self.consumer.poll(timeout=1.0)  # 1秒超时
                if msg is None:
                    continue
                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        # End of partition event
                        continue
                    else:
                        print(msg.error())
                        break
				"""
				如果拉取到了就放到对应的 user_id 的 queue 里面
				"""
                if msg:
                    logger.info(f"received data: {msg}")

                    msg_data = json.loads(msg.value().decode("utf-8"))
                    user_id = msg.key().decode("utf-8")

                    logger.info(f"msg_data: {msg_data}")
                    logger.info(f"user_id: {user_id}")

                    put_flag = self.put(user_id, msg_data)

                    if not put_flag:
                        logger.error(f"Create RESPONSE MSG for {user_id} failed")

                    else:
                        logger.info(f"create RESPONSE MSG response for {user_id}")

        except Exception as e:
            self.consumer.close()

app.py

"""
 @file: app.py.py
 @Time    : 2024/3/30
 @Author  : Peinuan qin
"""
import threading
import json
from flask import Flask, jsonify, request
from flask_cors import CORS
from ext import FLASK_HOST, FLASK_PORT, users_streams, LOGIN_TOPIC, producer, logger, ResponseConsumer
from kafka_create_user import kafka_consumer_task

app = Flask(__name__)
CORS(app)


response_consumer = ResponseConsumer()

@app.route("/login", methods=['POST'])
def create_user_post():
    data = request.json
    # 发送数据到Kafka
    producer.produce(LOGIN_TOPIC, key=str(data['user_id']).encode("utf-8"), value=json.dumps(data).encode("utf-8"))
    producer.flush()
    logger.info("send message to login consumer")
    return jsonify({"msg": "你好,请求正在处理"})


@app.route('/stream')
def stream():
    user_id = request.args.get('user_id')  # 假设用户ID通过查询参数传入
    logger.info(f"uid: {user_id}")
    logger.info(f"user_streams: {response_consumer.users_streams}")

    def event_stream(user_id):
        # 这里需要一种机制来持续发送数据给特定用户的流
        q = response_consumer.get_or_make(user_id)
        logger.info(f"{user_id} 's queue is: {q}")
        while True:
            if not q.empty():
                message = q.get()
                logger.info(f"message: {message}")
                yield f"data: {json.dumps(message)}\n\n"

    return app.response_class(event_stream(user_id), content_type='text/event-stream')


def run_multi_thread():

    consumer_thread = threading.Thread(target=kafka_consumer_task)
    response_thread = threading.Thread(target=response_consumer.listen_for_response, daemon=True)
    logger.info("Start APP ...")
    consumer_thread.start()
    logger.info("Create User Consumer start ...")
    response_thread.start()
    logger.info("Response Consumer start ...")
    app.run(host=FLASK_HOST, port=FLASK_PORT, debug=True, use_reloader=False)

if __name__ == '__main__':
    run_multi_thread()

kafka_create_user.py

"""
 @file: kafka_create_user.py
 @Time    : 2024/3/30
 @Author  : Peinuan qin
 """
import json
import os
from queue import Queue
import threading

# 初始化全局消息队列
from confluent_kafka import Consumer, KafkaError
from kafka import KafkaConsumer, KafkaProducer
from dotenv import load_dotenv
from ext import logger, LOGIN_TOPIC, RESPONSE_TOPIC, producer



def kafka_consumer_task():
	"""
    这里定义了 LOGIN TOPIC 的 consumer 的行为;也就是对 user_id 传过来的 RAW DATA 如何处理
    :return: 
    """
    
    # Kafka配置
    config = {
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'user-login-group',
        'auto.offset.reset': 'earliest'
    }
    consumer = Consumer(**config)
    consumer.subscribe([LOGIN_TOPIC])

    # 读取数据
    try:
        while True:
            msg = consumer.poll(timeout=1.0)  # 1秒超时
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    continue
                else:
                    print(msg.error())
                    break

            if msg:
                data = json.loads(msg.value().decode("utf-8"))
                key = msg.key().decode("utf-8")
                print("key:", key)

				"""
				为了观察,我们将 user 传过来的数据保存到本地
				"""
                with open(f"{key}.json", 'w') as f:
                    json.dump(data, f, ensure_ascii=False, indent=4)
                    logger.info(f"successfully saved the {key}.json")

                """
                完成任务后,通过 RESPONSE TOPIC 的 producer 生成 response,并发送给 RESPONSE TOPIC 等待对应的 consumer 来取,并且返回给前端
                """
                producer.produce(RESPONSE_TOPIC, key=msg.key(), value=json.dumps({"msg": f"successfully create user {key}"}).encode("utf-8"))
                producer.flush()
                logger.info("send processed data to response consumer")


    except KeyboardInterrupt:
        pass
    finally:
        # 清理操作
        consumer.close()
        producer.flush()
        producer.close()

强调一下:

  • 如果你也是基于 Flask 框架,虽然这里的 debug=True 可以保证每次更改代码后对代码进行重载,方便你进行调试。但是关于内存中的一些变量会消失,所以保证我上面的代码能够顺利运行,我设置了 use_reloader=False 否则 response_consumer.users_streams 总是为空,因为重载变量会造成混淆,引发未知的程序错误。
  • app.py 中的 stream 是以 SSE 的方式让服务器可以主动通知 user,本质是 user 向服务器建立长连接,然后 kafka 完成任务后通过这个端口将信息发送给 user

运行方式

本地安装 kafka

  • 不知道如何安装请 参考

运行 app.py

  • 直接用 pycharm 运行就可以

使用 postman 测试

建立 http 长连接,等待后端处理结果

  • + 新建窗口
  • 建立 http 连接,针对 stream 端口,并且是 GET 方法(注意选中 http 协议哦,通过左上角的符号,不要选择其他协议)
  • 同时在 Params 下面的 keyvalue 输入你 user_id 的信息(要和下面的 /login 的一致)
  • 然后点击 send
  • 长连接就会成功建立了
    在这里插入图片描述

发送 RAW DATA

  • 打开另一个新的窗口
  • 输入你本地运行的地址和端口,并且选择 post 方法
  • 选择 bodyraw 选择 json 的格式并在文本框中键入 json 数据
  • 发送
  • 就会收到 阶段性的服务器回复 ,这代表后端已经通过 kafka 来异步处理数据
    在这里插入图片描述
  • 这个时候,很快你应该可以看到在长连接的那个 postman 窗口里面出现 {"msg": "successfully create user peinuan"};并且每次你在 /login send 一次,这里就会成功获得一次结果(前端获得成功的信息)
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/505935.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Python之Opencv进阶教程(1):图片模糊

1、Opencv提供了多种模糊图片的方法 加载原始未经模糊处理的图片 import cv2 as cvimg cv.imread(../Resources/Photos/girl.jpg) cv.imshow(girl, img)1.1 平均值 关键代码 # Averaging 平均值 average cv.blur(img, (3, 3)) cv.imshow(Average Blur, average)实现效果 1.2…

this.$route.back()时的组件缓存

1.this.$route.back()回到上一个路径会重新加载 跳转时,前一个路由的内容会被销毁,当回来时,重新创建树,组件内有保存了距离,没有一开始是0. 2.keep-alive写在router-view上面,这个地方所代表的路由会被保存,因此可以写在上面,保存,当返回时,如果是这个路由,里面的内容是一样…

Linux学习:进程(4)程序地址空间(笔记)

目录 1. Linux下各种资源的内存分布2. 物理地址与虚拟(线性)地址3. 程序地址空间的区域划分4. 地址映射与页表5. 缺页中断 1. Linux下各种资源的内存分布 2. 物理地址与虚拟(线性)地址 在有关进程创建的初步学习中,我们了解了fork函数创建子进程的方式。此种进程的创…

云计算探索-如何在服务器上配置RAID(附模拟器)

一,引言 RAID(Redundant Array of Independent Disks)是一种将多个物理硬盘组合成一个逻辑单元的技术,旨在提升数据存取速度、增大存储容量以及提高数据可靠性。在服务器环境中配置RAID尤其重要,它不仅能够应对高并发访…

力扣Lc25--- 821. 字符的最短距离(java版)-2024年3月31日

1.题目描述 2.知识点 从左向右遍历: 这一遍历可以帮助我们找到每个位置到其左边最近的目标字符的距离。 从右向左遍历: 这一遍历可以帮助我们找到每个位置到其右边最近的目标字符的距离,并将这个距离与之前从左向右遍历得到的距离进行比较&…

腾讯云轻量2核2G3M云服务器优惠价格61元一年,配置详解

腾讯云轻量2核2G3M云服务器优惠价格61元一年,配置为轻量2核2G、3M带宽、200GB月流量、40GB SSD盘,腾讯云优惠活动 yunfuwuqiba.com/go/txy 活动链接打开如下图: 腾讯云轻量2核2G云服务器优惠价格 腾讯云:轻量应用服务器100%CPU性能…

Linux速览(2)——环境基础开发工具篇(其一)

本章我们来介绍一些linux的常用工具 目录 一. Linux 软件包管理器 yum 1.什么是软件包? 2. 查看软件包 3. 如何安装软件 4. 如何卸载软件 5.yum补充 6. 关于 rzsz 二. Linux编辑器-vim使用 1. vim的基本概念 2. vim的基本操作 3. vim正常模式命令集 4. vim末行模式…

【核弹级软安全事件】XZ Utils库中发现秘密后门,影响主要Linux发行版,软件供应链安全大事件

Red Hat 发布了一份“紧急安全警报”,警告称两款流行的数据压缩库XZ Utils(先前称为LZMA Utils)的两个版本已被植入恶意代码后门,这些代码旨在允许未授权的远程访问。 此次软件供应链攻击被追踪为CVE-2024-3094,其CVS…

数据结构——二叉树——堆

前言: 在前面我们已经学习了数据结构的基础操作:顺序表和链表及其相关内容,今天我们来学一点有些难度的知识——数据结构中的二叉树,今天我们先来学习二叉树中堆的知识,这部分内容还是非常有意思的,下面我们…

docker-compose命令管理docker命令集合

Docker-Compose 简介 Docker-Compose 项目是Docker官方的开源项目,负责实现对Docker容器集群的快速编排。 Docker-Compose 项目由 Python 编写,调用 Docker 服务提供的API来对容器进行管理。因此,只要所操作的平台支持 Docker API&#xff…

zookeeper如何管理客户端与服务端之间的链接?(zookeeper sessions)

zookeeper客户端与服务端之间的链接用zookeeper session表示。 zookeeper session有三个状态: CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY, CLOSED, AUTH_FAILED, NOT_CONNECTED(start时的状态) 1、CONNECTING 。 表明客户…

期货学习笔记-MACD指标学习1

MACD常规参数与优化参数降低容错率的使用技巧 MACD的基本概念及组成 概念 MACD是杰拉德 阿佩尔于1979年提出的,利用收盘价的短期(常用为12日)指数移动平均线与长期(常用为26日)指数移动平均线之间的聚合与分离情况&a…

【Gd2O3】Gd2O3栅极电介质增强GaN器件的可靠性

【Effects of Gd2O3 Gate Dielectric on Proton-Irradiated AlGaN/GaN HEMTs】 概括总结: 该研究探讨了质子辐射对使用Gd2O3作为栅极电介质的AlGaN/GaN高电子迁移率晶体管(HEMTs)的影响。通过对比肖特基栅极HEMTs和MOS-HEMTs在2 MeV质子辐射…

基础布局之LinearLayout线性布局

目录 一、基础属性二、重点属性2.1 weight(权重)属性:2.2 gravity 一、基础属性 LinearLayout默认方向是水平排放 属性作用android:id控件的ID,可以通过这个ID号来找到对应的控件android:layout_width控件的宽度android:layout_height控件的高度androi…

HarmonyOS实战开发-switch、chart组件的使用

介绍 本篇Codelab基于switch组件和chart组件,实现线形图、占比图、柱状图,并通过switch切换chart组件数据的动静态显示。要求实现以下功能: 实现静态数据可视化图表。打开开关,实现静态图切换为动态可视化图表。 相关概念 swit…

浅谈WPF之路由事件

为了降低由事件订阅带来的耦合度,和代码量,WPF推出了路由事件机制。路由事件与直接事件的区别在于,直接事件激发时,发送者直接将消息通过事件订阅者交给事件响应者,事件响应者对事件的发生做出响应。路由事件的订阅者和…

pnpm比npm、yarn好在哪里?

前言 pnpm对比npm/yarn的优点: 更快速的依赖下载更高效的利用磁盘空间更优秀的依赖管理 我们按照包管理工具的发展历史,从 npm2 开始讲起: npm2 使用早期的npm1/2安装依赖,node_modules文件会以递归的形式呈现,严格…

简单了解波 Mono-repo Multi-repo(Poly-repo)

Mono-repo 和 Multi-repo 是软件开发中代码管理的两个不同策略。Mono-repo & Multi-repo 孰优孰劣是个老生常谈得话题了,这里就不 PK 了,“略微”看下两者区别。 当我们使用 Git 作为版本控制系统管理项目的代码时,那么 monorepo 与 mul…

模拟游戏《幸福工厂》好玩吗?《幸福工厂》怎么在mac电脑上打开?

关于《幸福工厂》这款游戏是否好玩,普遍的玩家反馈和评价表明,《幸福工厂》(Satisfactory)因其深度的工厂建造模拟、自由度极高的探索以及精美的图形表现而受到许多玩家的喜爱。它允许玩家在一个开放的世界中规划并建立复杂的生产…

02-JDK新特性-Lambda表达式

JDK新特性 Lambda表达式 什么是Lambda表达式 Lambda表达式是一个匿名代码块,用于简单的传递一段代码片段。 Lambda表达式标准格式 格式:(形式参数) -> {代码块} 形式参数 如果有多个参数,参数只见用逗号隔开;如果没有&…