io_contextttttttttttt

创建上下文——io_context_t

它是一个上下文结构,在内部它包含一个完成队列,在线程之间是可以共享的。

提交请求——iocb

io回调数据结构,和io_submit配合使用。

处理结果

通过io_event处理结果,

struct io_event {
    void *data;
    struct iocb *obj;
    long long res;
};

2222222222222222222222222222222222222222222222222222222222222222222

Linux异步IO:AIO教程(libaio)

迹寒

迹寒

华中科技大学 计算机系统结构硕士

​关注他

10 人赞同了该文章

AIO使用顺序

  1. 打开一个I/O Context以提交或者获取I/O请求;
  2. 创建一个或多个请求对象并设置;
  3. 将这些请求提交到I/O Context,这些请求会被发送到设备;
  4. 以事件的方式获取完成的对象;
  5. 回到2,循环往复。

创建上下文——io_context_t

它是一个上下文结构,在内部它包含一个完成队列,在线程之间是可以共享的。

struct io_context {
  int32_t ctx_id;
  uint32_t aio_max_events;
  uint32_t aio_pendings;
  // for system io_context_t, the type is long, point to kernel aio_ring
  void* sys_ctx;
  pthread_mutex_t ring_lock;             // protect tail
  pthread_mutex_t completion_lock;       // protect head

  pthread_spinlock_t rlock;
  pthread_spinlock_t clock;
  pthread_spinlock_t nrlock;

  struct io_event** ring = nullptr;
  uint32_t head;
  uint32_t tail;
  io_context(uint32_t maxevents, uint32_t ctxid)
  : ctx_id(ctxid),
    aio_max_events(maxevents),
    aio_pendings(0),
    sys_ctx(nullptr),
    head(0),
    tail(0) {
    // init lock
    pthread_mutex_init(&ring_lock, nullptr);
    pthread_mutex_init(&completion_lock, nullptr);
    pthread_spin_init(&nrlock, 0);
    pthread_spin_init(&rlock, 0);
    pthread_spin_init(&clock, 0);
    // iocb ring
    ring = reinterpret_cast<struct io_event**>(calloc(maxevents, sizeof(struct io_event*)));
  }

  ~io_context() {
    pthread_mutex_destroy(&ring_lock);
    pthread_mutex_destroy(&completion_lock);
    pthread_spin_destroy(&nrlock);
    pthread_spin_destroy(&rlock);
    pthread_spin_destroy(&clock);
    free(ring);
  }
};

如果你需要创建一个io_context_t,请使用:

int io_setup(int maxevents, io_context_t *ctxp);

maxevents是最大事件数,也就是IO队列的长度。如果需要析构一个io_context_t:

int io_destroy(io_context_t ctx);

提交请求——iocb

io回调数据结构,和io_submit配合使用。

struct iocb {
    void *data;
    short aio_lio_opcode; // 表明一个op是读还是写,分别用IO_CMD_PREAD和IO_CMD_PWRITE表示
    int aio_fildes; // iocb读写文件的fd

    union {
        struct {
            void *buf; // 指向读写内存的指针
            unsigned long nbytes; // 请求的长度
            long long offset; // 请求的offset
        } c;
    } u;
};

初始化iocb需要用到io_prep_pread andio_prep_pwrite

io_prep_pwritev:为异步写建立iocb,也就是回调函数。

inline void io_prep_pwrite(struct iocb *iocb, int fd, void *buf, size_t count, 
                           long long offset);

io_prep_write() 是用于设置并行写入的便捷函数。

iocb->aio_fildes = fd 是描述符的文件的第一个 iocb->u.c.nbytes = count 字节从 iocb->u.c.buf = buf 开始的缓冲区写入。 写入从文件中的绝对位置 ioc->u.c.offset = offset 开始。

该函数立即返回。 要安排操作,必须调用函数 io_submit(3)。

io_prep_pread也类似:

inline void io_prep_pread(struct iocb *iocb, int fd, void *buf, size_t count, 
                          long long offset);

提交请求需要用到io_submit

int io_submit(io_context_t ctx, long nr, struct iocb *ios[]);

