runsisi's

technical notes

ceph-mgr 进程

2019-02-25 runsisi#ceph

mgr 是一个 active/standby 系统,通过心跳与 mon 保活,同一时刻所有的 mgr 进程中只有一个为 active 状态的 mgr 进程提供服务,mgr 进程运行在 public 网络上,它不断的收集集群信息,并将信息暴露给外部。

mgr 进程的一个非常重要的功能是通过 Python 模块的形式扩展 mgr,当然这样做的前提是内嵌 Python 解释器,并实现内置的 ceph_module 模块对外暴露 mgr C++ 部分提供的数据或接口供 mgr Python 扩展模块使用。

ceph_module 由 BaseMgrModule.cc / BaseMgrStandbyModule.cc / PyOSDMap.cc 3 个 C++ 文件定义,其分别定义了 5 个 Python 类型对象:BaseMgrModuleType / BaseMgrStandbyModuleType / BasePyOSDMapType / BasePyOSDMapIncrementalType / BasePyCRUSHType,通过 PyModule::init_ceph_module 静态方法注册为如下的 Python 类(class):

ceph_module.BaseMgrModule
ceph_module.BaseMgrStandbyModule
ceph_module.BasePyOSDMap
ceph_module.BasePyOSDMapIncremental
ceph_module.BasePyCRUSH

模块组成

在 mgr 的实现中,有四个主要的结构体:MgrStandby、PyModuleRegistry、Mgr、DaemonServer,其关系大概如下图所示:

mgr

  1. PyModuleRegistry 是 MgrStandby 的成员变量,用于管理内嵌的 Python 解释器以及 mgr Python 扩展模块;
  2. Mgr 是 MgrStandby 的指针类型的成员变量,当 mgr 进程为非 active 状态时 Mgr 实例为 nullptr,当 mgr 进程切换为 active 状态后,Mgr 实例才得以创建,负责处理集群的各种 map(MMonMap、MOSDMap、MServiceMap、MFSMap)以及 MMgrDigest、MLog 消息;
  3. DaemonServer 是 Mgr 的成员变量,主要负责处理集群服务进程的上报消息(MMgrReport)以及 PG 信息(MPGStats),并通过 mgr command 和 Python 扩展模块对外暴露这些信息;
  4. cluster_state(ClusterState)是 Mgr 的成员变量,负责记录 FSMap、ServiceMap、MgrMap、PGMap(注意 MonMap 和 OSDMap 存储在其成员指针变量 monc 与 objecter 中);
  5. daemon_state(DaemonStateIndex)是 Mgr 的成员变量,负责记录集群所有服务进程的信息(DaemonState);
  6. c/msgr 用于 mgr 作为 ceph 集群的客户端与 ceph 集群交互,比如:上电注册 mgr 自身、发送心跳消息、处理集群 map 等;
  7. s/msgr 用于 mgr 作为 ceph 集群的信息采集中心,接收集群服务进程的上报消息和 PG 信息。

main 函数

mgr 进程的 main 函数非常简单,大致如下(ceph 代码变化太快,这里是 2018 年 12 月上旬 master 分支代码):

int main(int argc, const char **argv)
{
  auto cct = global_init(&defaults, args, CEPH_ENTITY_TYPE_MGR,
      CODE_ENVIRONMENT_DAEMON, 0);

  pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC);
  common_init_finish(g_ceph_context);

  MgrStandby mgr(argc, argv);
  int rc = mgr.init();
  return mgr.main(args);
}

在 MgrStandby 的构造函数中会构造网络层 AsyncMessenger 实例以及集群 daemon 的客户端实例,MgrStandby 内包含了所有集群 daemon 的客户端实例:monc 对应 mon,mgrc 对应 mgr,objecter 对应 osd,client 对应 mds。

此时构造的 msgr 实例是作为 mon 的客户端创建的实例(MgrStandby::client_messenger),因此它并不会进行 bind 操作,当 mgr 进程切换成 active 状态后,会新建一个 msgr 实例并 bind 作为服务端进行监听(DaemonServer::msgr,s/msgr)。

