Tornado 剖析

The Analysis of Tornado

Posted by decaywood on 2016-12-21
- 错误校对

Tornado 是 FaceBook 开源的一款非阻塞的 web 服务器,具有极强的可拓展性,能够极大程度的利用计算机的CPU资源。与其他主流 Web 服务器框架不同的是,Tornado是非阻塞的。得益于框架底层基于epoll(linux)或者kqueue(macOS、freeBSD)的应用,Tornado 每秒能够同时处理数千个长连接,这也是 Tornado 的一个显著的特点。在实时消息推送场景中,每个用户与服务器都会保持一个服务器连接,故 Tornado 的这个特性非常适合应用于处理实时性消息推送问题。本文将以一个例子出发,深入源码内部,剖析 Tornado 高性能的根本原因。

Tornado 整体结构

Tornado 的事件循环实现一共有几个部分,其中核心继承关系为 Configurable、IOLoop、PollIOLoop,继承关系如下:

Configurable
     ^
     |
   IOLoop
     ^
     |
 PollIOLoop

Configurable 类主要用于实现类的一些配置,由 new 方法可以看出 Configurable 类的意图:

def __new__(cls, *args, **kwargs):
        base = cls.configurable_base()
        init_kwargs = {}
        if cls is base:
            impl = cls.configured_class()
            if base.__impl_kwargs:
                init_kwargs.update(base.__impl_kwargs)
        else:
            impl = cls
        init_kwargs.update(kwargs)
        instance = super(Configurable, cls).__new__(impl)
        instance.initialize(*args, **init_kwargs)
        return instance

可以看见,new 方法会通过 configured_class 间接调用 IOLoop::configurable_default 方法获取需要实例化的类型,实例化后,Configurable 类会调用 initialize 方法,这个是为了兼容 AsyncHTTPClient 的缘故,对于 Configurable 的实现类,用 initialize 与 用 __init__ 进行初始化没任何区别。总之,Configurable 类的作用相当于一个工厂,用于实例化子类。

IOLoop 为 Tornado 事件循环的核心,是 Tornado 进行异步任务的基础组件,定义了异步事件的监听接口。为什么要异步呢?可以这样理解,如果一个服务器接收一个用户请求。然而,这个请求可能还需要请求其他资源,有可能是文件资源,也有可能是网络资源。这时,服务器请求这些资源时可能就有一定的 IO 延迟。如果服务器同步阻塞在那,服务器从接收请求到返回整个时间段都只能处理这一个请求。因此,吞吐量大大减少,而此时,CPU资源可能并没有被有效利用,时间消耗在了 IO 上面。为了解决这个问题,我们可以在 IO 请求时将代码挂起,继续接受其他请求,等到请求的资源就绪后再继续从挂起的代码处开始执行,由此提高整个服务器的吞吐量。

至于具体实现方式就和平台相关了,Tornado 异步的底层实现文章前面提到过,是分平台的,linux 系统实现了 epoll,而 macOS、freeBSD 实现了 kqueue,这两种实现功能一致。若 epoll、kqueue 在本机系统均没实现则退化为 select 实现。IOLoop::configurable_default 就是负责这个工作的,具体机制前面已经说过了。不过需要强调的是,具体实现类并没有在继承关系中,而是被 IOLoop 的子类 PollIOLoop 维护。以 Epoll 为例,PollIOLoop 的子类 EPollIOLoop 并没有具体实现 epoll,而是通过 initialize 将实现传给 PollIOLoop 来维护,而与平台无关的调度逻辑则由 PollIOLoop 和 IOLoop 负责了,这样就很好的实现了 IO 和调度的职责分离。

下一节将着重分析 Tornado 是如何对代码进行调度,实现代码跳转的。

Tornado 异步IO原理解析

首先看一个例子,模拟爬取一组URL,以下是 Tornado 异步版本的代码,同步版本就不赘述了:

import random
import time
from tornado import gen
from tornado.ioloop import IOLoop

@gen.coroutine
def gen_url(url):
    wait_time = random.randint(4, 5)
    yield gen.sleep(wait_time)
    print('URL {} took {}s to get!'.format(url, wait_time))
    raise gen.Return((url, wait_time))

