Python 版分布式消息队列 Kafka 实现图片数据传输

1、Kafka 介绍

在使用 Kafka 之前,通常需要先安装和配置 ZooKeeper。ZooKeeper 是 Kafka 的依赖项之一,它用于协调和管理 Kafka 集群的状态。

ZooKeeper 是一个开源的分布式协调服务,它提供了可靠的数据存储和协调机制,用于协调分布式系统中的各个节点。Kafka 使用 ZooKeeper 来存储和管理集群的元数据、配置信息和状态。

2、Kafka 环境搭建

环境:

  • Windows11
  • Java 1.8 及以上
  • Anaconda
  • Python10
  • Kafka 2.0.2 (kafka-python)
2.1、安装 Python 版本 Kafka
pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple

至此,Windows 环境下还不能运行 Kafka,一般情况下,程序会提示超时(60ms)等报错。原因是,还需要启动 Kafka 服务。

2.2、启动 Kafka 服务

从 Kafka 官网下下载对应的文件:Apache Kafka 官网下载地址

在这里插入图片描述

下载红色箭头所指向的文件到本地并解压。

在这里插入图片描述

注意:

从 Kafka 官网上下载的 kafka_2.12-3.2.1 文件需要放置在路径较浅文件夹下解压,一旦放置的路径较深,会报错:

在这里插入图片描述

输入行太长。
命令语法不正确。

本案例放在 E 盘下。

2.2.1、启动 Zookeeper 服务

在上图路径下打开 cmd 命令窗口,执行如下命令:

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

在这里插入图片描述

出现如下信息,表示 Zookeeper 服务启动成功:

在这里插入图片描述

2.2.2、启动 Kafka 服务

在上图路径下打开 cmd 命令窗口,执行如下命令:

.\bin\windows\kafka-server-start.bat .\config\server.properties

出现如下信息,表示 Kafka 服务启动成功:

在这里插入图片描述

3、构建图片传输队列

在这里插入图片描述

3.1、配置文件

Properties/config.yaml:

kafka:
 host: "127.0.0.1"
 port: 9092
 parameter:
   bootstrap_servers: '127.0.0.1:9092'
   api_version: "2.5.0"
   log_path: "KafkaLog/log.txt"
workspace:
  path: "E:/harrytsz-workspace/harrytsz-python/DistributedSystemDemo00"
input:
  images_path: "DataSource/Images"
output:
  output_path: "DataSource/Output"
3.2、Kafka 创建分区

KafkaModule/ProducerConsumer/KafkaClient.py:

from kafka.admin import KafkaAdminClient, NewPartitions


client = KafkaAdminClient(bootstrap_servers="127.0.0.1:9092")

# 在已有的 topic 中创建分区
new_partitions = NewPartitions(3)
client.create_partitions({"kafka_demo": new_partitions})
3.3、生产者、消费者(单线程版)

生产者:

KafkaModule/ProducerConsumer/KafkaDemoProducer.py:

# -*- coding: utf-8 -*-
import json
import yaml
import base64
import os.path
import logging
import random
import traceback
from kafka.errors import kafka_errors
from kafka import KafkaProducer

def producer_demo(cfg):
    """
    :param cfg:
    :return:
    """
    # 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json
    producer = KafkaProducer(bootstrap_servers=cfg['kafka']['parameter']['bootstrap_servers'],
                             key_serializer=lambda k: json.dumps(k).encode(),
                             value_serializer=lambda v: json.dumps(v).encode())
    logging.info("Kafka Producer Starting")
    images_path = cfg['input']['images_path']
    workspace_path = cfg['workspace']['path']
    for i, img in enumerate(os.listdir(os.path.join(workspace_path, images_path))):
        print(f"img: {img}")
        workspace_path = cfg['workspace']['path']
        image_path = os.path.join(workspace_path, images_path, img)
        with open(image_path, "rb") as image_file:
            image_data = image_file.read()
        encode_image = base64.b64encode(image_data)
        json_data = encode_image.decode("utf-8")
        json_string = json.dumps(json_data)

        future = producer.send('kafka_demo',
                               key=str(i),  # 同一个key值,会被送至同一个分区
                               value=json_string,
                               partition=random.randint(0, 2))  # 向分区1发送消息
        producer.flush()
        logging.info("Send {}".format(str(i)))
        try:
            future.get(timeout=10)  # 监控是否发送成功
        except kafka_errors:  # 发送失败抛出 kafka_errors
            traceback.format_exc()

