用Disruptor框架实现生产者-消费者模式

ConcurrentLinkedQueue队列的秘诀就在于大量使用了无锁CAS操作。
现成的Disruptor框架实现CAS进行编程。
无锁的缓存框架:Disruptor
它使用无锁的方式实现了一个环形队列,非常适合实现生产者-消费者模式,
比如事件和消息的发布。如果队列是环形的,则只需要对外提供一个当前位置cursor,
利用这个指针即可用进入队操作,也可用进行出队操作。
由于环形队列的缘故,队列的总大小必须事先指定,不能动态扩展。
为了能够快速从一个序列对于到数组的实际位置,每次有元素入队,序列就加1.
Disruptor框架要求我们必须将数组的大小设置为2的整数次方。这样通过sequence&(queueSize-1)
就能立即定位到实际的元素位置index,这比取余%操作快得多。
如果搭建不理解上面的sequence&(queueSize-1),那么我在这里再简单说明一下。
如果queueSize是2的整数次幂。则这个数字的二进制表示必然是10、100、1000、10000等形式。

果一个数字是2的整数次幂,其二进制表示的确是形如10、100、1000、10000等的形式。
这是因为2的整数次幂在二进制中只有一个比特位为1,其余都为0。例如:
2^1 = 2,二进制表示为10
2^2 = 4,二进制表示为100
2^3 = 8,二进制表示为1000
2^4 = 16,二进制表示为10000
这种规律一直持续下去。这样的性质在计算机科学和计算机工程中经常会被利用,
特别是在处理队列(Queue)等数据结构的大小时。
Disruptor框架


生产者需要一个 RingBuffer的引用,也就是环形缓冲区。
它有一个重要的方法pushData()将产生的数据推入缓冲区。
方法pushData()接收一个ByteBuffer对象。在ByteBuffer对象中
可用用来包装任何数据类型。这里用来存储long整数,
pushData()方法的功能就是将传入的ByteBuffer对象中的数据提取出来,
并转载到环形缓冲区中。
只有发布后的数据才会真正被消费者看见。

public class Consumer implements WorkHandler<PCData> {

    @Override
    public void onEvent(PCData event) throws Exception{
        System.out.println(Thread.currentThread().getId()+":Event: --"
            + event.get() * event.get() +"--");
    }

}
public class PCData {
    private long value;
    public void set(long value){
        this.value = value;
    }
    public long get(){
        return value;
    }

}
public class PCDataFactory implements EventFactory<PCData> {
    public PCData newInstance(){
        return new PCData();
    }
}
public class Producer {
    private final RingBuffer<PCData> ringBuffer;

    public Producer(RingBuffer<PCData> ringBuffer){
        this.ringBuffer = ringBuffer;
    }
    public void pushData(ByteBuffer bb){
        long sequence = ringBuffer.next();
        try{
            PCData event = ringBuffer.get(sequence);
            event.set(bb.getLong(0));
        }
        finally {
            ringBuffer.publish(sequence);
        }
    }

}
public static void main(String[] args) {
        Executor executors =  Executors.newCachedThreadPool();
        PCDataFactory factory = new PCDataFactory();
        int bufferSize = 1024;
        Disruptor<PCData> disruptor = new Disruptor<PCData>((EventFactory<PCData>) factory,
                bufferSize,
                (Executor) executors,
                ProducerType.MULTI,
                new BlockingWaitStrategy()
        );
        disruptor.handleEventsWithWorkerPool(
                new Consumer(),
                new Consumer(),
                new Consumer(),
                new Consumer());
        disruptor.start();
        RingBuffer<PCData> ringBuffer = disruptor.getRingBuffer();
        Producer producer = new Producer(ringBuffer);
        ByteBuffer bb = ByteBuffer.allocate(8);
        for(long l = 0;true;l++){
            bb.putLong(0,l);
            producer.pushData(bb);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("add data "+ l);
        }
    }

main 函数是一个无限循环,因为它包含了一个 for 循环,其中的条件是 true。因此,这个 main 函数会一直执行下去,直到程序被手动中断或出现异常导致程序终止。

在循环中,你不断地向 RingBuffer 中发布数据,由 Producer 类的 pushData 方法完成。同时,RingBuffer 中的数据会被多个消费者(Consumer 类)并发地处理。

