""" A module for scheduling arbitrary callables to run at given times or intervals, modeled on the naviserver API. Scheduler runs in its own thread; callables run in this same thread, so if you have an unusually long callable to run you may wish to give it its own thread, for instance, schedule(3600, lambda: threading.Thread(target=longcallable).start()) Scheduler does not provide subsecond resolution. Public functions are threadsafe. """ import atexit, sys, threading, time, traceback logger = lambda s: sys.stderr.write('%s\n' % s) def debuglogger(s): import spyce spyce.DEBUG(s) def schedule(interval, callable, once=False): """ Schedules callable to be run every interval seconds. Returns the scheduled Task object. """ if interval < 1: raise Exception("Interval must be postitive") return _insertsorted(Task(time.time() + interval, interval, callable, once)) def _nearest_epoch(hours, minutes): L = list(time.localtime(time.time())) L[3] = hours L[4] = minutes L[5] = 0 epoch = time.mktime(L) if epoch < time.time(): epoch += 24 * 3600 return epoch def schedule_daily(hours, minutes, callable, once=False): """ Schedules callable to be run at hours:minutes every day. (Hours is a 24-hour format.) Returns the scheduled Task object. """ if hours > 23 or hours < 0: raise ValueError("Invalid hours %s" % hours) if minutes > 59 or minutes < 0: raise ValueError("Invalid minutes %s" % minutes) epoch = _nearest_epoch(hours, minutes) return _insertsorted(Task(epoch, 24 * 3600, callable, once)) def schedule_weekly(day, hours, minutes, callable, once=False): """ Schedules callable to be run at hours:minutes on the given zero-based day of the week. (Monday is 0.) Returns the scheduled Task object. """ if day > 6 or day < 0: raise ValueError("Invalid day %s" % day) if hours > 23 or hours < 0: raise ValueError("Invalid hours %s" % hours) if minutes > 59 or minutes < 0: raise ValueError("Invalid minutes %s" % minutes) epoch = _nearest_epoch(hours, minutes) while time.localtime(epoch)[6] != day: epoch += 24 * 3600 return _insertsorted(Task(epoch, 7 * 24 * 3600, callable, once)) def unschedule(task): """ Removes the given task from the scheduling queue. """ _qlock.acquire() try: _queue.remove(task) finally: _qlock.release() class Task: """ Instantiated by the schedule methods. Instance variables: nextrun: epoch seconds at which to run next interval: seconds before repeating callable: function to invoke last: if True, will be unscheduled after nextrun (Note that by manually setting last on a Task instance, you can cause it to run an arbitrary number of times.) """ def __init__(self, firstrun, interval, callable, once): self.nextrun = firstrun self.interval = interval self.callable = callable self.last = once def __repr__(self): return 'Task(nextrun=%r, interval=%d, callable=%s, last=%s)' \ % (time.asctime(time.localtime(self.nextrun)), self.interval, self.callable, self.last) _qlock = threading.RLock() # (we don't use a Queue object here since we need to do our own locking anyway) _queue = [] def _insertsorted(task): _qlock.acquire() i = len(_queue) while i > 0: if task.nextrun < _queue[i - 1].nextrun: break i -= 1 _queue.insert(i, task) _qlock.release() return task _keepgoing = True _paused = False def _process(): """True if a task was run""" try: _qlock.acquire() if not len(_queue): return False if _queue[-1].nextrun > time.time(): return False task = _queue.pop() finally: _qlock.release() debuglogger('running scheduled task %s' % task.callable) try: task.callable() except Exception: logger(traceback.format_exc()) if not task.last: task.nextrun = max(task.nextrun + task.interval, time.time()) _insertsorted(task) return True def _loop(): sleep = False while _keepgoing: if sleep: time.sleep(1) if _paused: sleep = True continue sleep = not _process() debuglogger("Scheduler thread has stopped") _thread = threading.Thread(target=_loop, name='spyce-scheduler') _thread.start() def pause(): """Temporarily suspend running scheduled tasks""" _paused = True def unpause(): """ Resume running scheduled tasks. If a task came due while it was paused, it will run immediately after unpausing. """ _paused = False def _cleanup(): debuglogger('Waiting for scheduler thread...') global _keepgoing _keepgoing = False _thread.join() atexit.register(_cleanup) # yes, I need more and better tests... you're welcome to add some :) if __name__ == "__main__": d = {} def foo(): d['test'] = 1 print schedule(1, lambda: threading.Thread(target=foo).start(), True) while not d: time.sleep(1) print d assert not _queue d = {} hours, minutes, _, day = time.localtime(time.time() + 61)[3:7] print schedule_weekly(day, hours, minutes, foo) print '(waiting... could be up to 60 seconds)' while not d: time.sleep(1) print d assert len(_queue) == 1