runsisi's

technical notes

Cinder RBD 驱动协程

2020-02-26 runsisi#ceph#virt

最近在定位一个 Cinder 批量创建 Ceph 后端云盘耗时长的问题,其实严格来说不是绝对时间过长,而是社区原生 RBD 驱动相比 RESTful 接口封装的驱动要慢。

简单分析如下:

  1. cinder-volume 是一个单线程程序,以 eventlet 协程方式驱动运行,每个外部请求通过 eventlet.wsgi.server 创建一个对应的协程进行处理(注意与 Golang 协程的差异);
  2. 为了支持在调用 socketos 等模块的同步调用方法(等待)时协程能够正常切换,cinder-volume 会调用 eventlet.monkey_patch 进行 monkey patch;
  3. RBD 驱动所使用的 rbd 模块不是原生 Python 代码,它的底层仍然是 C/C++ 调用,monkey patch 对 C/C++ 扩展模块无效;
  4. 为了规避 rbd 模块无法 monkey patch 的问题,RBD 驱动在一些耗时的操作中使用了 tpool 线程池;
  5. 但仍有一些 RBD 方法(包括 RADOS 方法)在协程上下文运行,在调用这些方法时,所有的协程串行使用同一个主线程,当并发的卷创建操作比较集中时,性能影响会非常明显;
  6. RESTful 驱动使用 httplib 模块发送请求,其内部使用的是 Python 的 socket 库,monkey patch 对其(同时包括驱动中用到的 time 等模块)生效(requests 类似);
  7. 批量操作对单个操作的时延增长并不敏感,理论上即使每个卷创建操作时延从 1 秒增长到 30 秒,整个批量虚机创建过程所需时间仅增长 29 秒;
  8. 但协程中无法 monkey patch 且没有通过 tpool 实现异步执行的调用会使得并发操作(部分)串行化, 从而对整个批量创建过程所需时间影响比较大;

因此,对于 cinder-volume 的批量建卷请求:

  1. 使用原生 RBD 驱动会使得协程的调度出现串行化,容易形成瓶颈;
  2. 使用 RESTful 驱动,当底层调用未返回时,协程能够切换出去处理另外一个并发的请求,实现真正的并发;
  3. 通过将 RBD 驱动中的 RADOSClientRBDVolumeProxy 以及 rbd.Image__init__ 方法放到 tpool 中执行,RBD 驱动比 RESTful 驱动性能稍好。

模拟代码

使用 tpool.Proxy 代理对象实例请求:

#!/usr/bin/env python

import eventlet
from eventlet import tpool
eventlet.monkey_patch(time=False)

import rados
import rbd
import random
import time
import threading

pool = eventlet.GreenPool(1024)

# refer to eventlet/tpool.py
# export EVENTLET_THREADPOOL_SIZE=20


class RADOSClient(object):
    def __init__(self, pool):
        self.pool = pool
        self.cluster = None
        self.ioctx = None

    def __enter__(self):
        self.cluster, self.ioctx = self._connect_to_rados(self.pool)
        return self

    def __exit__(self, type_, value, traceback):
        self._disconnect_from_rados(self.cluster, self.ioctx)

    def _connect_to_rados(self, pool):
        client = rados.Rados(
            rados_id='cinder',
            conffile='/etc/ceph/ceph.conf')

        try:
            client.connect()
            ioctx = client.open_ioctx(pool)
            return client, ioctx
        except rados.Error:
            msg = _("Error connecting to ceph cluster.")
            print(msg)
            client.shutdown()

    def _disconnect_from_rados(self, client, ioctx):
        # closing an ioctx cannot raise an exception
        ioctx.close()
        client.shutdown()


class XXX(object):
    def clone(*args, **kwargs):
        c_name = args[5]
        print('begin clone {}'.format(c_name))
        start = time.time()
        time.sleep(0.5)
        end = time.time()
        print('end clone {}'.format(c_name))
        print('clone {0} elapsed: {1}'.format(c_name, end - start))


