memory.py 3.57 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
from __future__ import absolute_import

from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError
from apscheduler.util import datetime_to_utc_timestamp


class MemoryJobStore(BaseJobStore):
    """
    Stores jobs in an array in RAM. Provides no persistence support.

    Plugin alias: ``memory``
    """

    def __init__(self):
        super(MemoryJobStore, self).__init__()
        # list of (job, timestamp), sorted by next_run_time and job id (ascending)
        self._jobs = []
        self._jobs_index = {}  # id -> (job, timestamp) lookup table

    def lookup_job(self, job_id):
        return self._jobs_index.get(job_id, (None, None))[0]

    def get_due_jobs(self, now):
        now_timestamp = datetime_to_utc_timestamp(now)
        pending = []
        for job, timestamp in self._jobs:
            if timestamp is None or timestamp > now_timestamp:
                break
            pending.append(job)

        return pending

    def get_next_run_time(self):
        return self._jobs[0][0].next_run_time if self._jobs else None

    def get_all_jobs(self):
        return [j[0] for j in self._jobs]

    def add_job(self, job):
        if job.id in self._jobs_index:
            raise ConflictingIdError(job.id)

        timestamp = datetime_to_utc_timestamp(job.next_run_time)
        index = self._get_job_index(timestamp, job.id)
        self._jobs.insert(index, (job, timestamp))
        self._jobs_index[job.id] = (job, timestamp)

    def update_job(self, job):
        old_job, old_timestamp = self._jobs_index.get(job.id, (None, None))
        if old_job is None:
            raise JobLookupError(job.id)

        # If the next run time has not changed, simply replace the job in its present index.
        # Otherwise, reinsert the job to the list to preserve the ordering.
        old_index = self._get_job_index(old_timestamp, old_job.id)
        new_timestamp = datetime_to_utc_timestamp(job.next_run_time)
        if old_timestamp == new_timestamp:
            self._jobs[old_index] = (job, new_timestamp)
        else:
            del self._jobs[old_index]
            new_index = self._get_job_index(new_timestamp, job.id)
            self._jobs.insert(new_index, (job, new_timestamp))

        self._jobs_index[old_job.id] = (job, new_timestamp)

    def remove_job(self, job_id):
        job, timestamp = self._jobs_index.get(job_id, (None, None))
        if job is None:
            raise JobLookupError(job_id)

        index = self._get_job_index(timestamp, job_id)
        del self._jobs[index]
        del self._jobs_index[job.id]

    def remove_all_jobs(self):
        self._jobs = []
        self._jobs_index = {}

    def shutdown(self):
        self.remove_all_jobs()

    def _get_job_index(self, timestamp, job_id):
        """
        Returns the index of the given job, or if it's not found, the index where the job should be
        inserted based on the given timestamp.

        :type timestamp: int
        :type job_id: str

        """
        lo, hi = 0, len(self._jobs)
        timestamp = float('inf') if timestamp is None else timestamp
        while lo < hi:
            mid = (lo + hi) // 2
            mid_job, mid_timestamp = self._jobs[mid]
            mid_timestamp = float('inf') if mid_timestamp is None else mid_timestamp
            if mid_timestamp > timestamp:
                hi = mid
            elif mid_timestamp < timestamp:
                lo = mid + 1
            elif mid_job.id > job_id:
                hi = mid
            elif mid_job.id < job_id:
                lo = mid + 1
            else:
                return mid

        return lo