Runsisi's Blog
不念过去 不畏将来
Ceph 中的定时器

Ceph 中的时间

common/Timer.cc
  SafeTimer

common/ceph_timer.h
  ceph::timer

msg/async/Event.cc
  EventCenter::process_time_events

auth/cephx/CephxKeyServer.cc
  KeyServer::_rotate_secret

mon/MonClient.cc
  MonClient::_check_auth_rotating

auth/cephx/CephxProtocol.cc
  CephXTicketHandler::need_key

mon/Paxos.cc
  Paxos::extend_lease

Ceph 中的定时器

Ceph 中的定时器并没有统一的实现,有基于 pthread_cond_timedwait 原生接口的实现,也有基于 C++ std::condition_variable 标准库的接口实现,当然标准库在 Linux 平台上最终还是基于 pthread_cond_timedwait 原生接口的实现。

namespace std {

template<>
cv_status
condition_variable::wait_until(unique_lock<mutex>& __lock,
    const chrono::time_point<chrono::steady_clock, chrono::nanoseconds>& __atime)
{
  // DR 887 - Sync unknown clock to known clock.
  const typename chrono::steady_clock::time_point __c_entry = chrono::steady_clock::now();
  const __clock_t::time_point __s_entry = __clock_t::now();
  const auto __delta = __atime - __c_entry;
  const auto __s_atime = __s_entry + __delta;

  if (__wait_until_impl(__lock, __s_atime) == cv_status::no_timeout)
    return cv_status::no_timeout;
  // We got a timeout when measured against __clock_t but
  // we need to check against the caller-supplied clock
  // to tell whether we should return a timeout.
  if (chrono::steady_clock::now() < __atime)
    return cv_status::no_timeout;
  return cv_status::timeout;
}

} // namespace std
class condition_variable
{
  condition_variable() noexcept {
    int r = 0;
#if !defined(__APPLE__)
    pthread_condattr_t attr;
    pthread_condattr_init(&attr);
    pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
    r = pthread_cond_init(&_M_cond, &attr);
    pthread_condattr_destroy(&attr);
#else
    r = pthread_cond_init(&_M_cond, nullptr);
#endif
  }
  ~condition_variable() noexcept {
    /* int __e = */ pthread_cond_destroy(&_M_cond);
  }
};
// condition_variable.h

#pragma once

#include <chrono>
#include <mutex>
#include <condition_variable>

namespace cond {
namespace gcc {

namespace chrono = std::chrono;

using std::mutex;
using std::unique_lock;
using std::cv_status;

class condition_variable
{
  using steady_clock = chrono::steady_clock;
  using system_clock = chrono::system_clock;
  using __clock_t = steady_clock;
  typedef pthread_cond_t __native_type;

  __native_type _M_cond;

public:
  typedef __native_type* native_handle_type;

  condition_variable() noexcept {
    int r = 0;
#if !defined(__APPLE__)
    pthread_condattr_t attr;
    pthread_condattr_init(&attr);
    pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
    r = pthread_cond_init(&_M_cond, &attr);
    pthread_condattr_destroy(&attr);
#else
    r = pthread_cond_init(&_M_cond, nullptr);
#endif
  }
  ~condition_variable() noexcept {
    /* int __e = */ pthread_cond_destroy(&_M_cond);
  }

  condition_variable(const condition_variable&) = delete;
  condition_variable& operator=(const condition_variable&) = delete;

  void
  notify_one() noexcept {
    /* int __e = */ pthread_cond_signal(&_M_cond);
  }

  void
  notify_all() noexcept {
    /* int __e = */ pthread_cond_broadcast(&_M_cond);
  }

  void
  wait(unique_lock<mutex>& __lock) noexcept {
    /* int __e = */ pthread_cond_wait(&_M_cond, __lock.mutex()->native_handle());

    /*
    if (__e)
      std::terminate();
    */
  }