你在循环中使用了 Thread.sleep(100),这会导致每次循环执行后线程暂停 100 毫秒。因此,每次数据被发布后,你会看到 "add data" 的输出,并且在消费者的 onEvent 方法中会输出相应的信息。

由于这是一个生产者-消费者模型,生产者生产数据,消费者处理数据,程序在生产和消费之间持续运行。

如果你希望在某个条件下结束程序,你需要在循环中添加相应的退出条件,并在满足条件时使用 break 语句跳出循环。否则,这个程序将一直执行下去。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/270581.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【网络安全 | 网络协议】结合Wireshark讲解TCP三次握手

TCP三次握手在Wireshark数据包中是如何体现的&#xff1f;在此之前&#xff0c;先熟悉TCP三次握手的流程。 TCP三次握手流程 TCP&#xff08;传输控制协议&#xff09;是一种面向连接的、可靠的传输层协议。在建立 TCP 连接时&#xff0c;需要进行三次握手&#xff0c;防止因为…

Python 直方图的绘制-`hist()`方法(Matplotlib篇-第7讲)

Python 直方图的绘制-hist()方法(Matplotlib篇-第7讲)         🍹博主 侯小啾 感谢您的支持与信赖。☀️ 🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹…

【MySQL】脏读、不可重复读、幻读介绍及代码解释

&#x1f34e;个人博客&#xff1a;个人主页 &#x1f3c6;个人专栏&#xff1a; 数 据 库 ⛳️ 功不唐捐&#xff0c;玉汝于成 目录 前言 正文 结语 我的其他博客 前言 数据库事务隔离级别是关系数据库管理系统中一个重要的概念&#xff0c;它涉及到多个事务并发执行…

14、Qt使用Eigen3

一、下载Eigen Eigen 二、创建项目 创建一个"Qt Widget Application"项目&#xff0c;基类选择“QMainWindow“&#xff0c;把Eigen拷贝到项目中 三、更改代码 在.pro中添加 INCLUDEPATH $$PWD\Eigen 在界面上添加一个pushButton&#xff0c;并转到槽&#xff0…

SpringIOC之AbstractResourceBasedMessageSource

博主介绍&#xff1a;✌全网粉丝5W&#xff0c;全栈开发工程师&#xff0c;从事多年软件开发&#xff0c;在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战&#xff0c;博主也曾写过优秀论文&#xff0c;查重率极低&#xff0c;在这方面有丰富的经验…

simulink代码生成(四)——SCI发送模块(串口通信)

C2000中的SCI模块分为两种&#xff0c;一种是接收模块&#xff0c;一种是发送模块&#xff1b; 1 发送模块 发送模块如下图所示&#xff1a; SCI传输块使用指定的SCI硬件模块传输标量或矢量数据。采样率和数据类型是与输入端口一致&#xff1b; 注意&#xff1a;一个模型只能…

让某个页面一直处于最前面,可以屏蔽切屏检测

前言 学习通智慧树网课分屏&#xff0c;让某个页面一直处于最前面&#xff0c;可以屏蔽切屏检测。 页面一直处于最前面 前言1 安装包2 使用 1 安装包 https://download.csdn.net/download/qq_44850489/76684366 2 使用 一直下一步就可以 选择要放到前面的窗口&#xff0c…

【SSM】Spring MVC

Spring MVC 文章目录 Spring MVC1. 简介2. 核心组件与调用流程3. 入门使用4. SpringMVC接收数据4.1 访问路径设置4.2 接收参数&#xff08;重点&#xff09;4.2.1 param 和 json参数比较4.2.2 param参数接收4.2.3 路径参数接收4.2.4 json参数接收 4.3 接收Cookie数据和接收请求…

蓝桥杯备赛 day 1 —— 递归 、递归、枚举算法(C/C++,零基础,配图)

目录 &#x1f308;前言 &#x1f4c1; 枚举的概念 &#x1f4c1;递归的概念 例题&#xff1a; 1. 递归实现指数型枚举 2. 递归实现排列型枚举 3. 递归实现组合型枚举 &#x1f4c1; 递推的概念 例题&#xff1a; 斐波那契数列 &#x1f4c1;习题 1. 带分数 2. 反硬币 3. 费解的…

SQL实践篇(二):为什么微信用SQLite存储聊天记录?

文章目录 简介什么是SQLite在python中使用SQLite通过SQLite查询微信的聊天记录参考文献 简介 SQLite是一个嵌入式的开源数据库引擎&#xff0c;大小只有3M左右&#xff0c;因此我们可以将整个SQLite嵌入到应用中&#xff0c;而不再需要采用传统的客户端/服务器&#xff08;CS&…

