watch 与 notify 机制是 ceph 客户端之间通信的一种方式,librbd 在 image 共享访问、rbd-mirror 的协同工作等地方大量使用了这种机制,因此有必要对 watch/notify 的 rados 层实现进行分析理解。

数据结构

首先需要明确的是 watch/notify 是与单个 rados 对象关联的,当多个客户端 watch 同一个对象后,任一客户端(该客户端不需要事先 watch 该对象)发送 notify 消息将在 OSD 端进行消息复制并转发给所有的客户端(如果发送者已经 watch 了该对象,则还包括 notify 消息的发送者)。

watch/notify 关键的信息记录在 3 个地方:

  1. 磁盘数据结构 object_info_t 的如下字段:
1
map<pair<uint64_t, entity_name_t>, watch_info_t> watchers;
  1. 内存数据结构 ObjectContext 的如下字段:
1
map<pair<uint64_t, entity_name_t>, WatchRef> watchers;

注意 WatchRef,即 shared_ptr<Watch>,也有一个指向 ObjectContextshared_ptr<ObjectContext> 的指针;

  1. 还有一些辅助的内存数据存在于 PrimaryLogPG::OpContext 的如下字段中:
1
2
3
4
list<pair<watch_info_t, bool>> watch_connects;
list<watch_disconnect_t> watch_disconnects;
list<notify_info_t> notifies;
list<NotifyAck> notify_acks;
  1. 当通信的对端与 OSD 建立 socket 连接并完成 cephx 认证后,OSD 会新建 Session 实例并设置到 Connection::priv 字段,当客户端在 OSD 端注册 watcher 时,会将 watcher 对应的 Watch 实例加入到 Session::wstate 中;

大致关系如下图所示:

处理逻辑

主要的处理逻辑包括:

1
2
3
4
5
6
7
PrimaryLogPG::do_osd_ops
PrimaryLogPG::do_osd_op_effects
PrimaryLogPG::populate_obc_watchers
PrimaryLogPG::handle_watch_timeout
PrimaryLogPG::check_blacklisted_watchers
PrimaryLogPG::context_registry_on_change
PrimaryLogPG::get_watchers

注意 PrimaryLogPG 中 op 处理的处理顺序如下:

1
2
3
4
5
6
7
8
9
10
11
12
do_request
do_op
find_object_context
new OpContext
execute_ctx
prepare_transaction
do_osd_ops
make_writeable
finish_ctx
do_osd_op_effects
issue_repop
eval_repop

