gevent初探

0x00 前言

有很多Python语言的协程库,如:tornado、asyncio等。这些库在使用时需要使用特定的语法,如:async/await,对于非协程的代码需要改写才能以协程方式运行。

gevent是一个特殊的协程库,它可以将非协程代码以协程方式运行,从而起到提升性能的作用。本文尝试分析一下它的实现原理。

0x01 使用案例

先看以下这段代码:

import ctypes
import sys
import threading
import time

def gettid():
    if sys.platform == 'linux2':
        return ctypes.CDLL('libc.so.6').syscall(186)
    else:
        return ctypes.windll.kernel32.GetCurrentThreadId()

def thread_test(index):
    time0 = time.time()
    while time.time() - time0 < 1:
        print('I\'m thread %d: %d %d' % (index, threading.current_thread().ident, gettid()))
        time.sleep(0.1)

thread1 = threading.Thread(target=thread_test, args=(1,))
thread1.start()

thread2 = threading.Thread(target=thread_test, args=(2,))
thread2.start()
print('Main thread sleep')
time.sleep(2)
print('Main thread exit')

输出内容如下:

I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
Main thread sleep
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
Main thread exit

在这段代码前面加上以下代码:

from gevent import monkey
monkey.patch_thread()

输出如下:

I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
Main thread sleep
Main thread exit

可以看出,在加入gevent后,输出与之前有些不同,最大的区别是:两个线程具有相同的线程ID。也就是说,这两个线程其实是跑在同一个线程里的。

还有一点需要注意的地方:Main thread sleep这句输出在前后两次执行是不同的。

0x02 原理分析

来看下patch_thread函数的实现:

def patch_thread(threading=True, _threading_local=True, Event=True, logging=True,
                 existing_locks=True,
                 _warnings=None):
    if threading:
        threading_mod = __import__('threading')
        # Capture the *real* current thread object before
        # we start returning DummyThread objects, for comparison
        # to the main thread.
        orig_current_thread = threading_mod.current_thread()
    else:
        threading_mod = None
        gevent_threading_mod = None
        orig_current_thread = None

    gevent_thread_mod, thread_mod = _patch_module('thread',
                                                  _warnings=_warnings, _notify_did_subscribers=False)

    if threading:
        gevent_threading_mod, _ = _patch_module('threading',
                                                _warnings=_warnings, _notify_did_subscribers=False)

        if Event:
            from gevent.event import Event
            patch_item(threading_mod, 'Event', Event)
            # Python 2 had `Event` as a function returning
            # the private class `_Event`. Some code may be relying
            # on that.
            if hasattr(threading_mod, '_Event'):
                patch_item(threading_mod, '_Event', Event)

        if existing_locks:
            _patch_existing_locks(threading_mod)

        if logging and 'logging' in sys.modules:
            logging = __import__('logging')
            patch_item(logging, '_lock', threading_mod.RLock())
            for wr in logging._handlerList:
                # In py26, these are actual handlers, not weakrefs
                handler = wr() if callable(wr) else wr
                if handler is None:
                    continue
                if not hasattr(handler, 'lock'):
                    raise TypeError("Unknown/unsupported handler %r" % handler)
                handler.lock = threading_mod.RLock()

    if _threading_local:
        _threading_local = __import__('_threading_local')
        from gevent.local import local
        patch_item(_threading_local, 'local', local)

    def make_join_func(thread, thread_greenlet):
        from gevent.hub import sleep
        from time import time

        def join(timeout=None):
            end = None
            if threading_mod.current_thread() is thread:
                raise RuntimeError("Cannot join current thread")
            if thread_greenlet is not None and thread_greenlet.dead:
                return
            if not thread.is_alive():
                return

            if timeout:
                end = time() + timeout

            while thread.is_alive():
                if end is not None and time() > end:
                    return
                sleep(0.01)
        return join

    if threading:
        from gevent.threading import main_native_thread

        for thread in threading_mod._active.values():
            if thread == main_native_thread():
                continue
            thread.join = make_join_func(thread, None)

    if sys.version_info[:2] >= (3, 4):

        # Issue 18808 changes the nature of Thread.join() to use
        # locks. This means that a greenlet spawned in the main thread
        # (which is already running) cannot wait for the main thread---it
        # hangs forever. We patch around this if possible. See also
        # gevent.threading.
        greenlet = __import__('greenlet')

        if orig_current_thread == threading_mod.main_thread():
            main_thread = threading_mod.main_thread()
            _greenlet = main_thread._greenlet = greenlet.getcurrent()

            main_thread.join = make_join_func(main_thread, _greenlet)

            # Patch up the ident of the main thread to match. This
            # matters if threading was imported before monkey-patching
            # thread
            oldid = main_thread.ident
            main_thread._ident = threading_mod.get_ident()
            if oldid in threading_mod._active:
                threading_mod._active[main_thread.ident] = threading_mod._active[oldid]
            if oldid != main_thread.ident:
                del threading_mod._active[oldid]
        else:
            _queue_warning("Monkey-patching not on the main thread; "
                           "threading.main_thread().join() will hang from a greenlet",
                           _warnings)

    from gevent import events
    _notify_patch(events.GeventDidPatchModuleEvent('thread', gevent_thread_mod, thread_mod))
    _notify_patch(events.GeventDidPatchModuleEvent('threading', gevent_threading_mod, threading_mod))

