
0x00 前言



0x01 使用案例


import ctypes
import sys
import threading
import time

def gettid():
    if sys.platform == 'linux2':
        return ctypes.CDLL('').syscall(186)
        return ctypes.windll.kernel32.GetCurrentThreadId()

def thread_test(index):
    time0 = time.time()
    while time.time() - time0 < 1:
        print('I\'m thread %d: %d %d' % (index, threading.current_thread().ident, gettid()))

thread1 = threading.Thread(target=thread_test, args=(1,))

thread2 = threading.Thread(target=thread_test, args=(2,))
print('Main thread sleep')
print('Main thread exit')


I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
Main thread sleep
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
Main thread exit


from gevent import monkey


I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
Main thread sleep
Main thread exit


还有一点需要注意的地方:Main thread sleep这句输出在前后两次执行是不同的。

0x02 原理分析


def patch_thread(threading=True, _threading_local=True, Event=True, logging=True,
    if threading:
        threading_mod = __import__('threading')
        # Capture the *real* current thread object before
        # we start returning DummyThread objects, for comparison
        # to the main thread.
        orig_current_thread = threading_mod.current_thread()
        threading_mod = None
        gevent_threading_mod = None
        orig_current_thread = None

    gevent_thread_mod, thread_mod = _patch_module('thread',
                                                  _warnings=_warnings, _notify_did_subscribers=False)

    if threading:
        gevent_threading_mod, _ = _patch_module('threading',
                                                _warnings=_warnings, _notify_did_subscribers=False)

        if Event:
            from gevent.event import Event
            patch_item(threading_mod, 'Event', Event)
            # Python 2 had `Event` as a function returning
            # the private class `_Event`. Some code may be relying
            # on that.
            if hasattr(threading_mod, '_Event'):
                patch_item(threading_mod, '_Event', Event)

        if existing_locks:

        if logging and 'logging' in sys.modules:
            logging = __import__('logging')
            patch_item(logging, '_lock', threading_mod.RLock())
            for wr in logging._handlerList:
                # In py26, these are actual handlers, not weakrefs
                handler = wr() if callable(wr) else wr
                if handler is None:
                if not hasattr(handler, 'lock'):
                    raise TypeError("Unknown/unsupported handler %r" % handler)
                handler.lock = threading_mod.RLock()

    if _threading_local:
        _threading_local = __import__('_threading_local')
        from gevent.local import local
        patch_item(_threading_local, 'local', local)

    def make_join_func(thread, thread_greenlet):
        from gevent.hub import sleep
        from time import time

        def join(timeout=None):
            end = None
            if threading_mod.current_thread() is thread:
                raise RuntimeError("Cannot join current thread")
            if thread_greenlet is not None and thread_greenlet.dead:
            if not thread.is_alive():

            if timeout:
                end = time() + timeout

            while thread.is_alive():
                if end is not None and time() > end:
        return join

    if threading:
        from gevent.threading import main_native_thread

        for thread in threading_mod._active.values():
            if thread == main_native_thread():
            thread.join = make_join_func(thread, None)

    if sys.version_info[:2] >= (3, 4):

        # Issue 18808 changes the nature of Thread.join() to use
        # locks. This means that a greenlet spawned in the main thread
        # (which is already running) cannot wait for the main thread---it
        # hangs forever. We patch around this if possible. See also
        # gevent.threading.
        greenlet = __import__('greenlet')

        if orig_current_thread == threading_mod.main_thread():
            main_thread = threading_mod.main_thread()
            _greenlet = main_thread._greenlet = greenlet.getcurrent()

            main_thread.join = make_join_func(main_thread, _greenlet)

            # Patch up the ident of the main thread to match. This
            # matters if threading was imported before monkey-patching
            # thread
            oldid = main_thread.ident
            main_thread._ident = threading_mod.get_ident()
            if oldid in threading_mod._active:
                threading_mod._active[main_thread.ident] = threading_mod._active[oldid]
            if oldid != main_thread.ident:
                del threading_mod._active[oldid]
            _queue_warning("Monkey-patching not on the main thread; "
                           "threading.main_thread().join() will hang from a greenlet",

    from gevent import events
    _notify_patch(events.GeventDidPatchModuleEvent('thread', gevent_thread_mod, thread_mod))
    _notify_patch(events.GeventDidPatchModuleEvent('threading', gevent_threading_mod, threading_mod))



def __call_module_hook(gevent_module, name, module, items, _warnings):
    # This function can raise DoNotPatch on 'will'

    def warn(message):
        _queue_warning(message, _warnings)

    func_name = '_gevent_' + name + '_monkey_patch'
        func = getattr(gevent_module, func_name)
    except AttributeError:
        func = lambda *args: None

    func(module, items, warn)

def patch_item(module, attr, newitem):
    olditem = getattr(module, attr, _NONE)
    if olditem is not _NONE:
        saved.setdefault(module.__name__, {}).setdefault(attr, olditem)
    setattr(module, attr, newitem)

def patch_module(target_module, source_module, items=None,
    patch_module(target_module, source_module, items=None)

    Replace attributes in *target_module* with the attributes of the
    same name in *source_module*.

    The *source_module* can provide some attributes to customize the process:

    * ``__implements__`` is a list of attribute names to copy; if not present,
      the *items* keyword argument is mandatory.
    * ``_gevent_will_monkey_patch(target_module, items, warn, **kwargs)``
    * ``_gevent_did_monkey_patch(target_module, items, warn, **kwargs)``
      These two functions in the *source_module* are called *if* they exist,
      before and after copying attributes, respectively. The "will" function
      may modify *items*. The value of *warn* is a function that should be called
      with a single string argument to issue a warning to the user. If the "will"
      function raises :exc:``, no patching will be done. These functions
      are called before any event subscribers or plugins.

    :keyword list items: A list of attribute names to replace. If
       not given, this will be taken from the *source_module* ``__implements__``
    :return: A true value if patching was done, a false value if patching was canceled.

    .. versionadded:: 1.3b1
    from gevent import events

    if items is None:
        items = getattr(source_module, '__implements__', None)
        if items is None:
            raise AttributeError('%r does not have __implements__' % source_module)

        __call_module_hook(source_module, 'will', target_module, items, _warnings)
            events.GeventWillPatchModuleEvent(target_module.__name__, source_module,
                                              target_module, items),
    except events.DoNotPatch:
        return False

    for attr in items:
        patch_item(target_module, attr, getattr(source_module, attr))

    __call_module_hook(source_module, 'did', target_module, items, _warnings)

    if _notify_did_subscribers:
        # We allow turning off the broadcast of the 'did' event for the benefit
        # of our internal functions which need to do additional work (besides copying
        # attributes) before their patch can be considered complete.
            events.GeventDidPatchModuleEvent(target_module.__name__, source_module,

    return True

def _patch_module(name, items=None, _warnings=None, _notify_did_subscribers=True):

    gevent_module = getattr(__import__('gevent.' + name), name)
    module_name = getattr(gevent_module, '__target__', name)
    target_module = __import__(module_name)

    patch_module(target_module, gevent_module, items=items,

    return gevent_module, target_module


  • 调用__call_module_hook执行目标模块中的_gevent_will_monkey_patch_gevent_did_monkey_patch方法

  • 调用patch_item将原始模块中的指定方法替换为gevent对应模块中的同名方法


0x03 threading模块patch分析


import threading as __threading__

def _gevent_will_monkey_patch(native_module, items, warn): # pylint:disable=unused-argument
    # Make sure the MainThread can be found by our current greenlet ID,
    # otherwise we get a new DummyThread, which cannot be joined.
    # Fixes tests in test_threading_2 under PyPy.
    main_thread = main_native_thread()
    if __threading__.current_thread() != main_thread:
        warn("Monkey-patching outside the main native thread. Some APIs "
             "will not be available. Expect a KeyError to be printed at shutdown.")

    if _get_ident() not in __threading__._active:
        main_id = main_thread.ident
        del __threading__._active[main_id]
        main_thread._ident = main_thread._Thread__ident = _get_ident()
        __threading__._active[_get_ident()] = main_thread


__implements__ = [


__implements__ = [


  |                        |
  | threading.Thread.start |
  |                        |
|                             |
| threading._start_new_thread |
|                             |
  |                         |
  | thread.start_new_thread |
  |                         |

上面的threading._start_new_threadthread.start_new_thread都是在gevent的__implements__列表中的。也就是说:这两个函数都被gevent hook了。

from gevent.thread import start_new_thread as _start_new_thread


def start_new_thread(function, args=(), kwargs=None):
    if kwargs is not None:
        greenlet = Greenlet.spawn(function, *args, **kwargs)
        greenlet = Greenlet.spawn(function, *args)
    return get_ident(greenlet)


0x04 总结

