librados watch/notify 机制

watch 与 notify 机制是 ceph 客户端之间通信的一种方式,librbd 在 image 共享访问、rbd-mirror 的协同工作等地方大量使用了这种机制,因此有必要对 watch/notify 的 rados 层实现进行分析理解。

数据结构

首先需要明确的是 watch/notify 是与单个 rados 对象关联的,当多个客户端 watch 同一个对象后,任一客户端(该客户端不需要事先 watch 该对象)发送 notify 消息将在 OSD 端进行消息复制并转发给所有的客户端(如果发送者已经 watch 了该对象,则还包括 notify 消息的发送者)。

watch/notify 关键的信息记录在 3 个地方:

  1. 磁盘数据结构 object_info_t 的如下字段:
map<pair<uint64_t, entity_name_t>, watch_info_t> watchers;
  1. 内存数据结构 ObjectContext 的如下字段:
map<pair<uint64_t, entity_name_t>, WatchRef> watchers;

注意 WatchRef,即 shared_ptr<Watch>,也有一个指向 ObjectContextshared_ptr<ObjectContext> 的指针;

  1. 还有一些辅助的内存数据存在于 PrimaryLogPG::OpContext 的如下字段中:
list<pair<watch_info_t, bool>> watch_connects;
list<watch_disconnect_t> watch_disconnects;
list<notify_info_t> notifies;
list<NotifyAck> notify_acks;
  1. 当通信的对端与 OSD 建立 socket 连接并完成 cephx 认证后,OSD 会新建 Session 实例并设置到 Connection::priv 字段,当客户端在 OSD 端注册 watcher 时,会将 watcher 对应的 Watch 实例加入到 Session::wstate 中;

大致关系如下图所示:

处理逻辑

主要的处理逻辑包括:

PrimaryLogPG::do_osd_ops
PrimaryLogPG::do_osd_op_effects
PrimaryLogPG::populate_obc_watchers
PrimaryLogPG::handle_watch_timeout
PrimaryLogPG::check_blacklisted_watchers
PrimaryLogPG::context_registry_on_change
PrimaryLogPG::get_watchers

注意 PrimaryLogPG 中 op 处理的处理顺序如下:

do_request
    do_op
        find_object_context
        new OpContext
        execute_ctx
            prepare_transaction
                do_osd_ops
                make_writeable
                finish_ctx
            do_osd_op_effects
            issue_repop
            eval_repop

PrimaryLogPG::do_osd_ops,对客户端 WATCH, RECONNECT, PING, UNWATCH, NOTIFY, NOTIFY_ACK 等 6 个请求进行预处理:

case CEPH_OSD_OP_NOTIFY:
  ++ctx->num_read;
  {
    uint32_t timeout;
    bufferlist bl;

    try {
      uint32_t ver; // obsolete
      ::decode(ver, bp);
      ::decode(timeout, bp);
      ::decode(bl, bp);
    } catch (const buffer::error &e) {
      timeout = 0;
    }
    if (!timeout) // IoCtxImpl 构造函数设置的默认值 client_notify_timeout 是 10,除非设置 client_notify_timeout 默认值为 0,否则这里的值不会生效
      timeout = cct->_conf->osd_default_notify_timeout; // 默认值为 30

    notify_info_t n;
    n.timeout = timeout;
    n.notify_id = osd->get_next_id(get_osdmap()->get_epoch());
    // 不管是 watch 还是 notify 都会在 Objecter 中 register 注册一个 LingerOp,当然 notify 在完成之后 LingerOp 会删除
    n.cookie = op.watch.cookie; // LingerOp 的内存地址
    n.bl = bl;
    ctx->notifies.push_back(n);

    // return our unique notify id to the client
    ::encode(n.notify_id, osd_op.outdata);
  }
  break;
