Home

Tornado源码分析笔记(2)

IOLoop就是 tornado 的事件循环,是一个全局的单例,通过IOLoop.current()IOLoop.instance()访问。IOLoop.current()内部调用的也是instance()方法。所以我们来看instance():

class IOLoop(Configurable):

    ...

    # Global lock for creating global IOLoop instance
        _instance_lock = threading.Lock()

        _current = threading.local()

        @staticmethod
        def instance():
            if not hasattr(IOLoop, "_instance"):
                with IOLoop._instance_lock:
                    if not hasattr(IOLoop, "_instance"):
                        # New instance after double check
                        IOLoop._instance = IOLoop()
            return IOLoop._instance

IOLoop没有__init__()函数,所以我们要去看它的父类Configurable:

class Configurable(object):

    __impl_class = None
    __impl_kwargs = None

    def __new__(cls, *args, **kwargs):
        base = cls.configurable_base()
        init_kwargs = {}
        if cls is base:
            impl = cls.configured_class()
            if base.__impl_kwargs:
                init_kwargs.update(base.__impl_kwargs)
        else:
            impl = cls
        init_kwargs.update(kwargs)
        instance = super(Configurable, cls).__new__(impl)

        instance.initialize(*args, **init_kwargs)
        return instance            

IOLoop.confirable_base()返回的就是IOLoop自身,所以进入configured_class()

@classmethod
def configured_class(cls):
    """Returns the currently configured class."""
    base = cls.configurable_base()
    if cls.__impl_class is None:
        base.__impl_class = cls.configurable_default()
    return base.__impl_class

IOLoop.configured_class()返回的也是IOLoop这个类。

@classmethod
def configurable_default(cls):
    if hasattr(select, "epoll"):
        from tornado.platform.epoll import EPollIOLoop
        return EPollIOLoop
    if hasattr(select, "kqueue"):
        # Python 2.6+ on BSD or Mac
        from tornado.platform.kqueue import KQueueIOLoop
        return KQueueIOLoop
    from tornado.platform.select import SelectIOLoop
    return SelectIOLoop

在这里我们看到通过判断select模块有无 “epoll”,”kqueue” 属性在不同平台上实例化不同的IOLoop实例。EPollIOLoopKQueueIOLoopSelectIOLoop都继承于PollIOLoop并封装了统一的接口。在PollIOLoop中可以看到:

def add_handler(self, fd, handler, events):
    fd, obj = self.split_fd(fd)
    self._handlers[fd] = (obj, stack_context.wrap(handler))
    self._impl.register(fd, events | self.ERROR)

def update_handler(self, fd, events):
    fd, obj = self.split_fd(fd)
    self._impl.modify(fd, events | self.ERROR)

def remove_handler(self, fd):
    fd, obj = self.split_fd(fd)
    self._handlers.pop(fd, None)
    self._events.pop(fd, None)
    try:
        self._impl.unregister(fd)
    except Exception:
        gen_log.debug("Error deleting fd from IOLoop", exc_info=True)

如果要实现自己的 tornado 异步函数可以调用上面三个方法 在IOLoop中注册 fd 和 回调即可。 回到__new__函数,接下来就是创建实例instance然后初始化。 接下来我们到了重头戏——start()

start方法的实现在PollIOLoop,有近200行,不过可以分为几个部分