MgrStandby::main 的逻辑更为简单,注册 SIGINT 和 SIGTERM 信号处理,然后等待 msgr 退出,由 msgr 驱动整个事件机制的运行。

初始化

mgr 存在两种状态:active 和 standby,其初始化相应的也分为两部分。

standby 状态初始化

mgr 进程上电时处于 standby 状态,其初始化行为在 MgrStandby::init 中进行,主要逻辑如下:

  1. 注册 SIGHUP 信号处理(signal handler);
  2. 为客户端 msgr 实例(MgrStandby::client_messenger,c/msgr)注册 handler(所有的客户端实例及 MgrStandby 自身都作为 handler 进行注册),handler 的处理顺序为:monc -> objecter -> this -> client -> mgrc;
  3. 初始化 monc 并通过 monc 向 mon 进行 cephx 认证;
  4. 将 msgr 的 entity name 设置为 mgr.<global id>;
  5. 初始化 mgrc、objecter、client、py_module_registry
  6. 启动 tick 定时器;

下面主要分析 Mgr::py_module_registry 的初始化过程。

PyModuleRegistry 的初始化(PyModuleRegistry::init)过程就是初始化 Python 解释器并 import mgr Python 扩展模块的过程:

void PyModuleRegistry::init()
{
  std::lock_guard locker(lock);

  // Set up global python interpreter
  Py_SetProgramName(const_cast<char*>(PYTHON_EXECUTABLE));

  // Add more modules
  if (g_conf().get_val<bool>("daemonize")) {
    PyImport_AppendInittab("ceph_logger", PyModule::init_ceph_logger);
  }
  // 注意 PyModule::init_ceph_module 是一个 static 方法
  PyImport_AppendInittab("ceph_module", PyModule::init_ceph_module); // 将 ceph_module 作为 builtin module 进行加载

  Py_InitializeEx(0); // 初始化 Python 解释器

  // Let CPython know that we will be calling it back from other
  // threads in future.
  if (! PyEval_ThreadsInitialized()) {
    PyEval_InitThreads();
  }

  // Drop the GIL and remember the main thread state (current
  // thread state becomes NULL)
  pMainThreadState = PyEval_SaveThread();
  ceph_assert(pMainThreadState != nullptr);

  std::list<std::string> failed_modules;

  // 搜索 mgr_module_path 路径下的所有文件夹,如果文件夹下存在 module.py 文件则认为是一个 mgr Python 模块
  std::set<std::string> module_names = probe_modules();
  // Load python code
  for (const auto& module_name : module_names) { // import 所有的 mgr Python 模块
    dout(1) << "Loading python module '" << module_name << "'" << dendl;

    // Everything starts disabled, set enabled flag on module
    // when we see first MgrMap
    auto mod = std::make_shared<PyModule>(module_name); // 每个 PyModule 实例代表一个 import 的 Python 模块
    // 每个 PyModule,即每个 import 的 mgr Python 模块,都有自己独立的 Python 解释器环境
    int r = mod->load(pMainThreadState); // import mgr Python 模块,当然是通过调用 python-devel C API 的形式

    // Record the module even if the load failed, so that we can
    // report its loading error
    modules[module_name] = std::move(mod);
  }
}

mgr Python 扩展模块 import 过程:

int PyModule::load(PyThreadState *pMainThreadState)
{
  ceph_assert(pMainThreadState != nullptr);

  // Configure sub-interpreter
  {
    // Python 解释器总是在与 OS 线程绑定的 PyThreadState 环境运行,切换解释器就意味着切换 PyThreadState 环境,
    // 如果 Python 解释器要在某个非绑定的 OS 线程上运行,则需要新建 PyThreadState 实例与该 OS 线程绑定并调用
    // PyThreadState_Swap 显式切换至新建的 PyThreadState 环境
    SafeThreadState sts(pMainThreadState); // 记录 Python 主解释器环境(PyThreadState)与当前 OS 线程(pthread_self())的绑定关系
    Gil gil(sts); // 请求 GIL,切换 Python 解释器环境为 sts 记录的 PyThreadState 环境

    // 每个 PyModule 实例新建一个由成员变量 pMyThreadState 管理的 Python 子解释器环境
    auto thread_state = Py_NewInterpreter(); // 创建 Python 子解释器(即 PyThreadState 环境),自动与当前 OS 线程绑定
    pMyThreadState.set(thread_state); // 记录 Python 子解释器与当前 OS 线程的绑定关系


    // Some python modules do not cope with an unpopulated argv, so lets
    // fake one.  This step also picks up site-packages into sys.path.
    const char *argv[] = {"ceph-mgr"};
    PySys_SetArgv(1, (char**)argv);

    // Configure sys.path to include mgr_module_path
    string paths = (":" + get_site_packages() +
          ":" + g_conf().get_val<std::string>("mgr_module_path"));
    string sys_path(Py_GetPath() + paths);
    PySys_SetPath(const_cast<char*>(sys_path.c_str())); // 设置 sys.path
    dout(10) << "Computed sys.path '" << sys_path << "'" << dendl;
  }
  // Environment is all good, import the external module
  {
    Gil gil(pMyThreadState); // 请求 GIL,切换解释器环境为新建的子解释器环境

    // 所有的 mgr Python 模块都需要定义从 MgrModule/MgrStandbyModule 类(在 src/pybind/mgr/mgr_module.py 中定义)派生的子类
    int r;

    // 通过 import 当前 mgr Python 模块,将 mgr 模块定义的 MgrModule 子类(class)信息保存在 PyModule::pClass 中
    r = load_subclass_of("MgrModule", &pClass);

    r = load_commands(); // 根据 Python 类成员变量 COMMANDS 加载所有的 ModuleCommand 并保存在 PyModule::commands 中

    r = load_options(); // 根据 Python 类成员变量 OPTIONS 加载所有的 ModuleCommand 并保存在 PyModule::options 中

    // We've imported the module and found a MgrModule subclass, at this
    // point the module is considered loaded.  It might still not be
    // runnable though, can_run populated later...
    loaded = true;

    // 通过 import 当前 mgr Python 模块,将 mgr 模块定义的 MgrStandbyModule 子类(class)信息保存在 PyModule::pStandbyClass 中
    r = load_subclass_of("MgrStandbyModule", &pStandbyClass);

    // 调用类方法 can_run,这样可以判断 mgr 模块是否加载成功
    PyObject *pCanRunTuple = PyObject_CallMethod(pClass, const_cast<char*>("can_run"), const_cast<char*>("()"));
    PyObject *pCanRun = PyTuple_GetItem(pCanRunTuple, 0);
    PyObject *can_run_str = PyTuple_GetItem(pCanRunTuple, 1);
    can_run = (pCanRun == Py_True);
    if (!can_run) {
      error_string = PyString_AsString(can_run_str);
      dout(4) << "Module " << get_name()
              << " reported that it cannot run: "
              << error_string << dendl;
    }

    Py_DECREF(pCanRunTuple);
  }
  return 0;
}

active 状态初始化

MgrStandby 对 MMgrMap 消息的处理,可能导致 mgr 进程从非 active 切换成 active 或者从 active 切换成非 active,mgr 进程切换至 active 状态会新建 Mgr 实例,并调用 Mgr::background_init 进行初始化(在 Finisher 上下文调用 Mgr::init 而已):