ubuntu22.04 下载路径

ftp下载路径 csdn下载 ubuntu22.04下载路径ubuntu-22.04-desktop-amd64.7z.001资源-CSDN文库 ubuntu22.04下载路径ubuntu-22.04-desktop-amd64.7z.002资源-CSDN文库 【免费】ubuntu-22.04-desktop-amd64.7z.003资源-CSDN文库 【免费】ubuntu-22.04-desktop-amd64.7z.004资源-…

Jave EE 网络原理之网络层与数据链路层

文章目录 1. 网络层1.1 IP 协议1.1.1 协议头格式1.1.2 地址管理1.1.2.1 认识 IP 地址 1.1.3 路由选择 2. 数据链路层2.1 认识以太网2.1.1 以太网帧格式2.1.2 DNS 应用层协议 1. 网络层 网络层要做的事情&#xff0c;主要是两个方面 地址管理 &#xff08;制定一系列的规则&am…

数字化协同在服装行业:监狱服装生产的量身解决方案

内容来自演讲&#xff1a;苗子实 | 北京宜通华瑞科技有限公司 | 产品经理 摘要 这篇文章介绍了宜通世纪子公司北京宜通华瑞在服装行业智能制造方面的业务&#xff0c;以及明道云提供的业务支持。文章提到了服装行业的痛点及解决方案&#xff0c;并详细介绍了优化监狱服装企业的…

算法练习Day20 (Leetcode/Python-回溯算法)

虽然看似进入了一个新章节&#xff0c;但其实还是前几天二叉树章节的延续。。 回溯算法 &#xff08;以下内容摘抄自代码随想录&#xff09;&#xff1a; 回溯法解决的问题都可以抽象为树形结构&#xff0c;是的&#xff0c;我指的是所有回溯法的问题都可以抽象为树形结构&…

C#获取企业微信《会话内容存档》

因为公司某些原因需要使用企业微信的会话内容存档内容&#xff0c;看微信的文档踩了一些坑&#xff0c;现在将项目代码记录下来&#xff0c;以备各位码农同行查阅。 项目使用 .NET8.0架构&#xff0c;节本结构如下图&#xff1a; 项目中的Lib是下载的微信SDK&#xff0c;项目地…

9道软件测试面试题,刷掉90%的测试程序员

经历了“金9银10”&#xff0c;转眼2024年招聘季就要来了&#xff0c;没点真本事真技术&#xff0c;没点面试经验&#xff0c;不了解点职场套路&#xff0c;如何过五关斩六将&#xff1f;如何打败面试官&#xff1f;如何拿下那梦寐以求的offer&#xff1f; 如果你的跳槽意向已…

[C/C++]数据结构 希尔排序

&#x1f966;前言: 希尔排序也称 “缩小增量排序”&#xff0c;它也是一种插入类排序的方法,在学习希尔排序之前我们首先了解一下直接插入排序. 一: &#x1f6a9;直接插入排序 1.1 &#x1f31f;排序思路 直接插入排序的基本原理是将一条记录插入到已排好的有序表中&#x…

EDSR训练及测试教程

EDSR训练及测试教程 超分重建经典算法EDSR开源代码使用教程。 论文名称:Enhanced Deep Residual Networks for Single Image Super-Resolution,CVPR2017。 训练自己的数据集 由于EDSR开源代码只针对DIV2K数据集,在数据集加载时很多代码已经固定,因此在这里使用固定的文…

Android Studio 如何实现软件英文变中文教程

目录 前言 一、确认版本号 二、下载汉化包 三、汉化包安装 四、如何实现中英文切换 五、更多资源 前言 Android Studio是一款功能强大的集成开发环境&#xff08;IDE&#xff09;&#xff0c;用于开发Android应用程序。默认情况下&#xff0c;Android Studio的界面和…

STM32实战之深入理解I²C通信协议

目录 IC的物理层 IC的协议层 IC特点 IC 总线时序图 软件模拟IC时序分享 例程简介 例程分享 STM32的IC外设 IIC&#xff08;Inter-Integrated Circuit&#xff09;&#xff0c;也称为IC或TWI&#xff08;Two-Wire Interface&#xff09;&#xff0c;是一种广泛使用的串行…