  template<typename _Predicate>
    void
    wait(unique_lock<mutex>& __lock, _Predicate __p)
    {
      while (!__p())
        wait(__lock);
    }

  template<typename _Duration>
    cv_status
    wait_until(unique_lock<mutex>& __lock,
               const chrono::time_point<steady_clock, _Duration>& __atime)
    { return __wait_until_impl(__lock, __atime); }

  template<typename _Duration>
    cv_status
    wait_until(unique_lock<mutex>& __lock,
               const chrono::time_point<system_clock, _Duration>& __atime)
    {
      // return __wait_until_impl(__lock, __atime);
      return wait_until<system_clock, _Duration>(__lock, __atime);
    }

  template<typename _Clock, typename _Duration>
    cv_status
    wait_until(unique_lock<mutex>& __lock,
               const chrono::time_point<_Clock, _Duration>& __atime)
    {
#if __cplusplus > 201703L
      static_assert(chrono::is_clock_v<_Clock>);
#endif
      using __s_dur = typename __clock_t::duration;
      const typename _Clock::time_point __c_entry = _Clock::now();
      const __clock_t::time_point __s_entry = __clock_t::now();
      const auto __delta = __atime - __c_entry;
      auto __reldelta = chrono::duration_cast<__s_dur>(__delta);
      if (__reldelta < __delta)
        ++__reldelta;
      const auto __s_atime = __s_entry + __reldelta;

      if (__wait_until_impl(__lock, __s_atime) == cv_status::no_timeout)
        return cv_status::no_timeout;
      // We got a timeout when measured against __clock_t but
      // we need to check against the caller-supplied clock
      // to tell whether we should return a timeout.
      if (_Clock::now() < __atime)
        return cv_status::no_timeout;
      return cv_status::timeout;
    }

  template<typename _Clock, typename _Duration, typename _Predicate>
    bool
    wait_until(unique_lock<mutex>& __lock,
               const chrono::time_point<_Clock, _Duration>& __atime,
               _Predicate __p)
    {
      while (!__p())
        if (wait_until(__lock, __atime) == cv_status::timeout)
          return __p();
      return true;
    }

  template<typename _Rep, typename _Period>
    cv_status
    wait_for(unique_lock<mutex>& __lock,
             const chrono::duration<_Rep, _Period>& __rtime)
    {
      using __dur = typename steady_clock::duration;
      auto __reltime = chrono::duration_cast<__dur>(__rtime);
      if (__reltime < __rtime)
        ++__reltime;
      return wait_until(__lock, steady_clock::now() + __reltime);
    }

  template<typename _Rep, typename _Period, typename _Predicate>
    bool
    wait_for(unique_lock<mutex>& __lock,
             const chrono::duration<_Rep, _Period>& __rtime,
             _Predicate __p)
    {
      using __dur = typename steady_clock::duration;
      auto __reltime = chrono::duration_cast<__dur>(__rtime);
      if (__reltime < __rtime)
        ++__reltime;
      return wait_until(__lock, steady_clock::now() + __reltime, std::move(__p));
    }

  native_handle_type
  native_handle()
  { return &_M_cond; }

private:
  template<typename _Dur>
    cv_status
    __wait_until_impl(unique_lock<mutex>& __lock,
                      const chrono::time_point<steady_clock, _Dur>& __atime)
    {
      auto __s = chrono::time_point_cast<chrono::seconds>(__atime);
      auto __ns = chrono::duration_cast<chrono::nanoseconds>(__atime - __s);

      struct timespec __ts =
        {
          static_cast<std::time_t>(__s.time_since_epoch().count()),
          static_cast<long>(__ns.count())
        };

      pthread_cond_timedwait(&_M_cond, __lock.mutex()->native_handle(),
                                       &__ts);

      return (steady_clock::now() < __atime
              ? cv_status::no_timeout : cv_status::timeout);
    }