def process():
    with open(os.path.expanduser("E:/harrytsz-workspace/harrytsz-python/DistributedSystemDemo00/"
                                 "Properties/config.yaml"), "r") as config:
        cfg = yaml.safe_load(config)
    logging.basicConfig(filename=cfg['kafka']['parameter']['log_path'],
                        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s - %(funcName)s',
                        level=logging.INFO)
    producer_demo(cfg)

if __name__ == '__main__':
    process()

消费者:

KafkaModule/ProducerConsumer/KafkaDemoConsumer.py:

import json
import yaml
import base64
import logging
import os.path
from kafka import KafkaConsumer

def consumer_demo0(cfg):
    """
    :param cfg:
    :return:
    """
    consumer = KafkaConsumer('kafka_demo',
                             bootstrap_servers=cfg['kafka']['parameter']['bootstrap_servers'],
                             api_version=cfg['kafka']['parameter']['api_version'],
                             group_id='test')
    logging.info("consumer_demo0 starting")
    for message in consumer:
        key_json_string = json.loads(message.key.decode())
        value_json_string = json.loads(message.value.decode())
        name_data = "test0" + key_json_string + ".jpg"
        image_data = base64.b64decode(value_json_string)
        logging.info(f"Receiving {name_data} data.")
        workspace_path = cfg['workspace']['path']
        output_path = cfg['output']['output_path']
        image_path = os.path.join(workspace_path, output_path, name_data)
        with open(image_path, 'wb') as jpg_file:
            jpg_file.write(image_data)
            logging.info(f"Save {name_data} data finished.")

def process():
    """
    :return:
    """
    with open(os.path.expanduser("E:/harrytsz-workspace/harrytsz-python/DistributedSystemDemo00/"
                                 "Properties/config.yaml"), "r") as config:
        cfg = yaml.safe_load(config)
    logging.basicConfig(filename=cfg['kafka']['parameter']['log_path'],
                        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s - %(funcName)s',
                        level=logging.INFO)
    consumer_demo0(cfg)

if __name__ == '__main__':
    process()
3.4、生产者、消费者(线程池版)

生产者:

KafkaModule/ProducerConsumer/KafkaDemoProducerMultiThread.py:

# -*- coding: utf-8 -*-
import json
import yaml
import base64
import os.path
import logging
import random
import traceback
from kafka.errors import kafka_errors
from kafka import KafkaProducer

def producer_demo(cfg):
    """
    :param cfg:
    :return:
    """
    # 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json
    producer = KafkaProducer(bootstrap_servers=cfg['kafka']['parameter']['bootstrap_servers'],
                             key_serializer=lambda k: json.dumps(k).encode(),
                             value_serializer=lambda v: json.dumps(v).encode())

    logging.info("Kafka Producer Starting")

    images_path = cfg['input']['images_path']
    workspace_path = cfg['workspace']['path']

    for i, img in enumerate(os.listdir(os.path.join(workspace_path, images_path))):
        print(f"img: {img}")
        workspace_path = cfg['workspace']['path']
        image_path = os.path.join(workspace_path, images_path, img)

        with open(image_path, "rb") as image_file:
            image_data = image_file.read()

        encode_image = base64.b64encode(image_data)
        json_data = encode_image.decode("utf-8")
        json_string = json.dumps(json_data)

        future = producer.send('kafka_demo',
                               key=str(i),  # 同一个key值,会被送至同一个分区
                               value=json_string,
                               partition=random.randint(0, 2))  # 向分区1发送消息
        producer.flush()
        logging.info("Send {}".format(str(i)))
        try:
            future.get(timeout=10)  # 监控是否发送成功
        except kafka_errors:  # 发送失败抛出 kafka_errors
            traceback.format_exc()

def process():
    with open(os.path.expanduser("E:/harrytsz-workspace/harrytsz-python/DistributedSystemDemo00/"
                                 "Properties/config.yaml"), "r") as config:
        cfg = yaml.safe_load(config)
    logging.basicConfig(filename=cfg['kafka']['parameter']['log_path'],
                        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s - %(funcName)s',
                        level=logging.INFO)
    producer_demo(cfg)

if __name__ == '__main__':
    process()

消费者:

KafkaModule/ProducerConsumer/KafkaDemoConsumerMultiThread.py:

import json
import yaml
import base64
import logging
import os.path
from kafka import KafkaConsumer
from concurrent.futures import ThreadPoolExecutor, as_completed

