timerfd加epoll封装定时器

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 1、用timerfd加epoll封装定时器的优点
  • 2、代码实现

1、用timerfd加epoll封装定时器的优点

定时器为什么需要timerfd
在设计定时器时,我们首先想到的就是设置一个定时任务触发的时间,然后不断判断(死循环)当前时间是否大于等于定时任务触发的时间,如果是,那么就处理定时任务。这就是最为简单的设计,在我之前的博客中[定时器的简单实现],就是这么实现的,但是这样设计会存在诸多问题

  1. CPU资源浪费
    使用死循环来检查时间意味着CPU必须不断地执行这段代码,即使大部分时间都是在做无用的比较。这会导致CPU资源的浪费,尤其是在高性能的服务器或多任务环境中。
  2. 响应性下降
    由于CPU忙于执行定时器的检查,它可能无法及时响应其他重要的事件或任务,导致系统响应性下降。
  3. 不准确
    依赖于系统的时钟分辨率和调度器延迟,使用死循环检查时间的方法可能无法实现精确的定时。例如,如果系统时钟的分辨率是毫秒级,而你尝试实现微秒级的定时,那么这种方法就无法满足需求。
  4. 不适合长时间等待
    如果定时任务触发的时间间隔很长(例如几小时或几天),那么使用死循环来等待这段时间是非常低效的。

为解决上述问题,就产生了timerfd,当使用timerfd_create创建timerfd时,并设置了定时任务,当定时任务时间到达时,那么timerfd就变成了可读,经常与 select/poll/epoll搭配使用

这里我们不需要轮询这个timerfd,判断timerfd是否有数据(是否可读),因为这样做也会带来上述问题,因此我们需要将timerfd加入到select/poll/epoll中,让它们轮询,一般来说使用epoll更高效

  1. 统一的事件处理:epoll是Linux下多路复用IO接口select/poll的增强版本,它可以高效地处理大量的文件描述符和I/O事件。通过将timerfd的文件描述符加入epoll的监控集合中,可以将定时器超时事件与其他I/O事件进行统一处理,简化了事件驱动编程的复杂性。
  2. 提高并发性能:在高并发的网络服务器中,使用epoll可以监控多个套接字的I/O事件,而使用timerfd可以实现定时任务(如心跳检测、超时处理等)。这种结合使用的方式可以提高系统的并发性能和吞吐量。
  3. 减少系统调用开销:由于epoll采用I/O多路复用机制,并且只在有事件发生时才进行通知,因此可以减少不必要的系统调用开销。同时,由于timerfd的精度较高,可以减少因轮询而产生的额外开销。

2、代码实现

定时任务

//TimerEvent.h
#pragma once
#include <cstdint>
#include <functional>
#include <sys/time.h>
#include <memory>
class TimerEvent
{
public:
    using s_ptr = std::shared_ptr<TimerEvent>;
    template<typename F, typename... Args>
    TimerEvent(int interval, bool is_repeated, F&& f, Args&&... args):interval_(interval), is_repeated_(is_repeated)
    {
        auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
        task_ = task;
    }
    

    int64_t getArriveTime() const
    {
        return arrive_time_;
    }

    void setCancler(bool flag)
    {
        is_cancled_ = flag;
    }

    bool isCancle()
    {
        return is_cancled_;
    }

    bool isRepeated()
    {
        return is_repeated_;
    }

    std::function<void()> getCallBack()
    {
        return task_;
    }

    //重新设定任务到达时间
    void resetArriveTime();
    //获取当前时间
    static int64_t getNowMs();
private:
    int64_t arrive_time_;//ms 执行任务时毫秒级时间戳,达到对应的时间戳就执行对应的任务
    int64_t interval_;//ms 隔多少ms后执行
    bool is_repeated_{false};//是否为周期性的定时任务
    bool is_cancled_{false};//是否取消
    
    std::function<void()> task_;
};

//TimerEvent.cpp
#include"TimerEvent.h"

