今天我们来讲讲tornado的Future
和coroutine。
首先什么是future
?按照我的理解,Future
是一个用来保存异步处理的结果和回调的一个类。tornado的Future
其实是实现了跟python concurrent这个包一样的api。在concurrent
这个包里面是这样用Future
的
import concurrent.futures
def done_callback(future):
print("I am done")
print(future.result())
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow, 323, 1235)
future.add_done_callback(done_callback)
print(future.done())
ThreadPoolExecutor
在线程池中执行pow
,不等函数执行完就立即返回一个future
作为一个桥梁,拿到这个future
后程序就可以去做其他事情了,当pow
执行完之后回调用done_callback
。而主线程也可以通过future
来获取运行情况或者进行取消等操作。
Future
的源码在concurrent.py里面,很简单,这里也就不贴出来了。在tornado中,Future
的使用是通过IOLoop
的add_future
方法起作用的。
ioloop.py:
def add_future(self, future, callback):
assert is_future(future)
callback = stack_context.wrap(callback)
future.add_done_callback(
lambda future: self.add_callback(callback, future))
我们可以看到,add_future
为future
添加了一个回调函数,当future
完成的时候,也就是调用了future.set_result
或future.set_exception
后向IOLoop
注册了callback
函数,当IOLoop
下一轮poll之前就会调用这个callback
函数。下面是一个简单的使用例子,耗时任务使用另外的线程完成。
import time
import threading
import tornado.ioloop
from tornado.concurrent import Future
ioloop = tornado.ioloop.IOLoop.current()
def long_task(future, sec=5):
print("long task start")
time.sleep(sec)
future.set_result("long task done in %s sec" % sec)
def after_task_done(future):
print(future.result())
def test_future():
future = Future()
threading.Thread(target=long_task, args=(future,)).start()
ioloop.add_future(future, after_task_done)
if __name__ == "__main__":
ioloop.add_callback(test_future)
ioloop.start()
异步的事件驱动最令人诟病的一点就是不停的回调回调,给写程序的时候带来很大的困难。针对这个痛点,tornado利用了基于python的生成器的协程来实现用同步的方式写异步的代码的。就让我们来看一看其背后的黑魔法吧。
先从demo开始:
class GenAsyncHandler(RequestHandler):
@gen.coroutine
def get(self):
http_client = AsyncHTTPClient()
response = yield http_client.fetch("http://example.com")
do_something_with_response(response)
self.render("template.html")
get
方法在gen.coroutine
装饰器的下在执行异步方法时yield就可以可以将控制权还给IOLoop,当异步方法回获得结果时返回get
方法,将结果赋值给response
后继续执行。我们知道在一个函数中有yield的话这个函数的返回值就变成了一个生成器,当异步函数的结果到来时,必定会调用生成器的send
方法将结果传入里面。我们先来看gen.coroutine
。
gen.py
def coroutine(func, replace_callback=True):
return _make_coroutine_wrapper(func, replace_callback=True)
很简单,返回一个_make_coroutine_wrapper
包装的函数。来看_make_coroutine_wrapper
:
def _make_coroutine_wrapper(func, replace_callback):
@functools.wraps(func)
def wrapper(*args, **kwargs):
future = TracebackFuture()
# 有callback关键词的话为callback添加一个future,future完成时执行
if replace_callback and 'callback' in kwargs:
callback = kwargs.pop('callback')
IOLoop.current().add_future(
future, lambda future: callback(future.result()))
# 调用func,根据结果的不同进行不同的处理
try:
result = func(*args, **kwargs)
except (Return, StopIteration) as e:
result = getattr(e, 'value', None)
except Exception:
future.set_exc_info(sys.exc_info())
return future
else:
if isinstance(result, types.GeneratorType):
# Inline the first iteration of Runner.run. This lets us
# avoid the cost of creating a Runner when the coroutine
# never actually yields, which in turn allows us to
# use "optional" coroutines in critical path code without
# performance penalty for the synchronous case.
try:
orig_stack_contexts = stack_context._state.contexts
# 若result是一个生成器,则会调用一次next,在根据结果进行不同处理
yielded = next(result)
if stack_context._state.contexts is not orig_stack_contexts:
yielded = TracebackFuture()
yielded.set_exception(
stack_context.StackContextInconsistentError(
'stack_context inconsistency (probably caused '
'by yield within a "with StackContext" block)'))
except (StopIteration, Return) as e:
future.set_result(getattr(e, 'value', None))
except Exception:
future.set_exc_info(sys.exc_info())
else:
Runner(result, future, yielded)
try:
return future
finally:
# Subtle memory optimization: if next() raised an exception,
# the future's exc_info contains a traceback which
# includes this stack frame. This creates a cycle,
# which will be collected at the next full GC but has
# been shown to greatly increase memory usage of
# benchmarks (relative to the refcount-based scheme
# used in the absence of cycles). We can avoid the
# cycle by clearing the local variable after we return it.
future = None
future.set_result(result)
return future
return wrapper
被gen.coroutine
装饰的函数返回结果都会变成一个Future
,若func
不是一个生成器,则将结果或异常放入future
中返回。若func
返回一个生成器,调用next
一次,如果还有后续,则交给Runner
这个类来执行。结合demo上的来看,此时的yielded
是http_client.fetch("http://example.com")
执行都结果,必须是一个Future
(或者YieldPoint
,早期版本使用的,后来由Future
代替,保留只为了兼容旧版本,这里不再讨论),后面我们可以看到。
来看Runner
类, 也在gen.py里面,在Runner
里面最关键的是run
和handle_yield
方法。
class Runner(object):
def __init__(self, gen, result_future, first_yielded):
self.gen = gen
self.result_future = result_future
self.future = _null_future
self.yield_point = None
self.pending_callbacks = None
self.results = None
self.running = False
self.finished = False
self.had_exception = False
self.io_loop = IOLoop.current()
self.stack_context_deactivate = None
# 若first_yielded的future以完成,则最少会run一次
if self.handle_yield(first_yielded):
self.run()
# ...省略...
def handle_yield(self, yielded):
# Lists containing YieldPoints require stack contexts;
# other lists are handled via multi_future in convert_yielded.
if (isinstance(yielded, list) and
any(isinstance(f, YieldPoint) for f in yielded)):
yielded = Multi(yielded)
elif (isinstance(yielded, dict) and
any(isinstance(f, YieldPoint) for f in yielded.values())):
yielded = Multi(yielded)
# 处理旧版本的YieldPoints的兼容,可以忽略...
# future未完成则向ioloop注册这个future,当future完成时调用run继续执行
if not self.future.done() or self.future is moment:
self.io_loop.add_future(
self.future, lambda f: self.run())
return False
return True
def run(self):
"""Starts or resumes the generator, running until it reaches a
yield point that is not ready.
"""
if self.running or self.finished:
return
try:
self.running = True
while True:
future = self.future
if not future.done():
return
self.future = None
try:
orig_stack_contexts = stack_context._state.contexts
exc_info = None
try:
value = future.result()
except Exception:
self.had_exception = True
exc_info = sys.exc_info()
# 有异常则设置异常
if exc_info is not None:
yielded = self.gen.throw(*exc_info)
exc_info = None
# 将结果send回去继续执行
else:
yielded = self.gen.send(value)
if stack_context._state.contexts is not orig_stack_contexts:
self.gen.throw(
stack_context.StackContextInconsistentError(
'stack_context inconsistency (probably caused '
'by yield within a "with StackContext" block)'))
except (StopIteration, Return) as e:
self.finished = True
self.future = _null_future
if self.pending_callbacks and not self.had_exception:
# If we ran cleanly without waiting on all callbacks
# raise an error (really more of a warning). If we
# had an exception then some callbacks may have been
# orphaned, so skip the check in that case.
raise LeakedCallbackError(
"finished without waiting for callbacks %r" %
self.pending_callbacks)
self.result_future.set_result(getattr(e, 'value', None))
self.result_future = None
self._deactivate_stack_context()
return
except Exception:
self.finished = True
self.future = _null_future
self.result_future.set_exc_info(sys.exc_info())
self.result_future = None
self._deactivate_stack_context()
return
# handle_yield检查yielded,若还未完成则继续向IOLoop注册future等待完成。
if not self.handle_yield(yielded):
return
finally:
self.running = False
# .......
在Runner
中handle_yield
帮我们向IOLoop注册了future
,当future
完成时也就是http_client.fetch
的结果返回之后,IOLoop就好执行Runner.run()
使我们从断点继续执行。在Runner.run()
里将future
的结果取出,发送进生成器里面继续执行。
当我们要实现自己的异步函数时,需要将耗时任务的执行放在主线程外的其它地方,返回一个Future
对象,当耗时任务结束时设在Future
的结果即可。配合gen.coroutine
即可实现同步的控制流写异步的代码。下面是一个例子:
import json
import time
import threading
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
from tornado import gen
from tornado.concurrent import Future
from tornado.options import define, options
define("port", default=8888, help="run on the given port", type=int)
@gen.coroutine
def logic_handler(pk):
data = yield get_data(pk)
# 在python3里可以在生成器直接return data
raise gen.Return(data)
def get_data(pk):
future = Future()
threading.Thread(target=fetch_data_from_db, args=(future, pk)).start()
return future
def fetch_data_from_db(future, pk):
time.sleep(5)
future.set_result({"pk": pk, "data": "Data from No.%s's" % pk})
class MainHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self, pk):
print(pk)
result = yield loginc_handler(pk)
self.write(json.dumps(result))
def main():
tornado.options.parse_command_line()
application = tornado.web.Application([
(r"/(\d+)", MainHandler),
])
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(options.port)
tornado.ioloop.IOLoop.current().start()
if __name__ == "__main__":
main()
不过在实际应用中还是会有些地方需要注意。如当一个函数使用gen.coroutine
装饰过后,其上层的调用者也要相应改成couroutine,如上面例子的logic_handler
如果还有很多层级依赖于db的数据的话,它们也要加@gen.coroutine
或者返回Future
。