muduo之TimerQueue
生活随笔
收集整理的這篇文章主要介紹了
muduo之TimerQueue
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
? ? ? ? ?muduo的TimerQueue是基于timerfd_create實現,這樣超時很容易和epoll結合起來。等待超時事件保存在set集合中,注意set集合的有序性,從小到大排列,整個對TimerQueue的處理也就是對set集合的操作。實現TimerQueue用了3個set,分別是等待超時事件set,活躍事件set,被撤銷定時set。主要是STL的一些操作。
TimerQueue.h
// Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file.// Author: Shuo Chen (chenshuo at chenshuo dot com) // // This is an internal header file, you should not include this.#ifndef MUDUO_NET_TIMERQUEUE_H #define MUDUO_NET_TIMERQUEUE_H#include <set> #include <vector>#include "muduo/base/Mutex.h" #include "muduo/base/Timestamp.h" #include "muduo/net/Callbacks.h" #include "muduo/net/Channel.h"namespace muduo { namespace net {class EventLoop; class Timer; class TimerId;/// /// A best efforts timer queue. /// No guarantee that the callback will be on time. /// class TimerQueue : noncopyable {public:explicit TimerQueue(EventLoop* loop);~TimerQueue();////// Schedules the callback to be run at given time,/// repeats if @c interval > 0.0.////// Must be thread safe. Usually be called from other threads./* * 用于注冊定時任務* @param cb, 超時調用的回調函數* @param when,超時時間(絕對時間)* @interval,是否是周期性超時任務*/TimerId addTimer(TimerCallback cb,Timestamp when,double interval);/* 取消定時任務,每個定時任務都有對應的TimerId,這是addTimer返回給調用者的 */void cancel(TimerId timerId);private:// FIXME: use unique_ptr<Timer> instead of raw pointers.// This requires heterogeneous comparison lookup (N3465) from C++14// so that we can find an T* in a set<unique_ptr<T>>./* * 主要用于刪除操作,通過TimerId找到Timer*,再通過Timer*找到在timers_中的位置,將期刪除* 覺得可以省略*/typedef std::pair<Timestamp, Timer*> Entry;typedef std::set<Entry> TimerList;typedef std::pair<Timer*, int64_t> ActiveTimer;typedef std::set<ActiveTimer> ActiveTimerSet;void addTimerInLoop(Timer* timer);void cancelInLoop(TimerId timerId);// called when timerfd alarms/* 當timerfd被激活時調用的回調函數,表示超時 */void handleRead();// move out all expired timers/* 從timers_中拿出所有超時的Timer* */std::vector<Entry> getExpired(Timestamp now);/* 將超時任務中周期性的任務重新添加到timers_中 */void reset(const std::vector<Entry>& expired, Timestamp now);/* 插入到timers_中 */bool insert(Timer* timer);EventLoop* loop_; /* 所屬的事件驅動循環 */const int timerfd_; /* 由timerfd_create創建的文件描述符 */Channel timerfdChannel_; /* 用于監聽timerfd的Channel */// Timer list sorted by expirationTimerList timers_; /* 保存所有的定時任務 */// for cancel()ActiveTimerSet activeTimers_;bool callingExpiredTimers_; /* atomic */ActiveTimerSet cancelingTimers_; //保存被取消的定時器 };} // namespace net } // namespace muduo #endif // MUDUO_NET_TIMERQUEUE_HTimerQueue.cc
// Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file.// Author: Shuo Chen (chenshuo at chenshuo dot com)#ifndef __STDC_LIMIT_MACROS #define __STDC_LIMIT_MACROS #endif#include "muduo/net/TimerQueue.h"#include "muduo/base/Logging.h" #include "muduo/net/EventLoop.h" #include "muduo/net/Timer.h" #include "muduo/net/TimerId.h"#include <sys/timerfd.h> #include <unistd.h>namespace muduo { namespace net { namespace detail {int createTimerfd() {int timerfd = ::timerfd_create(CLOCK_MONOTONIC,TFD_NONBLOCK | TFD_CLOEXEC);if (timerfd < 0){LOG_SYSFATAL << "Failed in timerfd_create";}return timerfd; }struct timespec howMuchTimeFromNow(Timestamp when)//現在距離超時時間when還有多久 {int64_t microseconds = when.microSecondsSinceEpoch()- Timestamp::now().microSecondsSinceEpoch();if (microseconds < 100){microseconds = 100;}struct timespec ts;ts.tv_sec = static_cast<time_t>(microseconds / Timestamp::kMicroSecondsPerSecond);ts.tv_nsec = static_cast<long>((microseconds % Timestamp::kMicroSecondsPerSecond) * 1000);return ts; }void readTimerfd(int timerfd, Timestamp now)//處理超時事件。超時后,timerfd變為可讀 {uint64_t howmany;ssize_t n = ::read(timerfd, &howmany, sizeof howmany);LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString();if (n != sizeof howmany){LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8";} }//timerfd是時間對于的文件描述符 void resetTimerfd(int timerfd, Timestamp expiration)//重新設置定時器 {// wake up loop by timerfd_settime()struct itimerspec newValue;struct itimerspec oldValue;memZero(&newValue, sizeof newValue);memZero(&oldValue, sizeof oldValue);newValue.it_value = howMuchTimeFromNow(expiration);int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);if (ret){LOG_SYSERR << "timerfd_settime()";} }} // namespace detail } // namespace net } // namespace muduousing namespace muduo; using namespace muduo::net; using namespace muduo::net::detail;TimerQueue::TimerQueue(EventLoop* loop): loop_(loop),timerfd_(createTimerfd()),timerfdChannel_(loop, timerfd_),timers_(),callingExpiredTimers_(false) {timerfdChannel_.setReadCallback(std::bind(&TimerQueue::handleRead, this));// we are always reading the timerfd, we disarm it with timerfd_settime.timerfdChannel_.enableReading(); }TimerQueue::~TimerQueue() {timerfdChannel_.disableAll();timerfdChannel_.remove();::close(timerfd_);// do not remove channel, since we're in EventLoop::dtor();for (const Entry& timer : timers_){delete timer.second;} }TimerId TimerQueue::addTimer(TimerCallback cb,Timestamp when,double interval) {Timer* timer = new Timer(std::move(cb), when, interval);/* * 在自己所屬線程調用addTimerInLoop函數 */loop_->runInLoop(std::bind(&TimerQueue::addTimerInLoop, this, timer));return TimerId(timer, timer->sequence()); }void TimerQueue::cancel(TimerId timerId) {loop_->runInLoop(std::bind(&TimerQueue::cancelInLoop, this, timerId)); }/* 向計時器隊列中添加超時事件 */ void TimerQueue::addTimerInLoop(Timer* timer) {loop_->assertInLoopThread();bool earliestChanged = insert(timer);//返回true,說明timer被添加到set的頂部,作為新的根節點,需要更新timerfd的激活時間// 只有在計時器為空的時候或者新加入的計時器的最早觸發時間小于當前計時器的堆頂的最小值// 才需要用最近時間去更新if (earliestChanged){resetTimerfd(timerfd_, timer->expiration());} }void TimerQueue::cancelInLoop(TimerId timerId) {loop_->assertInLoopThread();assert(timers_.size() == activeTimers_.size());ActiveTimer timer(timerId.timer_, timerId.sequence_);ActiveTimerSet::iterator it = activeTimers_.find(timer);if (it != activeTimers_.end())//要取消的在當前激活的Timer集合中{size_t n = timers_.erase(Entry(it->first->expiration(), it->first));//在timers_中取消assert(n == 1); (void)n;delete it->first; // FIXME: no delete pleaseactiveTimers_.erase(it);//在activeTimers_中取消}else if (callingExpiredTimers_)//如果正在執行超時定時器的回調函數,則加入到cancelingTimers集合中{cancelingTimers_.insert(timer);}assert(timers_.size() == activeTimers_.size()); }/* * 當定時器超時,保存timerfd的Channel激活,調用回調函數*/ void TimerQueue::handleRead() {loop_->assertInLoopThread();Timestamp now(Timestamp::now());readTimerfd(timerfd_, now);/* 從定時任務set中拿出所有超時任務 */std::vector<Entry> expired = getExpired(now);callingExpiredTimers_ = true;cancelingTimers_.clear();// safe to callback outside critical section/* 調用超時的事件回調函數 */for (const Entry& it : expired){it.second->run();}callingExpiredTimers_ = false;reset(expired, now); }/** 重新整理時間set中的任務,將所有超時的任務都拿出,然后調用其回調函數*/ std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now) {assert(timers_.size() == activeTimers_.size());std::vector<Entry> expired;Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX));//返回第一個大于等于now的迭代器,小于now的都已經超時//lower_bound( begin,end,num):從數組的begin位置到end-1位置二分查找第一個大于或等于num的數字,找到返回該數字的地址//lower_bound(val):返回容器中第一個值【大于或等于】val的元素的iterator位置TimerList::iterator end = timers_.lower_bound(sentry);assert(end == timers_.end() || now < end->first);/* back_inserter:容器適配器,將數據插入到參數的尾部 *///一個序列(sequence)拷貝到一個容器(container)中去,通常用std::copy算法std::copy(timers_.begin(), end, back_inserter(expired));timers_.erase(timers_.begin(), end); //從timers_中移除for (const Entry& it : expired){ActiveTimer timer(it.second, it.second->sequence());size_t n = activeTimers_.erase(timer); //從activeTimers_中移除assert(n == 1); (void)n;}assert(timers_.size() == activeTimers_.size());return expired; }/* * 調用完所有超時的回調函數后,需要對這些超時任務進行整理* 將周期性的定時任務重新添加到set中*/ void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now) {Timestamp nextExpire;for (const Entry& it : expired){ActiveTimer timer(it.second, it.second->sequence());if (it.second->repeat() /* 是否是周期性的定時任務 */&& cancelingTimers_.find(timer) == cancelingTimers_.end()){/* 重新計算超時時間 */it.second->restart(now);/* 重新添加到set中 */insert(it.second);}else{// FIXME move to a free listdelete it.second; // FIXME: no delete please}}/* 計算下次timerfd被激活的時間 */if (!timers_.empty()){nextExpire = timers_.begin()->second->expiration();//set從小到大排序}/* 設置 */if (nextExpire.valid())//時間是有效的{resetTimerfd(timerfd_, nextExpire);} }bool TimerQueue::insert(Timer* timer) {loop_->assertInLoopThread();assert(timers_.size() == activeTimers_.size());bool earliestChanged = false;/* 獲取timer的UTC時間戳,和timer組成std::pair<Timestamp, Timer*> */Timestamp when = timer->expiration();/* timers_begin()是set頂層元素(紅黑樹根節點),是超時時間最近的Timer* */TimerList::iterator it = timers_.begin();/* 如果要添加的timer的超時時間比timers_中的超時時間近,更改新的超時時間 */if (it == timers_.end() || when < it->first){earliestChanged = true;}{/* 添加到定時任務的set中 */std::pair<TimerList::iterator, bool> result = timers_.insert(Entry(when, timer));assert(result.second); (void)result;}{/* 同時也添加到activeTimers_中,用于刪除時查找操作 */std::pair<ActiveTimerSet::iterator, bool> result = activeTimers_.insert(ActiveTimer(timer, timer->sequence()));assert(result.second); (void)result;}assert(timers_.size() == activeTimers_.size());return earliestChanged; }?
總結
以上是生活随笔為你收集整理的muduo之TimerQueue的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: muduo采用计时函数gettimeof
- 下一篇: muduo之TcpClient