Aeron:两个代理之间的单向IPC(One-way IPC between two agents)

一、概述

本例展示了如何通过 IPC 在调度于不同线程的两个代理之间传输缓冲区。在继续学习本示例之前,最好先复习一下Simplest Full Example ,因为该示例展示的是 IPC 通信,没有增加代理的复杂性。读者还应熟悉Media Driver

流程构建如下:

  • 以默认模式运行的嵌入式Media Driver(发送器、接收器和指挥器的代理(an agent for Sender, Receiver, Conductor))(an embedded media driver running in default mode (an agent for Sender, Receiver, Conductor))
  • 通过publication发送 IPC 数据的代理(SendAgent)(an agent to send the IPC data over a publication (SendAgent))
  • 通过subscription接收 IPC 数据的代理(ReceiveAgent)(an agent to receive the IPC data over a subscription (ReceiveAgent))

Code Sample overview

代码示例包含在 ipc-core 项目 com.aeroncookbook.ipc.agents 命名空间中的三个文件中。它们是:

  • StartHere.java - the class responsible for setting up Aeron and scheduling the agents;(负责设置 Aeron 和调度代理的类)
  • SendAgent.java - the class holding the Agent responsible for sending data;(负责发送数据的代理的类)
  • ReceiveAgent.java - the class holding the Agent responsible for receiving data.(负责接收数据的代理的类)

下文将对每个部分进行细分和讨论。

 Execution Output

15:13:42.814 [main] starting
15:13:42.964 [receiver] received: 1000000

二、StartHere.java

public static void main(String[] args)
{
 final String channel = "aeron:ipc";
 final int stream = 10;
 final int sendCount = 1_000_000;
 final IdleStrategy idleStrategySend = new BusySpinIdleStrategy();
 final IdleStrategy idleStrategyReceive = new BusySpinIdleStrategy();
 final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();

 //construct Media Driver, cleaning up media driver folder on start/stop
 final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context()
         .dirDeleteOnStart(true)
         .threadingMode(ThreadingMode.SHARED)
         .sharedIdleStrategy(new BusySpinIdleStrategy())
         .dirDeleteOnShutdown(true);
 final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);

 //construct Aeron, pointing at the media driver's folder
 final Aeron.Context aeronCtx = new Aeron.Context()
         .aeronDirectoryName(mediaDriver.aeronDirectoryName());
 final Aeron aeron = Aeron.connect(aeronCtx);

 //construct the subs and pubs
 final Subscription subscription = aeron.addSubscription(channel, stream);
 final Publication publication = aeron.addPublication(channel, stream);

 //construct the agents
 final SendAgent sendAgent = new SendAgent(publication, sendCount);
 final ReceiveAgent receiveAgent = new ReceiveAgent(subscription, barrier,
     sendCount);
 //construct agent runners
 final AgentRunner sendAgentRunner = new AgentRunner(idleStrategySend,
         Throwable::printStackTrace, null, sendAgent);
 final AgentRunner receiveAgentRunner = new AgentRunner(idleStrategyReceive,
         Throwable::printStackTrace, null, receiveAgent);

 LOGGER.info("starting");

 //start the runners
 AgentRunner.startOnThread(sendAgentRunner);
 AgentRunner.startOnThread(receiveAgentRunner);

 //wait for the final item to be received before closing
 barrier.await();

 //close the resources
 receiveAgentRunner.close();
 sendAgentRunner.close();
 aeron.close();
 mediaDriver.close();
}

 Constructing support objects

final String channel = "aeron:ipc";
final int stream = 10;
final int sendCount = 1000;
final IdleStrategy idleStrategySend = new BusySpinIdleStrategy();
final IdleStrategy idleStrategyReceive = new BusySpinIdleStrategy();
final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();

这部分代码构建了一些支持对象。(This section of the code constructs a few support objects.)

  • Line 1 holds the channel definition, in this case aeron:ipc
  • Line 2 holds the stream ID to use, in this case 10
  • Line 3 is the number of integers to send over IPC
  • 第 4、5 行构建了代理使用的空闲策略(IdleStrategy)。在这种情况下,只要 doWork 工作周期返回 0,空闲策略就会忙于旋转。(Line 4,5 constructs the IdleStrategy to be used by the agents. In this case, whenever the doWork duty cycle returns 0, the idle strategy will busy spin.)
  • 第 6 行是一个屏障,用于协调样本的关闭。一旦 ReceiveAgent 总共接收到一个发送计数整数,它就会向屏障发出信号,触发关闭。(Line 6 is a barrier that will be used to co-ordinate a shutdown of the sample. Once the ReceiveAgent has received a total of sendCount integers, it will signal the barrier, triggering the shutdown.)

