runsisi's

technical notes

qemu 协程

2019-03-02 runsisi#openstack#qemu

协程类似于线程,也是一种调度的实体,但不同之处在于线程由操作系统内核的调度器进行线程间的运行切换,而协程是一种用户态的调度实体,必须由其自身主动放弃执行实现执行逻辑在多个协程间的切换。

c/c++ 的协程根据具体的需求、条件限制有多种实现方式,业界也有比较成熟的开源库支持,如 libco libgo boost-coroutine,而 qemu 根据需求,也实现了自己的协程支持。

需要注意的是,协程自身的执行仍然是承载于系统线程之上的,在一个软件系统中必然存在许多同步调用(如 blocking socket),调用同步接口的线程会阻塞而被系统调度出去,当同步接口完成时线程会得到通知并再次得到调度,显然这对协程的调度而言是灾难性的,一个协程调用同步接口会导致其它协程完全得不到调度,当然前提是所有协程承载在同一个系统线程上,通常这个问题有多种解决方案,如将协程承载在多个系统线程之上(如 Go 语言的协程),或者 hook 常见的同步接口并在 hook 中实现协程切换(如 Python gevent 库的 monkey patch,以及 libco 和 libgo)。

基本原理

qemu 的协程依赖于 setjmp 和 ucontext 两个组件,其中 setjmp 实现协程间的切换,ucontext 实现协程栈空间的创建。其主要的API 如下:

sigsetjmp/siglongjmp
getcontext/setcontext/makecontext/swapcontext

qemu 协程

co

协程退出调度(不管是主动释放 cpu 还是协程函数执行完成导致协程终止)总是把控制权返回给协程的上一个调用者,主线程实际上并不是一个协程(只是一个普通的系统线程,虽然在代码里叫 leader 协程)。当控制权返回到 leader 时,说明当前所有协程已结束或协程在等待 io 等而主动释放 cpu。

测试代码

ucontext + jmp

/*
 * ucontext.cc
 *
 *  Created on: Aug 8, 2018
 *      Author: runsisi
 */

#include <stdio.h>
#include <stdlib.h>
#include <ucontext.h>
#include <setjmp.h>

/* The three contexts:
 *    (1) main_context1 : The point in main to which loop will return.
 *    (2) main_context2 : The point in main to which control from loop will
 *                        flow by switching contexts.
 *    (3) loop_context  : The point in loop to which control from main will
 *                        flow by switching contexts. */
ucontext_t main_context1, main_context2, loop_context;

sigjmp_buf env;

/* The iterator return value. */
volatile int i_from_iterator;

/* This is the iterator function. It is entered on the first call to
 * swapcontext, and loops from 0 to 9. Each value is saved in i_from_iterator,
 * and then swapcontext used to return to the main loop.  The main loop prints
 * the value and calls swapcontext to swap back into the function. When the end
 * of the loop is reached, the function exits, and execution switches to the
 * context pointed to by main_context1. */
void loop(
    ucontext_t *loop_context,
    ucontext_t *other_context,
    int *i_from_iterator)
{
    int i;

    for (i=0; i < 1; ++i) {
        /* Write the loop counter into the iterator return location. */
        *i_from_iterator = i;

        printf("before in for loop: %d\n", i);

        /* Save the loop context (this point in the code) into ''loop_context'',
         * and switch to other_context. */
        swapcontext(loop_context, other_context);

        printf("after in for loop: %d\n", i);
    }

    printf("exit for loop\n");

    siglongjmp(env, 1);

    /* The function falls through to the calling context with an implicit
     * ''setcontext(&loop_context->uc_link);'' */

    printf("exit loop context\n");
}

int main(void)
{
    /* The stack for the iterator function. */
    char iterator_stack[SIGSTKSZ];

    /* Flag indicating that the iterator has completed. */
    volatile int iterator_finished;

    getcontext(&loop_context);

    /* Initialise the iterator context. uc_link points to main_context1, the
     * point to return to when the iterator finishes. */
    loop_context.uc_link          = &main_context1;
    loop_context.uc_stack.ss_sp   = iterator_stack;
    loop_context.uc_stack.ss_size = sizeof(iterator_stack);

    /* Fill in loop_context so that it makes swapcontext start loop. The
     * (void (*)(void)) typecast is to avoid a compiler warning but it is
     * not relevant to the behaviour of the function. */
    makecontext(&loop_context, (void (*)(void)) loop,
        3, &loop_context, &main_context2, &i_from_iterator);

    /* Clear the finished flag. */
    iterator_finished = 0;

    /* Save the current context into main_context1. When loop is finished,
     * control flow will return to this point. */
    getcontext(&main_context1);

    if (!iterator_finished) {
        /* Set iterator_finished so that when the previous getcontext is
         * returned to via uc_link, the above if condition is false and the
         * iterator is not restarted. */
        iterator_finished = 1;

        int i = 0;
        while (i++ < 5) {
            if (!sigsetjmp(env, 0)) {
                printf("before sigsetjmp\n");
                /* Save this point into main_context2 and switch into the iterator.
                 * The first call will begin loop.  Subsequent calls will switch to
                 * the swapcontext in loop. */
                swapcontext(&main_context2, &loop_context);
                printf("after sigsetjmp\n");
            }

            printf("i_from_iterator = %d\n", i_from_iterator);
        }
    }

    return 0;
}