  template<typename _Dur>
    cv_status
    __wait_until_impl(unique_lock<mutex>& __lock,
                      const chrono::time_point<system_clock, _Dur>& __atime)
    {
      auto __s = chrono::time_point_cast<chrono::seconds>(__atime);
      auto __ns = chrono::duration_cast<chrono::nanoseconds>(__atime - __s);

      struct timespec __ts =
        {
          static_cast<std::time_t>(__s.time_since_epoch().count()),
          static_cast<long>(__ns.count())
        };

      pthread_cond_timedwait(&_M_cond, __lock.mutex()->native_handle(),
                               &__ts);

      return (system_clock::now() < __atime
              ? cv_status::no_timeout : cv_status::timeout);
    }
};

} // namespace gcc
} // namespace cond
// main.cc

#include "condition_variable.h"

#include <iostream>
#include <atomic>
#include <condition_variable>
#include <thread>
#include <chrono>
using namespace std::chrono_literals;
using namespace std::chrono;
using namespace std;

ceph::gcc::condition_variable cv;
//std::condition_variable cv;
std::mutex cv_mutex;
std::atomic<int> i{0};

void print_now() {
    auto now = std::chrono::steady_clock::now();
    auto epoch = std::chrono::duration_cast<seconds>(now.time_since_epoch()).count();
    cout << epoch << endl;
}

void waits(int idx)
{
    std::unique_lock<std::mutex> lk(cv_mutex);
    cout << "waiting: " << idx << endl;

    print_now();

    if(cv.wait_until(lk, std::chrono::system_clock::now() + idx*20s) != std::cv_status::timeout) {
        std::cerr << "Thread " << idx << " finished waiting. i == " << i << '\n';

        print_now();
    } else {
        std::cerr << "Thread " << idx << " timed out. i == " << i << '\n';

        print_now();
    }
}

void signals()
{
    std::this_thread::sleep_for(232s);
    std::cerr << "Notifying...\n";
    cv.notify_all();
    std::this_thread::sleep_for(5s);
    i = 1;
    std::cerr << "Notifying again...\n";
    cv.notify_all();
}

int main()
{
    std::thread t1(waits, 1), t2(waits, 2), t3(waits, 3), t4(signals);
    t1.join();
    t2.join();
    t3.join();
    t4.join();
}

参考资料

CLOCK_MONOTONIC and pthread_mutex_timedlock / pthread_cond_timedwait

https://stackoverflow.com/questions/14248033/clock-monotonic-and-pthread-mutex-timedlock-pthread-cond-timedwait

0001216: Adding clockid parameter to functions that accept absolute struct timespec timeouts

https://www.austingroupbugs.net/view.php?id=1216

N2999: Background for issue 887: Clocks and Condition Variables (Rev. 1)

http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2009/n2999.html

Bug 41861 (DR887) - [DR 887][C++0x] <condition_variable> does not use monotonic_clock

https://gcc.gnu.org/bugzilla/show_bug.cgi?id=41861

Implement std::condition_variable via pthread_cond_clockwait() where available

https://reviews.llvm.org/D65339

std::condition_variable::wait_for prone to early/late timeout with libstdc++

http://randombitsofuselessinformation.blogspot.com/2018/06/its-about-time-monotonic-time.html

PR libstdc++/68519 use native duration to avoid rounding errors

https://github.com/gcc-mirror/gcc/commit/83fd5e73b3c16296e0d7ba54f6c547e01c7eae7b

libstdc++: Avoid rounding errors on custom clocks in condition_variable

https://github.com/gcc-mirror/gcc/commit/e05ff30078e80869f2bf3af6dbdbea134c252158

PR libstdc++/41861 Add full steady_clock support to condition_variable

https://github.com/gcc-mirror/gcc/commit/ad4d1d21ad5c515ba90355d13b14cbb74262edd2

Use the monotonic clock for thread conditions on POSIX platforms

https://bugs.python.org/issue23428

Uses and Abuses of Access Rights

http://www.gotw.ca/gotw/076.htm


最后修改于 2020-12-20