注意它提交的是一组iocb,nrios数组的长度。如果一次提交多个iocb,那么返回的iocb的顺序得不到保证。由于 CPU 使用率的降低,大批量提交有时会导致性能提升。有时还可以通过同时保持许多 I/O 的“运行”来提高性能。

如果提交包含太多iocb,以至于内部队列io_context_t在完成时会溢出,io_submit则将返回一个非零数字并设置errnoEAGAIN。务必使用O_DIRECT标志打开文件时使用该标志,并在原始块设备上进行操作。

处理结果

通过io_event处理结果,

struct io_event {
    void *data;
    struct iocb *obj;
    long long res;
};

data就是传入iocb的数据,obj是原始的iocb,res是读或写返回的结果。

通过以下函数得到io_event:

int io_getevents(io_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout);

ctx_id是获取的io_context

min_nr是返回的io_event的最小数量。除非这有min_nr个IO完成事件,否则该函数将会阻塞;

nr是IO完成事件的最大数量,即ios数组长度;

eventsio_events数组,

timeout是调用io_getevents可能阻塞直到它返回的最长时间。如果传入NULL,io_getevents则将阻塞,直到min_nr 个事件完成。

返回值表示报告了多少完成,即写入了多少事件。返回值将介于 0 和 之间nr。返回值可能低于min_nr超时时间;如果超时为NULL,则返回值将介于min_nr和之间nr。

  • min_nr = 0(或者,等效地,timeout = 0)。此选项形成了一种非阻塞轮询技术:无论是否有任何完成可用,它总是会立即返回。在每次迭代中作为应用程序主运行循环的一部分min_nr = 0调用时使用它是有意义的。io_getevents
  • min_nr = 1. 此选项会阻塞,直到有一个完成可用。该参数是产生阻塞调用的最小值,因此对于某些用户来说可能是低延迟操作的最佳值。当应用程序注意到eventfd触发了与 iocb 对应的一个(参见下一节epoll),然后应用程序可以调用io_getevents对应io_context_t的,并保证不会发生阻塞。
  • min_nr > 1. 此选项等待多个完成返回,除非超时到期。由于减少了 CPU 使用率,等待多个完成可能会提高吞吐量,这既是由于更少的io_getevents调用,也是因为如果完成队列中由于移除的完成而有更多的空间,那么稍后的io_submit调用可能具有更大的粒度,以及减少的当事件可用时,上下文切换回调用线程的次数。此选项存在增加操作延迟的风险,尤其是在操作率较低时。

即使min_nr = 0或 1,出于性能原因将 nr 设置得更大一点也是有用的:可能已经完成了多个事件,并且可以在不多次调用 的情况下对其进行处理io_getevents。更大的 nr 值库的唯一成本是用户必须分配更大的事件数组并准备好接受它们。

性能考虑

  • io_submit在ext4、缓冲操作、网络访问、管道等期间阻塞。AIO 接口不能很好地表示某些操作。对于缓冲读取、套接字或管道上的操作等完全不受支持的操作,整个操作将在 io_submit 系统调用期间执行,完成后可立即通过 io_getevents 访问。部分支持对文件系统(如 ext4)上文件的 AIO 访问:如果需要读取元数据来查找数据块(即,如果元数据尚未在内存中),则 io_submit 调用将阻塞读取元数据。某些类型的文件放大写入完全不受支持,并在整个操作期间阻塞。
  • CPU 开销。当在高性能设备上执行小操作并针对单个 CPU 的非常高的操作率时,可能会导致 CPU 瓶颈。这可以通过从多个线程提交和获取 AIO 来解决。
  • 当许多 CPU 或请求共享一个 io_context_t 时锁争用。在某些情况下,可以从多个 CPU 访问对应于 io_context_t 的内核数据结构。例如,多个线程可以从同一个 io_context_t 提交和获取事件。一些设备可能对所有完成使用单个中断线。这可能导致锁在内核之间反弹或锁被严重竞争,从而导致更高的 CPU 使用率和潜在的更低吞吐量。一种解决方案是分片成多个 io_context_t 对象,例如通过线程和地址的哈希。
  • 确保足够的并行性。一些设备需要许多并发操作才能达到峰值性能。这意味着要确保同时有几个“在进行中”的操作。在一些高性能存储设备上,当操作量较小时,必须并行提交数十或数百个,才能达到最大吞吐量。对于磁盘驱动器,如果电梯调度程序可以在飞行中同时进行更多操作做出更好的决策,则性能可能会随着更大的并行性而提高,但在许多情况下预计效果会很小。