case CEPH_OSD_OP_NOTIFY_ACK: // 其它 watcher 对 notify 消息的响应
  ++ctx->num_read;
  {
    try {
      uint64_t notify_id = 0;
      uint64_t watch_cookie = 0;
      ::decode(notify_id, bp);
      ::decode(watch_cookie, bp);
      bufferlist reply_bl;
      if (!bp.end()) {
        ::decode(reply_bl, bp);
      }
      OpContext::NotifyAck ack(notify_id, watch_cookie, reply_bl);
      ctx->notify_acks.push_back(ack);
    } catch (const buffer::error &e) {
      OpContext::NotifyAck ack(
      // op.watch.cookie is actually the notify_id for historical reasons
          op.watch.cookie);
      ctx->notify_acks.push_back(ack);
    }
  }
  break;
case CEPH_OSD_OP_WATCH:
  ++ctx->num_write;
  {
    if (!obs.exists) {
      result = -ENOENT;
      break;
    }
    uint64_t cookie = op.watch.cookie;
    entity_name_t entity = ctx->reqid.name;
    ObjectContextRef obc = ctx->obc;

    uint32_t timeout = cct->_conf->osd_client_watch_timeout;
    if (op.watch.timeout != 0) {
      timeout = op.watch.timeout;
    }

    watch_info_t w(cookie, timeout, ctx->op->get_req()->get_connection()->get_peer_addr());
    if (op.watch.op == CEPH_OSD_WATCH_OP_WATCH || op.watch.op == CEPH_OSD_WATCH_OP_LEGACY_WATCH) { // 注册 watcher
      if (oi.watchers.count(make_pair(cookie, entity))) {
        dout(10) << " found existing watch " << w << " by " << entity << dendl;
      } else {
        dout(10) << " registered new watch " << w << " by " << entity << dendl;
        oi.watchers[make_pair(cookie, entity)] = w;
        t->nop(soid);  // make sure update the object_info on disk!
      }
      bool will_ping = (op.watch.op == CEPH_OSD_WATCH_OP_WATCH);
      ctx->watch_connects.push_back(make_pair(w, will_ping));
    } else if (op.watch.op == CEPH_OSD_WATCH_OP_RECONNECT) { // 由于 osdmap 变化或 socket 链路中断,导致已注册的 watcher 重新发送 LingerOp 请求
      if (!oi.watchers.count(make_pair(cookie, entity))) {
        result = -ENOTCONN;
        break;
      }
      dout(10) << " found existing watch " << w << " by " << entity << dendl;
      ctx->watch_connects.push_back(make_pair(w, true));
    } else if (op.watch.op == CEPH_OSD_WATCH_OP_PING) { // 已注册的 watcher 需要定时发送 ping 消息,否则 PrimaryLogPG::handle_watch_timeout 会删除该 watcher
      /* Note: WATCH with PING doesn't cause may_write() to return true,
       * so if there is nothing else in the transaction, this is going
       * to run do_osd_op_effects, but not write out a log entry */
      if (!oi.watchers.count(make_pair(cookie, entity))) { // 在 object_info_t 中查找
        result = -ENOTCONN;
        break;
      }
      map<pair<uint64_t, entity_name_t>, WatchRef>::iterator p = obc->watchers.find(make_pair(cookie, entity)); // 在 ObjectContext 中查找
      if (p == obc->watchers.end() || !p->second->is_connected()) {
        // client needs to reconnect
        result = -ETIMEDOUT;
        break;
      }
      dout(10) << " found existing watch " << w << " by " << entity << dendl;
      p->second->got_ping(ceph_clock_now()); // 更新 Watch::last_ping 并重新注册 OSDService::watch_timer 的超时回调
      result = 0;
    } else if (op.watch.op == CEPH_OSD_WATCH_OP_UNWATCH) {
      map<pair<uint64_t, entity_name_t>, watch_info_t>::iterator oi_iter =
          oi.watchers.find(make_pair(cookie, entity));
      if (oi_iter != oi.watchers.end()) { // 删除 watcher
        dout(10) << " removed watch " << oi_iter->second << " by " << entity << dendl;
        oi.watchers.erase(oi_iter);
        t->nop(soid);  // update oi on disk
        ctx->watch_disconnects.push_back(watch_disconnect_t(cookie, entity, false));
      } else {
        dout(10) << " can't remove: no watch by " << entity << dendl;
      }
    }
  }
  break;