@gen.coroutine
def fetch_url():
    before = time.time()
    urls = [gen_url(url) for url in ['URL1', 'URL2', 'URL3']]
    result = yield urls
    after = time.time()
    print(result)
    print('total time: {} seconds'.format(after - before))

if __name__ == '__main__':
    print 'output:'
    IOLoop.current().run_sync(fetch_url)

# output:
# URL URL1 took 4s to get!
# URL URL3 took 4s to get!
# URL URL2 took 5s to get!
# [('URL1', 4), ('URL2', 5), ('URL3', 4)]
# total time: 5.00441908836 seconds    

可以看到,每个 URL 链接获取时间均在4到5秒之间,但实际上总耗时却只有5秒。如果是同步执行的话,总耗时一定是远大于5秒的,这就是 Tornado 的强大所在,异步化的IO处理可以极大提升整个 Web Server 的吞吐量,提高 CPU 运算资源的利用率。接下来,我们将以这段 demo 代码,深入分析 Tornado 的运行机制。

Demo 代码特点

从上一节的 demo 代码可以发现一些特点,代码风格完全是同步代码的写法,然而行为上却为异步形式的,乍一看着实有点神奇。此外,我们再看看 yield 关键字抛出的对象类型, 都是 Future 对象。也就是说,代码中的 gen.sleep(wait_time) 以及通过 @gen.coroutine 装饰器装饰的 gen_url 函数最终返回的都是 Future 类型的对象。那么我们来看看 Future 类有什么特点,为了节省文字,这里列出 Future 对象的两个关键方法:

  def add_done_callback(self, fn):
        if self._done:
            fn(self)
        else:
            self._callbacks.append(fn)

  def set_result(self, result):
        self._result = result
        self._done = True
        for cb in self._callbacks:
            try:
                cb(self)
            except Exception:
                app_log.exception('Exception in callback %r for %r',
                                  cb, self)
        self._callbacks = None

Future 对象中有两个关键成员变量,一个为 _result,另一个为 _callbacks。初始化 Future 对象时,可以通过调用 add_done_callback 定义结果就绪时的回调函数。当请求的结果返回时,结果会被通过 set_result 方法存入 Future 中,同时调用回调函数。具体何时调用 add_done_callback 会在后续章节说明。总之,Future 是 Tornado 实现异步 IO 的关键节点之一,它的作用类似一个暂存器或者说是一个协程与异步 IO 调度之间的桥梁,支撑着 IO 就绪时的消息分发,这个后续会有更详细的介绍。

程序执行流程

首先程序从 IOLoop::run_sync 入口进入,run_sync 内部定义了一个回调函数 run,并通过 IOLoop::add_callback 传入IOLoop的回调队列,add_callback 方法会将函数以及参数一并存入一个数组中,供后续调用。

随后,调用 IOLoop::start 方法,启动事件循环。事件循环实际上做了三件事情:执行超时事件回调、执行监听事件回调、以及 IO 事件回调:

  • 超时事件回调队列从数据结构上来说并不是一个队列,而是一个小顶堆,数据槽中有时间以及 callback 组成的pair,以时间戳为排序依据,每次会循环取出堆中时间戳的最小值,与当前时间比对,如果超时时间小于当前时间,证明已超时,此时将数据槽出堆,加入一个临时数组中进行后续的 callback 调用。demo 代码中 gen.sleep(wait_time) 实际上会将一个回调函数添加到超时事件回调队列中:
  def sleep(duration):
        f = Future()
        IOLoop.current().call_later(duration, lambda: f.set_result(None))
        return 
  • 监听事件回调队列存放 IOLoop::add_callback 设置进去的回调函数,每次事件循环进行顺序调用。其中 run_sync 内部定义的回调函数 run 就是通过 IOLoop::add_callback 方法放入了监听事件回调队列中。

  • IO 事件回调队列每次会从 epoll(linux)或者 kqueue(macOS、freeBSD)中获取感兴趣的事件,事件监听通过 PollIOLoop::add_handler 来进行注册感兴趣的事件,同时维护一个根据文件描述符(fd)映射的 callback map,当 IO 就绪后,IOLoop 调用底层 epoll(linux)或者 kqueue(macOS、freeBSD)实现获取到相应的文件描述符(fd),通过描述符从 callback map中获取对应的 callback 并执行。

    这里需要提到一点,IOLoop 为了防止 IO 事件没有就绪造成的 epoll 阻塞事件循环,使得监听事件队列或者超时事件的回调得不到执行,因此在 IOLoop 初始化的时候注册了一个 READ IO 事件监听,由 self._waker = Waker() 维护。这种情况会在第一次开始事件循环以及所有 IO 完成时出现此种情况。因此,每次执行 PollIOLoop::add_callback 时,如果发现回调队列为空,代表第一次执行时间循环,此时会通过 Waker::wake 唤醒事件循环。调用 PollIOLoop::stop 时同样也会有唤醒事件循环的动作。