首先,orig_current_thread变量存储了当前的线程对象,然后调用_patch_module函数patchthreadingthread模块。

_patch_module函数相关实现如下:

def __call_module_hook(gevent_module, name, module, items, _warnings):
    # This function can raise DoNotPatch on 'will'

    def warn(message):
        _queue_warning(message, _warnings)

    func_name = '_gevent_' + name + '_monkey_patch'
    try:
        func = getattr(gevent_module, func_name)
    except AttributeError:
        func = lambda *args: None


    func(module, items, warn)

def patch_item(module, attr, newitem):
    olditem = getattr(module, attr, _NONE)
    if olditem is not _NONE:
        saved.setdefault(module.__name__, {}).setdefault(attr, olditem)
    setattr(module, attr, newitem)

def patch_module(target_module, source_module, items=None,
                 _warnings=None,
                 _notify_did_subscribers=True):
    """
    patch_module(target_module, source_module, items=None)

    Replace attributes in *target_module* with the attributes of the
    same name in *source_module*.

    The *source_module* can provide some attributes to customize the process:

    * ``__implements__`` is a list of attribute names to copy; if not present,
      the *items* keyword argument is mandatory.
    * ``_gevent_will_monkey_patch(target_module, items, warn, **kwargs)``
    * ``_gevent_did_monkey_patch(target_module, items, warn, **kwargs)``
      These two functions in the *source_module* are called *if* they exist,
      before and after copying attributes, respectively. The "will" function
      may modify *items*. The value of *warn* is a function that should be called
      with a single string argument to issue a warning to the user. If the "will"
      function raises :exc:`gevent.events.DoNotPatch`, no patching will be done. These functions
      are called before any event subscribers or plugins.

    :keyword list items: A list of attribute names to replace. If
       not given, this will be taken from the *source_module* ``__implements__``
       attribute.
    :return: A true value if patching was done, a false value if patching was canceled.

    .. versionadded:: 1.3b1
    """
    from gevent import events

    if items is None:
        items = getattr(source_module, '__implements__', None)
        if items is None:
            raise AttributeError('%r does not have __implements__' % source_module)

    try:
        __call_module_hook(source_module, 'will', target_module, items, _warnings)
        _notify_patch(
            events.GeventWillPatchModuleEvent(target_module.__name__, source_module,
                                              target_module, items),
            _warnings)
    except events.DoNotPatch:
        return False

    for attr in items:
        patch_item(target_module, attr, getattr(source_module, attr))

    __call_module_hook(source_module, 'did', target_module, items, _warnings)

    if _notify_did_subscribers:
        # We allow turning off the broadcast of the 'did' event for the benefit
        # of our internal functions which need to do additional work (besides copying
        # attributes) before their patch can be considered complete.
        _notify_patch(
            events.GeventDidPatchModuleEvent(target_module.__name__, source_module,
                                             target_module)
        )

    return True

