Ceph mgr 传递 C++ 主程序指针给 C/C++ 扩展模块

ceph-mgr 是一个基于 C++ 构建的应用程序,其内嵌了 Python 解释器用于加载并运行 Python 插件(参考ceph-mgr 进程)。

ceph_module 是 ceph-mgr 对 Python 代码暴露的一个 C/C++ 扩展模块,其主要暴露了如下一些 Python 类:

class BaseMgrModule
class BaseMgrStandbyModule
class BasePyOSDMap
class BasePyOSDMapIncremental
class BasePyCRUSH

类实例的初始化函数分别如下:

BaseMgrModule_init
BaseMgrStandbyModule_init
BasePyOSDMap_init
BasePyOSDMapIncremental_init
BasePyCRUSH_init

实际上 ceph_module 并不直接对用户插件暴露,而是由 mgr_module 模块通过继承 ceph_module 模块中的类间接对用户插件暴露:

class MgrModule(ceph_module.BaseMgrModule)
class MgrStandbyModule(ceph_module.BaseMgrStandbyModule)
class OSDMap(ceph_module.BasePyOSDMap)
class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental)
class CRUSHMap(ceph_module.BasePyCRUSH)

各类的构造时机分别为:

ActivePyModule::load
StandbyPyModule::load
BaseMgrModule._ceph_get_osdmap -> ceph_get_osdmap -> ActivePyModules::get_osdmap / BasePyOSDMap._apply_incremental -> osdmap_apply_incremental
BasePyOSDMap._new_incremental -> osdmap_new_incremental
BasePyOSDMap._get_crush -> osdmap_get_crush

有时候,为了某些目的,可能需要向这些基类实例中暴露更多 C++ 主程序的信息,此时可以使用向构造函数传递主程序指针的方式进行实现,如下即为向 OSDMap 类实例传递主程序线程池的实现:

diff --git a/src/mgr/ActivePyModules.cc b/src/mgr/ActivePyModules.cc
index 7c67b38c1c..1e03b03e81 100644
--- a/src/mgr/ActivePyModules.cc
+++ b/src/mgr/ActivePyModules.cc
@@ -40,10 +40,10 @@
 ActivePyModules::ActivePyModules(PyModuleConfig const &config_,
           DaemonStateIndex &ds, ClusterState &cs,
 	  MonClient &mc, LogChannelRef clog_, Objecter &objecter_,
-          Client &client_, Finisher &f, DaemonServer &server)
+          Client &client_, Finisher &f, DaemonServer &server, ThreadPool &tp)
   : config_cache(config_), daemon_state(ds), cluster_state(cs),
     monc(mc), clog(clog_), objecter(objecter_), client(client_), finisher(f),
-    server(server), lock("ActivePyModules")
+    server(server), tp(tp), lock("ActivePyModules")
 {}

 ActivePyModules::~ActivePyModules() = default;
