Home

Tornado源码分析笔记(3)

今天我们来讲讲tornado的Future和coroutine。

Future

首先什么是future?按照我的理解,Future是一个用来保存异步处理的结果和回调的一个类。tornado的Future其实是实现了跟python concurrent这个包一样的api。在concurrent这个包里面是这样用Future

import concurrent.futures

def done_callback(future):
    print("I am done")
    print(future.result())

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    future.add_done_callback(done_callback)
    print(future.done())
    

ThreadPoolExecutor在线程池中执行pow,不等函数执行完就立即返回一个future作为一个桥梁,拿到这个future后程序就可以去做其他事情了,当pow执行完之后回调用done_callback。而主线程也可以通过future来获取运行情况或者进行取消等操作。

Future的源码在concurrent.py里面,很简单,这里也就不贴出来了。在tornado中,Future的使用是通过IOLoopadd_future方法起作用的。

ioloop.py:

    def add_future(self, future, callback):

        assert is_future(future)
        callback = stack_context.wrap(callback)
        future.add_done_callback(
            lambda future: self.add_callback(callback, future))
            

我们可以看到,add_futurefuture添加了一个回调函数,当future完成的时候,也就是调用了future.set_resultfuture.set_exception后向IOLoop注册了callback函数,当IOLoop下一轮poll之前就会调用这个callback函数。下面是一个简单的使用例子,耗时任务使用另外的线程完成。

import time
import threading
import tornado.ioloop
from tornado.concurrent import Future

ioloop = tornado.ioloop.IOLoop.current()

def long_task(future, sec=5):
    print("long task start")
    time.sleep(sec)
    future.set_result("long task done in %s sec" % sec)

def after_task_done(future):
    print(future.result())

def test_future():
    future = Future()
    threading.Thread(target=long_task, args=(future,)).start()
    ioloop.add_future(future, after_task_done)

if __name__ == "__main__":
    ioloop.add_callback(test_future)
    ioloop.start()

coroutine

异步的事件驱动最令人诟病的一点就是不停的回调回调,给写程序的时候带来很大的困难。针对这个痛点,tornado利用了基于python的生成器的协程来实现用同步的方式写异步的代码的。就让我们来看一看其背后的黑魔法吧。

先从demo开始:

class GenAsyncHandler(RequestHandler):
    @gen.coroutine
    def get(self):
        http_client = AsyncHTTPClient()
        response = yield http_client.fetch("http://example.com")
        do_something_with_response(response)
        self.render("template.html")
 

get方法在gen.coroutine装饰器的下在执行异步方法时yield就可以可以将控制权还给IOLoop,当异步方法回获得结果时返回get方法,将结果赋值给response后继续执行。我们知道在一个函数中有yield的话这个函数的返回值就变成了一个生成器,当异步函数的结果到来时,必定会调用生成器的send方法将结果传入里面。我们先来看gen.coroutine

gen.py

def coroutine(func, replace_callback=True):
    return _make_coroutine_wrapper(func, replace_callback=True)

很简单,返回一个_make_coroutine_wrapper包装的函数。来看_make_coroutine_wrapper

def _make_coroutine_wrapper(func, replace_callback):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        future = TracebackFuture()

		# 有callback关键词的话为callback添加一个future,future完成时执行
        if replace_callback and 'callback' in kwargs:
            callback = kwargs.pop('callback')
            IOLoop.current().add_future(
                future, lambda future: callback(future.result()))
        
        # 调用func,根据结果的不同进行不同的处理
        try:
            result = func(*args, **kwargs)
        except (Return, StopIteration) as e:
            result = getattr(e, 'value', None)
        except Exception:
            future.set_exc_info(sys.exc_info())
            return future
        else:
            if isinstance(result, types.GeneratorType):
                # Inline the first iteration of Runner.run.  This lets us
                # avoid the cost of creating a Runner when the coroutine
                # never actually yields, which in turn allows us to
                # use "optional" coroutines in critical path code without
                # performance penalty for the synchronous case.
                try:
                    orig_stack_contexts = stack_context._state.contexts
                    
                    # 若result是一个生成器,则会调用一次next,在根据结果进行不同处理
                    yielded = next(result)
                    if stack_context._state.contexts is not orig_stack_contexts:
                        yielded = TracebackFuture()
                        yielded.set_exception(
                            stack_context.StackContextInconsistentError(
                                'stack_context inconsistency (probably caused '
                                'by yield within a "with StackContext" block)'))
                except (StopIteration, Return) as e:
                    future.set_result(getattr(e, 'value', None))
                except Exception:
                    future.set_exc_info(sys.exc_info())
                else:
                    Runner(result, future, yielded)
                try:
                    return future
                finally:
                    # Subtle memory optimization: if next() raised an exception,
                    # the future's exc_info contains a traceback which
                    # includes this stack frame.  This creates a cycle,
                    # which will be collected at the next full GC but has
                    # been shown to greatly increase memory usage of
                    # benchmarks (relative to the refcount-based scheme
                    # used in the absence of cycles).  We can avoid the
                    # cycle by clearing the local variable after we return it.
                    future = None
        future.set_result(result)
        return future
    return wrapper