def _patch_module(name, items=None, _warnings=None, _notify_did_subscribers=True):

    gevent_module = getattr(__import__('gevent.' + name), name)
    module_name = getattr(gevent_module, '__target__', name)
    target_module = __import__(module_name)

    patch_module(target_module, gevent_module, items=items,
                 _warnings=_warnings,
                 _notify_did_subscribers=_notify_did_subscribers)

    return gevent_module, target_module

这里比较关键的逻辑有两个:

  • 调用__call_module_hook执行目标模块中的_gevent_will_monkey_patch_gevent_did_monkey_patch方法

  • 调用patch_item将原始模块中的指定方法替换为gevent对应模块中的同名方法

可以看出,patch的关键逻辑就是由patch_item实现的,具体要patch的对象是由gevent模块中的__implements__列表指定的。

0x03 threading模块patch分析

对于threading模块,对应模块就是gevent.threading,里面实现了_gevent_will_monkey_patch方法,在hook前做一些准备工作。

import threading as __threading__

def _gevent_will_monkey_patch(native_module, items, warn): # pylint:disable=unused-argument
    # Make sure the MainThread can be found by our current greenlet ID,
    # otherwise we get a new DummyThread, which cannot be joined.
    # Fixes tests in test_threading_2 under PyPy.
    main_thread = main_native_thread()
    if __threading__.current_thread() != main_thread:
        warn("Monkey-patching outside the main native thread. Some APIs "
             "will not be available. Expect a KeyError to be printed at shutdown.")
        return

    if _get_ident() not in __threading__._active:
        main_id = main_thread.ident
        del __threading__._active[main_id]
        main_thread._ident = main_thread._Thread__ident = _get_ident()
        __threading__._active[_get_ident()] = main_thread

gevent.threading模块中要patch的列表是:

__implements__ = [
    'local',
    '_start_new_thread',
    '_allocate_lock',
    'Lock',
    '_get_ident',
    '_sleep',
    '_DummyThread',
]

而在gevent.thread模块中则是:

__implements__ = [
    'allocate_lock',
    'get_ident',
    'exit',
    'LockType',
    'stack_size',
    'start_new_thread',
    '_local'
]

我们来看下threading模块在启动线程过程中做了哪些操作。

  +------------------------+
  |                        |
  | threading.Thread.start |
  |                        |
  +-----------+------------+
              |
              |
              v
+-------------+---------------+
|                             |
| threading._start_new_thread |
|                             |
+-------------+---------------+
              |
              |
              |
              v
  +-----------+-------------+
  |                         |
  | thread.start_new_thread |
  |                         |
  +-------------------------+

上面的threading._start_new_threadthread.start_new_thread都是在gevent的__implements__列表中的。也就是说:这两个函数都被gevent hook了。

from gevent.thread import start_new_thread as _start_new_thread

而由于gevent.threading._start_new_thread其实就是gevent.thread.start_new_thread,所以它们其实是同一个函数。

def start_new_thread(function, args=(), kwargs=None):
    if kwargs is not None:
        greenlet = Greenlet.spawn(function, *args, **kwargs)
    else:
        greenlet = Greenlet.spawn(function, *args)
    return get_ident(greenlet)

start_new_thread函数可以看出,启动线程逻辑已经被替换成了greenlet的实现,这也是为什么使用了patch_thread后,真实线程id相同的原因。

0x04 总结

gevent使用动态patch的方法,实现了动态将非协程库变成协程库的功能,在极少修改代码的前提下提升了程序的性能。但是,在上面线程的例子中可以看出,patch后程序的行为可能会有一些差异,所以在使用上还是需要小心。

分享