PrimaryLogPG::do_osd_ops,对客户端 WATCH, RECONNECT, PING, UNWATCH, NOTIFY, NOTIFY_ACK 等 6 个请求进行预处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
case CEPH_OSD_OP_NOTIFY:
++ctx->num_read;
{
uint32_t timeout;
bufferlist bl;

try {
uint32_t ver; // obsolete
::decode(ver, bp);
::decode(timeout, bp);
::decode(bl, bp);
} catch (const buffer::error &e) {
timeout = 0;
}
if (!timeout) // IoCtxImpl 构造函数设置的默认值 client_notify_timeout 是 10,除非设置 client_notify_timeout 默认值为 0,否则这里的值不会生效
timeout = cct->_conf->osd_default_notify_timeout; // 默认值为 30

notify_info_t n;
n.timeout = timeout;
n.notify_id = osd->get_next_id(get_osdmap()->get_epoch());
// 不管是 watch 还是 notify 都会在 Objecter 中 register 注册一个 LingerOp,当然 notify 在完成之后 LingerOp 会删除
n.cookie = op.watch.cookie; // LingerOp 的内存地址
n.bl = bl;
ctx->notifies.push_back(n);

// return our unique notify id to the client
::encode(n.notify_id, osd_op.outdata);
}
break;
case CEPH_OSD_OP_NOTIFY_ACK: // 其它 watcher 对 notify 消息的响应
++ctx->num_read;
{
try {
uint64_t notify_id = 0;
uint64_t watch_cookie = 0;
::decode(notify_id, bp);
::decode(watch_cookie, bp);
bufferlist reply_bl;
if (!bp.end()) {
::decode(reply_bl, bp);
}
OpContext::NotifyAck ack(notify_id, watch_cookie, reply_bl);
ctx->notify_acks.push_back(ack);
} catch (const buffer::error &e) {
OpContext::NotifyAck ack(
// op.watch.cookie is actually the notify_id for historical reasons
op.watch.cookie);
ctx->notify_acks.push_back(ack);
}
}
break;
case CEPH_OSD_OP_WATCH:
++ctx->num_write;
{
if (!obs.exists) {
result = -ENOENT;
break;
}
uint64_t cookie = op.watch.cookie;
entity_name_t entity = ctx->reqid.name;
ObjectContextRef obc = ctx->obc;

uint32_t timeout = cct->_conf->osd_client_watch_timeout;
if (op.watch.timeout != 0) {
timeout = op.watch.timeout;
}

watch_info_t w(cookie, timeout, ctx->op->get_req()->get_connection()->get_peer_addr());
if (op.watch.op == CEPH_OSD_WATCH_OP_WATCH || op.watch.op == CEPH_OSD_WATCH_OP_LEGACY_WATCH) { // 注册 watcher
if (oi.watchers.count(make_pair(cookie, entity))) {
dout(10) << " found existing watch " << w << " by " << entity << dendl;
} else {
dout(10) << " registered new watch " << w << " by " << entity << dendl;
oi.watchers[make_pair(cookie, entity)] = w;
t->nop(soid); // make sure update the object_info on disk!
}
bool will_ping = (op.watch.op == CEPH_OSD_WATCH_OP_WATCH);
ctx->watch_connects.push_back(make_pair(w, will_ping));
} else if (op.watch.op == CEPH_OSD_WATCH_OP_RECONNECT) { // 由于 osdmap 变化或 socket 链路中断,导致已注册的 watcher 重新发送 LingerOp 请求
if (!oi.watchers.count(make_pair(cookie, entity))) {
result = -ENOTCONN;
break;
}
dout(10) << " found existing watch " << w << " by " << entity << dendl;
ctx->watch_connects.push_back(make_pair(w, true));
} else if (op.watch.op == CEPH_OSD_WATCH_OP_PING) { // 已注册的 watcher 需要定时发送 ping 消息,否则 PrimaryLogPG::handle_watch_timeout 会删除该 watcher
/* Note: WATCH with PING doesn't cause may_write() to return true,
* so if there is nothing else in the transaction, this is going
* to run do_osd_op_effects, but not write out a log entry */
if (!oi.watchers.count(make_pair(cookie, entity))) { // 在 object_info_t 中查找
result = -ENOTCONN;
break;
}
map<pair<uint64_t, entity_name_t>, WatchRef>::iterator p = obc->watchers.find(make_pair(cookie, entity)); // 在 ObjectContext 中查找
if (p == obc->watchers.end() || !p->second->is_connected()) {
// client needs to reconnect
result = -ETIMEDOUT;
break;
}
dout(10) << " found existing watch " << w << " by " << entity << dendl;
p->second->got_ping(ceph_clock_now()); // 更新 Watch::last_ping 并重新注册 OSDService::watch_timer 的超时回调
result = 0;
} else if (op.watch.op == CEPH_OSD_WATCH_OP_UNWATCH) {
map<pair<uint64_t, entity_name_t>, watch_info_t>::iterator oi_iter =
oi.watchers.find(make_pair(cookie, entity));
if (oi_iter != oi.watchers.end()) { // 删除 watcher
dout(10) << " removed watch " << oi_iter->second << " by " << entity << dendl;
oi.watchers.erase(oi_iter);
t->nop(soid); // update oi on disk
ctx->watch_disconnects.push_back(watch_disconnect_t(cookie, entity, false));
} else {
dout(10) << " can't remove: no watch by " << entity << dendl;
}
}
}
break;