AIO的替代技术

  • 同步 I/O 线程的线程池。这适用于许多用例,并且可能更容易编程。与 AIO 不同,所有功能都可以通过线程池并行化。一些用户发现线程池不能很好地工作,因为线程在 CPU 和内存带宽使用方面的开销来自上下文切换。对于高性能存储设备上的小随机读取,这是一个特别大的问题。
  • POSIX AIO。另一个异步 I/O 接口是 POSIX AIO。它是作为 glibc 的一部分实现的。但是,glibc 实现在内部使用线程池。Joel Becker 实现了一个基于上述 Linux AIO 机制的 POSIX AIO版本。IBM DeveloperWorks对 POSIX AIO有很好的介绍。
  • epoll。Linux 对使用 epoll 作为异步 I/O 机制的支持有限。对于以缓冲模式(即没有 O_DIRECT)打开的文件的读取,如果文件以 O_NONBLOCK 方式打开,则读取将返回 EAGAIN,直到相关部分在内存中。对缓冲文件的写入通常是立即的,因为它们是通过另一个写回线程写出的。但是,这些机制并没有提供直接 I/O 提供的对 I/O 的控制级别。

示例代码

下面是一些使用 Linux AIO 的示例代码。我在谷歌写的,所以它使用谷歌 glog 日志库和谷歌 gflags 命令行标志库,以及对谷歌 C++ 编码约定的松散解释。使用 gcc 编译时,传递-laio给与 libaio 动态链接。(它不包含在 glibc 中,因此必须明确包含。)

// Code written by Daniel Ehrenberg, released into the public domain

#include <fcntl.h>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <libaio.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/stat.h>
#include <sys/types.h>

DEFINE_string(path, "/tmp/testfile", "Path to the file to manipulate");
DEFINE_int32(file_size, 1000, "Length of file in 4k blocks");
DEFINE_int32(concurrent_requests, 100, "Number of concurrent requests");
DEFINE_int32(min_nr, 1, "min_nr");
DEFINE_int32(max_nr, 1, "max_nr");

// The size of operation that will occur on the device
static const int kPageSize = 4096;

class AIORequest {
 public:
  int* buffer_;

  virtual void Complete(int res) = 0;

  AIORequest() {
    int ret = posix_memalign(reinterpret_cast<void**>(&buffer_),
                             kPageSize, kPageSize);
    CHECK_EQ(ret, 0);
  }

  virtual ~AIORequest() {
    free(buffer_);
  }
};

class Adder {
 public:
  virtual void Add(int amount) = 0;

  virtual ~Adder() { };
};

class AIOReadRequest : public AIORequest {
 private:
  Adder* adder_;

 public:
  AIOReadRequest(Adder* adder) : AIORequest(), adder_(adder) { }

  virtual void Complete(int res) {
    CHECK_EQ(res, kPageSize) << "Read incomplete or error " << res;
    int value = buffer_[0];
    LOG(INFO) << "Read of " << value << " completed";
    adder_->Add(value);
  }
};

class AIOWriteRequest : public AIORequest {
 private:
  int value_;

 public:
  AIOWriteRequest(int value) : AIORequest(), value_(value) {
    buffer_[0] = value;
  }

  virtual void Complete(int res) {
    CHECK_EQ(res, kPageSize) << "Write incomplete or error " << res;
    LOG(INFO) << "Write of " << value_ << " completed";
  }
};

class AIOAdder : public Adder {
 public:
  int fd_;
  io_context_t ioctx_;
  int counter_;
  int reap_counter_;
  int sum_;
  int length_;

  AIOAdder(int length)
      : ioctx_(0), counter_(0), reap_counter_(0), sum_(0), length_(length) { }

