[1-60].minute [1-24].hour [1-31].day [1-12].month jan feb mar apr may jun jul aug sep oct nov dec andall of those full month names(case insensitive) sunday, monday, tuesday, wednesday, thursday, friday, saturday weekday, weekend (case insensitive) [1].year
当然你也可以用原始的crontab的时间表示方法来表示,如下:
job = Job('demo', every='1,2 5,6 * * 3,4')
其中还可以指定一些特殊的值,如果指定这些值则at参数将会被忽略:
1 2 3 4 5 6 7
"yearly"# Run once a year at midnight on the morning of January 1 "monthly"# Run once a month at midnight on the morning of the first day # of the month "weekly"# Run once a week at midnight on Sunday morning "daily"# Run once a day at midnight "hourly"# Run once an hour at the beginning of the hour "reboot"# Run at startup
at参数则是指定于合适的时间运行,具体实例
1 2 3
job = Job('onejob', every='1.day', at='hour.12 minute.15 minute.45') # or even better job = Job('onejob', every='1.day', at='12:15 12:45')
classscheduler: def__init__(self, timefunc, delayfunc): """Initialize a new instance, passing the time and delay functions""" self._queue = [] # 需要运行的事件 self.timefunc = timefunc self.delayfunc = delayfunc
defenterabs(self, time, priority, action, argument): """Enter a new event in the queue at an absolute time. Returns an ID for the event which can be used to remove it, if necessary. """ event = Event(time, priority, action, argument) heapq.heappush(self._queue, event) return event # The ID
defenter(self, delay, priority, action, argument): """A variant that specifies the time as a relative time. This is actually the more commonly used interface. """ time = self.timefunc() + delay return self.enterabs(time, priority, action, argument)
defrun(self): # localize variable access to minimize overhead # and to improve thread safety q = self._queue delayfunc = self.delayfunc timefunc = self.timefunc pop = heapq.heappop while q: time, priority, action, argument = checked_event = q[0] now = timefunc() if now < time: delayfunc(time - now) # 相当于死循环中的sleep else: event = pop(q) # Verify that the event was not removed or altered # by another thread after we last looked at q[0]. if event is checked_event: action(*argument) delayfunc(0) # Let other threads run else: heapq.heappush(q, event)
defadd_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None, misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined, next_run_time=undefined, jobstore='default', executor='default', replace_existing=False, **trigger_args): job_kwargs = { 'trigger': self._create_trigger(trigger, trigger_args), # 在此处指定对应的trigger参数 'executor': executor, 'func': func, 'args': tuple(args) if args isnotNoneelse (), 'kwargs': dict(kwargs) if kwargs isnotNoneelse {}, 'id': id, 'name': name, 'misfire_grace_time': misfire_grace_time, 'coalesce': coalesce, 'max_instances': max_instances, 'next_run_time': next_run_time } job_kwargs = dict((key, value) for key, value in six.iteritems(job_kwargs) if value isnot undefined) job = Job(self, **job_kwargs)
# Don't really add jobs to job stores before the scheduler is up and running with self._jobstores_lock: ifnot self.running: self._pending_jobs.append((job, jobstore, replace_existing)) self._logger.info('Adding job tentatively -- it will be properly scheduled when ' 'the scheduler starts') else: self._real_add_job(job, jobstore, replace_existing, True)