此时,三个监听队列中只有 run_sync 内部定义的回调函数 run,因此执行 run 函数。run 函数中会初始化外层的生成器,也就是demo中的 fetch_url。由于有 @gen.coroutine装饰,此时先执行 gen 中的 _make_coroutine_wrapper 函数,函数内部会先构造一个 future 对象,然后初始化生成器并获取到 yield 出来的结果,暂称之为 result,这一点上面章节已经提到过。之后,future 对象以及 result 会被传入 Runner 对象的构造器中,生成 Runner 后,future被返回。实际上,在 demo 中,对于 fetch_url,这里的 result 就是一组 future,对于 gen_url 这里的 result 则是 yield gen.sleep(wait_time) 返回的 future 对象。在此可以得出一个结论,Tornado 通过 yield future 对象来实现异步代码写法的同步化,具体如何实现的,需要进一步分析 Runner了。

Runner 实际上类似于 IOLoop,内部也是一个循环,这个循环在初始化时就开始了。与 IOLoop 不一样,Runner 的循环是针对生成器 yield 出来的 future 对象,IOLoop 是针对事件而已。故 Runner 是 Tornado 进行代码跳转的关键所在。Runner 简化版代码如下:

class Runner(object):

    def __init__(self, gen, result_future, first_yielded): 
        self.gen = gen
        self.result_future = result_future
        self.future = _null_future
        if self.handle_yield(first_yielded):
            self.run()

    def run(self):
        if self.running or self.finished:
            return
        try:
            self.running = True
            while True:
                future = self.future
                if not future.done():
                    return
                self.future = None
                try:    
                    value = future.result()  
                    yielded = self.gen.send(value)

                except (StopIteration, Return) as e:
                    self.finished = True
                    self.future = _null_future
                    self.result_future.set_result(_value_from_stopiteration(e))
                    self.result_future = None
                    self._deactivate_stack_context()
                    return
                if not self.handle_yield(yielded):
                    return
        finally:
            self.running = False

    def handle_yield(self, yielded):
        if _contains_yieldpoint(yielded):
            yielded = multi(yielded)
            self.future = convert_yielded(yielded)
        if not self.future.done() or self.future is moment:
            self.io_loop.add_future(
                self.future, lambda f: self.run())
            return False
        return True

当 Runner 初始化后,构造器有三个参数,分别为 gen, result_future, first_yielded。对应为用户代码使用 @gen.coroutine 修饰的生成器、存取生成器最终结果的 Future 对象、以及生成器 yield 返回过来的 Future 对象。初始化后,Runner 会马上调用 handle_yield 方法,通过 IOLoop::add_future 来给 first_yielded 添加 done_callback,这就回答了文章开头 done_callback 调用时机的疑问。

那么,传入的 callback 回调时机是什么时候呢,有可能通过超时回调(比如 demo 中的 yield gen.sleep(wait_time))也有可能为 epoll(linux)或者kqueue(macOS、freeBSD)就绪描述符对应的 callback等等,也可能为 Return Exception(用户代码中调用 raise gen.Return)。这些动作会调用 future 中的 Future::set_result 进而触发 done_callback,从而触发 Runner 的下一次循环,继续调用 yielded = self.gen.send(value),这里的 value 就是 future 对象的 result。

这种机制所表现出来的行为就是 yield 了一个 future 对象后,控制权由生成器移交给了 IOLoop,一旦 IOLoop 通过某种方式(比如 fd 就绪)拿到了结果并设值到 future 对象中,经过回调之前的 Runner::run 控制权将重新移交给生成器。而整个程序结束的时机就是在最外层的生成器(IOLoop::run_sync 传入的) Return 回来后,对应的 Future 对象触发 done_callback 调用 IOLoop::stop 完成整个 IOLoop,至此整个程序运行结束。