watch 与 notify 机制是 ceph 客户端之间通信的一种方式,librbd 在 image 共享访问、rbd-mirror 的协同工作等地方大量使用了这种机制,因此有必要对 watch/notify 的 rados 层实现进行分析理解。
数据结构 首先需要明确的是 watch/notify 是与单个 rados 对象关联的,当多个客户端 watch 同一个对象后,任一客户端(该客户端不需要事先 watch 该对象)发送 notify 消息将在 OSD 端进行消息复制并转发给所有的客户端(如果发送者已经 watch 了该对象,则还包括 notify 消息的发送者)。
watch/notify 关键的信息记录在 3 个地方:
磁盘数据结构 object_info_t
的如下字段:
1 map<pair<uint64_t , entity_name_t >, watch_info_t > watchers;
内存数据结构 ObjectContext
的如下字段:
1 map<pair<uint64_t , entity_name_t >, WatchRef> watchers;
注意 WatchRef
,即 shared_ptr<Watch>
,也有一个指向 ObjectContext
的 shared_ptr<ObjectContext>
的指针;
还有一些辅助的内存数据存在于 PrimaryLogPG::OpContext
的如下字段中:
1 2 3 4 list<pair<watch_info_t , bool >> watch_connects; list<watch_disconnect_t > watch_disconnects; list<notify_info_t > notifies; list<NotifyAck> notify_acks;
当通信的对端与 OSD 建立 socket 连接并完成 cephx 认证后,OSD 会新建 Session
实例并设置到 Connection::priv
字段,当客户端在 OSD 端注册 watcher 时,会将 watcher 对应的 Watch
实例加入到 Session::wstate
中;
大致关系如下图所示:
处理逻辑 主要的处理逻辑包括:
1 2 3 4 5 6 7 PrimaryLogPG::do_osd_ops PrimaryLogPG::do_osd_op_effects PrimaryLogPG::populate_obc_watchers PrimaryLogPG::handle_watch_timeout PrimaryLogPG::check_blacklisted_watchers PrimaryLogPG::context_registry_on_change PrimaryLogPG::get_watchers
注意 PrimaryLogPG
中 op 处理的处理顺序如下:
1 2 3 4 5 6 7 8 9 10 11 12 do_request do_op find_object_context new OpContext execute_ctx prepare_transaction do_osd_ops make_writeable finish_ctx do_osd_op_effects issue_repop eval_repop
PrimaryLogPG::do_osd_ops
,对客户端 WATCH
, RECONNECT
, PING
, UNWATCH
, NOTIFY
, NOTIFY_ACK
等 6 个请求进行预处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 case CEPH_OSD_OP_NOTIFY: ++ctx->num_read; { uint32_t timeout; bufferlist bl; try { uint32_t ver; ::decode (ver, bp); ::decode (timeout, bp); ::decode (bl, bp); } catch (const buffer::error &e) { timeout = 0 ; } if (!timeout) timeout = cct->_conf->osd_default_notify_timeout; notify_info_t n; n.timeout = timeout; n.notify_id = osd->get_next_id (get_osdmap ()->get_epoch ()); n.cookie = op.watch.cookie; n.bl = bl; ctx->notifies.push_back (n); ::encode (n.notify_id, osd_op.outdata); } break ; case CEPH_OSD_OP_NOTIFY_ACK: ++ctx->num_read; { try { uint64_t notify_id = 0 ; uint64_t watch_cookie = 0 ; ::decode (notify_id, bp); ::decode (watch_cookie, bp); bufferlist reply_bl; if (!bp.end ()) { ::decode (reply_bl, bp); } OpContext::NotifyAck ack (notify_id, watch_cookie, reply_bl) ; ctx->notify_acks.push_back (ack); } catch (const buffer::error &e) { OpContext::NotifyAck ack ( op.watch.cookie) ; ctx->notify_acks.push_back (ack); } } break ; case CEPH_OSD_OP_WATCH: ++ctx->num_write; { if (!obs.exists) { result = -ENOENT; break ; } uint64_t cookie = op.watch.cookie; entity_name_t entity = ctx->reqid.name; ObjectContextRef obc = ctx->obc; uint32_t timeout = cct->_conf->osd_client_watch_timeout; if (op.watch.timeout != 0 ) { timeout = op.watch.timeout; } watch_info_t w (cookie, timeout, ctx->op->get_req()->get_connection()->get_peer_addr()) ; if (op.watch.op == CEPH_OSD_WATCH_OP_WATCH || op.watch.op == CEPH_OSD_WATCH_OP_LEGACY_WATCH) { if (oi.watchers.count (make_pair (cookie, entity))) { dout (10 ) << " found existing watch " << w << " by " << entity << dendl; } else { dout (10 ) << " registered new watch " << w << " by " << entity << dendl; oi.watchers[make_pair (cookie, entity)] = w; t->nop (soid); } bool will_ping = (op.watch.op == CEPH_OSD_WATCH_OP_WATCH); ctx->watch_connects.push_back (make_pair (w, will_ping)); } else if (op.watch.op == CEPH_OSD_WATCH_OP_RECONNECT) { if (!oi.watchers.count (make_pair (cookie, entity))) { result = -ENOTCONN; break ; } dout (10 ) << " found existing watch " << w << " by " << entity << dendl; ctx->watch_connects.push_back (make_pair (w, true )); } else if (op.watch.op == CEPH_OSD_WATCH_OP_PING) { if (!oi.watchers.count (make_pair (cookie, entity))) { result = -ENOTCONN; break ; } map<pair<uint64_t , entity_name_t >, WatchRef>::iterator p = obc->watchers.find (make_pair (cookie, entity)); if (p == obc->watchers.end () || !p->second->is_connected ()) { result = -ETIMEDOUT; break ; } dout (10 ) << " found existing watch " << w << " by " << entity << dendl; p->second->got_ping (ceph_clock_now ()); result = 0 ; } else if (op.watch.op == CEPH_OSD_WATCH_OP_UNWATCH) { map<pair<uint64_t , entity_name_t >, watch_info_t >::iterator oi_iter = oi.watchers.find (make_pair (cookie, entity)); if (oi_iter != oi.watchers.end ()) { dout (10 ) << " removed watch " << oi_iter->second << " by " << entity << dendl; oi.watchers.erase (oi_iter); t->nop (soid); ctx->watch_disconnects.push_back (watch_disconnect_t (cookie, entity, false )); } else { dout (10 ) << " can't remove: no watch by " << entity << dendl; } } } break ;
PrimaryLogPG::do_osd_ops
的处理逻辑如下:
WATCH
,往 object_info_t::watchers
中插入 watcher,并 push back 到 OpContext::watch_connects
链表;
UNWATCH
,从 object_info_t::watchers
中移除 watcher,并 push back 到 OpContext::watch_disconnects
链表;
RECONNECT
,从 object_info_t::watchers
中查找是否已存在该 watcher,如果不存在则返回 -ENOTCONN
,否则 push back 到 OpContext::watch_connects
链表;
PING
,从 object_info_t::watchers
中查找是否已存在该 watcher,如果不存在则返回 -ENOTCONN
,如果存在则继续在内存记录 ObjectContext::watchers
中继续查找,如果不存在,返回 -ETIMEDOUT
,如果存在,则调用 Watch::got_ping
更新定时器;
NOTIFY
,构造 notify_info_t
结构,并 push back 到 OpContext::notifies
链表;
NOTIFY_ACK
,构造 NotifyAck
结构,并 push back 到 OpContext::notify_acks
链表。
PrimaryLogPG::do_osd_op_effects
,对 PrimaryLogPG::do_osd_ops
预处理之后记录在 OpContext
中的结果 watch_connects
, watch_disconnects
, notifies
, notify_acks
进行后续处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 void PrimaryLogPG::do_osd_op_effects (OpContext *ctx, const ConnectionRef& conn) { entity_name_t entity = ctx->reqid.name; complete_disconnect_watches (ctx->obc, ctx->watch_disconnects); for (list<pair<watch_info_t , bool > >::iterator i = ctx->watch_connects.begin (); i != ctx->watch_connects.end (); ++i) { pair<uint64_t , entity_name_t > watcher (i->first.cookie, entity) ; WatchRef watch; if (ctx->obc->watchers.count (watcher)) { watch = ctx->obc->watchers[watcher]; } else { watch = Watch::makeWatchRef (this , osd, ctx->obc, i->first.timeout_seconds, i->first.cookie, entity, conn->get_peer_addr ()); ctx->obc->watchers.insert (make_pair (watcher, watch)); } watch->connect (conn, i->second); } for (list<notify_info_t >::iterator p = ctx->notifies.begin (); p != ctx->notifies.end (); ++p) { ConnectionRef conn (ctx->op->get_req()->get_connection()) ; NotifyRef notif (Notify::makeNotifyRef(conn, ctx->reqid.name.num(), p->bl, p->timeout, p->cookie, p->notify_id, ctx->obc->obs.oi.user_version, osd)) ; for (map<pair<uint64_t , entity_name_t >, WatchRef>::iterator i = ctx->obc->watchers.begin (); i != ctx->obc->watchers.end (); ++i) { i->second->start_notify (notif); } notif->init (); } for (list<OpContext::NotifyAck>::iterator p = ctx->notify_acks.begin (); p != ctx->notify_acks.end (); ++p) { for (map<pair<uint64_t , entity_name_t >, WatchRef>::iterator i = ctx->obc->watchers.begin (); i != ctx->obc->watchers.end (); ++i) { if (i->first.second != entity) continue ; if (p->watch_cookie && p->watch_cookie.get () != i->first.first) continue ; i->second->notify_ack (p->notify_id, p->reply_bl); } } }
PrimaryLogPG::do_osd_op_effects
的处理逻辑如下:
OpContext::watch_disconnects
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void PrimaryLogPG::complete_disconnect_watches (ObjectContextRef obc, const list<watch_disconnect_t > &to_disconnect) { for (list<watch_disconnect_t >::const_iterator i = to_disconnect.begin (); i != to_disconnect.end (); ++i) { pair<uint64_t , entity_name_t > watcher (i->cookie, i->name) ; auto watchers_entry = obc->watchers.find (watcher); if (watchers_entry != obc->watchers.end ()) { WatchRef watch = watchers_entry->second; obc->watchers.erase (watcher); watch->remove (i->send_disconnect); } } }
OpContext::watch_connects
OpContext::notifies
OpContext::notify_acks
PrimaryLogPG::populate_obc_watchers
PrimaryLogPG::handle_watch_timeout
PrimaryLogPG::check_blacklisted_watchers
PrimaryLogPG::context_registry_on_change
,遍历并移除 PG 中的所有 ObjectContext::watchers
,其在如下一些情况下被调用:
PG 开始新的 peering interval(PG::start_peering_interval
-> PrimaryLogPG::on_change
);
PG 删除(OSD::_remove_pg
-> PrimaryLogPG::on_removal
-> PrimaryLogPG::on_shutdown
);
OSD 下电(OSD::shutdown
-> PrimaryLogPG::on_shutdown
)
PrimaryLogPG::get_watchers
,遍历并记录 PG 中的所有 ObjectContext::watchers
,以支持 OSD admin socket 命令 dump_watchers
;
librados 客户端实现 客户端主要的接口如下(为了兼容性增加的带数字后缀的扩展版本、C++ 接口、同步接口以及非重要接口均未列出):
1 2 3 rados_aio_watch rados_aio_notify rados_aio_unwatch
这些外部接口最终都会调用到 IoCtxImpl
模块(librados/IoCtxImpl.cc)的实现:
1 2 3 IoCtxImpl::aio_watch IoCtxImpl::aio_notify IoCtxImpl::aio_unwatch
IoCtxImpl::aio_watch
,调用 Objecter::linger_register
注册 LingerOp
(注册的 LingerOp
将通过 IoCtxImpl::aio_unwatch
删除),并调用 Objecter::_linger_submit
构造 OSD Op 在服务端注册 watcher 信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 int librados::IoCtxImpl::aio_watch (const object_t & oid, AioCompletionImpl *c, uint64_t *handle, librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2, uint32_t timeout, bool internal) { Objecter::LingerOp *linger_op = objecter->linger_register (oid, oloc, 0 ); c->io = this ; Context *oncomplete = new C_aio_linger_Complete (c, linger_op, false ); ::ObjectOperation wr; *handle = linger_op->get_cookie (); linger_op->watch_context = new WatchInfo (this , oid, ctx, ctx2, internal); prepare_assert_ops (&wr); wr.watch (*handle, CEPH_OSD_WATCH_OP_WATCH, timeout); bufferlist bl; objecter->linger_watch (linger_op, wr, snapc, ceph::real_clock::now (), bl, oncomplete, &c->objver); return 0 ; }
IoCtxImpl::aio_notify
,主要流程和 aio_watch
类似,调用 Objecter::linger_register
注册 LingerOp
(注册的 LingerOp
将通过 IoCtxImpl::aio_unwatch
删除),并调用 Objecter::_linger_submit
构造 OSD Op 在服务端给所有注册的 watcher 发送 notify 消息,主要的不同在于用户回调需要等待 OSD Op 处理完成以及 notify 消息得到所有 watcher 的响应才能完成,且完成后注册的 LingerOp
将被删除:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 int librados::IoCtxImpl::aio_notify (const object_t & oid, AioCompletionImpl *c, bufferlist& bl, uint64_t timeout_ms, bufferlist *preply_bl, char **preply_buf, size_t *preply_buf_len) { Objecter::LingerOp *linger_op = objecter->linger_register (oid, oloc, 0 ); c->io = this ; C_aio_notify_Complete *oncomplete = new C_aio_notify_Complete (c, linger_op); C_notify_Finish *onnotify = new C_notify_Finish (client->cct, oncomplete, objecter, linger_op, preply_bl, preply_buf, preply_buf_len); Context *onack = new C_aio_notify_Ack (client->cct, onnotify, oncomplete); uint32_t timeout = notify_timeout; if (timeout_ms) timeout = timeout_ms / 1000 ; ::ObjectOperation rd; prepare_assert_ops (&rd); bufferlist inbl; rd.notify (linger_op->get_cookie (), 1 , timeout, bl, &inbl); objecter->linger_notify (linger_op, rd, snap_seq, inbl, NULL , onack, &c->objver); return 0 ; }
IoCtxImpl::aio_unwatch
,从服务端删除 watcher 信息,并删除 IoCtxImpl::aio_watch
注册的 LingerOp
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 int librados::IoCtxImpl::aio_unwatch (uint64_t cookie, AioCompletionImpl *c){ c->io = this ; Objecter::LingerOp *linger_op = reinterpret_cast <Objecter::LingerOp*>(cookie); Context *oncomplete = new C_aio_linger_Complete (c, linger_op, true ); ::ObjectOperation wr; prepare_assert_ops (&wr); wr.watch (cookie, CEPH_OSD_WATCH_OP_UNWATCH); objecter->mutate (linger_op->target.base_oid, oloc, wr, snapc, ceph::real_clock::now (), 0 , oncomplete, &c->objver); return 0 ; }
Objecter::linger_register
,在 Objecter 中分配并注册 LingerOp
;
Objecter::linger_cancel
,在 Objeter 中移除 Objecter::linger_register
注册的 LingerOp
;
Objecter::linger_watch
,针对 watch 调用为 LingerOp
设置相关的字段,然后调用 Objecter::_linger_submit
构造 OSDOp
并发送;
Objecter::linger_notify
,针对 notiy 调用为 LingerOp
设置相关的字段,然后调用 Objecter::_linger_submit
构造 OSDOp
并发送;
Objecter::_linger_submit
,通过 _calc_target
计算 t->osd
,创建 OSDSession 并建立 LingerOp
与 OSDSession
之间的联系:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul){ assert (sul.owns_lock () && sul.mutex () == &rwlock); assert (info->linger_id); OSDSession *s = NULL ; _calc_target(&info->target, nullptr ); int r = _get_session(info->target.osd, &s, sul); assert (r == 0 ); OSDSession::unique_lock sl (s->lock) ; _session_linger_op_assign(s, info); sl.unlock (); put_session (s); _send_linger(info, sul); }
Objecter::_send_linger
,基于 LingerOp
构造 OSDOp
并发送:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 void Objecter::_send_linger(LingerOp *info, shunique_lock& sul){ assert (sul.owns_lock () && sul.mutex () == &rwlock); vector<OSDOp> opv; Context *oncommit = NULL ; LingerOp::shared_lock watchl (info->watch_lock) ; bufferlist *poutbl = NULL ; if (info->registered && info->is_watch) { ldout (cct, 15 ) << "send_linger " << info->linger_id << " reconnect" << dendl; opv.push_back (OSDOp ()); opv.back ().op.op = CEPH_OSD_OP_WATCH; opv.back ().op.watch.cookie = info->get_cookie (); opv.back ().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT; opv.back ().op.watch.gen = ++info->register_gen; oncommit = new C_Linger_Reconnect (this , info); } else { ldout (cct, 15 ) << "send_linger " << info->linger_id << " register" << dendl; opv = info->ops; C_Linger_Commit *c = new C_Linger_Commit (this , info); if (!info->is_watch) { info->notify_id = 0 ; poutbl = &c->outbl; } oncommit = c; } watchl.unlock (); Op *o = new Op (info->target.base_oid, info->target.base_oloc, opv, info->target.flags | CEPH_OSD_FLAG_READ, oncommit, info->pobjver); o->outbl = poutbl; o->snapid = info->snap; o->snapc = info->snapc; o->mtime = info->mtime; o->target = info->target; o->tid = ++last_tid; o->should_resend = false ; if (info->register_tid) { OSDSession::unique_lock sl (info->session->lock) ; if (info->session->ops.count (info->register_tid)) { Op *o = info->session->ops[info->register_tid]; _op_cancel_map_check(o); _cancel_linger_op(o); } sl.unlock (); _op_submit(o, sul, &info->register_tid); } else { _op_submit_with_budget(o, sul, &info->register_tid); } logger->inc (l_osdc_linger_send); }
Objecter::_linger_commit
,非 watch 重连情况下 LingerOp
对应的 OSDOp
的回调:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 void Objecter::_linger_commit(LingerOp *info, int r, bufferlist& outbl){ LingerOp::unique_lock wl (info->watch_lock) ; ldout (cct, 10 ) << "_linger_commit " << info->linger_id << dendl; if (info->on_reg_commit) { info->on_reg_commit->complete (r); info->on_reg_commit = NULL ; } if (r < 0 && info->on_notify_finish) { info->on_notify_finish->complete (r); info->on_notify_finish = nullptr ; } info->registered = true ; info->pobjver = NULL ; if (!info->is_watch) { bufferlist::iterator p = outbl.begin (); try { ::decode (info->notify_id, p); ldout (cct, 10 ) << "_linger_commit notify_id=" << info->notify_id << dendl; } catch (buffer::error& e) { } } }
Objecter::_linger_reconnect
,watch 重连情况下 LingerOp
对应的 OSDOp
的回调(watch 能重连说明首次 watch 肯定是成功的,见 Objecter::_linger_commit
的分析):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void Objecter::_linger_reconnect(LingerOp *info, int r){ ldout (cct, 10 ) << __func__ << " " << info->linger_id << " = " << r << " (last_error " << info->last_error << ")" << dendl; if (r < 0 ) { LingerOp::unique_lock wl (info->watch_lock) ; if (!info->last_error) { r = _normalize_watch_error(r); info->last_error = r; if (info->watch_context) { finisher->queue (new C_DoWatchError (this , info, r)); } } wl.unlock (); } }
Objecter::_send_linger_ping
,由 Objecter::tick
定时遍历所有 OSDSession
下所有已注册的 watch LingerOp
(注册失败的已经被删除,参考 Objecter::_linger_commit
的处理)并发送 ping OSDOp
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 void Objecter::_send_linger_ping(LingerOp *info){ if (cct->_conf->objecter_inject_no_watch_ping) { ldout (cct, 10 ) << __func__ << " " << info->linger_id << " SKIPPING" << dendl; return ; } if (osdmap->test_flag (CEPH_OSDMAP_PAUSERD)) { ldout (cct, 10 ) << __func__ << " PAUSERD" << dendl; return ; } ceph::mono_time now = ceph::mono_clock::now (); ldout (cct, 10 ) << __func__ << " " << info->linger_id << " now " << now << dendl; vector<OSDOp> opv (1 ) ; opv[0 ].op.op = CEPH_OSD_OP_WATCH; opv[0 ].op.watch.cookie = info->get_cookie (); opv[0 ].op.watch.op = CEPH_OSD_WATCH_OP_PING; opv[0 ].op.watch.gen = info->register_gen; C_Linger_Ping *onack = new C_Linger_Ping (this , info); Op *o = new Op (info->target.base_oid, info->target.base_oloc, opv, info->target.flags | CEPH_OSD_FLAG_READ, onack, NULL , NULL ); o->target = info->target; o->should_resend = false ; _send_op_account(o); MOSDOp *m = _prepare_osd_op(o); o->tid = ++last_tid; _session_op_assign(info->session, o); _send_op(o, m); info->ping_tid = o->tid; onack->sent = now; logger->inc (l_osdc_linger_ping); }
Objecter::_linger_ping
,Objecter::_send_linger_ping
的回调:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 void Objecter::_linger_ping(LingerOp *info, int r, mono_time sent, uint32_t register_gen){ LingerOp::unique_lock l (info->watch_lock) ; ldout (cct, 10 ) << __func__ << " " << info->linger_id << " sent " << sent << " gen " << register_gen << " = " << r << " (last_error " << info->last_error << " register_gen " << info->register_gen << ")" << dendl; if (info->register_gen == register_gen) { if (r == 0 ) { info->watch_valid_thru = sent; } else if (r < 0 && !info->last_error) { r = _normalize_watch_error(r); info->last_error = r; if (info->watch_context) { finisher->queue (new C_DoWatchError (this , info, r)); } } } else { ldout (cct, 20 ) << " ignoring old gen" << dendl; } }
LingerOp
的重传有两个原因,一是 osdmap 发生变化,一是 socket 连接出错,分别是在如下两个函数中进行处理:
1 2 Objecter::handle_osd_map Objecter::ms_handle_reset
Objecter::handle_osd_map
,调用 _scan_requests
扫描得到所有需要重发的 LingerOp
, OSDOp
以及 CommandOp
,并在 Objecter::handle_osd_map
内部依次进行重发;
Objecter::ms_handle_reset
, 调用 _kick_requests
扫描得到所有需要重发的 OSDOp
, LingerOp
以及 CommandOp
,并在 _kick_requests
内部进行 OSDOp
和 CommandOp
的重发,而 LingerOp
在 Objecter::ms_handle_reset
中通过调用 _linger_ops_resend
进行重发。