int64_t TimerEvent::getNowMs()
{
    timeval val;
    gettimeofday(&val, NULL);
    return val.tv_sec*1000 + val.tv_usec/1000;
}

void TimerEvent::resetArriveTime()
{
    arrive_time_ = getNowMs() + interval_;
}

对timerfd的封装

//Timer.h
#pragma once
#include <map>
#include <vector>
#include <iostream>
#include "TimerEvent.h"

class Timer
{
public:
    Timer();
    ~Timer();

    int getFd()
    {
        return fd_;
    }

    void addTimerEvent(TimerEvent::s_ptr event);
    void deleteTimerEvent(TimerEvent::s_ptr event);
    //时间到达就触发
    void onTimer();
    std::vector<std::function<void()>> &getCallbacks()
    {
        return callbacks_;
    }

    //重新设置任务的到达时间
    void resetArriveTime();

private:
    int fd_;
    std::multimap<int64_t, TimerEvent::s_ptr> pending_events_;
    std::vector<std::function<void()>> callbacks_;
};


//Timer.cpp
#include <sys/timerfd.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include "Timer.h"

Timer::Timer() : fd_(timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK))
{
}

Timer::~Timer()
{

}

void Timer::resetArriveTime()
{
    if (pending_events_.empty())
    {
        return;
    }

    int64_t now = TimerEvent::getNowMs();
    auto it = pending_events_.begin();
    int64_t inteval = 0;
    // 第一个任务的定时时间比当前时间大,则重新设置
    if (it->second->getArriveTime() > now)
    {
        inteval = it->second->getArriveTime() - now;
    }
    else
    {
        // 第一个任务的定时时间比当前时间小或相等,说明第一个任务已经超时了,应该立马执行该任务
        inteval = 100; // ms
    }

    timespec ts;
    memset(&ts, 0, sizeof(ts));
    ts.tv_sec = inteval / 1000;//秒
    ts.tv_nsec = (inteval % 1000) * 1000000;//纳秒

    itimerspec value;
    memset(&value, 0, sizeof(value));
    value.it_value = ts;
    int result = timerfd_settime(fd_, 0, &value, NULL);
    if (result != 0)
    {
        printf("timerfd_settime error, errno=%d, error=%s", errno, strerror(errno));
    }
}

void Timer::addTimerEvent(TimerEvent::s_ptr event)
{
    bool is_reset_timerfd = false;
    if (pending_events_.empty())
    {
        is_reset_timerfd = true;
    }
    else
    {
        auto it = pending_events_.begin();
        // 当前需要插入的定时任务时间比已经存在的定时任务时间要早,那么就需要重新设定超时时间,防止任务延时
        if (it->first > event->getArriveTime())
        {
            is_reset_timerfd = true;
        }
    }
    pending_events_.emplace(event->getArriveTime(), event);
    if (is_reset_timerfd)
    {
        resetArriveTime();
    }
}

void Timer::deleteTimerEvent(TimerEvent::s_ptr event)
{
    event->setCancler(true);
    //pending_events_是multimap,key是时间,可能存在多个相同时间的event
    //将对应的event从pending_events_中删除
    auto begin = pending_events_.lower_bound(event->getArriveTime());
    auto end = pending_events_.upper_bound(event->getArriveTime());

    auto it = begin;

    for(;it != end; ++it)
    {
        if(it->second == event)
        {
            break;
        }
    }

    if(it != end)
    {
        pending_events_.erase(it);
    }

}

