tornado.py 1.74 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from __future__ import absolute_import

import sys
from concurrent.futures import ThreadPoolExecutor

from tornado.gen import convert_yielded

from apscheduler.executors.base import BaseExecutor, run_job

try:
    from apscheduler.executors.base_py3 import run_coroutine_job
    from apscheduler.util import iscoroutinefunction_partial
except ImportError:
    def iscoroutinefunction_partial(func):
        return False


class TornadoExecutor(BaseExecutor):
    """
    Runs jobs either in a thread pool or directly on the I/O loop.

    If the job function is a native coroutine function, it is scheduled to be run directly in the
    I/O loop as soon as possible. All other functions are run in a thread pool.

    Plugin alias: ``tornado``

    :param int max_workers: maximum number of worker threads in the thread pool
    """

    def __init__(self, max_workers=10):
        super(TornadoExecutor, self).__init__()
        self.executor = ThreadPoolExecutor(max_workers)

    def start(self, scheduler, alias):
        super(TornadoExecutor, self).start(scheduler, alias)
        self._ioloop = scheduler._ioloop

    def _do_submit_job(self, job, run_times):
        def callback(f):
            try:
                events = f.result()
            except BaseException:
                self._run_job_error(job.id, *sys.exc_info()[1:])
            else:
                self._run_job_success(job.id, events)

        if iscoroutinefunction_partial(job.func):
            f = run_coroutine_job(job, job._jobstore_alias, run_times, self._logger.name)
        else:
            f = self.executor.submit(run_job, job, job._jobstore_alias, run_times,
                                     self._logger.name)

        f = convert_yielded(f)
        f.add_done_callback(callback)