PrimaryLogPG::do_osd_ops 的处理逻辑如下:

  1. WATCH,往 object_info_t::watchers 中插入 watcher,并 push back 到 OpContext::watch_connects 链表;

  2. UNWATCH,从 object_info_t::watchers 中移除 watcher,并 push back 到 OpContext::watch_disconnects 链表;

  3. RECONNECT,从 object_info_t::watchers 中查找是否已存在该 watcher,如果不存在则返回 -ENOTCONN,否则 push back 到 OpContext::watch_connects 链表;

  4. PING,从 object_info_t::watchers 中查找是否已存在该 watcher,如果不存在则返回 -ENOTCONN,如果存在则继续在内存记录 ObjectContext::watchers 中继续查找,如果不存在,返回 -ETIMEDOUT,如果存在,则调用 Watch::got_ping 更新定时器;

  5. NOTIFY,构造 notify_info_t 结构,并 push back 到 OpContext::notifies 链表;

  6. NOTIFY_ACK,构造 NotifyAck 结构,并 push back 到 OpContext::notify_acks 链表。

PrimaryLogPG::do_osd_op_effects,对 PrimaryLogPG::do_osd_ops 预处理之后记录在 OpContext 中的结果 watch_connects, watch_disconnects, notifies, notify_acks 进行后续处理:

void PrimaryLogPG::do_osd_op_effects(OpContext *ctx,
    const ConnectionRef& conn) {
  entity_name_t entity = ctx->reqid.name;

  // disconnects first
  complete_disconnect_watches(ctx->obc, ctx->watch_disconnects);

  for (list<pair<watch_info_t, bool> >::iterator i = ctx->watch_connects.begin();
      i != ctx->watch_connects.end(); ++i) {
    pair<uint64_t, entity_name_t> watcher(i->first.cookie, entity);

    WatchRef watch;
    if (ctx->obc->watchers.count(watcher)) { // 创建 ObjectContext 中的 watcher 信息,每个 watcher 对应一个 Watch 实例
      watch = ctx->obc->watchers[watcher];
    } else {
      watch = Watch::makeWatchRef(this, osd, ctx->obc, i->first.timeout_seconds,
          i->first.cookie, entity, conn->get_peer_addr());

      ctx->obc->watchers.insert(make_pair(watcher, watch)); // <watcher, Watch>
    }

    // 将 Watch 实例加入到 OSD 连接的 Session::wstate 中,接受所有正在进行的 notify,并注册超时处理,
    // 所有的状态将在 Watch::discard 中清除
    watch->connect(conn, i->second); // 第二个参数表示是否会要求客户端定时 ping,对于非 legcy 类型的 watch 这里为 true
  }

  for (list<notify_info_t>::iterator p = ctx->notifies.begin();
      p != ctx->notifies.end(); ++p) {
    ConnectionRef conn(ctx->op->get_req()->get_connection());
    NotifyRef notif(Notify::makeNotifyRef(conn, ctx->reqid.name.num(), p->bl, p->timeout,
            p->cookie, p->notify_id, ctx->obc->obs.oi.user_version, osd));
    for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator i = ctx->obc->watchers.begin();
        i != ctx->obc->watchers.end(); ++i) { // 遍历 ObjectContext::<watcher, Watch>,因为每个 notify 消息要发送给所有的 watcher
      // 将 notif 加入 Watch::in_progress_notifies
      // 将当前 Watch 实例加入 Notify::watchers
      // 发送 MWatchNotify 消息给当前 watcher
      i->second->start_notify(notif); // Watch::start_notify
    }

    // 注册超时回调,notif 此时实际上完全依赖定时器或者等待所有 watcher 的 ack 来释放
    notif->init();
  }

  for (list<OpContext::NotifyAck>::iterator p = ctx->notify_acks.begin();
      p != ctx->notify_acks.end(); ++p) {
    for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator i = ctx->obc->watchers.begin();
        i != ctx->obc->watchers.end(); ++i) { // 遍历 ObjectContext::<watcher, Watch> 找到是哪个客户端 watcher 发送的 ack
      if (i->first.second != entity) // entity_name_t
        continue;
      if (p->watch_cookie && p->watch_cookie.get() != i->first.first) // watch cookie
        continue;

      // 根据 notify_id 从 Watch::in_progress_notifies 中找到对应的 Notify 实例
      // 调用 Notify::complete_watcher 从 Notify::watchers 移除 Watch 实例,并检查是否已经接收到了所有的 watcher 的 ack
      // 如果所有 ack 都已收到,则给发送 notify 消息的客户端响应
      i->second->notify_ack(p->notify_id, p->reply_bl); // Watch::notify_ack
    }
  }
}