void Timer::onTimer()
{
    char buf[8];
    for(;;)
    {
        if((read(fd_, buf, 8) == -1) && errno == EAGAIN)
        {
            break;
        }
    }
    int64_t now = TimerEvent::getNowMs();
    std::vector<TimerEvent::s_ptr> tmps;
    std::vector<std::function<void()>>& callbacks_ = getCallbacks();
    auto it = pending_events_.begin();
    for(; it != pending_events_.end(); ++it)
    {
        // 任务已经到时或者超时,并且没有被取消,就需要执行
        if((it->first <= now) && !it->second->isCancle())
        {
            tmps.push_back(it->second);
            callbacks_.push_back(it->second->getCallBack());
        }
        else
        {
            break;// 因为定时任务是升序排的,只要第一个任务没到时,后面的都没到时
        }
    }

    //因为把任务已经保存好了,因此需要把m_pending_events中对应的定时任务删除,防止下次又执行了
    pending_events_.erase(pending_events_.begin(), it);
    // 需要把重复的TimerEvent再次添加进去
    for(auto i = tmps.begin(); i != tmps.end(); ++i)
    {
        if(!(*i)->isCancle())
        {
            //std::cout<<"重新添加"<<std::endl;
            (*i)->resetArriveTime();
            addTimerEvent(*i);
        }
    }
    resetArriveTime();
}

对epoll的封装

//TimerPollPoller.h
#pragma once
#include <sys/epoll.h>
#include <cstring>
#include <unistd.h>
#include <fcntl.h>
#include <atomic>
#include <iostream>
#include "ThreadPool.h"
#include "Timer.h"

class TimerPollPoller
{
public:
    TimerPollPoller(unsigned int num = std::thread::hardware_concurrency())
    :epollfd_(::epoll_create1(EPOLL_CLOEXEC)),
    thread_pool_(ThreadPool::instance()),
    stop_(true)
    {
        timer_ = std::make_shared<Timer>();
        struct epoll_event event;
        memset(&event, 0, sizeof(event));
        event.data.ptr = reinterpret_cast<void*>(&timer_);
        event.events = EPOLLIN;
        ::epoll_ctl(epollfd_, EPOLL_CTL_ADD, timer_->getFd(), &event);
        start();
    }
    ~TimerPollPoller()
    {
        ::close(epollfd_);
        stop();
        if(t.joinable())
        {
            std::cout << "主线程 join thread " << t.get_id() << std::endl;
            t.join();
        }
    }
    void start();
    void stop();
    void addTimerEvent(TimerEvent::s_ptr event);
    void cancelTimeEvent(TimerEvent::s_ptr event);
    void handleTimerfdInEpoll();
private:
    const int epollfd_;
    std::shared_ptr<Timer> timer_;
    std::thread t;//单独起一个线程,进行轮询epoll
    ThreadPool& thread_pool_;
    std::atomic<bool> stop_;
};


//TimerPollPoller.cpp
#include "TimerPollPoller.h"

void TimerPollPoller::start()
{
    t = std::move(std::thread(&TimerPollPoller::handleTimerfdInEpoll, this));
}
void TimerPollPoller::stop()
{
    stop_.store(true);
}
void TimerPollPoller::addTimerEvent(TimerEvent::s_ptr event)
{
    timer_->addTimerEvent(event);
}
void TimerPollPoller::cancelTimeEvent(TimerEvent::s_ptr event)
{
    timer_->deleteTimerEvent(event);
}
void TimerPollPoller::handleTimerfdInEpoll()
{
    struct epoll_event event;
    stop_.store(false);
    while(!stop_.load())
    {
        int numEvents = ::epoll_wait(epollfd_, &event, 1, 0);
        if(numEvents == 1)
        {
            std::shared_ptr<Timer> timer_ptr = *reinterpret_cast<std::shared_ptr<Timer>*>(event.data.ptr);
            timer_ptr->onTimer();
            std::vector<std::function<void()>> callbacks = std::move(timer_ptr->getCallbacks());
            for(auto task:callbacks)
            {
                thread_pool_.commit(task);
            }
        }
    }
}

处理任务的线程池