  void Init() {
    LOG(INFO) << "Opening file";
    fd_ = open(FLAGS_path.c_str(), O_RDWR | O_DIRECT | O_CREAT, 0644);
    PCHECK(fd_ >= 0) << "Error opening file";
    LOG(INFO) << "Allocating enough space for the sum";
    PCHECK(fallocate(fd_, 0, 0, kPageSize * length_) >= 0) << "Error in fallocate";
    LOG(INFO) << "Setting up the io context";
    PCHECK(io_setup(100, &ioctx_) >= 0) << "Error in io_setup";
  }

  virtual void Add(int amount) {
    sum_ += amount;
    LOG(INFO) << "Adding " << amount << " for a total of " << sum_;
  }

  void SubmitWrite() {
    LOG(INFO) << "Submitting a write to " << counter_;
    struct iocb iocb;
    struct iocb* iocbs = &iocb;
    AIORequest *req = new AIOWriteRequest(counter_);
    io_prep_pwrite(&iocb, fd_, req->buffer_, kPageSize, counter_ * kPageSize);
    iocb.data = req;
    int res = io_submit(ioctx_, 1, &iocbs);
    CHECK_EQ(res, 1);
  }

  void WriteFile() {
    reap_counter_ = 0;
    for (counter_ = 0; counter_ < length_; counter_++) {
      SubmitWrite();
      Reap();
    }
    ReapRemaining();
  }

  void SubmitRead() {
    LOG(INFO) << "Submitting a read from " << counter_;
    struct iocb iocb;
    struct iocb* iocbs = &iocb;
    AIORequest *req = new AIOReadRequest(this);
    io_prep_pread(&iocb, fd_, req->buffer_, kPageSize, counter_ * kPageSize);
    iocb.data = req;
    int res = io_submit(ioctx_, 1, &iocbs);
    CHECK_EQ(res, 1);
  }

  void ReadFile() {
    reap_counter_ = 0;
    for (counter_ = 0; counter_ < length_; counter_++) {
        SubmitRead();
        Reap();
    }
    ReapRemaining();
  }

  int DoReap(int min_nr) {
    LOG(INFO) << "Reaping between " << min_nr << " and "
              << FLAGS_max_nr << " io_events";
    struct io_event* events = new io_event[FLAGS_max_nr];
    struct timespec timeout;
    timeout.tv_sec = 0;
    timeout.tv_nsec = 100000000;
    int num_events;
    LOG(INFO) << "Calling io_getevents";
    num_events = io_getevents(ioctx_, min_nr, FLAGS_max_nr, events,
                              &timeout);
    LOG(INFO) << "Calling completion function on results";
    for (int i = 0; i < num_events; i++) {
      struct io_event event = events[i];
      AIORequest* req = static_cast<AIORequest*>(event.data);
      req->Complete(event.res);
      delete req;
    }
    delete events;
    
LOG(INFO) << "Reaped " << num_events << " io_events";
    reap_counter_ += num_events;
    return num_events;
  }

  void Reap() {
    if (counter_ >= FLAGS_min_nr) {
      DoReap(FLAGS_min_nr);
    }
  }

  void ReapRemaining() {
    while (reap_counter_ < length_) {
      DoReap(1);
    }
  }

  ~AIOAdder() {
    LOG(INFO) << "Closing AIO context and file";
    io_destroy(ioctx_);
    close(fd_);
  }

  int Sum() {
    LOG(INFO) << "Writing consecutive integers to file";
    WriteFile();
    LOG(INFO) << "Reading consecutive integers from file";
    ReadFile();
    return sum_;
  }
};

int main(int argc, char* argv[]) {
  google::ParseCommandLineFlags(&argc, &argv, true);
  AIOAdder adder(FLAGS_file_size);
  adder.Init();
  int sum = adder.Sum();
  int expected = (FLAGS_file_size * (FLAGS_file_size - 1)) / 2;
  LOG(INFO) << "AIO is complete";
  CHECK_EQ(sum, expected) << "Expected " << expected << " Got " << sum;
  printf("Successfully calculated that the sum of integers from 0"
         " to %d is %d\n", FLAGS_file_size - 1, sum);
  return 0;
}

参考资料:

https://github.com/littledan/linux-aio