Constructing the Media Driver 

//construct Media Driver, cleaning up media driver folder on start/stop
final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context()
 .dirDeleteOnStart(true)
 .threadingMode(ThreadingMode.SHARED)
 .sharedIdleStrategy(new BusySpinIdleStrategy())
 .dirDeleteOnShutdown(true);
final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);

本节代码使用定义的上下文构建Media Driver。上下文是一个对象,其中包含Media Driver的所有可选配置参数。在本例中,有两项配置被重写,以确保Media Driver在启动和关闭时整理Media Driver目录。一旦上下文准备就绪,Media Driver就会作为嵌入式代理启动。

See also: Media Driver

 Constructing Aeron, the Publication and the Subscription

//construct Aeron, pointing at the media driver's folder
final Aeron.Context aeronCtx = new Aeron.Context()
 .aeronDirectoryName(mediaDriver.aeronDirectoryName());
final Aeron aeron = Aeron.connect(aeronCtx);

这部分代码再次使用 Context 构建 Aeron 对象。有了这个上下文,我们就能让 Aeron 知道Media Driver的 Aeron 目录在哪里。一旦上下文准备就绪,Aeron 对象就会连接到Media Driver。接下来,我们将使用之前定义的通道和流 id 创建 IPC 发布和订阅(IPC publication and subscription)。

//construct the subs and pubs
final Subscription subscription = aeron.addSubscription(channel, stream);
final Publication publication = aeron.addPublication(channel, stream);

Constructing and scheduling the agents

//construct the agents
final SendAgent sendAgent = new SendAgent(publication, sendCount);
final ReceiveAgent receiveAgent = new ReceiveAgent(subscription, barrier, sendCount);

//construct agent runners
final AgentRunner sendAgentRunner = new AgentRunner(idleStrategySend,
     Throwable::printStackTrace, null, sendAgent);
final AgentRunner receiveAgentRunner = new AgentRunner(idleStrategyReceive,
     Throwable::printStackTrace, null, receiveAgent);

//start the runners
AgentRunner.startOnThread(sendAgentRunner);
AgentRunner.startOnThread(receiveAgentRunner);

 

这部分代码构建发送代理(SendAgent)和接收代理(ReceiveAgent),创建代理运行程序来管理它们,然后在特定线程上启动它们。关键行如下:

  • 第 6-7 行和第 8-9 行:这两行分别构建了发送和接收的代理运行程序。请注意,每行都给出了空闲策略,用于控制线程在 doWork 工作周期后如何使用资源。
  • 第 12 和 13 行:这两行为每个代理创建新线程,并开始工作周期。

Shutting down cleanly

//wait for the final item to be received before closing
barrier.await();

//close the resources
receiveAgentRunner.close();
sendAgentRunner.close();
aeron.close();
mediaDriver.close();

代码的最后部分负责等待 ReceiveAgent 触发屏障,然后正确清理资源。首先关闭代理,然后关闭 aeron 对象,最后关闭Media Driver。如果不注意关闭过程中的执行顺序,可能会出现核心转储或其他看似严重的故障。 

三、SendAgent.java

public class SendAgent implements Agent
{
 private final Publication publication;
 private final int sendCount;
 private final UnsafeBuffer unsafeBuffer;
 private int currentCountItem = 1;
 private final Logger logger = LoggerFactory.getLogger(SendAgent.class);

 public SendAgent(final Publication publication, int sendCount)
 {
     this.publication = publication;
     this.sendCount = sendCount;
     this.unsafeBuffer = new UnsafeBuffer(ByteBuffer.allocate(64));
     unsafeBuffer.putInt(0, currentCountItem);
 }

 @Override
 public int doWork()
 {
     if (currentCountItem > sendCount)
     {
         return 0;
     }

     if (publication.isConnected())
     {
         if (publication.offer(unsafeBuffer) > 0)
         {
             currentCountItem += 1;
             unsafeBuffer.putInt(0, currentCountItem);
         }
     }
     return 0;
 }

 @Override
 public String roleName()
 {
     return "sender";
 }
}

 