def consumer_demo0(cfg, thread_id):
    """ 线程池版的消费者
    :param cfg: 配置文件
    :param thread_id: 线程序号
    :return:
    """
    consumer = KafkaConsumer('kafka_demo',
                             bootstrap_servers=cfg['kafka']['parameter']['bootstrap_servers'],
                             api_version=cfg['kafka']['parameter']['api_version'],
                             group_id='test')

    logging.info("consumer_demo0 starting")
    for message in consumer:
        key_json_string = json.loads(message.key.decode())
        value_json_string = json.loads(message.value.decode())
        name_data = f"test_{thread_id}_" + key_json_string + ".jpg"
        image_data = base64.b64decode(value_json_string)
        logging.info(f"Receiving {name_data} data.")
        workspace_path = cfg['workspace']['path']
        output_path = cfg['output']['output_path']
        image_path = os.path.join(workspace_path, output_path, name_data)
        with open(image_path, 'wb') as jpg_file:
            jpg_file.write(image_data)
            logging.info(f"Save {name_data} data finished.")

def process():
    """
    :return:
    """
    with open(os.path.expanduser("E:/harrytsz-workspace/harrytsz-python/DistributedSystemDemo00/"
                                 "Properties/config.yaml"), "r") as config:
        cfg = yaml.safe_load(config)

    logging.basicConfig(filename=cfg['kafka']['parameter']['log_path'],
                        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s - %(funcName)s',
                        level=logging.INFO)
    # 线程池
    thread_pool_executor = ThreadPoolExecutor(max_workers=5, thread_name_prefix="thread_test_")

    all_task = [thread_pool_executor.submit(consumer_demo0, cfg, i) for i in range(10)]
    for future in as_completed(all_task):
        res = future.result()
        print("res", str(res))
    thread_pool_executor.shutdown(wait=True)

if __name__ == '__main__':
    process()

运行顺序:

  • 首先运行 KafkaDemoConsumer.py 或者 KafkaDemoConsumerMultiThread.py
  • 然后运行 KafkaDemoProducer.py 或者 KafkaDemoProducerMultiThread.py
  • DataSource/Output 中会接受生产者发送的图片数据,ProducerConsumer/KafkaLog 路径也会产生运行日志。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/547856.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

超越GPT-4V!马斯克发布Grok-1.5 With Vision

在 Grok-1 开源后不到一个月,xAI 的首个多模态模型就问世了。Grok-1.5V是XAI的第一代多模态模型,除了其强大的文本处理能力之外,Grok现在还能够处理包括文档、图表、图形、屏幕截图和照片在内的各种视觉信息。相信Grok-1.5V将很快提供给现有的…

基于ssm的社区再就业培训管理系统的设计与实现论文

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术,让传统数据信息的管理升级为软件存储,归纳,集中处理数据信息的管理方式。本社区再就业培训管理系统就是在这样的大环境下诞生,其可以帮助管理者在短时间内处理完毕庞大的数…

【Spring】依赖注入(DI)时常用的注解@Autowired和@Value

目录 1、Autowired 自动装配 1.1、要实现自动装配不是一定要使用Autowired 1.2、Autowired的特性 (1)首先会根据类型去spring容器中找(bytype),如果有多个类型,会根据名字再去spring容器中找(byname) (2)如果根据名…

Sony Camera Remote SDK在Windows上的使用

Sony官方提供了相机遥控软件开发包,允许用户自行开发应用软件,实现对相机的远程控制,包括拍摄、监看和文件传输等。截至目前最新的版本是2024.4.12发布的1.12.00版本,下载链接如下:Camera Remote SDK | LICENSE AGREEM…

温湿度传感器(DHT11)以及光照强度传感器(BH1750)的使用

前言 对于一些单片机类的环境检测或者智能家居小项目中,温湿度传感器(DHT11)以及光照强度传感器(BH1750)往往是必不可少的两个外设,下面我们来剖析这两个外设的原理,以及使用。 1. 温湿度传感…

elasticsearch 下载、启动和账号密码登录

因为我的电脑是 window,以下都是以 window 环境举例。 一、下载 Elasticsearch 是使用 java 开发的,且 7.8 版本的 ES 需要 JDK 版本 1.8 以上,安装前注意java环境的准备。 官网地址:https://www.elastic.co/cn/ 下载地址&#xf…

云安全与网络安全:有什么区别?

云计算已经存在了一段时间,但某些术语的正确含义仍然存在混乱。一个例子是区分云安全与网络安全。 首先,让我们看一下网络安全一词 ,以了解它的含义。然后,我们将将该术语与云安全进行比较,以了解两者在几个关键领域的…