PrimaryLogPG::do_osd_op_effects 的处理逻辑如下:

  1. OpContext::watch_disconnects
void PrimaryLogPG::complete_disconnect_watches(ObjectContextRef obc,
    const list<watch_disconnect_t> &to_disconnect) {
  for (list<watch_disconnect_t>::const_iterator i = to_disconnect.begin();
      i != to_disconnect.end(); ++i) {
    pair<uint64_t, entity_name_t> watcher(i->cookie, i->name);
    auto watchers_entry = obc->watchers.find(watcher);
    if (watchers_entry != obc->watchers.end()) {
      WatchRef watch = watchers_entry->second;

      obc->watchers.erase(watcher);
      watch->remove(i->send_disconnect);
    }
  }
}
  1. OpContext::watch_connects

  2. OpContext::notifies

  3. OpContext::notify_acks

PrimaryLogPG::populate_obc_watchers

PrimaryLogPG::handle_watch_timeout

PrimaryLogPG::check_blacklisted_watchers

PrimaryLogPG::context_registry_on_change,遍历并移除 PG 中的所有 ObjectContext::watchers,其在如下一些情况下被调用:

  1. PG 开始新的 peering interval(PG::start_peering_interval -> PrimaryLogPG::on_change);

  2. PG 删除(OSD::_remove_pg -> PrimaryLogPG::on_removal -> PrimaryLogPG::on_shutdown);

  3. OSD 下电(OSD::shutdown -> PrimaryLogPG::on_shutdown

PrimaryLogPG::get_watchers,遍历并记录 PG 中的所有 ObjectContext::watchers,以支持 OSD admin socket 命令 dump_watchers

librados 客户端实现

客户端主要的接口如下(为了兼容性增加的带数字后缀的扩展版本、C++ 接口、同步接口以及非重要接口均未列出):

rados_aio_watch
rados_aio_notify
rados_aio_unwatch

这些外部接口最终都会调用到 IoCtxImpl 模块(librados/IoCtxImpl.cc)的实现:

IoCtxImpl::aio_watch
IoCtxImpl::aio_notify
IoCtxImpl::aio_unwatch

IoCtxImpl::aio_watch,调用 Objecter::linger_register 注册 LingerOp(注册的 LingerOp 将通过 IoCtxImpl::aio_unwatch 删除),并调用 Objecter::_linger_submit 构造 OSD Op 在服务端注册 watcher 信息:

int librados::IoCtxImpl::aio_watch(const object_t& oid,
                                   AioCompletionImpl *c, // 异步 api 的回调
                                   uint64_t *handle,
                                   librados::WatchCtx *ctx, // 兼容性处理,用户 watch 处理回调,用于接收 notify 通知消息和错误处理
                                   librados::WatchCtx2 *ctx2, // 用户 watch 处理回调,用于接收 notify 通知消息和错误处理
                                   uint32_t timeout,
                                   bool internal)
{
  // new 分配 LingerOp,设置 target、linger_id,并注册到 Objecter::linger_ops 及 Objecter::linger_ops_set 中
  Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);

  // 为 api 回调 c 设置指向 IoCtxImpl 的引用
  c->io = this;

  // oncomplete 将赋值给 LingerOp::on_reg_commit
  // 底层 io 完成时(AsyncMessenger 上下文)将异步 api 回调 c 通过 C_AioComplete 封装入 c->io->client->finisher(即 RadosClient::finisher)
  // 队列,在 RadosClient::finisher 上下文异步处理异步 api 回调 c 中的用户回调,当 r < 0 时 LingerOp 将通过 C_aio_linger_Cancel 删除
  Context *oncomplete = new C_aio_linger_Complete(c, linger_op, false);

  ::ObjectOperation wr;
  *handle = linger_op->get_cookie(); // cookie 实际上就是 LingerOp 的内存地址

  // 注意上面的回调 c 是 librados 异步 api 接口的回调,这里的回调是 watch 在服务端注册成功后用于处理错误/通知消息的回调
  linger_op->watch_context = new WatchInfo(this, oid, ctx, ctx2, internal);

  prepare_assert_ops(&wr);
  wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout);
  bufferlist bl;

  // 针对 watch 调用为 LingerOp 设置相关的字段,然后调用 _linger_submit 构造 OSDOp 并发送
  objecter->linger_watch(linger_op, wr,
                         snapc, ceph::real_clock::now(), bl,
                         oncomplete, &c->objver);

  return 0;
}

