0x00 前言
有很多Python语言的协程库,如:tornado、asyncio等。这些库在使用时需要使用特定的语法,如:async/await,对于非协程的代码需要改写才能以协程方式运行。
gevent是一个特殊的协程库,它可以将非协程代码以协程方式运行,从而起到提升性能的作用。本文尝试分析一下它的实现原理。
0x01 使用案例
先看以下这段代码:
import ctypes
import sys
import threading
import time
def gettid():
if sys.platform == 'linux2':
return ctypes.CDLL('libc.so.6').syscall(186)
else:
return ctypes.windll.kernel32.GetCurrentThreadId()
def thread_test(index):
time0 = time.time()
while time.time() - time0 < 1:
print('I\'m thread %d: %d %d' % (index, threading.current_thread().ident, gettid()))
time.sleep(0.1)
thread1 = threading.Thread(target=thread_test, args=(1,))
thread1.start()
thread2 = threading.Thread(target=thread_test, args=(2,))
thread2.start()
print('Main thread sleep')
time.sleep(2)
print('Main thread exit')
输出内容如下:
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
Main thread sleep
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
I'm thread 1: 140540774946560 32347
I'm thread 2: 140540766553856 32348
Main thread exit
在这段代码前面加上以下代码:
from gevent import monkey
monkey.patch_thread()
输出如下:
I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 1: 21069936 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
I'm thread 2: 14522208 31623
Main thread sleep
Main thread exit
可以看出,在加入gevent后,输出与之前有些不同,最大的区别是:两个线程具有相同的线程ID
。也就是说,这两个线程其实是跑在同一个线程里的。
还有一点需要注意的地方:Main thread sleep
这句输出在前后两次执行是不同的。
0x02 原理分析
来看下patch_thread
函数的实现:
def patch_thread(threading=True, _threading_local=True, Event=True, logging=True,
existing_locks=True,
_warnings=None):
if threading:
threading_mod = __import__('threading')
# Capture the *real* current thread object before
# we start returning DummyThread objects, for comparison
# to the main thread.
orig_current_thread = threading_mod.current_thread()
else:
threading_mod = None
gevent_threading_mod = None
orig_current_thread = None
gevent_thread_mod, thread_mod = _patch_module('thread',
_warnings=_warnings, _notify_did_subscribers=False)
if threading:
gevent_threading_mod, _ = _patch_module('threading',
_warnings=_warnings, _notify_did_subscribers=False)
if Event:
from gevent.event import Event
patch_item(threading_mod, 'Event', Event)
# Python 2 had `Event` as a function returning
# the private class `_Event`. Some code may be relying
# on that.
if hasattr(threading_mod, '_Event'):
patch_item(threading_mod, '_Event', Event)
if existing_locks:
_patch_existing_locks(threading_mod)
if logging and 'logging' in sys.modules:
logging = __import__('logging')
patch_item(logging, '_lock', threading_mod.RLock())
for wr in logging._handlerList:
# In py26, these are actual handlers, not weakrefs
handler = wr() if callable(wr) else wr
if handler is None:
continue
if not hasattr(handler, 'lock'):
raise TypeError("Unknown/unsupported handler %r" % handler)
handler.lock = threading_mod.RLock()
if _threading_local:
_threading_local = __import__('_threading_local')
from gevent.local import local
patch_item(_threading_local, 'local', local)
def make_join_func(thread, thread_greenlet):
from gevent.hub import sleep
from time import time
def join(timeout=None):
end = None
if threading_mod.current_thread() is thread:
raise RuntimeError("Cannot join current thread")
if thread_greenlet is not None and thread_greenlet.dead:
return
if not thread.is_alive():
return
if timeout:
end = time() + timeout
while thread.is_alive():
if end is not None and time() > end:
return
sleep(0.01)
return join
if threading:
from gevent.threading import main_native_thread
for thread in threading_mod._active.values():
if thread == main_native_thread():
continue
thread.join = make_join_func(thread, None)
if sys.version_info[:2] >= (3, 4):
# Issue 18808 changes the nature of Thread.join() to use
# locks. This means that a greenlet spawned in the main thread
# (which is already running) cannot wait for the main thread---it
# hangs forever. We patch around this if possible. See also
# gevent.threading.
greenlet = __import__('greenlet')
if orig_current_thread == threading_mod.main_thread():
main_thread = threading_mod.main_thread()
_greenlet = main_thread._greenlet = greenlet.getcurrent()
main_thread.join = make_join_func(main_thread, _greenlet)
# Patch up the ident of the main thread to match. This
# matters if threading was imported before monkey-patching
# thread
oldid = main_thread.ident
main_thread._ident = threading_mod.get_ident()
if oldid in threading_mod._active:
threading_mod._active[main_thread.ident] = threading_mod._active[oldid]
if oldid != main_thread.ident:
del threading_mod._active[oldid]
else:
_queue_warning("Monkey-patching not on the main thread; "
"threading.main_thread().join() will hang from a greenlet",
_warnings)
from gevent import events
_notify_patch(events.GeventDidPatchModuleEvent('thread', gevent_thread_mod, thread_mod))
_notify_patch(events.GeventDidPatchModuleEvent('threading', gevent_threading_mod, threading_mod))
首先,orig_current_thread
变量存储了当前的线程对象,然后调用_patch_module
函数patch
了threading
和thread
模块。
_patch_module
函数相关实现如下:
def __call_module_hook(gevent_module, name, module, items, _warnings):
# This function can raise DoNotPatch on 'will'
def warn(message):
_queue_warning(message, _warnings)
func_name = '_gevent_' + name + '_monkey_patch'
try:
func = getattr(gevent_module, func_name)
except AttributeError:
func = lambda *args: None
func(module, items, warn)
def patch_item(module, attr, newitem):
olditem = getattr(module, attr, _NONE)
if olditem is not _NONE:
saved.setdefault(module.__name__, {}).setdefault(attr, olditem)
setattr(module, attr, newitem)
def patch_module(target_module, source_module, items=None,
_warnings=None,
_notify_did_subscribers=True):
"""
patch_module(target_module, source_module, items=None)
Replace attributes in *target_module* with the attributes of the
same name in *source_module*.
The *source_module* can provide some attributes to customize the process:
* ``__implements__`` is a list of attribute names to copy; if not present,
the *items* keyword argument is mandatory.
* ``_gevent_will_monkey_patch(target_module, items, warn, **kwargs)``
* ``_gevent_did_monkey_patch(target_module, items, warn, **kwargs)``
These two functions in the *source_module* are called *if* they exist,
before and after copying attributes, respectively. The "will" function
may modify *items*. The value of *warn* is a function that should be called
with a single string argument to issue a warning to the user. If the "will"
function raises :exc:`gevent.events.DoNotPatch`, no patching will be done. These functions
are called before any event subscribers or plugins.
:keyword list items: A list of attribute names to replace. If
not given, this will be taken from the *source_module* ``__implements__``
attribute.
:return: A true value if patching was done, a false value if patching was canceled.
.. versionadded:: 1.3b1
"""
from gevent import events
if items is None:
items = getattr(source_module, '__implements__', None)
if items is None:
raise AttributeError('%r does not have __implements__' % source_module)
try:
__call_module_hook(source_module, 'will', target_module, items, _warnings)
_notify_patch(
events.GeventWillPatchModuleEvent(target_module.__name__, source_module,
target_module, items),
_warnings)
except events.DoNotPatch:
return False
for attr in items:
patch_item(target_module, attr, getattr(source_module, attr))
__call_module_hook(source_module, 'did', target_module, items, _warnings)
if _notify_did_subscribers:
# We allow turning off the broadcast of the 'did' event for the benefit
# of our internal functions which need to do additional work (besides copying
# attributes) before their patch can be considered complete.
_notify_patch(
events.GeventDidPatchModuleEvent(target_module.__name__, source_module,
target_module)
)
return True
def _patch_module(name, items=None, _warnings=None, _notify_did_subscribers=True):
gevent_module = getattr(__import__('gevent.' + name), name)
module_name = getattr(gevent_module, '__target__', name)
target_module = __import__(module_name)
patch_module(target_module, gevent_module, items=items,
_warnings=_warnings,
_notify_did_subscribers=_notify_did_subscribers)
return gevent_module, target_module
这里比较关键的逻辑有两个:
调用
__call_module_hook
执行目标模块中的_gevent_will_monkey_patch
和_gevent_did_monkey_patch
方法调用
patch_item
将原始模块中的指定方法替换为gevent对应模块中的同名方法
可以看出,patch的关键逻辑就是由patch_item
实现的,具体要patch的对象是由gevent模块中的__implements__
列表指定的。
0x03 threading模块patch分析
对于threading
模块,对应模块就是gevent.threading
,里面实现了_gevent_will_monkey_patch
方法,在hook前做一些准备工作。
import threading as __threading__
def _gevent_will_monkey_patch(native_module, items, warn): # pylint:disable=unused-argument
# Make sure the MainThread can be found by our current greenlet ID,
# otherwise we get a new DummyThread, which cannot be joined.
# Fixes tests in test_threading_2 under PyPy.
main_thread = main_native_thread()
if __threading__.current_thread() != main_thread:
warn("Monkey-patching outside the main native thread. Some APIs "
"will not be available. Expect a KeyError to be printed at shutdown.")
return
if _get_ident() not in __threading__._active:
main_id = main_thread.ident
del __threading__._active[main_id]
main_thread._ident = main_thread._Thread__ident = _get_ident()
__threading__._active[_get_ident()] = main_thread
在gevent.threading
模块中要patch的列表是:
__implements__ = [
'local',
'_start_new_thread',
'_allocate_lock',
'Lock',
'_get_ident',
'_sleep',
'_DummyThread',
]
而在gevent.thread
模块中则是:
__implements__ = [
'allocate_lock',
'get_ident',
'exit',
'LockType',
'stack_size',
'start_new_thread',
'_local'
]
我们来看下threading
模块在启动线程过程中做了哪些操作。
+------------------------+
| |
| threading.Thread.start |
| |
+-----------+------------+
|
|
v
+-------------+---------------+
| |
| threading._start_new_thread |
| |
+-------------+---------------+
|
|
|
v
+-----------+-------------+
| |
| thread.start_new_thread |
| |
+-------------------------+
上面的threading._start_new_thread
和thread.start_new_thread
都是在gevent的__implements__
列表中的。也就是说:这两个函数都被gevent hook了。
from gevent.thread import start_new_thread as _start_new_thread
而由于gevent.threading._start_new_thread
其实就是gevent.thread.start_new_thread
,所以它们其实是同一个函数。
def start_new_thread(function, args=(), kwargs=None):
if kwargs is not None:
greenlet = Greenlet.spawn(function, *args, **kwargs)
else:
greenlet = Greenlet.spawn(function, *args)
return get_ident(greenlet)
从start_new_thread
函数可以看出,启动线程逻辑已经被替换成了greenlet
的实现,这也是为什么使用了patch_thread
后,真实线程id相同的原因。
0x04 总结
gevent使用动态patch的方法,实现了动态将非协程库变成协程库的功能,在极少修改代码的前提下提升了程序的性能。但是,在上面线程的例子中可以看出,patch后程序的行为可能会有一些差异,所以在使用上还是需要小心。