Commit 398a74a4 authored by Vladislav Rykov's avatar Vladislav Rykov
Browse files

periodic server maintanance task added. confirmed config messages and data...

periodic server maintanance task added. confirmed config messages and data files are removed during maintanance task
parent aab39a2e
from __future__ import absolute_import
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.base import BaseScheduler
try:
from gevent.event import Event
from gevent.lock import RLock
import gevent
except ImportError: # pragma: nocover
raise ImportError('GeventScheduler requires gevent installed')
class GeventScheduler(BlockingScheduler):
"""A scheduler that runs as a Gevent greenlet."""
_greenlet = None
def start(self, *args, **kwargs):
self._event = Event()
BaseScheduler.start(self, *args, **kwargs)
self._greenlet = gevent.spawn(self._main_loop)
return self._greenlet
def shutdown(self, *args, **kwargs):
super(GeventScheduler, self).shutdown(*args, **kwargs)
self._greenlet.join()
del self._greenlet
def _create_lock(self):
return RLock()
def _create_default_executor(self):
from apscheduler.executors.gevent import GeventExecutor
return GeventExecutor()
from __future__ import absolute_import
from apscheduler.schedulers.base import BaseScheduler
try:
from PyQt5.QtCore import QObject, QTimer
except (ImportError, RuntimeError): # pragma: nocover
try:
from PyQt4.QtCore import QObject, QTimer
except ImportError:
try:
from PySide.QtCore import QObject, QTimer # noqa
except ImportError:
raise ImportError('QtScheduler requires either PyQt5, PyQt4 or PySide installed')
class QtScheduler(BaseScheduler):
"""A scheduler that runs in a Qt event loop."""
_timer = None
def shutdown(self, *args, **kwargs):
super(QtScheduler, self).shutdown(*args, **kwargs)
self._stop_timer()
def _start_timer(self, wait_seconds):
self._stop_timer()
if wait_seconds is not None:
wait_time = min(wait_seconds * 1000, 2147483647)
self._timer = QTimer.singleShot(wait_time, self._process_jobs)
def _stop_timer(self):
if self._timer:
if self._timer.isActive():
self._timer.stop()
del self._timer
def wakeup(self):
self._start_timer(0)
def _process_jobs(self):
wait_seconds = super(QtScheduler, self)._process_jobs()
self._start_timer(wait_seconds)
from __future__ import absolute_import
from datetime import timedelta
from functools import wraps
from apscheduler.schedulers.base import BaseScheduler
from apscheduler.util import maybe_ref
try:
from tornado.ioloop import IOLoop
except ImportError: # pragma: nocover
raise ImportError('TornadoScheduler requires tornado installed')
def run_in_ioloop(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
self._ioloop.add_callback(func, self, *args, **kwargs)
return wrapper
class TornadoScheduler(BaseScheduler):
"""
A scheduler that runs on a Tornado IOLoop.
The default executor can run jobs based on native coroutines (``async def``).
=========== ===============================================================
``io_loop`` Tornado IOLoop instance to use (defaults to the global IO loop)
=========== ===============================================================
"""
_ioloop = None
_timeout = None
@run_in_ioloop
def shutdown(self, wait=True):
super(TornadoScheduler, self).shutdown(wait)
self._stop_timer()
def _configure(self, config):
self._ioloop = maybe_ref(config.pop('io_loop', None)) or IOLoop.current()
super(TornadoScheduler, self)._configure(config)
def _start_timer(self, wait_seconds):
self._stop_timer()
if wait_seconds is not None:
self._timeout = self._ioloop.add_timeout(timedelta(seconds=wait_seconds), self.wakeup)
def _stop_timer(self):
if self._timeout:
self._ioloop.remove_timeout(self._timeout)
del self._timeout
def _create_default_executor(self):
from apscheduler.executors.tornado import TornadoExecutor
return TornadoExecutor()
@run_in_ioloop
def wakeup(self):
self._stop_timer()
wait_seconds = self._process_jobs()
self._start_timer(wait_seconds)
from __future__ import absolute_import
from functools import wraps
from apscheduler.schedulers.base import BaseScheduler
from apscheduler.util import maybe_ref
try:
from twisted.internet import reactor as default_reactor
except ImportError: # pragma: nocover
raise ImportError('TwistedScheduler requires Twisted installed')
def run_in_reactor(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
self._reactor.callFromThread(func, self, *args, **kwargs)
return wrapper
class TwistedScheduler(BaseScheduler):
"""
A scheduler that runs on a Twisted reactor.
Extra options:
=========== ========================================================
``reactor`` Reactor instance to use (defaults to the global reactor)
=========== ========================================================
"""
_reactor = None
_delayedcall = None
def _configure(self, config):
self._reactor = maybe_ref(config.pop('reactor', default_reactor))
super(TwistedScheduler, self)._configure(config)
@run_in_reactor
def shutdown(self, wait=True):
super(TwistedScheduler, self).shutdown(wait)
self._stop_timer()
def _start_timer(self, wait_seconds):
self._stop_timer()
if wait_seconds is not None:
self._delayedcall = self._reactor.callLater(wait_seconds, self.wakeup)
def _stop_timer(self):
if self._delayedcall and self._delayedcall.active():
self._delayedcall.cancel()
del self._delayedcall
@run_in_reactor
def wakeup(self):
self._stop_timer()
wait_seconds = self._process_jobs()
self._start_timer(wait_seconds)
def _create_default_executor(self):
from apscheduler.executors.twisted import TwistedExecutor
return TwistedExecutor()
from abc import ABCMeta, abstractmethod
from datetime import timedelta
import random
import six
class BaseTrigger(six.with_metaclass(ABCMeta)):
"""Abstract base class that defines the interface that every trigger must implement."""
__slots__ = ()
@abstractmethod
def get_next_fire_time(self, previous_fire_time, now):
"""
Returns the next datetime to fire on, If no such datetime can be calculated, returns
``None``.
:param datetime.datetime previous_fire_time: the previous time the trigger was fired
:param datetime.datetime now: current datetime
"""
def _apply_jitter(self, next_fire_time, jitter, now):
"""
Randomize ``next_fire_time`` by adding or subtracting a random value (the jitter). If the
resulting datetime is in the past, returns the initial ``next_fire_time`` without jitter.
``next_fire_time - jitter <= result <= next_fire_time + jitter``
:param datetime.datetime|None next_fire_time: next fire time without jitter applied. If
``None``, returns ``None``.
:param int|None jitter: maximum number of seconds to add or subtract to
``next_fire_time``. If ``None`` or ``0``, returns ``next_fire_time``
:param datetime.datetime now: current datetime
:return datetime.datetime|None: next fire time with a jitter.
"""
if next_fire_time is None or not jitter:
return next_fire_time
next_fire_time_with_jitter = next_fire_time + timedelta(
seconds=random.uniform(-jitter, jitter))
if next_fire_time_with_jitter < now:
# Next fire time with jitter is in the past.
# Ignore jitter to avoid false misfire.
return next_fire_time
return next_fire_time_with_jitter
from apscheduler.triggers.base import BaseTrigger
from apscheduler.util import obj_to_ref, ref_to_obj
class BaseCombiningTrigger(BaseTrigger):
__slots__ = ('triggers', 'jitter')
def __init__(self, triggers, jitter=None):
self.triggers = triggers
self.jitter = jitter
def __getstate__(self):
return {
'version': 1,
'triggers': [(obj_to_ref(trigger.__class__), trigger.__getstate__())
for trigger in self.triggers],
'jitter': self.jitter
}
def __setstate__(self, state):
if state.get('version', 1) > 1:
raise ValueError(
'Got serialized data for version %s of %s, but only versions up to 1 can be '
'handled' % (state['version'], self.__class__.__name__))
self.jitter = state['jitter']
self.triggers = []
for clsref, state in state['triggers']:
cls = ref_to_obj(clsref)
trigger = cls.__new__(cls)
trigger.__setstate__(state)
self.triggers.append(trigger)
def __repr__(self):
return '<{}({}{})>'.format(self.__class__.__name__, self.triggers,
', jitter={}'.format(self.jitter) if self.jitter else '')
class AndTrigger(BaseCombiningTrigger):
"""
Always returns the earliest next fire time that all the given triggers can agree on.
The trigger is considered to be finished when any of the given triggers has finished its
schedule.
Trigger alias: ``and``
:param list triggers: triggers to combine
:param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most.
"""
__slots__ = ()
def get_next_fire_time(self, previous_fire_time, now):
while True:
fire_times = [trigger.get_next_fire_time(previous_fire_time, now)
for trigger in self.triggers]
if None in fire_times:
return None
elif min(fire_times) == max(fire_times):
return self._apply_jitter(fire_times[0], self.jitter, now)
else:
now = max(fire_times)
def __str__(self):
return 'and[{}]'.format(', '.join(str(trigger) for trigger in self.triggers))
class OrTrigger(BaseCombiningTrigger):
"""
Always returns the earliest next fire time produced by any of the given triggers.
The trigger is considered finished when all the given triggers have finished their schedules.
Trigger alias: ``or``
:param list triggers: triggers to combine
:param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most.
.. note:: Triggers that depends on the previous fire time, such as the interval trigger, may
seem to behave strangely since they are always passed the previous fire time produced by
any of the given triggers.
"""
__slots__ = ()
def get_next_fire_time(self, previous_fire_time, now):
fire_times = [trigger.get_next_fire_time(previous_fire_time, now)
for trigger in self.triggers]
fire_times = [fire_time for fire_time in fire_times if fire_time is not None]
if fire_times:
return self._apply_jitter(min(fire_times), self.jitter, now)
else:
return None
def __str__(self):
return 'or[{}]'.format(', '.join(str(trigger) for trigger in self.triggers))
from datetime import datetime, timedelta
from tzlocal import get_localzone
import six
from apscheduler.triggers.base import BaseTrigger
from apscheduler.triggers.cron.fields import (
BaseField, MonthField, WeekField, DayOfMonthField, DayOfWeekField, DEFAULT_VALUES)
from apscheduler.util import datetime_ceil, convert_to_datetime, datetime_repr, astimezone
class CronTrigger(BaseTrigger):
"""
Triggers when current time matches all specified time constraints,
similarly to how the UNIX cron scheduler works.
:param int|str year: 4-digit year
:param int|str month: month (1-12)
:param int|str day: day of the (1-31)
:param int|str week: ISO week (1-53)
:param int|str day_of_week: number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
:param int|str hour: hour (0-23)
:param int|str minute: minute (0-59)
:param int|str second: second (0-59)
:param datetime|str start_date: earliest possible date/time to trigger on (inclusive)
:param datetime|str end_date: latest possible date/time to trigger on (inclusive)
:param datetime.tzinfo|str timezone: time zone to use for the date/time calculations (defaults
to scheduler timezone)
:param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most.
.. note:: The first weekday is always **monday**.
"""
FIELD_NAMES = ('year', 'month', 'day', 'week', 'day_of_week', 'hour', 'minute', 'second')
FIELDS_MAP = {
'year': BaseField,
'month': MonthField,
'week': WeekField,
'day': DayOfMonthField,
'day_of_week': DayOfWeekField,
'hour': BaseField,
'minute': BaseField,
'second': BaseField
}
__slots__ = 'timezone', 'start_date', 'end_date', 'fields', 'jitter'
def __init__(self, year=None, month=None, day=None, week=None, day_of_week=None, hour=None,
minute=None, second=None, start_date=None, end_date=None, timezone=None,
jitter=None):
if timezone:
self.timezone = astimezone(timezone)
elif isinstance(start_date, datetime) and start_date.tzinfo:
self.timezone = start_date.tzinfo
elif isinstance(end_date, datetime) and end_date.tzinfo:
self.timezone = end_date.tzinfo
else:
self.timezone = get_localzone()
self.start_date = convert_to_datetime(start_date, self.timezone, 'start_date')
self.end_date = convert_to_datetime(end_date, self.timezone, 'end_date')
self.jitter = jitter
values = dict((key, value) for (key, value) in six.iteritems(locals())
if key in self.FIELD_NAMES and value is not None)
self.fields = []
assign_defaults = False
for field_name in self.FIELD_NAMES:
if field_name in values:
exprs = values.pop(field_name)
is_default = False
assign_defaults = not values
elif assign_defaults:
exprs = DEFAULT_VALUES[field_name]
is_default = True
else:
exprs = '*'
is_default = True
field_class = self.FIELDS_MAP[field_name]
field = field_class(field_name, exprs, is_default)
self.fields.append(field)
@classmethod
def from_crontab(cls, expr, timezone=None):
"""
Create a :class:`~CronTrigger` from a standard crontab expression.
See https://en.wikipedia.org/wiki/Cron for more information on the format accepted here.
:param expr: minute, hour, day of month, month, day of week
:param datetime.tzinfo|str timezone: time zone to use for the date/time calculations (
defaults to scheduler timezone)
:return: a :class:`~CronTrigger` instance
"""
values = expr.split()
if len(values) != 5:
raise ValueError('Wrong number of fields; got {}, expected 5'.format(len(values)))
return cls(minute=values[0], hour=values[1], day=values[2], month=values[3],
day_of_week=values[4], timezone=timezone)
def _increment_field_value(self, dateval, fieldnum):
"""
Increments the designated field and resets all less significant fields to their minimum
values.
:type dateval: datetime
:type fieldnum: int
:return: a tuple containing the new date, and the number of the field that was actually
incremented
:rtype: tuple
"""
values = {}
i = 0
while i < len(self.fields):
field = self.fields[i]
if not field.REAL:
if i == fieldnum:
fieldnum -= 1
i -= 1
else:
i += 1
continue
if i < fieldnum:
values[field.name] = field.get_value(dateval)
i += 1
elif i > fieldnum:
values[field.name] = field.get_min(dateval)
i += 1
else:
value = field.get_value(dateval)
maxval = field.get_max(dateval)
if value == maxval:
fieldnum -= 1
i -= 1
else:
values[field.name] = value + 1
i += 1
difference = datetime(**values) - dateval.replace(tzinfo=None)
return self.timezone.normalize(dateval + difference), fieldnum
def _set_field_value(self, dateval, fieldnum, new_value):
values = {}
for i, field in enumerate(self.fields):
if field.REAL:
if i < fieldnum:
values[field.name] = field.get_value(dateval)
elif i > fieldnum:
values[field.name] = field.get_min(dateval)
else:
values[field.name] = new_value
return self.timezone.localize(datetime(**values))
def get_next_fire_time(self, previous_fire_time, now):
if previous_fire_time:
start_date = min(now, previous_fire_time + timedelta(microseconds=1))
if start_date == previous_fire_time:
start_date += timedelta(microseconds=1)
else:
start_date = max(now, self.start_date) if self.start_date else now
fieldnum = 0
next_date = datetime_ceil(start_date).astimezone(self.timezone)
while 0 <= fieldnum < len(self.fields):
field = self.fields[fieldnum]
curr_value = field.get_value(next_date)
next_value = field.get_next_value(next_date)
if next_value is None:
# No valid value was found
next_date, fieldnum = self._increment_field_value(next_date, fieldnum - 1)
elif next_value > curr_value:
# A valid, but higher than the starting value, was found
if field.REAL:
next_date = self._set_field_value(next_date, fieldnum, next_value)
fieldnum += 1
else:
next_date, fieldnum = self._increment_field_value(next_date, fieldnum)
else:
# A valid value was found, no changes necessary
fieldnum += 1
# Return if the date has rolled past the end date
if self.end_date and next_date > self.end_date:
return None
if fieldnum >= 0:
next_date = self._apply_jitter(next_date, self.jitter, now)
return min(next_date, self.end_date) if self.end_date else next_date
def __getstate__(self):
return {
'version': 2,
'timezone': self.timezone,
'start_date': self.start_date,
'end_date': self.end_date,
'fields': self.fields,
'jitter': self.jitter,
}
def __setstate__(self, state):
# This is for compatibility with APScheduler 3.0.x
if isinstance(state, tuple):
state = state[1]
if state.get('version', 1) > 2:
raise ValueError(
'Got serialized data for version %s of %s, but only versions up to 2 can be '
'handled' % (state['version'], self.__class__.__name__))
self.timezone = state['timezone']
self.start_date = state['start_date']
self.end_date = state['end_date']
self.fields = state['fields']
self.jitter = state.get('jitter')
def __str__(self):
options = ["%s='%s'" % (f.name, f) for f in self.fields if not f.is_default]
return 'cron[%s]' % (', '.join(options))
def __repr__(self):
options = ["%s='%s'" % (f.name, f) for f in self.fields if not f.is_default]
if self.start_date:
options.append("start_date=%r" % datetime_repr(self.start_date))
if self.end_date:
options.append("end_date=%r" % datetime_repr(self.end_date))
if self.jitter:
options.append('jitter=%s' % self.jitter)
return "<%s (%s, timezone='%s')>" % (
self.__class__.__name__, ', '.join(options), self.timezone)
"""This module contains the expressions applicable for CronTrigger's fields."""
from calendar import monthrange
import re
from apscheduler.util import asint
__all__ = ('AllExpression', 'RangeExpression', 'WeekdayRangeExpression',
'WeekdayPositionExpression', 'LastDayOfMonthExpression')
WEEKDAYS = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun']
MONTHS = ['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec']
class AllExpression(object):
value_re = re.compile(r'\*(?:/(?P<step>\d+))?$')
def __init__(self, step=None):
self.step = asint(step)
if self.step == 0:
raise ValueError('Increment must be higher than 0')
def validate_range(self, field_name):
from apscheduler.triggers.cron.fields import MIN_VALUES, MAX_VALUES
value_range = MAX_VALUES[field_name] - MIN_VALUES[field_name]
if self.step and self.step > value_range:
raise ValueError('the step value ({}) is higher than the total range of the '
'expression ({})'.format(self.step, value_range))
def get_next_value(self, date, field):
start = field.get_value(date)
minval = field.get_min(date)
maxval = field.get_max(date)
start = max(start, minval)
if not self.step:
next = start
else:
distance_to_next = (self.step - (start - minval)) % self.step
next = start + distance_to_next
if next <= maxval:
return next
def __eq__(self, other):
return isinstance(other, self.__class__) and self.step == other.step
def __str__(self):
if self.step:
return '*/%d' % self.step
return '*'
def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, self.step)
class RangeExpression(AllExpression):
value_re = re.compile(
r'(?P<first>\d+)(?:-(?P<last>\d+))?(?:/(?P<step>\d+))?$')
def __init__(self, first, last=None, step=None):
super(RangeExpression, self).__init__(step)
first = asint(first)
last = asint(last)
if last is None and step is None:
last = first
if last is not None and first > last:
raise ValueError('The minimum value in a range must not be higher than the maximum')
self.first = first
self.last = last
def validate_range(self, field_name):
from apscheduler.triggers.cron.fields import MIN_VALUES, MAX_VALUES
super(RangeExpression, self).validate_range(field_name)
if self.first < MIN_VALUES[field_name]:
raise ValueError('the first value ({}) is lower than the minimum value ({})'
.format(self.first, MIN_VALUES[field_name]))
if self.last is not None and self.last > MAX_VALUES[field_name]:
raise ValueError('the last value ({}) is higher than the maximum value ({})'
.format(self.last, MAX_VALUES[field_name]))
value_range = (self.last or MAX_VALUES[field_name]) - self.first
if self.step and self.step > value_range:
raise ValueError('the step value ({}) is higher than the total range of the '
'expression ({})'.format(self.step, value_range))
def get_next_value(self, date, field):
startval = field.get_value(date)
minval = field.get_min(date)
maxval = field.get_max(date)
# Apply range limits
minval = max(minval, self.first)
maxval = min(maxval, self.last) if self.last is not None else maxval
nextval = max(minval, startval)
# Apply the step if defined
if self.step:
distance_to_next = (self.step - (nextval - minval)) % self.step
nextval += distance_to_next
return nextval if nextval <= maxval else None
def __eq__(self, other):
return (isinstance(other, self.__class__) and self.first == other.first and
self.last == other.last)
def __str__(self):
if self.last != self.first and self.last is not None:
range = '%d-%d' % (self.first, self.last)
else:
range = str(self.first)
if self.step:
return '%s/%d' % (range, self.step)
return range
def __repr__(self):
args = [str(self.first)]
if self.last != self.first and self.last is not None or self.step:
args.append(str(self.last))
if self.step:
args.append(str(self.step))
return "%s(%s)" % (self.__class__.__name__, ', '.join(args))
class MonthRangeExpression(RangeExpression):
value_re = re.compile(r'(?P<first>[a-z]+)(?:-(?P<last>[a-z]+))?', re.IGNORECASE)
def __init__(self, first, last=None):
try:
first_num = MONTHS.index(first.lower()) + 1
except ValueError:
raise ValueError('Invalid month name "%s"' % first)
if last:
try:
last_num = MONTHS.index(last.lower()) + 1
except ValueError:
raise ValueError('Invalid month name "%s"' % last)
else:
last_num = None
super(MonthRangeExpression, self).__init__(first_num, last_num)
def __str__(self):
if self.last != self.first and self.last is not None:
return '%s-%s' % (MONTHS[self.first - 1], MONTHS[self.last - 1])
return MONTHS[self.first - 1]
def __repr__(self):
args = ["'%s'" % MONTHS[self.first]]
if self.last != self.first and self.last is not None:
args.append("'%s'" % MONTHS[self.last - 1])
return "%s(%s)" % (self.__class__.__name__, ', '.join(args))
class WeekdayRangeExpression(RangeExpression):
value_re = re.compile(r'(?P<first>[a-z]+)(?:-(?P<last>[a-z]+))?', re.IGNORECASE)
def __init__(self, first, last=None):
try:
first_num = WEEKDAYS.index(first.lower())
except ValueError:
raise ValueError('Invalid weekday name "%s"' % first)
if last:
try:
last_num = WEEKDAYS.index(last.lower())
except ValueError:
raise ValueError('Invalid weekday name "%s"' % last)
else:
last_num = None
super(WeekdayRangeExpression, self).__init__(first_num, last_num)
def __str__(self):
if self.last != self.first and self.last is not None:
return '%s-%s' % (WEEKDAYS[self.first], WEEKDAYS[self.last])
return WEEKDAYS[self.first]
def __repr__(self):
args = ["'%s'" % WEEKDAYS[self.first]]
if self.last != self.first and self.last is not None:
args.append("'%s'" % WEEKDAYS[self.last])
return "%s(%s)" % (self.__class__.__name__, ', '.join(args))
class WeekdayPositionExpression(AllExpression):
options = ['1st', '2nd', '3rd', '4th', '5th', 'last']
value_re = re.compile(r'(?P<option_name>%s) +(?P<weekday_name>(?:\d+|\w+))' %
'|'.join(options), re.IGNORECASE)
def __init__(self, option_name, weekday_name):
super(WeekdayPositionExpression, self).__init__(None)
try:
self.option_num = self.options.index(option_name.lower())
except ValueError:
raise ValueError('Invalid weekday position "%s"' % option_name)
try:
self.weekday = WEEKDAYS.index(weekday_name.lower())
except ValueError:
raise ValueError('Invalid weekday name "%s"' % weekday_name)
def get_next_value(self, date, field):
# Figure out the weekday of the month's first day and the number of days in that month
first_day_wday, last_day = monthrange(date.year, date.month)
# Calculate which day of the month is the first of the target weekdays
first_hit_day = self.weekday - first_day_wday + 1
if first_hit_day <= 0:
first_hit_day += 7
# Calculate what day of the month the target weekday would be
if self.option_num < 5:
target_day = first_hit_day + self.option_num * 7
else:
target_day = first_hit_day + ((last_day - first_hit_day) // 7) * 7
if target_day <= last_day and target_day >= date.day:
return target_day
def __eq__(self, other):
return (super(WeekdayPositionExpression, self).__eq__(other) and
self.option_num == other.option_num and self.weekday == other.weekday)
def __str__(self):
return '%s %s' % (self.options[self.option_num], WEEKDAYS[self.weekday])
def __repr__(self):
return "%s('%s', '%s')" % (self.__class__.__name__, self.options[self.option_num],
WEEKDAYS[self.weekday])
class LastDayOfMonthExpression(AllExpression):
value_re = re.compile(r'last', re.IGNORECASE)
def __init__(self):
super(LastDayOfMonthExpression, self).__init__(None)
def get_next_value(self, date, field):
return monthrange(date.year, date.month)[1]
def __str__(self):
return 'last'
def __repr__(self):
return "%s()" % self.__class__.__name__
"""Fields represent CronTrigger options which map to :class:`~datetime.datetime` fields."""
from calendar import monthrange
import re
import six
from apscheduler.triggers.cron.expressions import (
AllExpression, RangeExpression, WeekdayPositionExpression, LastDayOfMonthExpression,
WeekdayRangeExpression, MonthRangeExpression)
__all__ = ('MIN_VALUES', 'MAX_VALUES', 'DEFAULT_VALUES', 'BaseField', 'WeekField',
'DayOfMonthField', 'DayOfWeekField')
MIN_VALUES = {'year': 1970, 'month': 1, 'day': 1, 'week': 1, 'day_of_week': 0, 'hour': 0,
'minute': 0, 'second': 0}
MAX_VALUES = {'year': 9999, 'month': 12, 'day': 31, 'week': 53, 'day_of_week': 6, 'hour': 23,
'minute': 59, 'second': 59}
DEFAULT_VALUES = {'year': '*', 'month': 1, 'day': 1, 'week': '*', 'day_of_week': '*', 'hour': 0,
'minute': 0, 'second': 0}
SEPARATOR = re.compile(' *, *')
class BaseField(object):
REAL = True
COMPILERS = [AllExpression, RangeExpression]
def __init__(self, name, exprs, is_default=False):
self.name = name
self.is_default = is_default
self.compile_expressions(exprs)
def get_min(self, dateval):
return MIN_VALUES[self.name]
def get_max(self, dateval):
return MAX_VALUES[self.name]
def get_value(self, dateval):
return getattr(dateval, self.name)
def get_next_value(self, dateval):
smallest = None
for expr in self.expressions:
value = expr.get_next_value(dateval, self)
if smallest is None or (value is not None and value < smallest):
smallest = value
return smallest
def compile_expressions(self, exprs):
self.expressions = []
# Split a comma-separated expression list, if any
for expr in SEPARATOR.split(str(exprs).strip()):
self.compile_expression(expr)
def compile_expression(self, expr):
for compiler in self.COMPILERS:
match = compiler.value_re.match(expr)
if match:
compiled_expr = compiler(**match.groupdict())
try:
compiled_expr.validate_range(self.name)
except ValueError as e:
exc = ValueError('Error validating expression {!r}: {}'.format(expr, e))
six.raise_from(exc, None)
self.expressions.append(compiled_expr)
return
raise ValueError('Unrecognized expression "%s" for field "%s"' % (expr, self.name))
def __eq__(self, other):
return isinstance(self, self.__class__) and self.expressions == other.expressions
def __str__(self):
expr_strings = (str(e) for e in self.expressions)
return ','.join(expr_strings)
def __repr__(self):
return "%s('%s', '%s')" % (self.__class__.__name__, self.name, self)
class WeekField(BaseField):
REAL = False
def get_value(self, dateval):
return dateval.isocalendar()[1]
class DayOfMonthField(BaseField):
COMPILERS = BaseField.COMPILERS + [WeekdayPositionExpression, LastDayOfMonthExpression]
def get_max(self, dateval):
return monthrange(dateval.year, dateval.month)[1]
class DayOfWeekField(BaseField):
REAL = False
COMPILERS = BaseField.COMPILERS + [WeekdayRangeExpression]
def get_value(self, dateval):
return dateval.weekday()
class MonthField(BaseField):
COMPILERS = BaseField.COMPILERS + [MonthRangeExpression]
from datetime import datetime
from tzlocal import get_localzone
from apscheduler.triggers.base import BaseTrigger
from apscheduler.util import convert_to_datetime, datetime_repr, astimezone
class DateTrigger(BaseTrigger):
"""
Triggers once on the given datetime. If ``run_date`` is left empty, current time is used.
:param datetime|str run_date: the date/time to run the job at
:param datetime.tzinfo|str timezone: time zone for ``run_date`` if it doesn't have one already
"""
__slots__ = 'run_date'
def __init__(self, run_date=None, timezone=None):
timezone = astimezone(timezone) or get_localzone()
if run_date is not None:
self.run_date = convert_to_datetime(run_date, timezone, 'run_date')
else:
self.run_date = datetime.now(timezone)
def get_next_fire_time(self, previous_fire_time, now):
return self.run_date if previous_fire_time is None else None
def __getstate__(self):
return {
'version': 1,
'run_date': self.run_date
}
def __setstate__(self, state):
# This is for compatibility with APScheduler 3.0.x
if isinstance(state, tuple):
state = state[1]
if state.get('version', 1) > 1:
raise ValueError(
'Got serialized data for version %s of %s, but only version 1 can be handled' %
(state['version'], self.__class__.__name__))
self.run_date = state['run_date']
def __str__(self):
return 'date[%s]' % datetime_repr(self.run_date)
def __repr__(self):
return "<%s (run_date='%s')>" % (self.__class__.__name__, datetime_repr(self.run_date))
from datetime import timedelta, datetime
from math import ceil
from tzlocal import get_localzone
from apscheduler.triggers.base import BaseTrigger
from apscheduler.util import convert_to_datetime, timedelta_seconds, datetime_repr, astimezone
class IntervalTrigger(BaseTrigger):
"""
Triggers on specified intervals, starting on ``start_date`` if specified, ``datetime.now()`` +
interval otherwise.
:param int weeks: number of weeks to wait
:param int days: number of days to wait
:param int hours: number of hours to wait
:param int minutes: number of minutes to wait
:param int seconds: number of seconds to wait
:param datetime|str start_date: starting point for the interval calculation
:param datetime|str end_date: latest possible date/time to trigger on
:param datetime.tzinfo|str timezone: time zone to use for the date/time calculations
:param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most.
"""
__slots__ = 'timezone', 'start_date', 'end_date', 'interval', 'interval_length', 'jitter'
def __init__(self, weeks=0, days=0, hours=0, minutes=0, seconds=0, start_date=None,
end_date=None, timezone=None, jitter=None):
self.interval = timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes,
seconds=seconds)
self.interval_length = timedelta_seconds(self.interval)
if self.interval_length == 0:
self.interval = timedelta(seconds=1)
self.interval_length = 1
if timezone:
self.timezone = astimezone(timezone)
elif isinstance(start_date, datetime) and start_date.tzinfo:
self.timezone = start_date.tzinfo
elif isinstance(end_date, datetime) and end_date.tzinfo:
self.timezone = end_date.tzinfo
else:
self.timezone = get_localzone()
start_date = start_date or (datetime.now(self.timezone) + self.interval)
self.start_date = convert_to_datetime(start_date, self.timezone, 'start_date')
self.end_date = convert_to_datetime(end_date, self.timezone, 'end_date')
self.jitter = jitter
def get_next_fire_time(self, previous_fire_time, now):
if previous_fire_time:
next_fire_time = previous_fire_time + self.interval
elif self.start_date > now:
next_fire_time = self.start_date
else:
timediff_seconds = timedelta_seconds(now - self.start_date)
next_interval_num = int(ceil(timediff_seconds / self.interval_length))
next_fire_time = self.start_date + self.interval * next_interval_num
if self.jitter is not None:
next_fire_time = self._apply_jitter(next_fire_time, self.jitter, now)
if not self.end_date or next_fire_time <= self.end_date:
return self.timezone.normalize(next_fire_time)
def __getstate__(self):
return {
'version': 2,
'timezone': self.timezone,
'start_date': self.start_date,
'end_date': self.end_date,
'interval': self.interval,
'jitter': self.jitter,
}
def __setstate__(self, state):
# This is for compatibility with APScheduler 3.0.x
if isinstance(state, tuple):
state = state[1]
if state.get('version', 1) > 2:
raise ValueError(
'Got serialized data for version %s of %s, but only versions up to 2 can be '
'handled' % (state['version'], self.__class__.__name__))
self.timezone = state['timezone']
self.start_date = state['start_date']
self.end_date = state['end_date']
self.interval = state['interval']
self.interval_length = timedelta_seconds(self.interval)
self.jitter = state.get('jitter')
def __str__(self):
return 'interval[%s]' % str(self.interval)
def __repr__(self):
options = ['interval=%r' % self.interval, 'start_date=%r' % datetime_repr(self.start_date)]
if self.end_date:
options.append("end_date=%r" % datetime_repr(self.end_date))
if self.jitter:
options.append('jitter=%s' % self.jitter)
return "<%s (%s, timezone='%s')>" % (
self.__class__.__name__, ', '.join(options), self.timezone)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment