IOLoop
就是 tornado 的事件循环,是一个全局的单例,通过IOLoop.current()
或IOLoop.instance()
访问。IOLoop.current()
内部调用的也是instance()
方法。所以我们来看instance()
:
class IOLoop(Configurable):
...
# Global lock for creating global IOLoop instance
_instance_lock = threading.Lock()
_current = threading.local()
@staticmethod
def instance():
if not hasattr(IOLoop, "_instance"):
with IOLoop._instance_lock:
if not hasattr(IOLoop, "_instance"):
# New instance after double check
IOLoop._instance = IOLoop()
return IOLoop._instance
IOLoop
没有__init__()
函数,所以我们要去看它的父类Configurable
:
class Configurable(object):
__impl_class = None
__impl_kwargs = None
def __new__(cls, *args, **kwargs):
base = cls.configurable_base()
init_kwargs = {}
if cls is base:
impl = cls.configured_class()
if base.__impl_kwargs:
init_kwargs.update(base.__impl_kwargs)
else:
impl = cls
init_kwargs.update(kwargs)
instance = super(Configurable, cls).__new__(impl)
instance.initialize(*args, **init_kwargs)
return instance
IOLoop.confirable_base()
返回的就是IOLoop
自身,所以进入configured_class()
@classmethod
def configured_class(cls):
"""Returns the currently configured class."""
base = cls.configurable_base()
if cls.__impl_class is None:
base.__impl_class = cls.configurable_default()
return base.__impl_class
IOLoop.configured_class()
返回的也是IOLoop
这个类。
@classmethod
def configurable_default(cls):
if hasattr(select, "epoll"):
from tornado.platform.epoll import EPollIOLoop
return EPollIOLoop
if hasattr(select, "kqueue"):
# Python 2.6+ on BSD or Mac
from tornado.platform.kqueue import KQueueIOLoop
return KQueueIOLoop
from tornado.platform.select import SelectIOLoop
return SelectIOLoop
在这里我们看到通过判断select
模块有无 “epoll”,”kqueue” 属性在不同平台上实例化不同的IOLoop
实例。EPollIOLoop
、KQueueIOLoop
和SelectIOLoop
都继承于PollIOLoop
并封装了统一的接口。在PollIOLoop
中可以看到:
def add_handler(self, fd, handler, events):
fd, obj = self.split_fd(fd)
self._handlers[fd] = (obj, stack_context.wrap(handler))
self._impl.register(fd, events | self.ERROR)
def update_handler(self, fd, events):
fd, obj = self.split_fd(fd)
self._impl.modify(fd, events | self.ERROR)
def remove_handler(self, fd):
fd, obj = self.split_fd(fd)
self._handlers.pop(fd, None)
self._events.pop(fd, None)
try:
self._impl.unregister(fd)
except Exception:
gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
如果要实现自己的 tornado 异步函数可以调用上面三个方法 在IOLoop
中注册 fd 和 回调即可。
回到__new__
函数,接下来就是创建实例instance
然后初始化。
接下来我们到了重头戏——start()
start
方法的实现在PollIOLoop
,有近200行,不过可以分为几个部分
def start(self):
# 初始化
if self._running:
raise RuntimeError("IOLoop is already running")
self._setup_logging()
if self._stopped:
self._stopped = False
return
old_current = getattr(IOLoop._current, "instance", None)
IOLoop._current.instance = self
self._thread_ident = thread.get_ident()
self._running = True
...
try:
# 下面就是事件循环
while True:
# Prevent IO event starvation by delaying new callbacks
# to the next iteration of the event loop.
with self._callback_lock:
callbacks = self._callbacks
self._callbacks = []
# Add any timeouts that have come due to the callback
due_timeouts = []
if self._timeouts:
now = self.time()
while self._timeouts:
if self._timeouts[0].callback is None:
# The timeout was cancelled. Note that the
# cancellation check is repeated below for timeouts
# that are cancelled by another timeout or callback.
heapq.heappop(self._timeouts)
self._cancellations -= 1
elif self._timeouts[0].deadline <= now:
due_timeouts.append(heapq.heappop(self._timeouts))
else:
break
if (self._cancellations > 512
and self._cancellations > (len(self._timeouts) >> 1)):
# Clean up the timeout queue when it gets large and it's
# more than half cancellations.
self._cancellations = 0
self._timeouts = [x for x in self._timeouts
if x.callback is not None]
heapq.heapify(self._timeouts)
# 使用self._run_callback来执行回调
for callback in callbacks:
self._run_callback(callback)
# 执行所有到期的 timeout
for timeout in due_timeouts:
if timeout.callback is not None:
self._run_callback(timeout.callback)
# Closures may be holding on to a lot of memory, so allow
# them to be freed before we go into our poll wait.
callbacks = callback = due_timeouts = timeout = None
...
在循环的开始 先执行self._callbacks
列表里面的回调函数还有 到达deadline
的timeout
。timeout
类保存了一个self.deadline
和self.callback
并重载了__lt__
和__le__
方法,所以IOLoop._timeouts
能使用优先队列heap
来对它们排序,从而找出到期的timeout
并执行它们的回调函数。
回到事件循环,执行完所有callbacks
和due_timeouts
后,poll
终于出场了:
...
# 接上面
if self._callbacks:
# 如果在上面 self._callbacks 存在有callback,
# poll 的等待时间设为0
poll_timeout = 0.0
# 如果在上面 self._timeouts 存在有timeout,
# poll等待到最近timeout.deadline
elif self._timeouts:
poll_timeout = self._timeouts[0].deadline - self.time()
poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
else:
# No timeouts and no callbacks, so use the default.
poll_timeout = _POLL_TIMEOUT
if not self._running:
break
if self._blocking_signal_threshold is not None:
# clear alarm so it doesn't fire while poll is waiting for
# events.
signal.setitimer(signal.ITIMER_REAL, 0, 0)
try:
# 终于 poll 了
event_pairs = self._impl.poll(poll_timeout)
except Exception as e:
if errno_from_exception(e) == errno.EINTR:
continue
else:
raise
if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL,
self._blocking_signal_threshold, 0)
self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
try:
# 获取注册 fd 的回调并执行
fd_obj, handler_func = self._handlers[fd]
handler_func(fd_obj, events)
except (OSError, IOError) as e:
if errno_from_exception(e) == errno.EPIPE:
# Happens when the client closes the connection
pass
else:
self.handle_callback_exception(self._handlers.get(fd))
except Exception:
self.handle_callback_exception(self._handlers.get(fd))
fd_obj = handler_func = None
finally:
# reset the stopped flag so another start/stop pair can be issued
self._stopped = False
if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL, 0, 0)
IOLoop._current.instance = old_current
if old_wakeup_fd is not None:
signal.set_wakeup_fd(old_wakeup_fd)
总结一下,tornado 的IOLoop
是 基于poll
的事件驱动模型IOLoop
是单线程的,避免了多进程/多线程模型上下文切换的损耗,在 IO 密集的情况下有很好的性能。但如果在IOLoop
循环中 callback 的执行时间太长会阻塞掉整个 IOLooop
,造成性能的急剧下降,所以在实践中要十分的注意。