PrimaryLogPG::do_osd_ops 的处理逻辑如下:

  1. WATCH,往 object_info_t::watchers 中插入 watcher,并 push back 到 OpContext::watch_connects 链表;

  2. UNWATCH,从 object_info_t::watchers 中移除 watcher,并 push back 到 OpContext::watch_disconnects 链表;

  3. RECONNECT,从 object_info_t::watchers 中查找是否已存在该 watcher,如果不存在则返回 -ENOTCONN,否则 push back 到 OpContext::watch_connects 链表;

  4. PING,从 object_info_t::watchers 中查找是否已存在该 watcher,如果不存在则返回 -ENOTCONN,如果存在则继续在内存记录 ObjectContext::watchers 中继续查找,如果不存在,返回 -ETIMEDOUT,如果存在,则调用 Watch::got_ping 更新定时器;

  5. NOTIFY,构造 notify_info_t 结构,并 push back 到 OpContext::notifies 链表;

  6. NOTIFY_ACK,构造 NotifyAck 结构,并 push back 到 OpContext::notify_acks 链表。

PrimaryLogPG::do_osd_op_effects,对 PrimaryLogPG::do_osd_ops 预处理之后记录在 OpContext 中的结果 watch_connects, watch_disconnects, notifies, notify_acks 进行后续处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
void PrimaryLogPG::do_osd_op_effects(OpContext *ctx,
const ConnectionRef& conn) {
entity_name_t entity = ctx->reqid.name;

// disconnects first
complete_disconnect_watches(ctx->obc, ctx->watch_disconnects);

for (list<pair<watch_info_t, bool> >::iterator i = ctx->watch_connects.begin();
i != ctx->watch_connects.end(); ++i) {
pair<uint64_t, entity_name_t> watcher(i->first.cookie, entity);

WatchRef watch;
if (ctx->obc->watchers.count(watcher)) { // 创建 ObjectContext 中的 watcher 信息,每个 watcher 对应一个 Watch 实例
watch = ctx->obc->watchers[watcher];
} else {
watch = Watch::makeWatchRef(this, osd, ctx->obc, i->first.timeout_seconds,
i->first.cookie, entity, conn->get_peer_addr());

ctx->obc->watchers.insert(make_pair(watcher, watch)); // <watcher, Watch>
}

// 将 Watch 实例加入到 OSD 连接的 Session::wstate 中,接受所有正在进行的 notify,并注册超时处理,
// 所有的状态将在 Watch::discard 中清除
watch->connect(conn, i->second); // 第二个参数表示是否会要求客户端定时 ping,对于非 legcy 类型的 watch 这里为 true
}

for (list<notify_info_t>::iterator p = ctx->notifies.begin();
p != ctx->notifies.end(); ++p) {
ConnectionRef conn(ctx->op->get_req()->get_connection());
NotifyRef notif(Notify::makeNotifyRef(conn, ctx->reqid.name.num(), p->bl, p->timeout,
p->cookie, p->notify_id, ctx->obc->obs.oi.user_version, osd));
for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator i = ctx->obc->watchers.begin();
i != ctx->obc->watchers.end(); ++i) { // 遍历 ObjectContext::<watcher, Watch>,因为每个 notify 消息要发送给所有的 watcher
// 将 notif 加入 Watch::in_progress_notifies
// 将当前 Watch 实例加入 Notify::watchers
// 发送 MWatchNotify 消息给当前 watcher
i->second->start_notify(notif); // Watch::start_notify
}

// 注册超时回调,notif 此时实际上完全依赖定时器或者等待所有 watcher 的 ack 来释放
notif->init();
}

for (list<OpContext::NotifyAck>::iterator p = ctx->notify_acks.begin();
p != ctx->notify_acks.end(); ++p) {
for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator i = ctx->obc->watchers.begin();
i != ctx->obc->watchers.end(); ++i) { // 遍历 ObjectContext::<watcher, Watch> 找到是哪个客户端 watcher 发送的 ack
if (i->first.second != entity) // entity_name_t
continue;
if (p->watch_cookie && p->watch_cookie.get() != i->first.first) // watch cookie
continue;

// 根据 notify_id 从 Watch::in_progress_notifies 中找到对应的 Notify 实例
// 调用 Notify::complete_watcher 从 Notify::watchers 移除 Watch 实例,并检查是否已经接收到了所有的 watcher 的 ack
// 如果所有 ack 都已收到,则给发送 notify 消息的客户端响应
i->second->notify_ack(p->notify_id, p->reply_bl); // Watch::notify_ack
}
}
}