void Mgr::init()
{
  std::lock_guard l(lock);

  // 创建 s/msgr 用于处理 MPGStats、MMgrReport、MMgrOpen、MMgrClose、MCommand 等种类型的消息
  // mgr 在向 mon 发送心跳消息时(MgrStandby::send_beacon)如果 mgr 并没有初始化完成(即 Mgr::initialized != true),
  // 心跳消息里并不会携带 mgr 的地址,初始化完成之后,心跳消息上报的地址是 DaemonServer::init 里创建的 s/msgr 的地址
  int r = server.init(monc->get_global_id(), client_messenger->get_myaddrs());

  // Preload all daemon metadata (will subsequently keep this
  // up to date by watching maps, so do the initial load before
  // we subscribe to any maps)
  dout(4) << "Loading daemon metadata..." << dendl;

  // 向 mon 请求所有 mon/osd/mds 的 metadata 并保存在 Mgr::daemon_state 中(即 DaemonStateIndex)
  load_all_metadata();

  // subscribe to all the maps
  monc->sub_want("log-info", 0, 0);
  monc->sub_want("mgrdigest", 0, 0);
  monc->sub_want("fsmap", 0, 0);
  monc->sub_want("servicemap", 0, 0);

  dout(4) << "waiting for OSDMap..." << dendl;
  // Subscribe to OSDMap update to pass on to ClusterState
  objecter->maybe_request_map();

  // reset the mon session.  we get these maps through subscriptions which
  // are stateful with the connection, so even if *we* don't have them a
  // previous incarnation sharing the same MonClient may have.
  monc->reopen_session();

  // Start Objecter and wait for OSD map
  lock.Unlock();  // Drop lock because OSDMap dispatch calls into my ms_dispatch
  objecter->wait_for_osd_map();
  lock.Lock();

  // ClusterState 不直接保存 monmap 和 osdmap,而是保存有指向 monc 和 objecter 的指针类型成员变量,
  // 因此 with_monmap/with_osdmap/with_osdmap_and_pgmap 会让 monc 和 objecter 对应的方法去处理
  // ClusterState 直接保存 fsmap/servicemap/mgr_map/pg_map,因此 with_fsmap/with_servicemap/with_mgrmap/with_pgmap
  // 以及 with_osdmap_and_pgmap 会直接传递这些保存的变量作为回调函数的参数

  // Populate PGs in ClusterState
  cluster_state.with_osdmap_and_pgmap([this](const OSDMap &osd_map, const PGMap& pg_map) {
    // 根据 OSDMap 生成 PGMap(ClusterState::pg_map)
    cluster_state.notify_osdmap(osd_map);
  });

  // Wait for FSMap
  dout(4) << "waiting for FSMap..." << dendl;
  while (!cluster_state.have_fsmap()) { // 等待 FSMap
    fs_map_cond.Wait(lock);
  }

  // Wait for MgrDigest...
  dout(4) << "waiting for MgrDigest..." << dendl;
  while (!digest_received) { // 等待 Mgr::handle_mgr_digest 处理 MMgrDigest 消息完成
    digest_cond.Wait(lock);
  }

  // Load module KV store
  // 向 mon 请求所有的 ConfigKeyService 中保存的配置数据,过滤并得到所有 "mgr/" 和 "device/" 类型的配置数据,
  // 如果是 device/ 数据,创建对应的 DeviceState 实例并保存在 DaemonStateIndex::devices 中,如果是普通的配置数据,
  // 则直接保存在临时变量 kv_store 中(仅用于兼容性处理,见下面的 upgrade_config 调用)
  auto kv_store = load_store();

  // Migrate config from KV store on luminous->mimic
  // drop lock because we do blocking config sets to mon
  lock.Unlock();
  // L -> M 升级的兼容性处理,L 版本的 mgr Python 扩展模块配置数据保存在 mon ConfigKeyService 中,而升级到
  // M 版本之后要把 config-key 中的配置数据转成 mon ConfigMonitor 管理的配置数据
  // 升级过程会把配置数据设置到 ConfigMonitor 中,以后会通过 MgrStandby::init 在 monc 中注册的回调获取配置数据
  py_module_registry->upgrade_config(monc, kv_store);
  lock.Lock();

  // 遍历 mgr 扩展模块(由 PyModuleRegistry::init import 得到模块中派生的 MgrModule 子类并保存在 PyModuleRegistry::modules 中)
  // 并创建 MgrModule 子类实例
  py_module_registry->active_start(daemon_state, cluster_state,
      kv_store, *monc, clog, audit_clog, *objecter, *client,
      finisher, server);

  initializing = false;
  initialized = true;
}

