Zookeeper分布式队列实战

目录

Zookeeper分布式队列

普通方式实现

设计思路

具体实现

使用Curator实现

具体实现

注意事项


Zookeeper分布式队列

       常见的消息队列有:RabbitMQ,RocketMQ,Kafka等。Zookeeper作为一个分布式的小文件管理系统,同样能实现简单的队列功能。Zookeeper不适合大数据量存储,官方并不推荐作为队列使用,但由于实现简单,集群搭建较为便利,因此在一些吞吐量不高的小型系统中是比较好用的。

普通方式实现

设计思路

     

1.创建队列根节点
       在Zookeeper中创建一个持久节点,用作队列的根节点。所有队列元素的节点将放在这个根节点下。
2.实现入队操作
       当需要将一个元素添加到队列时,可以在队列的根节点下创建一个临时有序节点。节点的数据可以包含队列元素的信息。
3.实现出队操作
       当需要从队列中取出一个元素时,先获取根节点下的所有子节点。再找到具有最小序号的子节点,获取该节点的数据,删除该节点,然后返回节点的数据。

具体实现
/**
 * 入队
 * @param data
 * @throws Exception
 */
public void enqueue(String data) throws Exception {
    // 创建临时有序子节点
    zk.create(QUEUE_ROOT + "/queue-", data.getBytes(StandardCharsets.UTF_8),
            ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}

/**
 * 出队
 * @return
 * @throws Exception
 */
public String dequeue() throws Exception {
    while (true) {
        List<String> children = zk.getChildren(QUEUE_ROOT, false);
        if (children.isEmpty()) {
            return null;
        }

        Collections.sort(children);

        for (String child : children) {
            String childPath = QUEUE_ROOT + "/" + child;
            try {
                byte[] data = zk.getData(childPath, false, null);
                zk.delete(childPath, -1);
                return new String(data, StandardCharsets.UTF_8);
            } catch (KeeperException.NoNodeException e) {
                // 节点已被其他消费者删除,尝试下一个节点
            }
        }
    }
}

使用Curator实现

Curator是一个ZooKeeper客户端的封装库,提供了许多高级功能,包括分布式队列。

具体实现
public class CuratorDistributedQueueDemo {
    private static final String QUEUE_ROOT = "/curator_distributed_queue";

    public static void main(String[] args) throws Exception {
        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181",
                new ExponentialBackoffRetry(1000, 3));
        client.start();

        // 定义队列序列化和反序列化
        QueueSerializer<String> serializer = new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }
        };

        // 定义队列消费者
        QueueConsumer<String> consumer = new QueueConsumer<String>() {
            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("消费消息: " + message);
            }

            @Override
            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {

            }
        };

        // 创建分布式队列
        DistributedQueue<String> queue = QueueBuilder.builder(client, consumer, serializer, QUEUE_ROOT)
                .buildQueue();
        queue.start();

        // 生产消息
        for (int i = 0; i < 5; i++) {
            String message = "Task-" + i;
            System.out.println("生产消息: " + message);
            queue.put(message);
            Thread.sleep(1000);
        }

        Thread.sleep(10000);
        queue.close();
        client.close();
    }
}
注意事项

       使用Curator的DistributedQueue时,默认情况下不使用锁。当调用QueueBuilder的lockPath()方法并指定一个锁节点路径时,才会启用锁。如果不指定锁节点路径,那么队列操作可能会受到并发问题的影响。

       在创建分布式队列时,指定一个锁节点路径可以帮助确保队列操作的原子性和顺序性。分布式环境中,多个消费者可能同时尝试消费队列中的消息。如果不使用锁来同步这些操作,可能会导致消息被多次处理或者处理顺序出现混乱。如果应用场景允许消息被多次处理,或者处理顺序不是关键问题,那么可以不使用锁。这样可以提高队列操作的性能,因为不再需要等待获取锁。