IoCtxImpl::aio_notify,主要流程和 aio_watch 类似,调用 Objecter::linger_register 注册 LingerOp(注册的 LingerOp 将通过 IoCtxImpl::aio_unwatch 删除),并调用 Objecter::_linger_submit 构造 OSD Op 在服务端给所有注册的 watcher 发送 notify 消息,主要的不同在于用户回调需要等待 OSD Op 处理完成以及 notify 消息得到所有 watcher 的响应才能完成,且完成后注册的 LingerOp 将被删除:

int librados::IoCtxImpl::aio_notify(const object_t& oid, AioCompletionImpl *c,
    bufferlist& bl, uint64_t timeout_ms, bufferlist *preply_bl,
    char **preply_buf, size_t *preply_buf_len)
{
  // new 分配 LingerOp,设置 target、linger_id,并注册到 Objecter::linger_ops 及 Objecter::linger_ops_set 中
  Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);

  // 为异步 api 回调 c 设置指向 IoCtxImpl 的引用
  c->io = this;

  // C_aio_notify_Complete 是由 C_aio_linger_Complete 派生的子类,最大的不同是 C_aio_notify_Complete 完成时会设置 C_aio_linger_Complete::cancel = true,
  // 从而在调用父类 C_aio_linger_Complete::finish 完成时会入队 C_aio_linger_cancel 回调 Objecter::linger_cancel 来删除上面注册的 LingerOp
  // oncomplete 的完成由 onack 和 onnotify 来驱动,即只有 LingerOp::on_reg_commit 以及 LingerOp::on_notify_finish 都完成才能完成
  // 底层 io 完成时(AsyncMessenger 上下文)将 api 回调 c 通过 C_AioComplete 封装入 c->io->client->finisher(即 RadosClient::finisher)
  // 队列,在 RadosClient::finisher 上下文异步处理 api 回调 c 中的用户回调
  C_aio_notify_Complete *oncomplete = new C_aio_notify_Complete(c, linger_op);

  // C_notify_Finish 在构造函数内部将 this 赋值给 LingerOp::on_notify_finish,onnotify 的完成将调用 oncomplete->complete(注意
  // 只有 oncomplete->handle_ack 和 oncomplete->complete 都调用之后 oncomplete 才真正的完成)
  C_notify_Finish *onnotify = new C_notify_Finish(client->cct, oncomplete,
      objecter, linger_op, preply_bl, preply_buf, preply_buf_len);

  // onack 将赋值给 LingerOp::on_reg_commit,onack 完成时调用 oncomplete->handle_ack
  Context *onack = new C_aio_notify_Ack(client->cct, onnotify, oncomplete);

  // 默认为 10
  uint32_t timeout = notify_timeout;
  if (timeout_ms)
    timeout = timeout_ms / 1000;

  // Construct RADOS op
  ::ObjectOperation rd;
  prepare_assert_ops(&rd);
  bufferlist inbl;
  rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl);

  // 针对 notify 调用为 LingerOp 设置相关的字段,然后调用 _linger_submit 构造 OSDOp 并发送
  // Issue RADOS op
  objecter->linger_notify(linger_op, rd, snap_seq, inbl, NULL, onack,
      &c->objver);
  return 0;
}

IoCtxImpl::aio_unwatch,从服务端删除 watcher 信息,并删除 IoCtxImpl::aio_watch 注册的 LingerOp