@@ -676,7 +676,7 @@ PyObject *ActivePyModules::get_context()
 PyObject *construct_with_capsule(
     const std::string &module_name,
     const std::string &clsname,
-    void *wrapped)
+    void *wrapped, void *wrapped2)
 {
   // Look up the OSDMap type which we will construct
   PyObject *module = PyImport_ImportModule(module_name.c_str());
@@ -700,6 +700,17 @@ PyObject *construct_with_capsule(

   // Construct the python OSDMap
   auto pArgs = PyTuple_Pack(1, wrapped_capsule);
+
+  decltype(wrapped_capsule) wrapped_capsule2;
+  if (wrapped2) {
+    Py_DECREF(pArgs);
+
+    wrapped_capsule2 = PyCapsule_New(wrapped2, nullptr, nullptr);
+    assert(wrapped_capsule2);
+
+    pArgs = PyTuple_Pack(2, wrapped_capsule, wrapped_capsule2);
+  }
+
   auto wrapper_instance = PyObject_CallObject(wrapper_type, pArgs);
   if (wrapper_instance == nullptr) {
     derr << "Failed to construct python OSDMap:" << dendl;
@@ -709,6 +720,10 @@ PyObject *construct_with_capsule(
   Py_DECREF(pArgs);
   Py_DECREF(wrapped_capsule);

+  if (wrapped2) {
+    Py_DECREF(wrapped_capsule2);
+  }
+
   Py_DECREF(wrapper_type);
   Py_DECREF(module);

@@ -728,7 +743,7 @@ PyObject *ActivePyModules::get_osdmap()
   }
   PyEval_RestoreThread(tstate);

-  return construct_with_capsule("mgr_module", "OSDMap", (void*)newmap);
+  return construct_with_capsule("mgr_module", "OSDMap", (void*)newmap, (void*)&tp);
 }

 void ActivePyModules::set_health_checks(const std::string& module_name,
diff --git a/src/mgr/ActivePyModules.h b/src/mgr/ActivePyModules.h
index 004cebcda5..17d0306fd3 100644
--- a/src/mgr/ActivePyModules.h
+++ b/src/mgr/ActivePyModules.h
@@ -21,6 +21,7 @@
 #include "osdc/Objecter.h"
 #include "client/Client.h"
 #include "common/LogClient.h"
+#include "common/WorkQueue.h"
 #include "mon/MgrMap.h"
 #include "mon/MonCommand.h"

@@ -45,7 +46,7 @@ class ActivePyModules
   Client   &client;
   Finisher &finisher;
   DaemonServer &server;
-
+  ThreadPool &tp;

   mutable Mutex lock{"ActivePyModules::lock"};

@@ -53,7 +54,7 @@ public:
   ActivePyModules(PyModuleConfig const &config_,
             DaemonStateIndex &ds, ClusterState &cs, MonClient &mc,
             LogChannelRef clog_, Objecter &objecter_, Client &client_,
-            Finisher &f, DaemonServer &server);
+            Finisher &f, DaemonServer &server, ThreadPool &tp);

   ~ActivePyModules();

diff --git a/src/mgr/Mgr.cc b/src/mgr/Mgr.cc
index 25cd658cb4..6dc19a25a9 100644
--- a/src/mgr/Mgr.cc
+++ b/src/mgr/Mgr.cc
@@ -57,6 +57,7 @@ Mgr::Mgr(MonClient *monc_, const MgrMap& mgrmap,
   cluster_state(monc, nullptr, mgrmap),
   server(monc, finisher, daemon_state, cluster_state, *py_module_registry,
          clog_, audit_clog_),
+  cpu_tp(g_ceph_context, "Mgr::cpu_tp", "cpu_tp", g_conf->mon_cpu_threads),
   clog(clog_),
   audit_clog(audit_clog_),
   initialized(false),
@@ -219,10 +220,12 @@ void Mgr::init()
     digest_cond.Wait(lock);
   }

+  cpu_tp.start();
+
   // assume finisher already initialized in background_init
   dout(4) << "starting python modules..." << dendl;
   py_module_registry->active_start(loaded_config, daemon_state, cluster_state, *monc,
-      clog, *objecter, *client, finisher, server);
+      clog, *objecter, *client, finisher, server, cpu_tp);

   dout(4) << "Complete." << dendl;
   initializing = false;
@@ -375,6 +378,8 @@ void Mgr::shutdown()
   // to touch references to the things we're about to tear down
   finisher.wait_for_empty();
   finisher.stop();
+
+  cpu_tp.stop();
 }

 void Mgr::handle_osd_map()
diff --git a/src/mgr/Mgr.h b/src/mgr/Mgr.h
index a4d8aad39f..637d7105f3 100644
--- a/src/mgr/Mgr.h
+++ b/src/mgr/Mgr.h
@@ -66,6 +66,8 @@ protected:

   DaemonServer server;

+  ThreadPool cpu_tp;
+
   LogChannelRef clog;
   LogChannelRef audit_clog;

diff --git a/src/mgr/PyModuleRegistry.cc b/src/mgr/PyModuleRegistry.cc
index b57df3a3f8..0319dff44f 100644
--- a/src/mgr/PyModuleRegistry.cc
+++ b/src/mgr/PyModuleRegistry.cc
@@ -342,7 +342,7 @@ void PyModuleRegistry::active_start(
             PyModuleConfig &config_,
             DaemonStateIndex &ds, ClusterState &cs, MonClient &mc,
             LogChannelRef clog_, Objecter &objecter_, Client &client_,
-            Finisher &f, DaemonServer &server)
+            Finisher &f, DaemonServer &server, ThreadPool &tp)
 {
   Mutex::Locker locker(lock);

@@ -357,7 +357,7 @@ void PyModuleRegistry::active_start(
   }

   active_modules.reset(new ActivePyModules(
-              config_, ds, cs, mc, clog_, objecter_, client_, f, server));
+              config_, ds, cs, mc, clog_, objecter_, client_, f, server, tp));

   for (const auto &i : modules) {
     dout(4) << "Starting " << i.first << dendl;
diff --git a/src/mgr/PyModuleRegistry.h b/src/mgr/PyModuleRegistry.h
index 7aafa09041..632d3e8853 100644
--- a/src/mgr/PyModuleRegistry.h
+++ b/src/mgr/PyModuleRegistry.h
@@ -22,6 +22,7 @@
 #include <memory>

 #include "common/LogClient.h"
+#include "common/WorkQueue.h"

 #include "ActivePyModules.h"
 #include "StandbyPyModules.h"
@@ -110,7 +111,7 @@ public:
                 PyModuleConfig &config_,
                 DaemonStateIndex &ds, ClusterState &cs, MonClient &mc,
                 LogChannelRef clog_, Objecter &objecter_, Client &client_,
-                Finisher &f, DaemonServer &server);
+                Finisher &f, DaemonServer &server, ThreadPool &tp);
   void standby_start(
       MonClient *monc);