qemu coroutine

#include <string.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <setjmp.h>
#include <stdint.h>
#include <ucontext.h>

#ifndef container_of
#define container_of(ptr, type, member) ({                      \
        const typeof(((type *) 0)->member) *__mptr = (ptr);     \
        (type *) ((char *) __mptr - offsetof(type, member));})
#endif

/* Convert from a base type to a parent type, with compile time checking.  */
#ifdef __GNUC__
#define DO_UPCAST(type, field, dev) ( __extension__ ( { \
    char __attribute__((unused)) offset_must_be_zero[ \
        -offsetof(type, field)]; \
    container_of(dev, type, field);}))
#else
#define DO_UPCAST(type, field, dev) container_of(dev, type, field)
#endif

/**
 * Mark a function that executes in coroutine context
 *
 * Functions that execute in coroutine context cannot be called directly from
 * normal functions.  In the future it would be nice to enable compiler or
 * static checker support for catching such errors.  This annotation might make
 * it possible and in the meantime it serves as documentation.
 *
 * For example:
 *
 *   static void coroutine_fn foo(void) {
 *       ....
 *   }
 */
#define coroutine_fn

typedef struct Coroutine Coroutine;

/**
 * Coroutine entry point
 *
 * When the coroutine is entered for the first time, opaque is passed in as an
 * argument.
 *
 * When this function returns, the coroutine is destroyed automatically and
 * execution continues in the caller who last entered the coroutine.
 */
typedef void coroutine_fn CoroutineEntry(void *opaque);

struct Coroutine {
    CoroutineEntry *entry;
    void *entry_arg;
    Coroutine *caller;
};

struct CoroutineUContext {
    Coroutine base;
    void *stack;
    sigjmp_buf env;
};

typedef enum {
    COROUTINE_YIELD = 1,
    COROUTINE_TERMINATE = 2,
    COROUTINE_ENTER = 3,
} CoroutineAction;

/**
 * Per-thread coroutine bookkeeping
 */
static __thread CoroutineUContext leader;
static __thread Coroutine *current;

/*
 * va_args to makecontext() must be type 'int', so passing
 * the pointer we need may require several int args. This
 * union is a quick hack to let us do that
 */
union cc_arg {
    void *p;
    int i[2];
};

CoroutineAction qemu_coroutine_switch(Coroutine *from, Coroutine *to,
                                      CoroutineAction action);

static void coroutine_trampoline(int i0, int i1)
{
    union cc_arg arg;
    CoroutineUContext *self;
    Coroutine *co;

    arg.i[0] = i0;
    arg.i[1] = i1;
    self = (CoroutineUContext *)arg.p; // CoroutineUContext newly created by qemu_coroutine_new
    co = &self->base;

    /* Initialize longjmp environment and switch back the caller */
    if (!sigsetjmp(self->env, 0)) {
        siglongjmp(*(sigjmp_buf *)co->entry_arg, 1); // jump back to qemu_coroutine_new
    }

    while (true) {
        co->entry(co->entry_arg); // may call qemu_coroutine_yield to switch back to the caller

        // save longjmp env in co->env and jump to co->caller->env
        qemu_coroutine_switch(co, co->caller, COROUTINE_TERMINATE);
    }
}