int librados::IoCtxImpl::aio_unwatch(uint64_t cookie, AioCompletionImpl *c)
{
  c->io = this;
  Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
  // C_aio_linger_Complete::cancel 为 true,因此完成时会入队 C_aio_linger_cancel 回调 Objecter::linger_cancel 来删除
  // IoCtxImpl::aio_watch 注册的 LingerOp
  Context *oncomplete = new C_aio_linger_Complete(c, linger_op, true);

  ::ObjectOperation wr;
  prepare_assert_ops(&wr);
  wr.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
  objecter->mutate(linger_op->target.base_oid, oloc, wr, snapc,
      ceph::real_clock::now(), 0, oncomplete, &c->objver);
  return 0;
}

Objecter::linger_register,在 Objecter 中分配并注册 LingerOp

Objecter::linger_cancel,在 Objeter 中移除 Objecter::linger_register 注册的 LingerOp

Objecter::linger_watch,针对 watch 调用为 LingerOp 设置相关的字段,然后调用 Objecter::_linger_submit 构造 OSDOp 并发送;

Objecter::linger_notify,针对 notiy 调用为 LingerOp 设置相关的字段,然后调用 Objecter::_linger_submit 构造 OSDOp 并发送;

Objecter::_linger_submit,通过 _calc_target 计算 t->osd,创建 OSDSession 并建立 LingerOpOSDSession 之间的联系:

void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul)
{
  assert(sul.owns_lock() && sul.mutex() == &rwlock);
  assert(info->linger_id);

  // 通过 oid 计算 pgid,然后通过 pgid 和 crush(osdmap->pg_to_up_acting_osds) 得到 t->osd
  // Populate Op::target
  OSDSession *s = NULL;
  _calc_target(&info->target, nullptr);

  // Create LingerOp<->OSDSession relation
  int r = _get_session(info->target.osd, &s, sul);
  assert(r == 0);
  OSDSession::unique_lock sl(s->lock);
  // 设置 LingerOp::session = s 以及 OSDSession[LingerOp::linger_id] = info
  _session_linger_op_assign(s, info);
  sl.unlock();
  put_session(s);

  // 基于 LingerOp 构造 OSDOp 并发送
  _send_linger(info, sul);
}

Objecter::_send_linger,基于 LingerOp 构造 OSDOp 并发送:

void Objecter::_send_linger(LingerOp *info, shunique_lock& sul)
{
  assert(sul.owns_lock() && sul.mutex() == &rwlock);

  vector<OSDOp> opv;
  Context *oncommit = NULL;
  LingerOp::shared_lock watchl(info->watch_lock);
  bufferlist *poutbl = NULL;
  // LingerOp::registered 由回调 Objecter::_linger_commit 设置,且无论返回值 r 如何,registered 都会
  // 被置成 true,显然 r < 0 会导致上层异步 api 接口的回调报错,且 IoCtxImpl 模块中的 C_aio_linger_Complete 当
  // r < 0 时会删除 LingerOp,因此即使 r < 0 时 registered 置成 true 也无所谓
  if (info->registered && info->is_watch) { // watch 是一个长连接,当 osdmap 发生改变(Objecter::handle_osd_map)
                                            // 或者底层 socket 连接出错(Objecter::ms_handle_reset)时需要重连
    ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect" << dendl;
    opv.push_back(OSDOp());
    opv.back().op.op = CEPH_OSD_OP_WATCH;
    opv.back().op.watch.cookie = info->get_cookie();
    opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
    opv.back().op.watch.gen = ++info->register_gen; // 服务端没用到,但用于 ping 的回调(Objecter::_linger_ping)检测 ping 的 watch 和注册的 watch 是否能对上

    // 回调函数为 Objecter::_linger_reconnect
    oncommit = new C_Linger_Reconnect(this, info);
  } else { // 非 watch 重连
    ldout(cct, 15) << "send_linger " << info->linger_id << " register" << dendl;

    // LingerOp::ops 由 IoCtxImpl::aio_watch/aio_notify 构造并在 Objecter::linger_watch 或者 Objecter::linger_notify 中设置
    opv = info->ops;

    // 回调函数为 Objecter::_linger_commit
    C_Linger_Commit *c = new C_Linger_Commit(this, info);
    if (!info->is_watch) {
      info->notify_id = 0;
      poutbl = &c->outbl;
    }
    oncommit = c;
  }
  watchl.unlock();

  Op *o = new Op(info->target.base_oid, info->target.base_oloc, opv,
      info->target.flags | CEPH_OSD_FLAG_READ, oncommit, info->pobjver);
  o->outbl = poutbl;
  o->snapid = info->snap;
  o->snapc = info->snapc;
  o->mtime = info->mtime;

  // 直接结构体赋值,因此 LingerOp 中的 t->paused 永远保持为默认值 false,因为 Objecter::_linger_submit 中虽然调用了
  // _calc_target 进行计算,但 op_target_t::paused 只会在 Objecter::_op_submit 中可能被置成 true
  o->target = info->target;
  o->tid = ++last_tid;

  // do not resend this; we will send a new op to reregister
  o->should_resend = false; // 由 LingerOp 生成的 OSDOp 不主动重发,而是由 LingerOp 的重发来触发新的 OSDOp 发送

  if (info->register_tid) { // 之前已基于该 LingerOp 构造并发送过 OSDOp
    // repeat send.  cancel old registeration op, if any.
    OSDSession::unique_lock sl(info->session->lock);
    if (info->session->ops.count(info->register_tid)) {
      Op *o = info->session->ops[info->register_tid];
      _op_cancel_map_check(o);

      // 取消之前下发的 OSDOp,由下面的 _op_submit 进行重发
      _cancel_linger_op(o);
    }
    sl.unlock();

    _op_submit(o, sul, &info->register_tid);
  } else {
    // first send
    _op_submit_with_budget(o, sul, &info->register_tid);
  }

  logger->inc(l_osdc_linger_send);
}