send 对象负责通过提供的 Aeron Publication 发送 sendCount 整数。doWork 方法用于保持代理的工作周期,该方法会被持续调用,直至代理关闭。一旦达到 sendCount 限制,它就会停止向publication发送更多信息,并开始闲置。

这段代码中最有趣的部分是:

  • Line 18 to 34: the doWork method holding the duty cycle for this agent
  • 第 22 行和第 34 行:这两条返回语句都返回 0,这将导致选定的空闲策略 BusySpinIdleStrategy 调用 ThreadHints.onSpinWait()
  • 第 25 行:只有当publication已连接时,才会返回 true。一旦连接,就可以安全地向publication提供信息。
  • 第 27 行:这将为publication提供缓冲数据。
  • Line 30: this logs the last sent integer, for example 15:13:42.818 [sender] sent: 123
  • Line 41: this sets the thread name to sender, as is visible in the log output.

四、ReceiveAgent.java

public class ReceiveAgent implements Agent
{
 private final Subscription subscription;
 private final ShutdownSignalBarrier barrier;
 private final int sendCount;
 private final Logger logger = LoggerFactory.getLogger(ReceiveAgent.class);

 public ReceiveAgent(final Subscription subscription,
                     ShutdownSignalBarrier barrier, int sendCount)
 {
     this.subscription = subscription;
     this.barrier = barrier;
     this.sendCount = sendCount;
 }

 @Override
 public int doWork() throws Exception
 {
     subscription.poll(this::handler, 1000);
     return 0;
 }

 private void handler(DirectBuffer buffer, int offset, int length,
                     Header header)
 {
     final int lastValue = buffer.getInt(offset);

     if (lastValue >= sendCount)
     {
         logger.info("received: {}", lastValue);
         barrier.signal();
     }
 }

 @Override
 public String roleName()
 {
     return "receiver";
 }
}

接收代理负责轮询所提供的订阅并记录接收到的值。一旦达到 sendCount 值,接收代理就会发出屏障信号。该对象中最有趣的部分是:

  • 第 17-21 行 - doWork 方法保持着该代理的duty cycle 。duty cycle由两部分组成,一部分是轮询订阅,将事件传递给提供的处理程序,另一部分是返回 0。通过配置的 IdleStrategy,返回 0 将导致线程停顿一微秒。
  • Line 26 - this logs the integer value received, for example: 15:13:42.814 [receiver] received: 5
  • Lines 29-32 - this signals the barrier, triggering the clean shutdown of the process.
  • Line 38 - this sets the role name to receiver, as visible in log output.

 五、Performance

在英特尔笔记本电脑上,本示例每秒可传输约 1 千万条 4 字节信息。如果使用的是 Linux 系统,且有可用的 /dev/shm,代码会自动使用。通过交换 NoOpIdleStrategy,并将media driver线程移至 DEDICATED,每秒可传输超过 2000 万条信息。主要更改见下文。请注意,您需要确保硬件上至少有 8 个物理内核。

final IdleStrategy idleStrategySend = new NoOpIdleStrategy();
final IdleStrategy idleStrategyReceive = new NoOpIdleStrategy();
final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();

//construct Media Driver, cleaning up media driver folder on start/stop
final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context()
     .dirDeleteOnStart(true)
     .threadingMode(ThreadingMode.DEDICATED)
     .conductorIdleStrategy(new BusySpinIdleStrategy())
     .senderIdleStrategy(new NoOpIdleStrategy())
     .receiverIdleStrategy(new NoOpIdleStrategy())
     .dirDeleteOnShutdown(true);
final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);

//construct Aeron, pointing at the media driver's folder
final Aeron.Context aeronCtx = new Aeron.Context()
     .idleStrategy(new NoOpIdleStrategy())
     .aeronDirectoryName(mediaDriver.aeronDirectoryName());
final Aeron aeron = Aeron.connect(aeronCtx);

 

有一个相关的 Two Agent example of OneToOneRingBuffer 非常相似,只不过它使用了 Agrona 的 OneToOneRingBuffer,并通过 BusySpinIdleStrategy 每秒发送大约 1800 万条 4 字节信息,或通过 NoOpIdleStrategy 每秒发送超过 5000 万条信息。

六、Using the C Media Driver

要使用 Aeron 的 C Media Driver测试此示例,您需要执行以下操作:

首先,从源代码中构建 C Media Driver(说明因操作系统而异,此部分参考博客即可,建议翻墙进行操作,贴出cookbook只是帮助借阅):

  • Building the C Media Driver on macOS
  • Building the C Media Driver on CentOS Linux 8
  • Building the C Media Driver on Ubuntu 20.04
  • Building the C Media Driver on Windows 10

Next, start the C Media Driver with default settings

  • ./aeronmd (Linux/macOS)
  • aeronmd (Windows)

Then, remove the Media Driver from StartHere.java, and reduce the Aeron context to defaults:

//construct Media Driver, cleaning up media driver folder on start/stop
//final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context()
//        .dirDeleteOnStart(true)
//        .threadingMode(ThreadingMode.SHARED)
//        .sharedIdleStrategy(new BusySpinIdleStrategy())
//        .dirDeleteOnShutdown(true);
//final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);
//construct Aeron, pointing at the media driver's folder
final Aeron.Context aeronCtx = new Aeron.Context();
final Aeron aeron = Aeron.connect(aeronCtx);

Aeron 和Media Driver将默认使用同一目录。

最后,正常运行 StartHere.java。进程应正常运行,输出应包括类似内容:

14:30:00.293 [main] starting
14:30:00.758 [receiver] received: 10000000

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/718658.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

结合Boosting理论与深度ResNet:ICML2018论文代码详解与实现

代码见:JordanAsh/boostresnet: A PyTorch implementation of BoostResNet 原始论文:Huang F, Ash J, Langford J, et al. Learning deep resnet blocks sequentially using boosting theory[C]//International Conference on Machine Learning. PMLR, 2…

英特尔 “AI” 科通:英特尔AI大模型应用前瞻

亲爱的科技探险家、前沿探索者、对未来深具好奇心的您, 身处人工智能引领的时代,我们目睹着行业的革命性变革。技术的创新不仅改变着我们的日常,更重新定义着我们对未来的期许。今天,怀着无限激情和期待,我们邀请您参…

国际数字影像产业园:建设与推动企业孵化与梯次培育

国际数字影像产业园在建设与推动企业孵化及梯次培育方面取得了显著成效。未来,随着技术的不断进步和市场的不断扩大,园区将继续发挥其在数字经济产业中的引领作用,为文化产业的发展贡献更多力量。 一、企业孵化与入驻 企业入驻情况&#xff…

物联边缘网关如何助力工厂实现智能化生产?以某智能制造工厂为例-天拓四方

随着工业4.0的深入推进,智能制造工厂成为了工业发展的重要方向。在这个背景下,物联边缘网关以其独特的优势在智能制造工厂中发挥着越来越重要的作用。以下将通过一个具体的智能制造工厂应用案例,来阐述物联边缘网关如何助力工厂实现智能化生产…

Milvus跨集群数据迁移

将 Milvus 数据从 A 集群(K8S集群)迁到 B 集群(K8S集群),解决方案很多,这里提供一个使用官方 milvus-backup 工具进行数据迁移的方案。 注意:此方案为非实时同步方案,但借助 MinIO 客…

在3D视觉技术的帮助下,轻松实现纸箱拆码垛

在繁忙的物流仓库中,纸箱的拆码垛工作常常让人头疼不已。但是,现在有了富唯智能的3D视觉引导纸箱拆码垛解决方案,这一切都变得轻松简单! 想象一下,那些堆积如山的纸箱,在3D视觉技术的帮助下,仿…

黄仁勋:下一波AI的浪潮是物理AI

B站:啥都会一点的研究生公众号:啥都会一点的研究生 最近AI圈又发生了啥? 快手视频生成大模型“可灵”开放邀测,效果对标 Sora 在OpenAl文生视频大模型Sora发布后,国内企业争相入局,快手视频生成大模型可…

Confluence安装

Confluence安装 1.安装 #下载confluence版本(8.5.11) https://www.atlassian.com/software/confluence/download-archives #修改权限 chmod x atlassian-confluence-8.5.11-x64.bin #执行安装 ./atlassian-confluence-8.5.11-x64.bin按照以下提示输入&…

NettyのEventLoopChannel

Netty的重要组件:EventLoop、Channel、Future & Promise、Handler & Pipeline、ByteBuf 本篇主要介绍Netty的EventLoop和Channel组件。 1、Netty入门案例 服务器端的创建,主要分为以下步骤: 创建serverBootstrap对象。配置服务器的…