编辑于 2022-06-28 11:06

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/772100.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

TIS人人都会用的数据集成产品

文章目录 1.TIS是什么&#xff1f;1.1 简介1.2 官网及项目地址1.3 架构 2.功能特性2.1 基于Web UI的开箱即用2.2 支持分布式任务分发2.3 全新的基于微内核的运行环境2.4 功能覆盖DataX大部分&#xff08;Reader/Writer&#xff09;Plugin2.5 重构DataX的Classloader2.6 支持RDB…

科比老大职业生涯数据预测(基于随机森林模型)

1.实验背景 科比布莱恩特&#xff0c;作为NBA历史上最伟大的篮球运动员之一&#xff0c;他的职业生涯充满了无数精彩瞬间。 科比于1996年以13顺位的选秀身份进入联盟&#xff0c;一生都效力于洛杉矶湖人队。于2016年宣布退役&#xff0c;职业生涯获奖无数&#xff0c;5次NBA总…

99. 岛屿数量

题目描述&#xff1a;给定一个由 1&#xff08;陆地&#xff09;和 0&#xff08;水&#xff09;组成的矩阵&#xff0c;你需要计算岛屿的数量。岛屿由水平方向或垂直方向上相邻的陆地连接而成&#xff0c;并且四周都是水域。你可以假设矩阵外均被水包围。 输入描述&#xff1a…

EXTI寄存器,AFIO的简洁,EXTI配置的流程

一&#xff0c;AFIO简介 AFIO是Alternate Function Input/Output 的缩写&#xff0c;表示复用功能IO&#xff0c;主要用于实现IO端口的复用功能以及外部中断的控制 STM32外设有很多I/O以及内置外设&#xff08;如12C&#xff0c;ADC,ISP,USART等&#xff09;。为节省引出管脚的…

命令行运行git reflog(reference log)报错的解决办法

文章目录 1. 检查 Git 是否已安装2. 检查 PATH 环境变量3. 重新安装 Git 在Git中&#xff0c; reflog的英文全称是 “ reference log”。意思是 引用日志&#xff08;参考日志&#xff09;。它记录了本地仓库中HEAD和分支引用所指向的提交的变更历史。这包括了你所有的提交&…

经典低功耗四通道运算放大器LM324

前言&#xff1a; SOP14封装LM324 这个LM324运放有几十年的历史了吧&#xff1f;很普通&#xff0c;很常用&#xff0c;搞电路的避免不了接触运放&#xff0c;怎么选择运放&#xff0c;是工程师关心的问题吧&#xff1f; 从本文开始&#xff0c;将陆续发一些常用的运放&#xf…

【AI学习】无线AI的问题和挑战

无线AI&#xff0c;即无线人工智能&#xff0c;是指内生于未来&#xff08;6G&#xff09;无线通信系统并通过无线架构、无线数据、无线算法和无线应用所呈现出来的新的人工智能技术体系。 最近一直在进行无线AI的调研&#xff0c;感觉真的是路漫漫其修远兮。业界有一些探索&a…

【人工智能】--生成对抗网络

个人主页&#xff1a;欢迎来到 Papicatch的博客 课设专栏 &#xff1a;学生成绩管理系统 专业知识专栏&#xff1a; 专业知识 文章目录 &#x1f349;引言 &#x1f349;GAN 的基本原理 &#x1f348;生成器&#xff08;Generator&#xff09; &#x1f348;判别器&…

【前端知识】一篇速成 建议收藏

HTML基础概念 正式敲代码之前呢,我们先来看几个概念: 0 静态网页和动态网页 静态网页: 页面的内容和显示效果就基本上不会发生变化了--除非你修改页面代码。 动态网页: 页面代码虽然没有变&#xff0c;但是显示的内容却是可以随着时间、环境或者数据库操作的结果而发生改变的…

解决SeaTunnel 2.3.4版本写入S3文件报错问题

在使用Apache SeaTunnel时&#xff0c;我遇到了一个写入S3文件的报错问题。通过深入调试和分析&#xff0c;找到了问题所在&#xff0c;并提出了相应的解决方案。 本文将详细介绍报错情况、参考资料、解决思路以及后续研究方向&#xff0c;希望对大家有帮助&#xff01; 一、…