Objecter::_linger_commit,非 watch 重连情况下 LingerOp 对应的 OSDOp 的回调:

void Objecter::_linger_commit(LingerOp *info, int r, bufferlist& outbl)
{
  LingerOp::unique_lock wl(info->watch_lock);
  ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl;

  if (info->on_reg_commit) { // 即 IoCtxImpl 模块中的 C_aio_linger_Complete(用于完成异步 api 回调)
    info->on_reg_commit->complete(r);
    info->on_reg_commit = NULL;
  }

  if (r < 0 && info->on_notify_finish) { // 即 IoCtxImpl 模块中的 C_notify_Finish
    info->on_notify_finish->complete(r); // on_reg_commit 和 on_notify_finish 都完成才会完成 IoCtxImpl 模块中的 C_aio_notify_Complete(用于完成异步 api 回调)
    info->on_notify_finish = nullptr;
  }

  // 如果 r < 0,IoCtxImpl 层的回调 C_aio_linger_Complete/C_aio_notify_Complete(C_aio_notify_Complete 的子类)会删除当前 LingerOp 并完成异步 api 回调,
  // 因此即使 r < 0,针对该 LingerOp 也不会再有任何操作,置上 registered = true 并不会导致任何问题

  // only tell the user the first time we do this
  // registered 只对 watch 有意义,用于 osdmap 发生改变(Objecter::handle_osd_map),或者底层 socket 连接
  // 出错(Objecter::ms_handle_reset)时判断是否是重连还是首次连接
  info->registered = true;
   info->pobjver = NULL;

  if (!info->is_watch) {
    // make note of the notify_id
    bufferlist::iterator p = outbl.begin();
    try {
      ::decode(info->notify_id, p);
      ldout(cct, 10) << "_linger_commit  notify_id=" << info->notify_id
             << dendl;
    }
    catch (buffer::error& e) {
    }
  }
}

Objecter::_linger_reconnect,watch 重连情况下 LingerOp 对应的 OSDOp 的回调(watch 能重连说明首次 watch 肯定是成功的,见 Objecter::_linger_commit 的分析):