diff --git a/src/mgr/PyOSDMap.cc b/src/mgr/PyOSDMap.cc
index b67f2a1474..6d491bde8f 100644
--- a/src/mgr/PyOSDMap.cc
+++ b/src/mgr/PyOSDMap.cc
@@ -6,6 +6,7 @@
 #include "osd/OSDMap.h"
 #include "common/errno.h"
 #include "common/version.h"
+#include "common/WorkQueue.h"
 #include "include/stringify.h"

 #include "PyOSDMap.h"
@@ -19,6 +20,7 @@
 typedef struct {
   PyObject_HEAD
   OSDMap *osdmap;
+  ThreadPool *tp;
 } BasePyOSDMap;

 typedef struct {
@@ -189,19 +191,24 @@ static int
 BasePyOSDMap_init(BasePyOSDMap *self, PyObject *args, PyObject *kwds)
 {
     PyObject *osdmap_capsule = nullptr;
-    static const char *kwlist[] = {"osdmap_capsule", NULL};
+    PyObject *tp_capsule = nullptr;
+    static const char *kwlist[] = {"osdmap_capsule", "tp_capsule", NULL};

-    if (! PyArg_ParseTupleAndKeywords(args, kwds, "O",
+    if (! PyArg_ParseTupleAndKeywords(args, kwds, "OO",
                                       const_cast<char**>(kwlist),
-                                      &osdmap_capsule)) {
+                                      &osdmap_capsule, &tp_capsule)) {
       assert(0);
         return -1;
     }
     assert(PyObject_TypeCheck(osdmap_capsule, &PyCapsule_Type));
+    assert(PyObject_TypeCheck(tp_capsule, &PyCapsule_Type));

     self->osdmap = (OSDMap*)PyCapsule_GetPointer(
         osdmap_capsule, nullptr);
+    self->tp = (ThreadPool*)PyCapsule_GetPointer(
+        tp_capsule, nullptr);
     assert(self->osdmap);
+    assert(self->tp);

     return 0;
 }
diff --git a/src/mgr/PyOSDMap.h b/src/mgr/PyOSDMap.h
index 09e5b041c8..dcf5ac46a5 100644
--- a/src/mgr/PyOSDMap.h
+++ b/src/mgr/PyOSDMap.h
@@ -16,5 +16,5 @@ extern PyTypeObject BasePyCRUSHType;
 PyObject *construct_with_capsule(
     const std::string &module,
     const std::string &clsname,
-    void *wrapped);
+    void *wrapped, void *wrapped2=nullptr);

显然,上面的关键代码只有两处:

@@ -728,7 +743,7 @@ PyObject *ActivePyModules::get_osdmap()
   }
   PyEval_RestoreThread(tstate);

-  return construct_with_capsule("mgr_module", "OSDMap", (void*)newmap);
+  return construct_with_capsule("mgr_module", "OSDMap", (void*)newmap, (void*)&tp);
 BasePyOSDMap_init(BasePyOSDMap *self, PyObject *args, PyObject *kwds)
 {
     PyObject *osdmap_capsule = nullptr;
-    static const char *kwlist[] = {"osdmap_capsule", NULL};
+    PyObject *tp_capsule = nullptr;
+    static const char *kwlist[] = {"osdmap_capsule", "tp_capsule", NULL};

-    if (! PyArg_ParseTupleAndKeywords(args, kwds, "O",
+    if (! PyArg_ParseTupleAndKeywords(args, kwds, "OO",
                                       const_cast<char**>(kwlist),
-                                      &osdmap_capsule)) {
+                                      &osdmap_capsule, &tp_capsule)) {
       assert(0);
         return -1;
     }
     assert(PyObject_TypeCheck(osdmap_capsule, &PyCapsule_Type));
+    assert(PyObject_TypeCheck(tp_capsule, &PyCapsule_Type));

     self->osdmap = (OSDMap*)PyCapsule_GetPointer(
         osdmap_capsule, nullptr);
+    self->tp = (ThreadPool*)PyCapsule_GetPointer(
+        tp_capsule, nullptr);

在构造类实例时,我们将线程池指针通过 args 参数(PyTuple_Pack)传递给类的初始化函数(BasePyOSDMap_init),然后在初始化函数中通过 PyArg_ParseTupleAndKeywordsargskwargs 参数解析出来,然后转回 C++ 指针以备后续使用。显然,这里使用 PyArg_ParseTuple 就够了,因为我们并没有传递 kwargs 进来,不过需要注意的是,在使用 PyArg_ParseTupleAndKeywordskwlist 中的关键字参数需要包括 args 中的位置参数。

参考资料

Extending Python with C or C++

https://docs.python.org/2.7/extending/extending.html

Parsing arguments and building values

https://docs.python.org/2.7/c-api/arg.html

Fancy Argument Parsing

https://llllllllll.github.io/c-extension-tutorial/fancy-argument-parsing.html


最后修改于 2019-12-16

- 目录 -