mgr 是一个 active/standby 系统,通过心跳与 mon 保活,同一时刻所有的 mgr 进程中只有一个为 active 状态的 mgr 进程提供服务,mgr 进程运行在 public 网络上,它不断的收集集群信息,并将信息暴露给外部。
mgr 进程的一个非常重要的功能是通过 Python 模块的形式扩展 mgr,当然这样做的前提是内嵌 Python 解释器,并实现内置的 ceph_module
模块对外暴露 mgr C++ 部分提供的数据或接口供 mgr Python 扩展模块使用。
ceph_module
由 BaseMgrModule.cc / BaseMgrStandbyModule.cc / PyOSDMap.cc 3 个 C++ 文件定义,其分别定义了 5 个 Python 类型对象:BaseMgrModuleType / BaseMgrStandbyModuleType / BasePyOSDMapType / BasePyOSDMapIncrementalType / BasePyCRUSHType,通过 PyModule::init_ceph_module
静态方法注册为如下的 Python 类(class):
ceph_module.BaseMgrModule
ceph_module.BaseMgrStandbyModule
ceph_module.BasePyOSDMap
ceph_module.BasePyOSDMapIncremental
ceph_module.BasePyCRUSH
模块组成
在 mgr 的实现中,有四个主要的结构体:MgrStandby、PyModuleRegistry、Mgr、DaemonServer,其关系大概如下图所示:
- PyModuleRegistry 是 MgrStandby 的成员变量,用于管理内嵌的 Python 解释器以及 mgr Python 扩展模块;
- Mgr 是 MgrStandby 的指针类型的成员变量,当 mgr 进程为非 active 状态时 Mgr 实例为 nullptr,当 mgr 进程切换为 active 状态后,Mgr 实例才得以创建,负责处理集群的各种 map(MMonMap、MOSDMap、MServiceMap、MFSMap)以及 MMgrDigest、MLog 消息;
- DaemonServer 是 Mgr 的成员变量,主要负责处理集群服务进程的上报消息(MMgrReport)以及 PG 信息(MPGStats),并通过 mgr command 和 Python 扩展模块对外暴露这些信息;
cluster_state
(ClusterState)是 Mgr 的成员变量,负责记录 FSMap、ServiceMap、MgrMap、PGMap(注意 MonMap 和 OSDMap 存储在其成员指针变量 monc 与 objecter 中);daemon_state
(DaemonStateIndex)是 Mgr 的成员变量,负责记录集群所有服务进程的信息(DaemonState);- c/msgr 用于 mgr 作为 ceph 集群的客户端与 ceph 集群交互,比如:上电注册 mgr 自身、发送心跳消息、处理集群 map 等;
- s/msgr 用于 mgr 作为 ceph 集群的信息采集中心,接收集群服务进程的上报消息和 PG 信息。
main 函数
mgr 进程的 main 函数非常简单,大致如下(ceph 代码变化太快,这里是 2018 年 12 月上旬 master 分支代码):
int main(int argc, const char **argv)
{
auto cct = global_init(&defaults, args, CEPH_ENTITY_TYPE_MGR,
CODE_ENVIRONMENT_DAEMON, 0);
pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC);
common_init_finish(g_ceph_context);
MgrStandby mgr(argc, argv);
int rc = mgr.init();
return mgr.main(args);
}
在 MgrStandby 的构造函数中会构造网络层 AsyncMessenger 实例以及集群 daemon 的客户端实例,MgrStandby 内包含了所有集群 daemon 的客户端实例:monc 对应 mon,mgrc 对应 mgr,objecter 对应 osd,client 对应 mds。
此时构造的 msgr 实例是作为 mon 的客户端创建的实例(MgrStandby::client_messenger
),因此它并不会进行 bind 操作,当 mgr 进程切换成 active 状态后,会新建一个 msgr 实例并 bind 作为服务端进行监听(DaemonServer::msgr
,s/msgr)。
MgrStandby::main 的逻辑更为简单,注册 SIGINT 和 SIGTERM 信号处理,然后等待 msgr 退出,由 msgr 驱动整个事件机制的运行。
初始化
mgr 存在两种状态:active 和 standby,其初始化相应的也分为两部分。
standby 状态初始化
mgr 进程上电时处于 standby 状态,其初始化行为在 MgrStandby::init
中进行,主要逻辑如下:
- 注册 SIGHUP 信号处理(signal handler);
- 为客户端 msgr 实例(
MgrStandby::client_messenger
,c/msgr)注册 handler(所有的客户端实例及 MgrStandby 自身都作为 handler 进行注册),handler 的处理顺序为:monc -> objecter -> this -> client -> mgrc; - 初始化 monc 并通过 monc 向 mon 进行 cephx 认证;
- 将 msgr 的 entity name 设置为 mgr.<global id>;
- 初始化 mgrc、objecter、client、
py_module_registry
; - 启动 tick 定时器;
下面主要分析 Mgr::py_module_registry
的初始化过程。
PyModuleRegistry 的初始化(PyModuleRegistry::init
)过程就是初始化 Python 解释器并 import mgr Python 扩展模块的过程:
void PyModuleRegistry::init()
{
std::lock_guard locker(lock);
// Set up global python interpreter
Py_SetProgramName(const_cast<char*>(PYTHON_EXECUTABLE));
// Add more modules
if (g_conf().get_val<bool>("daemonize")) {
PyImport_AppendInittab("ceph_logger", PyModule::init_ceph_logger);
}
// 注意 PyModule::init_ceph_module 是一个 static 方法
PyImport_AppendInittab("ceph_module", PyModule::init_ceph_module); // 将 ceph_module 作为 builtin module 进行加载
Py_InitializeEx(0); // 初始化 Python 解释器
// Let CPython know that we will be calling it back from other
// threads in future.
if (! PyEval_ThreadsInitialized()) {
PyEval_InitThreads();
}
// Drop the GIL and remember the main thread state (current
// thread state becomes NULL)
pMainThreadState = PyEval_SaveThread();
ceph_assert(pMainThreadState != nullptr);
std::list<std::string> failed_modules;
// 搜索 mgr_module_path 路径下的所有文件夹,如果文件夹下存在 module.py 文件则认为是一个 mgr Python 模块
std::set<std::string> module_names = probe_modules();
// Load python code
for (const auto& module_name : module_names) { // import 所有的 mgr Python 模块
dout(1) << "Loading python module '" << module_name << "'" << dendl;
// Everything starts disabled, set enabled flag on module
// when we see first MgrMap
auto mod = std::make_shared<PyModule>(module_name); // 每个 PyModule 实例代表一个 import 的 Python 模块
// 每个 PyModule,即每个 import 的 mgr Python 模块,都有自己独立的 Python 解释器环境
int r = mod->load(pMainThreadState); // import mgr Python 模块,当然是通过调用 python-devel C API 的形式
// Record the module even if the load failed, so that we can
// report its loading error
modules[module_name] = std::move(mod);
}
}
mgr Python 扩展模块 import 过程:
int PyModule::load(PyThreadState *pMainThreadState)
{
ceph_assert(pMainThreadState != nullptr);
// Configure sub-interpreter
{
// Python 解释器总是在与 OS 线程绑定的 PyThreadState 环境运行,切换解释器就意味着切换 PyThreadState 环境,
// 如果 Python 解释器要在某个非绑定的 OS 线程上运行,则需要新建 PyThreadState 实例与该 OS 线程绑定并调用
// PyThreadState_Swap 显式切换至新建的 PyThreadState 环境
SafeThreadState sts(pMainThreadState); // 记录 Python 主解释器环境(PyThreadState)与当前 OS 线程(pthread_self())的绑定关系
Gil gil(sts); // 请求 GIL,切换 Python 解释器环境为 sts 记录的 PyThreadState 环境
// 每个 PyModule 实例新建一个由成员变量 pMyThreadState 管理的 Python 子解释器环境
auto thread_state = Py_NewInterpreter(); // 创建 Python 子解释器(即 PyThreadState 环境),自动与当前 OS 线程绑定
pMyThreadState.set(thread_state); // 记录 Python 子解释器与当前 OS 线程的绑定关系
// Some python modules do not cope with an unpopulated argv, so lets
// fake one. This step also picks up site-packages into sys.path.
const char *argv[] = {"ceph-mgr"};
PySys_SetArgv(1, (char**)argv);
// Configure sys.path to include mgr_module_path
string paths = (":" + get_site_packages() +
":" + g_conf().get_val<std::string>("mgr_module_path"));
string sys_path(Py_GetPath() + paths);
PySys_SetPath(const_cast<char*>(sys_path.c_str())); // 设置 sys.path
dout(10) << "Computed sys.path '" << sys_path << "'" << dendl;
}
// Environment is all good, import the external module
{
Gil gil(pMyThreadState); // 请求 GIL,切换解释器环境为新建的子解释器环境
// 所有的 mgr Python 模块都需要定义从 MgrModule/MgrStandbyModule 类(在 src/pybind/mgr/mgr_module.py 中定义)派生的子类
int r;
// 通过 import 当前 mgr Python 模块,将 mgr 模块定义的 MgrModule 子类(class)信息保存在 PyModule::pClass 中
r = load_subclass_of("MgrModule", &pClass);
r = load_commands(); // 根据 Python 类成员变量 COMMANDS 加载所有的 ModuleCommand 并保存在 PyModule::commands 中
r = load_options(); // 根据 Python 类成员变量 OPTIONS 加载所有的 ModuleCommand 并保存在 PyModule::options 中
// We've imported the module and found a MgrModule subclass, at this
// point the module is considered loaded. It might still not be
// runnable though, can_run populated later...
loaded = true;
// 通过 import 当前 mgr Python 模块,将 mgr 模块定义的 MgrStandbyModule 子类(class)信息保存在 PyModule::pStandbyClass 中
r = load_subclass_of("MgrStandbyModule", &pStandbyClass);
// 调用类方法 can_run,这样可以判断 mgr 模块是否加载成功
PyObject *pCanRunTuple = PyObject_CallMethod(pClass, const_cast<char*>("can_run"), const_cast<char*>("()"));
PyObject *pCanRun = PyTuple_GetItem(pCanRunTuple, 0);
PyObject *can_run_str = PyTuple_GetItem(pCanRunTuple, 1);
can_run = (pCanRun == Py_True);
if (!can_run) {
error_string = PyString_AsString(can_run_str);
dout(4) << "Module " << get_name()
<< " reported that it cannot run: "
<< error_string << dendl;
}
Py_DECREF(pCanRunTuple);
}
return 0;
}
active 状态初始化
MgrStandby 对 MMgrMap 消息的处理,可能导致 mgr 进程从非 active 切换成 active 或者从 active 切换成非 active,mgr 进程切换至 active 状态会新建 Mgr 实例,并调用 Mgr::background_init
进行初始化(在 Finisher 上下文调用 Mgr::init
而已):
void Mgr::init()
{
std::lock_guard l(lock);
// 创建 s/msgr 用于处理 MPGStats、MMgrReport、MMgrOpen、MMgrClose、MCommand 等种类型的消息
// mgr 在向 mon 发送心跳消息时(MgrStandby::send_beacon)如果 mgr 并没有初始化完成(即 Mgr::initialized != true),
// 心跳消息里并不会携带 mgr 的地址,初始化完成之后,心跳消息上报的地址是 DaemonServer::init 里创建的 s/msgr 的地址
int r = server.init(monc->get_global_id(), client_messenger->get_myaddrs());
// Preload all daemon metadata (will subsequently keep this
// up to date by watching maps, so do the initial load before
// we subscribe to any maps)
dout(4) << "Loading daemon metadata..." << dendl;
// 向 mon 请求所有 mon/osd/mds 的 metadata 并保存在 Mgr::daemon_state 中(即 DaemonStateIndex)
load_all_metadata();
// subscribe to all the maps
monc->sub_want("log-info", 0, 0);
monc->sub_want("mgrdigest", 0, 0);
monc->sub_want("fsmap", 0, 0);
monc->sub_want("servicemap", 0, 0);
dout(4) << "waiting for OSDMap..." << dendl;
// Subscribe to OSDMap update to pass on to ClusterState
objecter->maybe_request_map();
// reset the mon session. we get these maps through subscriptions which
// are stateful with the connection, so even if *we* don't have them a
// previous incarnation sharing the same MonClient may have.
monc->reopen_session();
// Start Objecter and wait for OSD map
lock.Unlock(); // Drop lock because OSDMap dispatch calls into my ms_dispatch
objecter->wait_for_osd_map();
lock.Lock();
// ClusterState 不直接保存 monmap 和 osdmap,而是保存有指向 monc 和 objecter 的指针类型成员变量,
// 因此 with_monmap/with_osdmap/with_osdmap_and_pgmap 会让 monc 和 objecter 对应的方法去处理
// ClusterState 直接保存 fsmap/servicemap/mgr_map/pg_map,因此 with_fsmap/with_servicemap/with_mgrmap/with_pgmap
// 以及 with_osdmap_and_pgmap 会直接传递这些保存的变量作为回调函数的参数
// Populate PGs in ClusterState
cluster_state.with_osdmap_and_pgmap([this](const OSDMap &osd_map, const PGMap& pg_map) {
// 根据 OSDMap 生成 PGMap(ClusterState::pg_map)
cluster_state.notify_osdmap(osd_map);
});
// Wait for FSMap
dout(4) << "waiting for FSMap..." << dendl;
while (!cluster_state.have_fsmap()) { // 等待 FSMap
fs_map_cond.Wait(lock);
}
// Wait for MgrDigest...
dout(4) << "waiting for MgrDigest..." << dendl;
while (!digest_received) { // 等待 Mgr::handle_mgr_digest 处理 MMgrDigest 消息完成
digest_cond.Wait(lock);
}
// Load module KV store
// 向 mon 请求所有的 ConfigKeyService 中保存的配置数据,过滤并得到所有 "mgr/" 和 "device/" 类型的配置数据,
// 如果是 device/ 数据,创建对应的 DeviceState 实例并保存在 DaemonStateIndex::devices 中,如果是普通的配置数据,
// 则直接保存在临时变量 kv_store 中(仅用于兼容性处理,见下面的 upgrade_config 调用)
auto kv_store = load_store();
// Migrate config from KV store on luminous->mimic
// drop lock because we do blocking config sets to mon
lock.Unlock();
// L -> M 升级的兼容性处理,L 版本的 mgr Python 扩展模块配置数据保存在 mon ConfigKeyService 中,而升级到
// M 版本之后要把 config-key 中的配置数据转成 mon ConfigMonitor 管理的配置数据
// 升级过程会把配置数据设置到 ConfigMonitor 中,以后会通过 MgrStandby::init 在 monc 中注册的回调获取配置数据
py_module_registry->upgrade_config(monc, kv_store);
lock.Lock();
// 遍历 mgr 扩展模块(由 PyModuleRegistry::init import 得到模块中派生的 MgrModule 子类并保存在 PyModuleRegistry::modules 中)
// 并创建 MgrModule 子类实例
py_module_registry->active_start(daemon_state, cluster_state,
kv_store, *monc, clog, audit_clog, *objecter, *client,
finisher, server);
initializing = false;
initialized = true;
}
PyModuleRegistry::active_start
实现如下,主要逻辑为遍历所有通过 PyModuleRegistry::init
import 的扩展模块,创建 MgrModule 子类实例并启动 serve 线程:
void PyModuleRegistry::active_start(
DaemonStateIndex &ds, ClusterState &cs,
const std::map<std::string, std::string> &kv_store,
MonClient &mc, LogChannelRef clog_, LogChannelRef audit_clog_,
Objecter &objecter_, Client &client_, Finisher &f,
DaemonServer &server)
{
std::lock_guard locker(lock);
if (standby_modules != nullptr) { // mgr 切换为 active,停止之前启动的 standby 扩展模块(从 MgrStandbyModule 派生的子类实例)
standby_modules->shutdown();
standby_modules.reset();
}
// 创建 ActivePyModules 实例,用于管理所有的 active 扩展模块(从 MgrModule 派生的子类实例)
active_modules.reset(new ActivePyModules(module_config, kv_store, ds, cs, mc,
clog_, audit_clog_, objecter_, client_, f, server));
for (const auto &i : modules) {
// Anything we're skipping because of !can_run will be flagged
// to the user separately via get_health_checks
if (!(i.second->is_enabled() && i.second->is_loaded())) {
continue;
}
// 创建 ActivePyModule 实例并保存在 ActivePyModule::modules 中,通过 ActivePyModule 实例创建 MgrModule 子类实例
// 并启动 serve 线程(线程执行的函数是子类实例的 serve 方法)
int r = active_modules->start_one(i.second);
}
}
ActivePyModule::start_one
实现如下:
int ActivePyModules::start_one(PyModuleRef py_module)
{
std::lock_guard l(lock);
modules[py_module->get_name()].reset(new ActivePyModule(py_module, clog));
auto active_module = modules.at(py_module->get_name()).get();
// 注意与 PyModuleRegistry::init 中调用的 PyModule::load 方法区分开来,前者是 import 得到 MgrModule 子类(PyModule::pClass),
// 这里是基于得到的子类创建子类实例
int r = active_module->load(this);
// 创建 serve 线程,线程函数 PyModuleRunner::serve 调用 MgrModule 子类定义的 serve 方法对外提供服务
// 注意 ActivePyModule 是从 PyModuleRunner 派生的子类,因此线程函数是 PyModuleRunner::serve
active_module->thread.create(active_module->get_thread_name());
}
消息处理
由于 active/standby 状态的原因,消息处理也有两种不同的情况。
standby 状态消息处理
MgrStandby 的消息处理逻辑如下:
bool MgrStandby::ms_dispatch(Message *m)
{
std::lock_guard l(lock);
if (m->get_type() == MSG_MGR_MAP) {
handle_mgr_map(static_cast<MMgrMap*>(m)); // 处理 mgrmap,进行 active/non-active 状态切换
}
bool handled = false;
if (active_mgr) {
auto am = active_mgr;
lock.Unlock();
// 当前为 active 状态,处理 MMonMap、MOSDMap、MMgrDigest、MServiceMap、MFSMap、MLog 消息
handled = am->ms_dispatch(m);
lock.Lock();
}
if (m->get_type() == MSG_MGR_MAP) {
// let this pass through for mgrc
handled = false;
}
return handled;
}
显然,在 standby 状态下,MgrStandby 只能处理 MMgrMap 消息,且 MMgrMap 消息在处理完之后还会给 mgrc 去处理(前面提到过 mgrc 会作为最后一个 handler 挂载到 MgrStandby::client_messenger
的最后)。
正是对 MMgrMap 消息的处理,导致 mgr 进程从 standby 切换成 active 或者从 active 切换成 standby:
void MgrStandby::handle_mgr_map(MMgrMap* mmap) {
auto &map = mmap->get_map();
const bool active_in_map = map.active_gid == monc.get_global_id();
// PyModuleRegistry may ask us to respawn if it sees that
// this MgrMap is changing its set of enabled modules
bool need_respawn = py_module_registry.handle_mgr_map(map);
if (need_respawn) { // 启用或禁用了新的 Python 模块,重启 ceph-mgr 进程,进程 pid 不变
respawn();
}
if (active_in_map) {
if (!active_mgr) { // 从非 active 切换成 active
// 新建 Mgr 实例用于 active 状态下的业务消息处理,Mgr 实例将拿到第一张 mgrmap
active_mgr.reset(new Mgr(&monc, map, &py_module_registry, client_messenger.get(),
&objecter, &client, clog, audit_clog));
active_mgr->background_init(new FunctionContext([this](int r) {
// Advertise our active-ness ASAP instead of waiting for
// next tick.
std::lock_guard l(lock);
send_beacon();
}));
dout(1) << "I am now activating" << dendl;
} else {
dout(10) << "I was already active" << dendl;
bool need_respawn = active_mgr->got_mgr_map(map);
if (need_respawn) { // 启用或禁用了新的 Python 模块,重启 ceph-mgr 进程,进程 pid 不变
respawn();
}
}
if (!available_in_map && map.get_available()) {
dout(4) << "Map now says I am available" << dendl;
available_in_map = true;
}
} else if (active_mgr != nullptr) { // 从 active 切换成非 active,重启 ceph-mgr 进程,进程 pid 不变
derr << "I was active but no longer am" << dendl;
respawn();
} else {
if (map.active_gid != 0 && map.active_name != g_conf()->name.get_id()) {
// I am the standby and someone else is active, start modules
// in standby mode to do redirects if needed
if (!py_module_registry.is_standby_running()) {
// 遍历 mgr 扩展模块创建 MgrStandbyModule 子类实例及 serve 线程,具体实现与 PyModuleRegistry::active_start 类似,
// 因此可以参考下面对 PyModuleRegistry::active_start 的分析
py_module_registry.standby_start(monc);
}
}
}
}
active 状态消息处理
在 active 状态下,除了由 MgrStandby::ms_dispatch
继续处理 MMgrMap 消息外,其它的消息传递给 Mgr 实例进行处理:
bool Mgr::ms_dispatch(Message *m)
{
std::lock_guard l(lock);
switch (m->get_type()) {
case MSG_MGR_DIGEST:
handle_mgr_digest(static_cast<MMgrDigest*>(m));
break;
case CEPH_MSG_MON_MAP:
py_module_registry->notify_all("mon_map", ""); //
m->put();
break;
case CEPH_MSG_FS_MAP:
py_module_registry->notify_all("fs_map", "");
handle_fs_map((MFSMap*)m);
return false; // I shall let this pass through for Client
break;
case CEPH_MSG_OSD_MAP:
handle_osd_map();
py_module_registry->notify_all("osd_map", "");
// Continuous subscribe, so that we can generate notifications
// for our MgrPyModules
objecter->maybe_request_map();
m->put();
break;
case MSG_SERVICE_MAP:
handle_service_map(static_cast<MServiceMap*>(m));
py_module_registry->notify_all("service_map", "");
m->put();
break;
case MSG_LOG:
handle_log(static_cast<MLog *>(m));
break;
default:
return false;
}
return true;
}
注意 MMonMap、MOSDMap 消息实际上是由 Mgr 实例的指针成员变量 monc 和 objecter 处理的,Mgr 实例在这里只是间接的使用这些 map,Mgr 实例自身直接处理 MMgrDigest、MFSMap、MServiceMap 以及 MLog 等 4 个消息。
PyModuleRegistry::notify_all
通知扩展模块,中间过程调用的是 ActivePyModules::notify_all
,因此最终只调用了 MgrModule 子类的 notify 方法。
参考资料
Initialization, Finalization, and Threads
https://docs.python.org/2/c-api/init.html
Importing Modules
https://docs.python.org/2/c-api/import.html
Module Objects
https://docs.python.org/2/c-api/module.html
最后修改于 2019-02-25