#pragma once
#include <atomic>
#include <condition_variable>
#include <future>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
#include <functional>
class ThreadPool {
public:
	static ThreadPool& instance() 
	{
		static ThreadPool ins;
		return ins;
	}
	using Task = std::packaged_task<void()>;
	~ThreadPool() 
	{
		stop();
	}
	template <class F, class... Args>
	auto commit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
		using RetType = decltype(f(args...));
		if (stop_.load())
			return std::future<RetType>{};
		auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
		std::future<RetType> ret = task->get_future();
		{
			std::lock_guard<std::mutex> cv_mt(cv_mt_);
			//将任务放进任务队列中
			tasks_.emplace([task] { (*task)(); });
		}
		//唤醒一个线程
		cv_lock_.notify_one();
		return ret;
	}
	int idleThreadCount() {
		return thread_num_;
	}
private:
	ThreadPool(const ThreadPool&) = delete;
	ThreadPool& operator=(const ThreadPool&) = delete;
	ThreadPool(unsigned int num = std::thread::hardware_concurrency())
	: stop_(false) {
		{
			if (num < 1)
				thread_num_ = 1;
			else
				thread_num_ = num;
		}
		start();
	}
	//启动所有线程
	void start() 
	{
		for (int i = 0; i < thread_num_; ++i) {
			pool_.emplace_back([this]() {
				while (!this->stop_.load()) {
					Task task;
					{
						std::unique_lock<std::mutex> cv_mt(cv_mt_);
						this->cv_lock_.wait(cv_mt, [this] {
							//stop_为true或者tasks_不为空(return 返回true),则进行下一步,否则阻塞在条件变量上
							return this->stop_.load() || !this->tasks_.empty();
						});
						if (this->tasks_.empty())
							return;
						task = std::move(this->tasks_.front());
						this->tasks_.pop();
					}
					this->thread_num_--;
					task();
					this->thread_num_++;
				}
			});
		}
	}
	void stop() 
	{
		stop_.store(true);
		cv_lock_.notify_all();
		for (auto& td : pool_) {
			if (td.joinable()) {
				std::cout << "join thread " << td.get_id() << std::endl;
				td.join();
			}
		}
	}
private:
	std::mutex               cv_mt_;
	std::condition_variable  cv_lock_;
	std::atomic_bool         stop_;
	std::atomic_int          thread_num_;
	std::queue<Task>         tasks_;
	std::vector<std::thread> pool_;
};

测试代码

#include "TimerPollPoller.h"
#include <iostream>
void print()
{
    std::cout << "I love psy" << std::endl;
}
void print1()
{
    std::cout << "I love fl" << std::endl;
}

int main()
{
    TimerPollPoller timerPollPoller;
    TimerEvent::s_ptr timer1 = std::make_shared<TimerEvent>(500, true, print);
    TimerEvent::s_ptr timer2 = std::make_shared<TimerEvent>(1000, true, print1);
    timerPollPoller.addTimerEvent(timer1);
    timerPollPoller.addTimerEvent(timer2);
    std::this_thread::sleep_for(std::chrono::seconds(2));
    timerPollPoller.cancelTimeEvent(timer1);
    std::this_thread::sleep_for(std::chrono::seconds(2));

    return 0;
}

makefile

PATH_SRC := .
PATH_BIN = bin
PATH_OBJ = obj

CXX := g++
CXXFLAGS := -g -O0 -std=c++11 -lpthread -Wall -Wno-deprecated -Wno-unused-but-set-variable
CXXFLAGS += -I./