gen.coroutine装饰的函数返回结果都会变成一个Future,若func不是一个生成器,则将结果或异常放入future中返回。若func返回一个生成器,调用next一次,如果还有后续,则交给Runner这个类来执行。结合demo上的来看,此时的yieldedhttp_client.fetch("http://example.com")执行都结果,必须是一个Future(或者YieldPoint,早期版本使用的,后来由Future代替,保留只为了兼容旧版本,这里不再讨论),后面我们可以看到。

来看Runner类, 也在gen.py里面,在Runner里面最关键的是runhandle_yield方法。

class Runner(object):
    def __init__(self, gen, result_future, first_yielded):
        self.gen = gen
        self.result_future = result_future
        self.future = _null_future
        self.yield_point = None
        self.pending_callbacks = None
        self.results = None
        self.running = False
        self.finished = False
        self.had_exception = False
        self.io_loop = IOLoop.current()

        self.stack_context_deactivate = None
        
        # 若first_yielded的future以完成,则最少会run一次
        if self.handle_yield(first_yielded):
            self.run()
            
    # ...省略...
    
    def handle_yield(self, yielded):
        # Lists containing YieldPoints require stack contexts;
        # other lists are handled via multi_future in convert_yielded.
        if (isinstance(yielded, list) and
                any(isinstance(f, YieldPoint) for f in yielded)):
            yielded = Multi(yielded)
        elif (isinstance(yielded, dict) and
              any(isinstance(f, YieldPoint) for f in yielded.values())):
            yielded = Multi(yielded)

        # 处理旧版本的YieldPoints的兼容,可以忽略...
		
		# future未完成则向ioloop注册这个future,当future完成时调用run继续执行
        if not self.future.done() or self.future is moment:
            self.io_loop.add_future(
                self.future, lambda f: self.run())
            return False
        return True

    def run(self):
        """Starts or resumes the generator, running until it reaches a
        yield point that is not ready.
        """
        if self.running or self.finished:
            return
        try:
            self.running = True
            while True:
                future = self.future
                if not future.done():
                    return
                self.future = None
                try:
                    orig_stack_contexts = stack_context._state.contexts
                    exc_info = None

                    try:
                        value = future.result()
                    except Exception:
                        self.had_exception = True
                        exc_info = sys.exc_info()
					
					# 有异常则设置异常
                    if exc_info is not None:
                        yielded = self.gen.throw(*exc_info)
                        exc_info = None
                   	# 将结果send回去继续执行
                    else:
                        yielded = self.gen.send(value)

                    if stack_context._state.contexts is not orig_stack_contexts:
                        self.gen.throw(
                            stack_context.StackContextInconsistentError(
                                'stack_context inconsistency (probably caused '
                                'by yield within a "with StackContext" block)'))
                except (StopIteration, Return) as e:
                    self.finished = True
                    self.future = _null_future
                    if self.pending_callbacks and not self.had_exception:
                        # If we ran cleanly without waiting on all callbacks
                        # raise an error (really more of a warning).  If we
                        # had an exception then some callbacks may have been
                        # orphaned, so skip the check in that case.
                        raise LeakedCallbackError(
                            "finished without waiting for callbacks %r" %
                            self.pending_callbacks)
                    self.result_future.set_result(getattr(e, 'value', None))
                    self.result_future = None
                    self._deactivate_stack_context()
                    return
                except Exception:
                    self.finished = True
                    self.future = _null_future
                    self.result_future.set_exc_info(sys.exc_info())
                    self.result_future = None
                    self._deactivate_stack_context()
                    return
                # handle_yield检查yielded,若还未完成则继续向IOLoop注册future等待完成。
                if not self.handle_yield(yielded):
                    return
        finally:
            self.running = False

		# .......

Runnerhandle_yield帮我们向IOLoop注册了future,当future完成时也就是http_client.fetch的结果返回之后,IOLoop就好执行Runner.run()使我们从断点继续执行。在Runner.run()里将future的结果取出,发送进生成器里面继续执行。

当我们要实现自己的异步函数时,需要将耗时任务的执行放在主线程外的其它地方,返回一个Future对象,当耗时任务结束时设在Future的结果即可。配合gen.coroutine即可实现同步的控制流写异步的代码。下面是一个例子:

import json
import time
import threading
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web

from tornado import gen
from tornado.concurrent import Future
from tornado.options import define, options

define("port", default=8888, help="run on the given port", type=int)

@gen.coroutine
def logic_handler(pk):
    data = yield get_data(pk)
    
    # 在python3里可以在生成器直接return data
    raise gen.Return(data)

def get_data(pk):
    future = Future()
    threading.Thread(target=fetch_data_from_db, args=(future, pk)).start()
    return future

def fetch_data_from_db(future, pk):
    time.sleep(5)
    future.set_result({"pk": pk, "data": "Data from No.%s's" % pk})


class MainHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self, pk):
        print(pk)
        result = yield loginc_handler(pk)
        self.write(json.dumps(result))

def main():
    tornado.options.parse_command_line()
    application = tornado.web.Application([
        (r"/(\d+)", MainHandler),
    ])
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.current().start()

if __name__ == "__main__":
    main()

不过在实际应用中还是会有些地方需要注意。如当一个函数使用gen.coroutine装饰过后,其上层的调用者也要相应改成couroutine,如上面例子的logic_handler如果还有很多层级依赖于db的数据的话,它们也要加@gen.coroutine或者返回Future