PrimaryLogPG::do_osd_op_effects 的处理逻辑如下:

  1. OpContext::watch_disconnects
1
2
3
4
5
6
7
8
9
10
11
12
13
14
void PrimaryLogPG::complete_disconnect_watches(ObjectContextRef obc,
const list<watch_disconnect_t> &to_disconnect) {
for (list<watch_disconnect_t>::const_iterator i = to_disconnect.begin();
i != to_disconnect.end(); ++i) {
pair<uint64_t, entity_name_t> watcher(i->cookie, i->name);
auto watchers_entry = obc->watchers.find(watcher);
if (watchers_entry != obc->watchers.end()) {
WatchRef watch = watchers_entry->second;

obc->watchers.erase(watcher);
watch->remove(i->send_disconnect);
}
}
}
  1. OpContext::watch_connects

  2. OpContext::notifies

  3. OpContext::notify_acks

PrimaryLogPG::populate_obc_watchers

PrimaryLogPG::handle_watch_timeout

PrimaryLogPG::check_blacklisted_watchers

PrimaryLogPG::context_registry_on_change,遍历并移除 PG 中的所有 ObjectContext::watchers,其在如下一些情况下被调用:

  1. PG 开始新的 peering interval(PG::start_peering_interval -> PrimaryLogPG::on_change);

  2. PG 删除(OSD::_remove_pg -> PrimaryLogPG::on_removal -> PrimaryLogPG::on_shutdown);

  3. OSD 下电(OSD::shutdown -> PrimaryLogPG::on_shutdown

PrimaryLogPG::get_watchers,遍历并记录 PG 中的所有 ObjectContext::watchers,以支持 OSD admin socket 命令 dump_watchers

librados 客户端实现

客户端主要的接口如下(为了兼容性增加的带数字后缀的扩展版本、C++ 接口、同步接口以及非重要接口均未列出):

1
2
3
rados_aio_watch
rados_aio_notify
rados_aio_unwatch

这些外部接口最终都会调用到 IoCtxImpl 模块(librados/IoCtxImpl.cc)的实现:

1
2
3
IoCtxImpl::aio_watch
IoCtxImpl::aio_notify
IoCtxImpl::aio_unwatch

IoCtxImpl::aio_watch,调用 Objecter::linger_register 注册 LingerOp(注册的 LingerOp 将通过 IoCtxImpl::aio_unwatch 删除),并调用 Objecter::_linger_submit 构造 OSD Op 在服务端注册 watcher 信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
int librados::IoCtxImpl::aio_watch(const object_t& oid,
AioCompletionImpl *c, // 异步 api 的回调
uint64_t *handle,
librados::WatchCtx *ctx, // 兼容性处理,用户 watch 处理回调,用于接收 notify 通知消息和错误处理
librados::WatchCtx2 *ctx2, // 用户 watch 处理回调,用于接收 notify 通知消息和错误处理
uint32_t timeout,
bool internal)
{
// new 分配 LingerOp,设置 target、linger_id,并注册到 Objecter::linger_ops 及 Objecter::linger_ops_set 中
Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);

// 为 api 回调 c 设置指向 IoCtxImpl 的引用
c->io = this;

// oncomplete 将赋值给 LingerOp::on_reg_commit
// 底层 io 完成时(AsyncMessenger 上下文)将异步 api 回调 c 通过 C_AioComplete 封装入 c->io->client->finisher(即 RadosClient::finisher)
// 队列,在 RadosClient::finisher 上下文异步处理异步 api 回调 c 中的用户回调,当 r < 0 时 LingerOp 将通过 C_aio_linger_Cancel 删除
Context *oncomplete = new C_aio_linger_Complete(c, linger_op, false);

::ObjectOperation wr;
*handle = linger_op->get_cookie(); // cookie 实际上就是 LingerOp 的内存地址

// 注意上面的回调 c 是 librados 异步 api 接口的回调,这里的回调是 watch 在服务端注册成功后用于处理错误/通知消息的回调
linger_op->watch_context = new WatchInfo(this, oid, ctx, ctx2, internal);

prepare_assert_ops(&wr);
wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout);
bufferlist bl;