void Objecter::_linger_reconnect(LingerOp *info, int r)
{
  ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << r << " (last_error "
                 << info->last_error << ")" << dendl;
  if (r < 0) { // watch 重连失败,将发送给 LingerOp::watch_context-handle_error 处理
    LingerOp::unique_lock wl(info->watch_lock);
    if (!info->last_error) {
      r = _normalize_watch_error(r);
      info->last_error = r;
      if (info->watch_context) {
        finisher->queue(new C_DoWatchError(this, info, r)); // 给用户在调用 watch api 时注册的处理错误/通知消息回调进行处理
      }
    }
    wl.unlock();
  }
}

Objecter::_send_linger_ping,由 Objecter::tick 定时遍历所有 OSDSession 下所有已注册的 watch LingerOp(注册失败的已经被删除,参考 Objecter::_linger_commit 的处理)并发送 ping OSDOp

void Objecter::_send_linger_ping(LingerOp *info)
{
  // rwlock is locked unique
  // info->session->lock is locked

  if (cct->_conf->objecter_inject_no_watch_ping) {
    ldout(cct, 10) << __func__ << " " << info->linger_id << " SKIPPING" << dendl;
    return;
  }
  if (osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
    ldout(cct, 10) << __func__ << " PAUSERD" << dendl;
    return;
  }

  ceph::mono_time now = ceph::mono_clock::now();
  ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now << dendl;

  vector<OSDOp> opv(1);
  opv[0].op.op = CEPH_OSD_OP_WATCH;
  opv[0].op.watch.cookie = info->get_cookie();
  opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING;
  opv[0].op.watch.gen = info->register_gen; // 服务端没用到,当 watch 由于 osdmap/socket 导致重连时 ++info->register_gen,在 C_Linger_Ping 回调中用来做 ping 响应的检测
  C_Linger_Ping *onack = new C_Linger_Ping(this, info);
  Op *o = new Op(info->target.base_oid, info->target.base_oloc, opv,
      info->target.flags | CEPH_OSD_FLAG_READ, onack, NULL, NULL);
  o->target = info->target;
  o->should_resend = false; // 由 LingerOp 生成的 OSDOp 不主动重发,而是由 LingerOp 的重发来触发新的 OSDOp 发送
  _send_op_account(o);
  MOSDOp *m = _prepare_osd_op(o);
  o->tid = ++last_tid;
  _session_op_assign(info->session, o);
  _send_op(o, m);
  info->ping_tid = o->tid;

  onack->sent = now;
  logger->inc(l_osdc_linger_ping);
}

Objecter::_linger_pingObjecter::_send_linger_ping 的回调:

void Objecter::_linger_ping(LingerOp *info, int r, mono_time sent, uint32_t register_gen)
{
  LingerOp::unique_lock l(info->watch_lock);
  ldout(cct, 10) << __func__ << " " << info->linger_id << " sent " << sent << " gen "
                 << register_gen << " = " << r << " (last_error " << info->last_error
                 << " register_gen " << info->register_gen << ")" << dendl;

  if (info->register_gen == register_gen) {
    if (r == 0) {
      info->watch_valid_thru = sent;
    } else if (r < 0 && !info->last_error) {
      r = _normalize_watch_error(r);
      info->last_error = r;
      if (info->watch_context) {
        finisher->queue(new C_DoWatchError(this, info, r)); // 给用户在调用 watch api 时注册的处理错误/通知消息回调进行处理
      }
    }
  } else { // watch 已重连,忽略老的 ping 消息的响应
    ldout(cct, 20) << " ignoring old gen" << dendl;
  }
}

LingerOp 的重传有两个原因,一是 osdmap 发生变化,一是 socket 连接出错,分别是在如下两个函数中进行处理:

Objecter::handle_osd_map
Objecter::ms_handle_reset

Objecter::handle_osd_map,调用 _scan_requests 扫描得到所有需要重发的 LingerOp, OSDOp 以及 CommandOp,并在 Objecter::handle_osd_map 内部依次进行重发;

Objecter::ms_handle_reset, 调用 _kick_requests 扫描得到所有需要重发的 OSDOp, LingerOp 以及 CommandOp,并在 _kick_requests 内部进行 OSDOpCommandOp 的重发,而 LingerOpObjecter::ms_handle_reset 中通过调用 _linger_ops_resend 进行重发。


最后修改于 2019-02-21