def RADOSProxy(pool):
    return tpool.Proxy(RADOSClient(pool))


def RBDProxy():
    #return tpool.Proxy(rbd.RBD())
    return tpool.Proxy(XXX())


def create_cloned_volume(pool, p_name, p_snapname, c_name):
    #client = RADOSClient(pool)
    #if True:
    #with RADOSClient(pool) as client:
    with RADOSProxy(pool) as client:
        start = time.time()
        RBDProxy().clone(client.ioctx, p_name, p_snapname,
                         client.ioctx, c_name, nonblocking=False)
        end = time.time()
        print('clone {0} elapsed(includes waiting): {1}'.format(c_name, end - start))


def process_request(i):
    print('coroutine: {0}'.format(i))
    child_name = 'c{0}'.format(i)
    create_cloned_volume('volumes', 'i1', 's1', child_name)


if __name__ == '__main__':
    pool.spawn_n(process_request, 100000)
    pool.waitall()

    for i in range(0, 21):
        pool.spawn(process_request, i)
        r = random.randint(1, 10)
        eventlet.sleep(0.01 * r)
    pool.waitall()

使用 tpool.execute 代理 __init__ 请求:

#!/usr/bin/env python

import eventlet
from eventlet import tpool
eventlet.monkey_patch(time=False)

import rados
import rbd
import random
import time
import threading

pool = eventlet.GreenPool(1024)

# refer to eventlet/tpool.py
# export EVENTLET_THREADPOOL_SIZE=20


def new(x):
    try:
        import types
        _new = types.InstanceType
        _x = _new(x)
        return _x
    except (AttributeError, TypeError):
        _new = lambda cls: cls.__new__(cls)
        _x = _new(x)
        return _x


class RADOSClient(object):
    def __init__(self, *args, **kwargs):
        tmp = new(_RADOSClient)
        tpool.execute(tmp.__init__, *args, **kwargs)
        self.proxy = tpool.Proxy(tmp)

    def __enter__(self):
        return self.proxy

    def __exit__(self, type_, value, traceback):
        self.proxy.__exit__(type_, value, traceback)


class _RADOSClient(object):
    def __init__(self, pool):
        self.cluster, self.ioctx = self._connect_to_rados(pool)

    def __enter__(self):
        return self

    def __exit__(self, type_, value, traceback):
        self._disconnect_from_rados(self.cluster, self.ioctx)

    def _connect_to_rados(self, pool):
        client = rados.Rados(
            rados_id='cinder',
            conffile='/etc/ceph/zyc-ceph.conf')

        try:
            client.connect()
            ioctx = client.open_ioctx(pool)
            return client, ioctx
        except rados.Error:
            msg = _("Error connecting to ceph cluster.")
            print(msg)
            client.shutdown()

    def _disconnect_from_rados(self, client, ioctx):
        # closing an ioctx cannot raise an exception
        ioctx.close()
        client.shutdown()


class XXX(object):
    def clone(*args, **kwargs):
        c_name = args[5]
        print('begin clone {}'.format(c_name))
        start = time.time()
        time.sleep(0.5)
        end = time.time()
        print('end clone {}'.format(c_name))
        print('clone {0} elapsed: {1}'.format(c_name, end - start))


def RADOSProxy(pool):
    return tpool.Proxy(RADOSClient(pool))


def RBDProxy():
    #return tpool.Proxy(rbd.RBD())
    return tpool.Proxy(XXX())


def create_cloned_volume(pool, p_name, p_snapname, c_name):
    #client = RADOSClient(pool)
    #if True:
    #with RADOSClient(pool) as client:
    with RADOSProxy(pool) as client:
        start = time.time()
        RBDProxy().clone(client.ioctx, p_name, p_snapname,
                         client.ioctx, c_name, nonblocking=False)
        end = time.time()
        print('clone {0} elapsed(includes waiting): {1}'.format(c_name, end - start))