def start(self):
    # 初始化
    if self._running:
        raise RuntimeError("IOLoop is already running")
    self._setup_logging()
    if self._stopped:
        self._stopped = False
        return
    old_current = getattr(IOLoop._current, "instance", None)
    IOLoop._current.instance = self
    self._thread_ident = thread.get_ident()
    self._running = True

    ...

    try:
        # 下面就是事件循环
        while True:
            # Prevent IO event starvation by delaying new callbacks
            # to the next iteration of the event loop.
            with self._callback_lock:
                callbacks = self._callbacks
                self._callbacks = []

            # Add any timeouts that have come due to the callback
            due_timeouts = []
            if self._timeouts:
                now = self.time()
                while self._timeouts:
                    if self._timeouts[0].callback is None:
                        # The timeout was cancelled.  Note that the
                        # cancellation check is repeated below for timeouts
                        # that are cancelled by another timeout or callback.
                        heapq.heappop(self._timeouts)
                        self._cancellations -= 1
                    elif self._timeouts[0].deadline <= now:
                        due_timeouts.append(heapq.heappop(self._timeouts))
                    else:
                        break
                if (self._cancellations > 512
                        and self._cancellations > (len(self._timeouts) >> 1)):
                    # Clean up the timeout queue when it gets large and it's
                    # more than half cancellations.
                    self._cancellations = 0
                    self._timeouts = [x for x in self._timeouts
                                      if x.callback is not None]
                    heapq.heapify(self._timeouts)

            # 使用self._run_callback来执行回调
            for callback in callbacks:
                self._run_callback(callback)
            # 执行所有到期的 timeout
            for timeout in due_timeouts:
                if timeout.callback is not None:
                    self._run_callback(timeout.callback)
            # Closures may be holding on to a lot of memory, so allow
            # them to be freed before we go into our poll wait.
            callbacks = callback = due_timeouts = timeout = None

        ... 

在循环的开始 先执行self._callbacks列表里面的回调函数还有 到达deadlinetimeouttimeout类保存了一个self.deadlineself.callback并重载了__lt____le__方法,所以IOLoop._timeouts能使用优先队列heap来对它们排序,从而找出到期的timeout并执行它们的回调函数。 回到事件循环,执行完所有callbacksdue_timeouts后,poll终于出场了:

...
# 接上面

        if self._callbacks:
            # 如果在上面 self._callbacks 存在有callback,
            # poll 的等待时间设为0
            poll_timeout = 0.0
            # 如果在上面 self._timeouts 存在有timeout,
            # poll等待到最近timeout.deadline 
        elif self._timeouts:
            poll_timeout = self._timeouts[0].deadline - self.time()
            poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
        else:
            # No timeouts and no callbacks, so use the default.
            poll_timeout = _POLL_TIMEOUT

        if not self._running:
            break

        if self._blocking_signal_threshold is not None:
            # clear alarm so it doesn't fire while poll is waiting for
            # events.
            signal.setitimer(signal.ITIMER_REAL, 0, 0)

        try:
            # 终于 poll 了
            event_pairs = self._impl.poll(poll_timeout)
        except Exception as e:
            if errno_from_exception(e) == errno.EINTR:
                continue
            else:
                raise

        if self._blocking_signal_threshold is not None:
            signal.setitimer(signal.ITIMER_REAL,
                             self._blocking_signal_threshold, 0)

        self._events.update(event_pairs)
        while self._events:
            fd, events = self._events.popitem()
            try:
                # 获取注册 fd 的回调并执行
                fd_obj, handler_func = self._handlers[fd]
                handler_func(fd_obj, events)
            except (OSError, IOError) as e:
                if errno_from_exception(e) == errno.EPIPE:
                    # Happens when the client closes the connection
                    pass
                else:
                    self.handle_callback_exception(self._handlers.get(fd))
            except Exception:
                self.handle_callback_exception(self._handlers.get(fd))
        fd_obj = handler_func = None

finally:
    # reset the stopped flag so another start/stop pair can be issued
    self._stopped = False
    if self._blocking_signal_threshold is not None:
        signal.setitimer(signal.ITIMER_REAL, 0, 0)
    IOLoop._current.instance = old_current
    if old_wakeup_fd is not None:
        signal.set_wakeup_fd(old_wakeup_fd)

总结一下,tornado 的IOLoop是 基于poll的事件驱动模型IOLoop是单线程的,避免了多进程/多线程模型上下文切换的损耗,在 IO 密集的情况下有很好的性能。但如果在IOLoop循环中 callback 的执行时间太长会阻塞掉整个 IOLooop,造成性能的急剧下降,所以在实践中要十分的注意。