// 针对 watch 调用为 LingerOp 设置相关的字段,然后调用 _linger_submit 构造 OSDOp 并发送
objecter->linger_watch(linger_op, wr,
snapc, ceph::real_clock::now(), bl,
oncomplete, &c->objver);

return 0;
}

IoCtxImpl::aio_notify,主要流程和 aio_watch 类似,调用 Objecter::linger_register 注册 LingerOp(注册的 LingerOp 将通过 IoCtxImpl::aio_unwatch 删除),并调用 Objecter::_linger_submit 构造 OSD Op 在服务端给所有注册的 watcher 发送 notify 消息,主要的不同在于用户回调需要等待 OSD Op 处理完成以及 notify 消息得到所有 watcher 的响应才能完成,且完成后注册的 LingerOp 将被删除:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
int librados::IoCtxImpl::aio_notify(const object_t& oid, AioCompletionImpl *c,
bufferlist& bl, uint64_t timeout_ms, bufferlist *preply_bl,
char **preply_buf, size_t *preply_buf_len)
{
// new 分配 LingerOp,设置 target、linger_id,并注册到 Objecter::linger_ops 及 Objecter::linger_ops_set 中
Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);

// 为异步 api 回调 c 设置指向 IoCtxImpl 的引用
c->io = this;

// C_aio_notify_Complete 是由 C_aio_linger_Complete 派生的子类,最大的不同是 C_aio_notify_Complete 完成时会设置 C_aio_linger_Complete::cancel = true,
// 从而在调用父类 C_aio_linger_Complete::finish 完成时会入队 C_aio_linger_cancel 回调 Objecter::linger_cancel 来删除上面注册的 LingerOp
// oncomplete 的完成由 onack 和 onnotify 来驱动,即只有 LingerOp::on_reg_commit 以及 LingerOp::on_notify_finish 都完成才能完成
// 底层 io 完成时(AsyncMessenger 上下文)将 api 回调 c 通过 C_AioComplete 封装入 c->io->client->finisher(即 RadosClient::finisher)
// 队列,在 RadosClient::finisher 上下文异步处理 api 回调 c 中的用户回调
C_aio_notify_Complete *oncomplete = new C_aio_notify_Complete(c, linger_op);

// C_notify_Finish 在构造函数内部将 this 赋值给 LingerOp::on_notify_finish,onnotify 的完成将调用 oncomplete->complete(注意
// 只有 oncomplete->handle_ack 和 oncomplete->complete 都调用之后 oncomplete 才真正的完成)
C_notify_Finish *onnotify = new C_notify_Finish(client->cct, oncomplete,
objecter, linger_op, preply_bl, preply_buf, preply_buf_len);

// onack 将赋值给 LingerOp::on_reg_commit,onack 完成时调用 oncomplete->handle_ack
Context *onack = new C_aio_notify_Ack(client->cct, onnotify, oncomplete);

// 默认为 10
uint32_t timeout = notify_timeout;
if (timeout_ms)
timeout = timeout_ms / 1000;

// Construct RADOS op
::ObjectOperation rd;
prepare_assert_ops(&rd);
bufferlist inbl;
rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl);

// 针对 notify 调用为 LingerOp 设置相关的字段,然后调用 _linger_submit 构造 OSDOp 并发送
// Issue RADOS op
objecter->linger_notify(linger_op, rd, snap_seq, inbl, NULL, onack,
&c->objver);
return 0;
}