OpenHarmony实战开发-如何使用AKI轻松实现跨语言调用。

介绍 针对JS与C/C跨语言访问场景,NAPI使用比较繁琐。而AKI提供了极简语法糖使用方式,一行代码完成JS与C/C的无障碍跨语言互调,使用方便。本示例将介绍使用AKI编写C跨线程调用JS函数场景。通过调用C全局函数,创建子线程来调用JS函…

DaVinci Fusion Studio:专业影视后期特效合成的得力助手

在当今影视制作领域,后期特效合成是不可或缺的一环。而DaVinci Fusion Studio作为一款功能强大的影视后期特效合成软件,以其出色的性能、直观的操作界面和丰富的特效库,受到了广大影视制作人员的喜爱。 首先,DaVinci Fusion Stud…

使用python在本地指定的目录临时模拟服务器(3),2024年最新网易 面经

先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞…

NodeJS特点

NodeJS特点 web服务器的主要特点是:事件驱动,非阻塞I/O,单线程,跨平台自身非常简单,通过通信协议来组织许多node,通过拓展来达成构建大型网络应用的目的。每一个node进程都构成这个网络的一个节点适用于io…

4个步骤:如何使用 SwiftSoup 和爬虫代理获取网站视频

摘要/导言 在本文中,我们将探讨如何使用 SwiftSoup 库和爬虫代理技术来获取网站上的视频资源。我们将介绍一种简洁、可靠的方法,以及实现这一目标所需的步骤。 背景/引言 随着互联网的迅速发展,爬虫技术在今天的数字世界中扮演着越来越重要…

微信小程序wx.getLocation 真机调试不出现隐私弹窗

在小程序的开发过程中,首页中包含要获取用户地理位置的功能,所以在这里的onLoad()中调用了wx.getLocation(),模拟调试时一切正常,但到了真机环境中就隐私框就不再弹出,并且出现了报错&#xff0…

浏览器跨标签页通信的方式都有哪些

跨标签页的实际应用场景: 1. 共享登录状态: 用户登录后,多个标签页中需要及时获取到登录状态,以保持一致的用户信息。这种情况,可以使用浏览器的 localStorage 或者 sessionStorage 来存储登录状态,并通过…

无线测温技术在高炉炉壳温度检测中的应用/无线测温监控系统

安科瑞薛瑶瑶18701709087 摘要:应用方便灵活的无线测温和热成像技术对高炉炉壳进行检测,利用热成像进行检测,发现了温度异常区域后对关注部位进行点的检测,预防炉壳的烧穿,对温度数据采集及存储,通过查看历史趋势来对…

树莓派安装Nginx服务结合内网穿透实现无公网IP远程访问

文章目录 1. Nginx安装2. 安装cpolar3.配置域名访问Nginx4. 固定域名访问5. 配置静态站点 安装 Nginx(发音为“engine-x”)可以将您的树莓派变成一个强大的 Web 服务器,可以用于托管网站或 Web 应用程序。相比其他 Web 服务器,Ngi…

波奇学Linux:ip协议

ip报头是c语言的结构体 报头和有效载荷如何分离? 固定长度四位首部长度 4位版本号就是IPV4 8位服务类型:4位TOS位段和位保留字段 4位TOS分别表示:最小延时,最大吞吐量,最高可靠性,最小成本 给路由器提…

零基础学Python专栏文章导航站

零基础学Python专栏文章导航站 专栏导读零基础入门篇 专栏导读 本文是零基础学Python的文章导航站。专栏分为零基础入门篇、模块篇、网络爬虫篇、Web开发篇、办公自动化篇、数据分析篇… 为了方便专栏订阅者更方便的阅读专栏文章,点击链接即可跳转到具体文章&#…

谷歌浏览器的开发者插件vue-devtools

在这里我留下一个git地址用来下载插件包,首先在自己喜欢的位置创建一个新的文件夹,起一个自己喜欢的文件夹名字,下载到包后,然后点进文件夹里下载依赖,npm install,下载后如下面这个样子 git clone https://gitee.com…

深入详解GRACE CPU架构

深入详解GRACE CPU架构 NVIDIA Grace CPU 是 NVIDIA 开发的第一款数据中心 CPU。 通过将 NVIDIA 专业知识与 Arm 处理器、片上结构、片上系统 (SoC) 设计和弹性高带宽低功耗内存技术相结合,NVIDIA Grace CPU 从头开始构建,以创建世界上第一个超级芯片 用…