PyTorch - 神经网络基础

神经网络的主要原理包括一组基本元素&#xff0c;即人工神经元或感知器。它包括几个基本输入&#xff0c;例如 x1、x2… xn &#xff0c;如果总和大于激活电位&#xff0c;则会产生二进制输出。 样本神经元的示意图如下所述。 产生的输出可以被认为是具有激活电位或偏差的加权…

Java通过GeoLite2-City.mmdb 进行IP信息查询地理定位和经纬度筛选。

引入依赖 <dependency><groupId>com.maxmind.geoip2</groupId><artifactId>geoip2</artifactId><version>4.2.0</version> </dependency>下载数据文件&#xff1a;https://download.lin2ur.cn/GeoLite2/ package com.cqclo…

经典递归分析

在前面一篇中, 已经看过许多直观的递归的例子, 在这篇里, 将分析两个经典的递归问题, 阶乘与菲波那契数列数列, 在此过程中, 还将对比递归与循环(迭代)间的异同, 探讨递归与内存中的栈的关系, 以及递归的效率等问题. 如无特别说明, 示例使用的是 Java, IDE 则为 Eclipse. 阶乘(…

Matplotlib 简介

import matplotlib.pyplot as plt plt.plot([1, 2, 3, 4]) plt.ylabel(some numbers) plt.show() 当使用plot只传入单个数组时&#xff0c;matplotlib会认为这是y的值&#xff0c;并自动生成长度相同&#xff0c;但是从0开始的x值&#xff0c;所以这里的x会自动生成为 [0,1,2,…

python自动化办公之BeautifulSoup爬取并解析html文本

用到的库&#xff1a;BeautifulSoup 实现效果&#xff1a;爬取网站内容&#xff0c;拿到html文本并解析html文本 代码&#xff1a; 先爬取 # 先导入requests包 import requests urlhttps://www.baidu.com responserequests.get(url) # 做1个断言&#xff0c;如果执行成功&a…

java的工厂设备管理系统-计算机毕业设计源码16179

摘要 在现代制造业中&#xff0c;高效的设备管理对于确保生产过程的顺利进行至关重要。为了满足工厂对于设备管理的需求&#xff0c;我们设计并实现了一个基于 Java 的工厂设备管理系统。 该系统旨在提供一个全面、可靠且易于使用的解决方案&#xff0c;以帮助工厂有效地管理…

QT截屏,截取控件为图片,指定范围截屏三种截屏方式

项目中我们常用到截取屏幕&#xff0c;Qt给我的们多种方式&#xff1a; 主要有以下三种&#xff1a; 截取全屏&#xff1b;截取控件为图片&#xff1b;指定位置截屏三种截屏方式&#xff1b; 1.截取全屏 常用&#xff1a; 实现&#xff1a; QScreen *screen QGuiApplicat…

【超万卡GPU集群关键技术深度分析 2024】

文末有福利&#xff01; 1. 集群高能效计算技术 随着大模型从千亿参数的自然语言模型向万亿参数的多模态模型升级演进&#xff0c;超万卡集群吸需全面提升底层计算能力。 具体而言&#xff0c;包括增强单芯片能力、提升超节点计算能力、基于 DPU (Data Processing Unit) 实现…

波动方程 - 在三维图中动态显示二维波动方程的解就像水面波澜起伏

波动方程 - 在三维图中动态显示二维波动方程的解就像水面波澜起伏 flyfish 波动方程的求解结果通常不是一个单一的数值&#xff0c;而是一个函数或一组函数&#xff0c;这些函数描述了波随时间和空间的传播情况。具体来说&#xff0c;波动方程的解可以是关于时间和空间变量的…

#LinuxC高级 笔记二

makefile gcc gdb makefile 1. 分文件编程 1.1 源文件&#xff1a;.c结尾的文件 包含main函数的.c 包含子函数的.c 1.2 头文件&#xff1a;.h结尾的文件 头文件、宏定义、typedef 、结构体、共用体、枚举、函数声明 include引用时“”和<>的区别&#xff1a; <>去系…