IoCtxImpl::aio_unwatch,从服务端删除 watcher 信息,并删除 IoCtxImpl::aio_watch 注册的 LingerOp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int librados::IoCtxImpl::aio_unwatch(uint64_t cookie, AioCompletionImpl *c)
{
c->io = this;
Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
// C_aio_linger_Complete::cancel 为 true,因此完成时会入队 C_aio_linger_cancel 回调 Objecter::linger_cancel 来删除
// IoCtxImpl::aio_watch 注册的 LingerOp
Context *oncomplete = new C_aio_linger_Complete(c, linger_op, true);

::ObjectOperation wr;
prepare_assert_ops(&wr);
wr.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
objecter->mutate(linger_op->target.base_oid, oloc, wr, snapc,
ceph::real_clock::now(), 0, oncomplete, &c->objver);
return 0;
}

Objecter::linger_register,在 Objecter 中分配并注册 LingerOp

Objecter::linger_cancel,在 Objeter 中移除 Objecter::linger_register 注册的 LingerOp

Objecter::linger_watch,针对 watch 调用为 LingerOp 设置相关的字段,然后调用 Objecter::_linger_submit 构造 OSDOp 并发送;

Objecter::linger_notify,针对 notiy 调用为 LingerOp 设置相关的字段,然后调用 Objecter::_linger_submit 构造 OSDOp 并发送;

Objecter::_linger_submit,通过 _calc_target 计算 t->osd,创建 OSDSession 并建立 LingerOpOSDSession 之间的联系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul)
{
assert(sul.owns_lock() && sul.mutex() == &rwlock);
assert(info->linger_id);

// 通过 oid 计算 pgid,然后通过 pgid 和 crush(osdmap->pg_to_up_acting_osds) 得到 t->osd
// Populate Op::target
OSDSession *s = NULL;
_calc_target(&info->target, nullptr);

// Create LingerOp<->OSDSession relation
int r = _get_session(info->target.osd, &s, sul);
assert(r == 0);
OSDSession::unique_lock sl(s->lock);
// 设置 LingerOp::session = s 以及 OSDSession[LingerOp::linger_id] = info
_session_linger_op_assign(s, info);
sl.unlock();
put_session(s);

// 基于 LingerOp 构造 OSDOp 并发送
_send_linger(info, sul);
}