// 创建分布式队列
QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, serializer, "/order");
//指定了一个锁节点路径/orderlock,用于实现分布式锁,以保证队列操作的原子性和顺序性。
queue = builder.lockPath("/orderlock").buildQueue();
//启动队列,这时队列开始监听ZooKeeper中/order节点下的消息。
queue.start();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/369985.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【LeetCode: 2670. 找出不同元素数目差数组 + 哈希表 + 前后缀处理】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

使用PHPStudy搭建Cloudreve网盘服务

文章目录 1、前言2、本地网站搭建2.1 环境使用2.2 支持组件选择2.3 网页安装2.4 测试和使用2.5 问题解决 3、本地网页发布3.1 cpolar云端设置3.2 cpolar本地设置 4、公网访问测试5、结语 1、前言 自云存储概念兴起已经有段时间了&#xff0c;各互联网大厂也纷纷加入战局&#…

问题:下列哪些属于历史文化资源的特征( ). #学习方法#学习方法

问题&#xff1a;下列哪些属于历史文化资源的特征( ). A、稀缺性 B、脆弱性 C、可再生性 D、多样性 参考答案如图所示

Apple Vision Pro:新的隐私噩梦?

长期以来&#xff0c;苹果被誉为最注重隐私的科技公司之一&#xff0c;但如今&#xff0c;凭借售价 3499 美元的 Vision Pro&#xff0c;苹果可能已经打造出了一款终极监控机器。 作为苹果首款头戴式“空间计算”显示设备&#xff0c;号称将打造数字世界与物理世界交汇的新空间…

STL篇三:list

文章目录 前言1.list的介绍和使用1.1 list的介绍1.2 list的使用1.3 list的迭代器的失效 2.list的模拟实现2.1 结点的封装2.2 迭代器的封装2.2.1 正向迭代器2.2.2 反向迭代器 2.3 list功能的实现2.3.1 迭代器的实例化及begin()、end() 2.3.2 构造函数2.3.3 赋值运算符重载2.3.4 …

Axure RP9原型设计工具使用记录:基础操作

Axure RP9使用记录一 &#x1f4da;第一章 前言&#x1f4d7;背景&#x1f4d7;目的 &#x1f4da;第二章 基础介绍及操作&#x1f4d7;页面功能总览&#x1f4d7;基础操作&#x1f4d5;设置样式&#x1f4d5;设置交互&#x1f4d5;设置组合&#x1f4d5;设置动态面板&#x1f…

PyTorch使用

前言 系统环境&#xff1a;win10 使用Anaconda&#xff0c;Anaconda的安装自行百度。 目录 前言 创建虚拟环境 1、查看当前有哪些虚拟环境 2、创建虚拟环境pytorch 3、激活及关闭pytorch虚拟环境 4、删除pytorch虚拟环境 使用yolov5测试 1、切换至yolov5目录下&…

淘宝镜像到期如何切换镜像及如何安装淘宝镜像

淘宝镜像到期如何切换镜像及如何安装淘宝镜像 一、淘宝镜像到期如何切换新镜像二、第一次使用淘宝镜像如何配置镜像 一、淘宝镜像到期如何切换新镜像 清空缓存&#xff1a;npm cache clean --force切换镜像源&#xff1a;npm config set registry https://registry.npmmirror.…

nodejs+vue+ElementU教师科研管理系统l33wm

本次开发一套高校教师科研管理系统有管理员&#xff0c;教师&#xff0c;学院三个角色。管理员功能有个人中心&#xff0c;教师管理&#xff0c;学院管理&#xff0c;科研课题管理&#xff0c;软件著作权管理&#xff0c;论文信息管理&#xff0c;专利信息管理&#xff0c;科研…

AI大模型专题:OWASP大语言模型应用程序十大风险V1.0

今天分享的是AI大模型系列深度研究报告&#xff1a;《AI大模型专题&#xff1a;OWASP大语言模型应用程序十大风险V1.0》。 &#xff08;报告出品方&#xff1a;OWASP&#xff09; 报告共计&#xff1a;14页 LM01:2023_ 提示词注入 描述&#xff1a;提示词注入包括绕过过滤器…

稀疏场景高性能训练方案演变|京东广告算法架构体系最佳实践