def ImageProxy(ioctx, name):
    tmp = new(rbd.Image)
    tpool.execute(tmp.__init__, ioctx, name)
    return tpool.Proxy(tmp)


def image_id(pool, name):
    with RADOSProxy(pool) as client:
        #vol = ImageProxy(client.ioctx, name)
        vol = rbd.Image(client.ioctx, name)
        print(vol.id())
        vol.close()


def process_request(i):
    print('coroutine: {0}'.format(i))
    child_name = 'c{0}'.format(i)
    image_id('volumes', 'i1')
    #create_cloned_volume('volumes', 'i1', 's1', child_name)


if __name__ == '__main__':
    pool.spawn_n(process_request, 100000)
    pool.waitall()

    for i in range(0, 0):
        pool.spawn(process_request, i)
        r = random.randint(1, 10)
        eventlet.sleep(0.01 * r)
    pool.waitall()

纯粹的模拟代码(不访问 Ceph 集群):

#!/usr/bin/env python

import eventlet
from eventlet import tpool
eventlet.monkey_patch(time=False)

import time
import random
import threading

pool = eventlet.GreenPool(1024)

# refer to eventlet/tpool.py
# export EVENTLET_THREADPOOL_SIZE=20

global process_start


class RADOSClient(object):
    def __init__(self, pool):
        self.pool = pool
        self.ioctx = None
        self.cluter = None

    def __enter__(self):
        time.sleep(0.1)
        return self

    def __exit__(self, type_, value, traceback):
        pass


class RBD(object):
    def clone(*args, **kwargs):
        c_name = args[5]
        start = time.time()
        global process_start
        print('begin clone {0}, @{1}'.format(c_name, start - process_start))
        time.sleep(0.5)
        end = time.time()
        print('end clone {0}'.format(c_name))
        print('clone {0} elapsed: {1}'.format(c_name, end - start))


def RADOSProxy(pool):
    return tpool.Proxy(RADOSClient(pool))


def RBDProxy():
    return tpool.Proxy(RBD())


def create_cloned_volume(pool, p_name, p_snapname, c_name):
    start = time.time()
    #with RADOSClient(pool) as client:
    with RADOSProxy(pool) as client:
        RBDProxy().clone(client.ioctx, p_name, p_snapname,
                         client.ioctx, c_name, nonblocking=False)
        end = time.time()
        print('clone {0} elapsed(includes waiting): {1}'.format(c_name, end - start))


def process_request(i):
    start = time.time()
    global process_start
    print('coroutine: {0}, @{1}'.format(i, start - process_start))
    child_name = 'c{0}'.format(i)
    create_cloned_volume('volumes', 'i1', 's1', child_name)


if __name__ == '__main__':
    global process_start
    process_start = time.time()

    pool.spawn_n(process_request, 100000)
    pool.waitall()

    process_start = time.time()

    for i in range(0, 50):
        pool.spawn(process_request, i)
        r = random.randint(1, 10)
        eventlet.sleep(0.01 * r)
    pool.waitall()

参考资料

OpenStack Threading model

https://docs.openstack.org/cinder/latest/contributor/threading.html

gevent + requests blocks when using socks

https://github.com/psf/requests/issues/3861

cinder-volume monkey patch

https://github.com/openstack/cinder/blob/queens-em/cinder/cmd/volume.py#L32

Greening The World

http://eventlet.net/doc/patching.html

How to detect gevent blocking with greenlet.settrace

https://www.rfk.id.au/blog/entry/detect-gevent-blocking-with-greenlet-settrace/

monkey patch解惑

https://zhuanlan.zhihu.com/p/37679547

Gevent/Eventlet monkey patching for DB drivers

https://stackoverflow.com/questions/14491400/gevent-eventlet-monkey-patching-for-db-drivers

gevent.monkey - Make the standard library cooperative

http://www.gevent.org/api/gevent.monkey.html

Comparing gevent to eventlet

https://blog.gevent.org/2010/02/27/why-gevent/

Understanding __new__ and __init__

https://spyhce.com/blog/understanding-new-and-init