Objecter::_send_linger,基于 LingerOp 构造 OSDOp 并发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
void Objecter::_send_linger(LingerOp *info, shunique_lock& sul)
{
assert(sul.owns_lock() && sul.mutex() == &rwlock);

vector<OSDOp> opv;
Context *oncommit = NULL;
LingerOp::shared_lock watchl(info->watch_lock);
bufferlist *poutbl = NULL;
// LingerOp::registered 由回调 Objecter::_linger_commit 设置,且无论返回值 r 如何,registered 都会
// 被置成 true,显然 r < 0 会导致上层异步 api 接口的回调报错,且 IoCtxImpl 模块中的 C_aio_linger_Complete 当
// r < 0 时会删除 LingerOp,因此即使 r < 0 时 registered 置成 true 也无所谓
if (info->registered && info->is_watch) { // watch 是一个长连接,当 osdmap 发生改变(Objecter::handle_osd_map)
// 或者底层 socket 连接出错(Objecter::ms_handle_reset)时需要重连
ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect" << dendl;
opv.push_back(OSDOp());
opv.back().op.op = CEPH_OSD_OP_WATCH;
opv.back().op.watch.cookie = info->get_cookie();
opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
opv.back().op.watch.gen = ++info->register_gen; // 服务端没用到,但用于 ping 的回调(Objecter::_linger_ping)检测 ping 的 watch 和注册的 watch 是否能对上

// 回调函数为 Objecter::_linger_reconnect
oncommit = new C_Linger_Reconnect(this, info);
} else { // 非 watch 重连
ldout(cct, 15) << "send_linger " << info->linger_id << " register" << dendl;

// LingerOp::ops 由 IoCtxImpl::aio_watch/aio_notify 构造并在 Objecter::linger_watch 或者 Objecter::linger_notify 中设置
opv = info->ops;

// 回调函数为 Objecter::_linger_commit
C_Linger_Commit *c = new C_Linger_Commit(this, info);
if (!info->is_watch) {
info->notify_id = 0;
poutbl = &c->outbl;
}
oncommit = c;
}
watchl.unlock();

Op *o = new Op(info->target.base_oid, info->target.base_oloc, opv,
info->target.flags | CEPH_OSD_FLAG_READ, oncommit, info->pobjver);
o->outbl = poutbl;
o->snapid = info->snap;
o->snapc = info->snapc;
o->mtime = info->mtime;

// 直接结构体赋值,因此 LingerOp 中的 t->paused 永远保持为默认值 false,因为 Objecter::_linger_submit 中虽然调用了
// _calc_target 进行计算,但 op_target_t::paused 只会在 Objecter::_op_submit 中可能被置成 true
o->target = info->target;
o->tid = ++last_tid;

// do not resend this; we will send a new op to reregister
o->should_resend = false; // 由 LingerOp 生成的 OSDOp 不主动重发,而是由 LingerOp 的重发来触发新的 OSDOp 发送

if (info->register_tid) { // 之前已基于该 LingerOp 构造并发送过 OSDOp
// repeat send. cancel old registeration op, if any.
OSDSession::unique_lock sl(info->session->lock);
if (info->session->ops.count(info->register_tid)) {
Op *o = info->session->ops[info->register_tid];
_op_cancel_map_check(o);

// 取消之前下发的 OSDOp,由下面的 _op_submit 进行重发
_cancel_linger_op(o);
}
sl.unlock();

_op_submit(o, sul, &info->register_tid);
} else {
// first send
_op_submit_with_budget(o, sul, &info->register_tid);
}

logger->inc(l_osdc_linger_send);
}

Objecter::_linger_commit,非 watch 重连情况下 LingerOp 对应的 OSDOp 的回调:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void Objecter::_linger_commit(LingerOp *info, int r, bufferlist& outbl)
{
LingerOp::unique_lock wl(info->watch_lock);
ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl;

if (info->on_reg_commit) { // 即 IoCtxImpl 模块中的 C_aio_linger_Complete(用于完成异步 api 回调)
info->on_reg_commit->complete(r);
info->on_reg_commit = NULL;
}

if (r < 0 && info->on_notify_finish) { // 即 IoCtxImpl 模块中的 C_notify_Finish
info->on_notify_finish->complete(r); // on_reg_commit 和 on_notify_finish 都完成才会完成 IoCtxImpl 模块中的 C_aio_notify_Complete(用于完成异步 api 回调)
info->on_notify_finish = nullptr;
}

// 如果 r < 0,IoCtxImpl 层的回调 C_aio_linger_Complete/C_aio_notify_Complete(C_aio_notify_Complete 的子类)会删除当前 LingerOp 并完成异步 api 回调,
// 因此即使 r < 0,针对该 LingerOp 也不会再有任何操作,置上 registered = true 并不会导致任何问题

// only tell the user the first time we do this
// registered 只对 watch 有意义,用于 osdmap 发生改变(Objecter::handle_osd_map),或者底层 socket 连接
// 出错(Objecter::ms_handle_reset)时判断是否是重连还是首次连接
info->registered = true;
info->pobjver = NULL;

if (!info->is_watch) {
// make note of the notify_id
bufferlist::iterator p = outbl.begin();
try {
::decode(info->notify_id, p);
ldout(cct, 10) << "_linger_commit notify_id=" << info->notify_id
<< dendl;
}
catch (buffer::error& e) {
}
}
}