近年来&#xff0c;推荐场域为提升模型的表达能力和计算能力&#xff0c;模型规模和计算复杂度大幅增加&#xff0c;同时&#xff0c;高规格硬件资源为模型迭代、算法优化带来了更大的机遇和挑战。为了应对模型规模和算力升级带来的存储、IO和计算挑战&#xff0c;京东零售广告…

docker 安装minio

MinIO 是一款高性能、分布式的对象存储系统. 它是一款软件产品, 可以100%的运行在标准硬件。即X86等低成本机器也能够很好的运行MinIO。 MinIO与传统的存储和其他的对象存储不同的是&#xff1a;它一开始就针对性能要求更高的私有云标准进行软件架构设计。因为MinIO一开始就只…

Arthas-Java应用生产可用诊断神器

一、背景与简介 1、介绍 如果你的程序是Java开发&#xff0c;有时候生产环境出现性能瓶颈或者接口访问缓慢、又或者本地环境无法进行复现&#xff0c;只会在线上产生bug或者问题&#xff0c;这时候我们需要进行在线debug排查问题。但是生产环境又不能轻易重启、或者使用传统方…

django区县网络安全执法模式研究flask python

作为一款区县网络安全执法模式研究&#xff0c;面向的是大多数学者&#xff0c;软件的界面设计简洁清晰&#xff0c;用户可轻松掌握使用技巧。在调查之后&#xff0c;获得用户以下需求&#xff1a; &#xff08;1&#xff09;用户注册登录后&#xff0c;可进入系统解锁更多功能…

软件成本度量

1. 软件成本度量的意义 软评的意义主要在于其在软件项目的预算、招投标、实施及决算后评估阶段的重要作用。 在预算及招投标阶段&#xff0c;软评通过软件成本度量有助于制定合理的项目预算&#xff0c;规范招投标行为。这一阶段&#xff0c;甲方需要准确估算项目成本和合理的投…

加速数字化之旅:MessageBox赋能HubSpot与微信公众号的无缝整合

在数字化时代&#xff0c;企业需要整合关键平台以适应快速变化的市场。HubSpot和微信公众号的整合成为数字化营销的核心策略之一。MessageBox作为整合的关键力量&#xff0c;通过其卓越的能力&#xff0c;极大地加速了HubSpot与微信公众号的融合过程。今天运营坛将深入探讨Mess…

【OpenCV人脸检测】写了个智能锁屏小工具!人离开电脑自动锁屏

文章目录 1. 写在前面2. 设计思路3. 人脸检测4. 程序实现 【作者主页】&#xff1a;吴秋霖 【作者介绍】&#xff1a;Python领域优质创作者、阿里云博客专家、华为云享专家。长期致力于Python与爬虫领域研究与开发工作&#xff01; 【作者推荐】&#xff1a;对JS逆向感兴趣的朋…

React Hooks 学习笔记

1.useState&#xff08;&#xff09; 实现对页面数据的存储&#xff0c;当数据改变时候&#xff0c;自动触发render函数 2.useRef 用来解决两个问题&#xff1a; 1).是获取DOM元素或子组件的实例对象 2).存储渲染周期之间共享的数据 3.useEffect 4.useLayoutEffect 5…

数据结构(C语言)代码实现(六)——单链表的实现

目录 参考、格式 头文件LinkList.h 一、将函数的小括号写成中括号 二、读取权限冲突 三、L->Last指针没有移动 四、函数指针的使用 头文件完整代码 测试函数&#xff08;主函数&#xff09;test.cpp 测试结果 参考、格式 数据结构课本2.3节&#xff08;严蔚敏版&a…

【PLC一体机】PLC一体机中如何实现触摸屏和PC电脑的通讯

博主今天准备把之前买的PLC一体机拿出来玩一下&#xff0c;翻看以前的博文&#xff0c;发现没有记录分享PLC一体机中如何实现触摸屏程序下载的内容。 如之前博文介绍的那样&#xff0c;PLC一体机由PLC和触摸屏两部分集成的设备&#xff0c;因此设备内部已经做好了PLC和触摸屏之…