PyModuleRegistry::active_start 实现如下,主要逻辑为遍历所有通过 PyModuleRegistry::init import 的扩展模块,创建 MgrModule 子类实例并启动 serve 线程:

void PyModuleRegistry::active_start(
            DaemonStateIndex &ds, ClusterState &cs,
            const std::map<std::string, std::string> &kv_store,
            MonClient &mc, LogChannelRef clog_, LogChannelRef audit_clog_,
            Objecter &objecter_, Client &client_, Finisher &f,
            DaemonServer &server)
{
  std::lock_guard locker(lock);

  if (standby_modules != nullptr) { // mgr 切换为 active,停止之前启动的 standby 扩展模块(从 MgrStandbyModule 派生的子类实例)
    standby_modules->shutdown();
    standby_modules.reset();
  }

  // 创建 ActivePyModules 实例,用于管理所有的 active 扩展模块(从 MgrModule 派生的子类实例)
  active_modules.reset(new ActivePyModules(module_config, kv_store, ds, cs, mc,
              clog_, audit_clog_, objecter_, client_, f, server));

  for (const auto &i : modules) {
    // Anything we're skipping because of !can_run will be flagged
    // to the user separately via get_health_checks
    if (!(i.second->is_enabled() && i.second->is_loaded())) {
      continue;
    }

    // 创建 ActivePyModule 实例并保存在 ActivePyModule::modules 中,通过 ActivePyModule 实例创建 MgrModule 子类实例
    // 并启动 serve 线程(线程执行的函数是子类实例的 serve 方法)
    int r = active_modules->start_one(i.second);
  }
}

ActivePyModule::start_one 实现如下:

int ActivePyModules::start_one(PyModuleRef py_module)
{
  std::lock_guard l(lock);

  modules[py_module->get_name()].reset(new ActivePyModule(py_module, clog));

  auto active_module = modules.at(py_module->get_name()).get();

  // 注意与 PyModuleRegistry::init 中调用的 PyModule::load 方法区分开来,前者是 import 得到 MgrModule 子类(PyModule::pClass),
  // 这里是基于得到的子类创建子类实例
  int r = active_module->load(this);

  // 创建 serve 线程,线程函数 PyModuleRunner::serve 调用 MgrModule 子类定义的 serve 方法对外提供服务
  // 注意 ActivePyModule 是从 PyModuleRunner 派生的子类,因此线程函数是 PyModuleRunner::serve
  active_module->thread.create(active_module->get_thread_name());
}

消息处理

由于 active/standby 状态的原因,消息处理也有两种不同的情况。

standby 状态消息处理

MgrStandby 的消息处理逻辑如下:

bool MgrStandby::ms_dispatch(Message *m)
{
  std::lock_guard l(lock);

  if (m->get_type() == MSG_MGR_MAP) {
    handle_mgr_map(static_cast<MMgrMap*>(m)); // 处理 mgrmap,进行 active/non-active 状态切换
  }
  bool handled = false;
  if (active_mgr) {
    auto am = active_mgr;
    lock.Unlock();
    // 当前为 active 状态,处理 MMonMap、MOSDMap、MMgrDigest、MServiceMap、MFSMap、MLog 消息
    handled = am->ms_dispatch(m);
    lock.Lock();
  }
  if (m->get_type() == MSG_MGR_MAP) {
    // let this pass through for mgrc
    handled = false;
  }
  return handled;
}

显然,在 standby 状态下,MgrStandby 只能处理 MMgrMap 消息,且 MMgrMap 消息在处理完之后还会给 mgrc 去处理(前面提到过 mgrc 会作为最后一个 handler 挂载到 MgrStandby::client_messenger 的最后)。

正是对 MMgrMap 消息的处理,导致 mgr 进程从 standby 切换成 active 或者从 active 切换成 standby:

void MgrStandby::handle_mgr_map(MMgrMap* mmap) {
  auto &map = mmap->get_map();
  const bool active_in_map = map.active_gid == monc.get_global_id();

  // PyModuleRegistry may ask us to respawn if it sees that
  // this MgrMap is changing its set of enabled modules
  bool need_respawn = py_module_registry.handle_mgr_map(map);
  if (need_respawn) { // 启用或禁用了新的 Python 模块,重启 ceph-mgr 进程,进程 pid 不变
    respawn();
  }

  if (active_in_map) {
    if (!active_mgr) { // 从非 active 切换成 active
      // 新建 Mgr 实例用于 active 状态下的业务消息处理,Mgr 实例将拿到第一张 mgrmap
      active_mgr.reset(new Mgr(&monc, map, &py_module_registry, client_messenger.get(),
              &objecter, &client, clog, audit_clog));
      active_mgr->background_init(new FunctionContext([this](int r) {
          // Advertise our active-ness ASAP instead of waiting for
          // next tick.
          std::lock_guard l(lock);
          send_beacon();
        }));
      dout(1) << "I am now activating" << dendl;
    } else {
      dout(10) << "I was already active" << dendl;
      bool need_respawn = active_mgr->got_mgr_map(map);
      if (need_respawn) { // 启用或禁用了新的 Python 模块,重启 ceph-mgr 进程,进程 pid 不变
        respawn();
      }
    }

    if (!available_in_map && map.get_available()) {
      dout(4) << "Map now says I am available" << dendl;
      available_in_map = true;
    }
  } else if (active_mgr != nullptr) { // 从 active 切换成非 active,重启 ceph-mgr 进程,进程 pid 不变
    derr << "I was active but no longer am" << dendl;
    respawn();
  } else {
    if (map.active_gid != 0 && map.active_name != g_conf()->name.get_id()) {
      // I am the standby and someone else is active, start modules
      // in standby mode to do redirects if needed
      if (!py_module_registry.is_standby_running()) {
        // 遍历 mgr 扩展模块创建 MgrStandbyModule 子类实例及 serve 线程,具体实现与 PyModuleRegistry::active_start 类似,
        // 因此可以参考下面对 PyModuleRegistry::active_start 的分析
        py_module_registry.standby_start(monc);
      }
    }
  }
}

active 状态消息处理

在 active 状态下,除了由 MgrStandby::ms_dispatch 继续处理 MMgrMap 消息外,其它的消息传递给 Mgr 实例进行处理:

bool Mgr::ms_dispatch(Message *m)
{
  std::lock_guard l(lock);
  switch (m->get_type()) {
    case MSG_MGR_DIGEST:
      handle_mgr_digest(static_cast<MMgrDigest*>(m));
      break;
    case CEPH_MSG_MON_MAP:
      py_module_registry->notify_all("mon_map", ""); //
      m->put();
      break;
    case CEPH_MSG_FS_MAP:
      py_module_registry->notify_all("fs_map", "");
      handle_fs_map((MFSMap*)m);
      return false; // I shall let this pass through for Client
      break;
    case CEPH_MSG_OSD_MAP:
      handle_osd_map();

      py_module_registry->notify_all("osd_map", "");

      // Continuous subscribe, so that we can generate notifications
      // for our MgrPyModules
      objecter->maybe_request_map();
      m->put();
      break;
    case MSG_SERVICE_MAP:
      handle_service_map(static_cast<MServiceMap*>(m));
      py_module_registry->notify_all("service_map", "");
      m->put();
      break;
    case MSG_LOG:
      handle_log(static_cast<MLog *>(m));
      break;
    default:
      return false;
  }
  return true;
}

注意 MMonMap、MOSDMap 消息实际上是由 Mgr 实例的指针成员变量 monc 和 objecter 处理的,Mgr 实例在这里只是间接的使用这些 map,Mgr 实例自身直接处理 MMgrDigest、MFSMap、MServiceMap 以及 MLog 等 4 个消息。

PyModuleRegistry::notify_all 通知扩展模块,中间过程调用的是 ActivePyModules::notify_all,因此最终只调用了 MgrModule 子类的 notify 方法。

参考资料

Initialization, Finalization, and Threads

https://docs.python.org/2/c-api/init.html

Importing Modules

https://docs.python.org/2/c-api/import.html

Module Objects

https://docs.python.org/2/c-api/module.html