Objecter::_linger_reconnect,watch 重连情况下 LingerOp 对应的 OSDOp 的回调(watch 能重连说明首次 watch 肯定是成功的,见 Objecter::_linger_commit 的分析):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void Objecter::_linger_reconnect(LingerOp *info, int r)
{
ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << r << " (last_error "
<< info->last_error << ")" << dendl;
if (r < 0) { // watch 重连失败,将发送给 LingerOp::watch_context-handle_error 处理
LingerOp::unique_lock wl(info->watch_lock);
if (!info->last_error) {
r = _normalize_watch_error(r);
info->last_error = r;
if (info->watch_context) {
finisher->queue(new C_DoWatchError(this, info, r)); // 给用户在调用 watch api 时注册的处理错误/通知消息回调进行处理
}
}
wl.unlock();
}
}

Objecter::_send_linger_ping,由 Objecter::tick 定时遍历所有 OSDSession 下所有已注册的 watch LingerOp(注册失败的已经被删除,参考 Objecter::_linger_commit 的处理)并发送 ping OSDOp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void Objecter::_send_linger_ping(LingerOp *info)
{
// rwlock is locked unique
// info->session->lock is locked

if (cct->_conf->objecter_inject_no_watch_ping) {
ldout(cct, 10) << __func__ << " " << info->linger_id << " SKIPPING" << dendl;
return;
}
if (osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
ldout(cct, 10) << __func__ << " PAUSERD" << dendl;
return;
}

ceph::mono_time now = ceph::mono_clock::now();
ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now << dendl;

vector<OSDOp> opv(1);
opv[0].op.op = CEPH_OSD_OP_WATCH;
opv[0].op.watch.cookie = info->get_cookie();
opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING;
opv[0].op.watch.gen = info->register_gen; // 服务端没用到,当 watch 由于 osdmap/socket 导致重连时 ++info->register_gen,在 C_Linger_Ping 回调中用来做 ping 响应的检测
C_Linger_Ping *onack = new C_Linger_Ping(this, info);
Op *o = new Op(info->target.base_oid, info->target.base_oloc, opv,
info->target.flags | CEPH_OSD_FLAG_READ, onack, NULL, NULL);
o->target = info->target;
o->should_resend = false; // 由 LingerOp 生成的 OSDOp 不主动重发,而是由 LingerOp 的重发来触发新的 OSDOp 发送
_send_op_account(o);
MOSDOp *m = _prepare_osd_op(o);
o->tid = ++last_tid;
_session_op_assign(info->session, o);
_send_op(o, m);
info->ping_tid = o->tid;

onack->sent = now;
logger->inc(l_osdc_linger_ping);
}

Objecter::_linger_pingObjecter::_send_linger_ping 的回调:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void Objecter::_linger_ping(LingerOp *info, int r, mono_time sent, uint32_t register_gen)
{
LingerOp::unique_lock l(info->watch_lock);
ldout(cct, 10) << __func__ << " " << info->linger_id << " sent " << sent << " gen "
<< register_gen << " = " << r << " (last_error " << info->last_error
<< " register_gen " << info->register_gen << ")" << dendl;

if (info->register_gen == register_gen) {
if (r == 0) {
info->watch_valid_thru = sent;
} else if (r < 0 && !info->last_error) {
r = _normalize_watch_error(r);
info->last_error = r;
if (info->watch_context) {
finisher->queue(new C_DoWatchError(this, info, r)); // 给用户在调用 watch api 时注册的处理错误/通知消息回调进行处理
}
}
} else { // watch 已重连,忽略老的 ping 消息的响应
ldout(cct, 20) << " ignoring old gen" << dendl;
}
}

LingerOp 的重传有两个原因,一是 osdmap 发生变化,一是 socket 连接出错,分别是在如下两个函数中进行处理:

1
2
Objecter::handle_osd_map
Objecter::ms_handle_reset

Objecter::handle_osd_map,调用 _scan_requests 扫描得到所有需要重发的 LingerOp, OSDOp 以及 CommandOp,并在 Objecter::handle_osd_map 内部依次进行重发;

Objecter::ms_handle_reset, 调用 _kick_requests 扫描得到所有需要重发的 OSDOp, LingerOp 以及 CommandOp,并在 _kick_requests 内部进行 OSDOpCommandOp 的重发,而 LingerOpObjecter::ms_handle_reset 中通过调用 _linger_ops_resend 进行重发。