SRCS := $(wildcard $(PATH_SRC)/*.cpp) 
OBJS := $(patsubst $(PATH_SRC)/%.cpp,$(PATH_OBJ)/%.o,$(SRCS))  

TARGET := $(PATH_BIN)/main

# 默认目标:生成可执行文件
all : $(TARGET)

# 链接规则
$(TARGET): $(OBJS)
	$(CXX) $(CXXFLAGS) $(OBJS) -o $@ 

$(PATH_OBJ)/%.o: $(PATH_SRC)/%.cpp  
	$(CXX) $(CXXFLAGS) -c $< -o $@ 


clean:
	rm -rf $(PATH_OBJ)/*.o $(TARGET)

.PHONY : clean

在这里插入图片描述

使用之间,在当前目录下需要创建bin目录和obj目录,然后再进行make,就能在bin目录下生产可执行程序main

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/618959.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

HNU-操作系统OS-2024期中考试

前言 该卷为22计科/智能OS期中考卷。 感谢智能22毕宿同学记忆了考卷考题。 同学评价&#xff1a;总体简单&#xff1b;第1&#xff0c;7概念题较难需要看书&#xff1b;第4&#xff0c;5题原题。 欢迎同学分享答案。 【1】共10分 操作系统的设计目标有哪些&#xff1f; 【…

Attention-guided Feature Distillation for Semantic Segmentation

摘要 与现有的复杂方法相比&#xff0c;该方法通常用于从老师那里提取知识给学生&#xff0c;该方法展示了一种简单而强大的方法&#xff0c;可以利用精细的特征映射来转移注意力。事实证明&#xff0c;该方法在提取丰富信息方面是有效的&#xff0c;在作为密集预测任务的语义…

springfox.documentation.spi.DocumentationType没有OAS_30(从swagger2转到swagger3出现的问题)

直接开讲&#xff1a; 查看源码根本没有OAS_30的类型选择 右键package的springfox找到maven下载的包&#xff0c;打开到资源管理器 可以看到项目优先使用2版本的jar包&#xff0c;但是OAS_30只在3版本中才有&#xff0c;意思就是让项目优先使用以下图片中的3.0.0jar包 解决办法…

智能文件夹改名助手:一键秒级恢复原始名称,轻松告别繁琐操作,提升文件管理效率

文件夹管理成为了我们日常工作和生活中不可或缺的一部分。然而&#xff0c;随着文件数量的不断增加和文件夹命名的复杂性&#xff0c;我们经常面临着重命名文件夹的繁琐操作。你是否曾经因为误改文件夹名称而头疼不已&#xff1f;是否曾经为了找回原始名称而耗费大量时间&#…

docker容器实现https访问

前言&#xff1a; 【云原生】docker容器实现https访问_docker ssl访问-CSDN博客 一术语介绍 ①key 私钥 明文--自己生成&#xff08;genrsa &#xff09; ②csr 公钥 由私钥生成 ③crt 证书 公钥 签名&#xff08;自签名或者由CA签名&#xff09; ④证书&#xf…

【Java】:向上转型、向下转型和ClassCastException异常

目录 先用一个生动形象的例子来解释向上转型和向下转型 向上转型&#xff08;Upcasting&#xff09; 向下转型&#xff08;Downcasting&#xff09; 向上转型 概念 例子 发生向上转型的情况 1.子类对象赋值给父类引用 2.方法参数传递 3.返回值 向下转型 概念 注意…

SpringSecurity的核心原理使用总结

1. SpringSecurity的核心原理 对于最原始Servlet请求处理的层次结构 客户端->过滤器链->Servlet 对于在SpringMVC中处理请求的层次结构 如何让Filter与Spring建立连接呢? 因此它增加了一个DelegatingFilterProxy 它是SpringMVC提供的的Filter,它内部代理了一个原生的F…

代码随想录——二叉树的层序遍历(Leetcode102)二叉树层序遍历的模板

题目链接 层序遍历&#xff08;队列&#xff09; 层序遍历一个二叉树。就是从左到右一层一层的去遍历二叉树。这种遍历的方式和我们之前讲过的都不太一样。 需要借用一个辅助数据结构即队列来实现&#xff0c;队列先进先出&#xff0c;符合一层一层遍历的逻辑&#xff0c;而用…

java项目之企业OA管理系统源码(springboot+vue+mysql)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于springboot的企业OA管理系统。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; 企业OA管理系统的主要使用…

搭建Springboot的基础开发框架-02

本系列专题虽然是按教学的深度来定稿的&#xff0c;但在项目结构和代码组织方面是按公司系统的要求来书定的。在本章中主要介绍下基础开发框架的功能。后续所有章节的项目全是在本基础框架的基础上演进的。 工程结构介绍 SpringbootSeries&#xff1a;父工程&#xff0c;定义一…

语言:C#

一、VSCode生成exe 二、

【计算机毕业设计】基于微信小程序校园服务平台

随着 计算机技术的成熟&#xff0c;互联网的建立&#xff0c;如今&#xff0c;PC平台上有许多关于校园服务方面的应用程序&#xff0c;但由于使用时间和地点上的限制&#xff0c;用户在使用上存在着种种不方便&#xff0c;而开发一款基于微信小程序的校园服务平台&#xff0c;能…

Loongnix系统替换内核操作

Loongnix系统替换内核操作 一、终端下执行命令 sudo apt search linux-image* 返回结果中格式如: linux-image-4.19.0-19-loongson-3 为最新的内核源码。 二、下载内核源码包 sudo apt source linux-image-4.19.0-19-loongson-3 如提示&#xff1a;E: 您必须在 sources.li…

文件系统(未打开的文件)

之前我们讲述的一些文件操作都是在文件被打开的基础上的&#xff0c;因为用户想要对某个文件做操作的话&#xff0c;这个文件一定是被打开的&#xff0c;也就是一定是内存级的文件。 但是有的没有被操作的文件&#xff0c;是在磁盘中的&#xff0c;我们的笔记本是在SSD中&…

红米K60Pro/K50/K40系列澎湃OS解锁BL降级出厂MIUI14稳定版本方法

最新红米K60/60pro/K50/K50至尊/K40等多个系列手机都已经推送了澎湃OS系统&#xff0c;但新版的系统适配周期短或者等其他原因&#xff0c;导致很多小伙伴希望降级回到MIUI14系统&#xff0c;多个小米售后都拒绝降级服务&#xff0c;并且官方也没有开通1个自助降级的方法&#…

rt-thread 挂载romfs与ramfs

参考&#xff1a; https://www.rt-thread.org/document/site/#/rt-thread-version/rt-thread-standard/tutorial/qemu-network/filesystems/filesystems?id%e4%bd%bf%e7%94%a8-romfs https://www.rt-thread.org/document/site/#/rt-thread-version/rt-thread-standard/tutor…

AI回答总不满意?你的提问方式可能完全错误!

大家好&#xff0c;我是卷福同学&#xff0c;一个专注AI大模型整活的前阿里程序员&#xff0c;腾讯云社区2023新秀突破作者 向AI提问想写一篇论文&#xff0c;结果AI就生成2000字左右的文章后就完了。小伙伴们是不是也会遇到这类情况呢。今天来教大家AI提示词的技巧&#xff0c…

FANUC机器人故障诊断—报警代码(五)

FANUC机器人故障诊断中关于报警代码的介绍更新如下&#xff1a; 一、报警代码&#xff08;SRVO-214&#xff09; SRVO-214 6轴放大器保险丝熔断 [原因]6轴伺服放大器上的保险丝(FS2,FS3)已熔断。括号内的数字表示在第几台6轴伺服放大器上检测出了保险丝熔断。 [对策] 1.保险…

实现字符串比较函数(C语言)

一、N-S流程图&#xff1b; 二、运行结果&#xff1b; 三、源代码&#xff1b; # define _CRT_SECURE_NO_WARNINGS # include <stdio.h>int main() {//初始化变量值&#xff1b;int i, result;char s1[100], s2[100];//填充数组&#xff1b;printf("请输入数组s1的…

java线程局部变量使用方式

线程局部变量是Java中用于存储线程本地信息的变量。这种变量仅在线程的生命周期内存在&#xff0c;并且每个线程都有自己的一份拷贝。换句话说&#xff0c;线程局部变量是线程私有的&#xff0c;其他线程无法访问。 使用场景主要包括&#xff1a; 1. 存储线程状态信息&#xff…