Coroutine *qemu_coroutine_new(void)
{
    const size_t stack_size = 1 << 20;
    CoroutineUContext *co;
    ucontext_t old_uc, uc;
    sigjmp_buf old_env;
    union cc_arg arg = {0};

    /* The ucontext functions preserve signal masks which incurs a
     * system call overhead.  sigsetjmp(buf, 0)/siglongjmp() does not
     * preserve signal masks but only works on the current stack.
     * Since we need a way to create and switch to a new stack, use
     * the ucontext functions for that but sigsetjmp()/siglongjmp() for
     * everything else.
     */

    if (getcontext(&uc) == -1) {
        abort();
    }

    co = (CoroutineUContext *)malloc(sizeof(*co));
    memset(co, 0, sizeof(*co));
    co->stack = malloc(stack_size);
    co->base.entry_arg = &old_env; /* stash away our jmp_buf */

    uc.uc_link = &old_uc;
    uc.uc_stack.ss_sp = co->stack;
    uc.uc_stack.ss_size = stack_size;
    uc.uc_stack.ss_flags = 0;

    arg.p = co;

    makecontext(&uc, (void (*)(void))coroutine_trampoline,
                2, arg.i[0], arg.i[1]);

    /* swapcontext() in, siglongjmp() back out */
    if (!sigsetjmp(old_env, 0)) { // set longjmp env for coroutine_trampoline so it can jump back
        swapcontext(&old_uc, &uc); // execute coroutine_trampoline to set longjmp label for new coroutine
    }
    return &co->base;
}

void qemu_coroutine_delete(Coroutine *co_)
{
    CoroutineUContext *co = DO_UPCAST(CoroutineUContext, base, co_);

    free(co->stack);
    free(co);
}

/* This function is marked noinline to prevent GCC from inlining it
 * into coroutine_trampoline(). If we allow it to do that then it
 * hoists the code to get the address of the TLS variable "current"
 * out of the while() loop. This is an invalid transformation because
 * the sigsetjmp() call may be called when running thread A but
 * return in thread B, and so we might be in a different thread
 * context each time round the loop.
 */
CoroutineAction __attribute__((noinline))
qemu_coroutine_switch(Coroutine *from_, Coroutine *to_,
                      CoroutineAction action)
{
    CoroutineUContext *from = DO_UPCAST(CoroutineUContext, base, from_);
    CoroutineUContext *to = DO_UPCAST(CoroutineUContext, base, to_);
    int ret;

    current = to_;

    ret = sigsetjmp(from->env, 0);
    if (ret == 0) {
        // jump to while loop in coroutine_trampoline to execute the coroutine
        // function and then call qemu_coroutine_switch to switch back the caller
        siglongjmp(to->env, action);
    }
    return (CoroutineAction)ret;
}

Coroutine *qemu_coroutine_self(void)
{
    if (!current) {
        current = &leader.base;
    }
    return current;
}

bool qemu_in_coroutine(void)
{
    return current && current->caller;
}

Coroutine *qemu_coroutine_create(CoroutineEntry *entry)
{
    Coroutine *co = NULL;
    if (!co) {
        co = qemu_coroutine_new();
    }

    co->entry = entry;
    return co;
}

static void coroutine_delete(Coroutine *co)
{
    co->caller = NULL;

    qemu_coroutine_delete(co);
}

void qemu_coroutine_enter(Coroutine *co, void *opaque)
{
    Coroutine *self = qemu_coroutine_self();
    CoroutineAction ret;

    if (co->caller) {
        fprintf(stderr, "Co-routine re-entered recursively\n");
        abort();
    }

    co->caller = self;
    co->entry_arg = opaque;

    ret = qemu_coroutine_switch(self, co, COROUTINE_ENTER);

    // co function finished with ret == COROUTINE_TERMINATE or yield with
    // ret == COROUTINE_YIELD

    switch (ret) {
    case COROUTINE_YIELD:
        return;
    case COROUTINE_TERMINATE:
        coroutine_delete(co);
        return;
    default:
        abort();
    }
}

void coroutine_fn qemu_coroutine_yield(void)
{
    Coroutine *self = qemu_coroutine_self();
    Coroutine *to = self->caller;

    if (!to) {
        fprintf(stderr, "Co-routine is yielding to no one\n");
        abort();
    }

    self->caller = NULL;

    // switch back to caller
    qemu_coroutine_switch(self, to, COROUTINE_YIELD);
}


// -----------------------------------------------------------------------

void coroutine_fn test(void *p) {
    printf("before yield\n");
    qemu_coroutine_yield();
    printf("after yield\n");
}

int main() {
    Coroutine *co = qemu_coroutine_create(test);
    qemu_coroutine_enter(co, NULL);
    qemu_coroutine_enter(co, NULL);

    return 0;
}

参考资料

ucontext-人人都可以实现的简单协程库

https://blog.csdn.net/qq910894904/article/details/41911175

qemu核心机制分析-协程coroutine

http://www.cnblogs.com/VincentXu/p/3350389.html

QEMU学习笔记——协程

https://www.binss.me/blog/qemu-note-of-coroutine/

QEMU中的协程---qemu-coroutine

http://royluo.org/2016/06/24/qemu-coroutine/

Qemu中coroutine机制的实现

https://blog.csdn.net/LPSTC123/article/details/45009819