Avalonia for VSCode

1、在VSCode中编辑AvaloniaUI界面,在VSCode中搜索Avalonia,并安装。如下图,可以发现Avalonia for VSCode还是预览版。 2、 创建一个Avalonia 项目。 选择项目类型 输入项目名称 选择项目所在文件夹 打开项目 3、项目架构如下图。 4、builde…

基于jeecgboot-vue3的Flowable流程-所有任务

因为这个项目license问题无法开源&#xff0c;更多技术支持与服务请加入我的知识星球。 这个部分主要讲所有任务的功能 1、主要列表界面如下&#xff1a; <template><div class"p-2"><!--查询区域--><div class"jeecg-basic-table-form-…

创建型模式--抽象工厂模式

产品族创建–抽象工厂模式 工厂方法模式通过引入工厂等级结构,解决了简单工厂模式中工厂类职责太重的问题。 但由于工厂方法模式中的每个工厂只生产一类产品,可能会导致系统中存在大量的工厂类,势必会增加系统的开销。此时,可以考虑将一些相关的产品组成一个“产品族”,…

什么是Vue开发技术

概述 Vue.js 是一个用于构建用户界面的渐进式框架&#xff0c;它设计得非常灵活&#xff0c;可以轻松地被集成到任何项目中。 vue是视图的发音&#xff0c;其目的是帮助开发者易于上手&#xff0c;提供强大的功能构建复杂的应用程序 示例 以下是vue基本的语法概述 声明式渲…

示例:WPF中TreeView自定义TreeNode泛型绑定对象来实现级联勾选

一、目的&#xff1a;在绑定TreeView的功能中经常会遇到需要在树节点前增加勾选CheckBox框&#xff0c;勾选本节点的同时也要同步显示父节点和子节点状态 二、实现 三、环境 VS2022 四、示例 定义如下节点类 public partial class TreeNodeBase<T> : SelectBindable<…

探秘提交任务到线程池后源码的执行流程

探秘提交任务到线程池后源码的执行流程 1、背景2、任务提交2、Worker线程获取任务执行流程3、Worker线程的退出时机1、背景 2、任务提交 线程池任务提交有两种方式,execute()和submit()。首先看一下execute方法的源码。我们发现它接收的入参是一个Runnable类型。我们按照代码…

常见的Redis使用问题及解决方案

目录 1. 缓存穿透 1.1 解决方案 2. 缓存击穿 2.1 解决方案 3. 缓存雪崩 3.1 概念图及问题描述 ​编辑3.2 解决方案 4. 分布式锁 4.1 概念 4.2 基于redis来实现分布式锁 4.3 用idea来操作一遍redis分布式锁 4.4 分布式上锁的情况下&#xff0c;锁释放了服务器b中的锁…

水滴式粉碎机:粉碎干性物料的理想选择

在工业生产中&#xff0c;干性物料的粉碎是一个重要的环节&#xff0c;其对于提升产品质量、优化生产流程和提高生产效率具有显著的影响。近年来&#xff0c;水滴式粉碎机因其粉碎原理和工作性能&#xff0c;逐渐在干性物料粉碎领域崭露头角&#xff0c;成为众多企业的理想选择…

《Java2实用教程》 期末考试整理

作用域 当前类 当前包 子类 其他包 public √ √ √ √ protected √ √ √ default √ √ private √ 三、问答题&#xff08;每小题4分&#xff0c;共8分&#xff09; 1.类与对象的关系 对象&#xff1a;对象是类的一个实例&#xff0c;有状…

JavaScript事件类型和事件处理程序

● 之前我们用过了很多此的点击事件&#xff0c;这次让我们来学习另一种事件类型 mouseenter “mouseenter” 是一个鼠标事件类型&#xff0c;它在鼠标指针进入指定元素时触发。 const h1 document.querySelector(h1); h1.addEventListener(mouseenter, function (e) {aler…

【将xml文件转yolov5训练数据txt标签文件】连classes.txt都可以生成

将xml文件转yolov5训练数据txt标签文件 前言一、代码解析 二、使用方法总结 前言 找遍全网&#xff0c;我觉得写得最详细的就是这个博文⇨将xml文件转yolov5训练数据txt标签文件 虽然我还是没有跑成功。那个正则表达式我不会改QWQ&#xff0c;但是不妨碍我会训练ai。 最终成功…