2014-01-30 16:21:38 +11:00
# Initially we used sidetiq, this was a problem:
#
# 1. No mechnism to add "randomisation" into job execution
# 2. No stats about previous runs or failures
# 3. Dependency on ice_cube gem causes runaway CPU
module Scheduler
class Manager
2014-02-06 10:14:41 +11:00
extend Sidekiq :: ExceptionHandler
2014-01-30 16:21:38 +11:00
attr_accessor :random_ratio , :redis
class Runner
def initialize ( manager )
@queue = Queue . new
@manager = manager
@thread = Thread . new do
while true
klass = @queue . deq
failed = false
start = Time . now . to_f
2014-02-06 10:14:41 +11:00
info = @manager . schedule_info ( klass )
2014-01-30 16:21:38 +11:00
begin
2014-02-06 10:14:41 +11:00
info . prev_result = " RUNNING "
info . write!
2014-01-30 16:21:38 +11:00
klass . new . perform
2014-02-06 10:14:41 +11:00
rescue = > e
Scheduler :: Manager . handle_exception ( e )
2014-01-30 16:21:38 +11:00
failed = true
end
duration = ( ( Time . now . to_f - start ) * 1000 ) . to_i
info . prev_duration = duration
info . prev_result = failed ? " FAILED " : " OK "
info . write!
end
end
end
def stop!
@thread . kill
end
def enq ( klass )
@queue << klass
end
def wait_till_done
2014-02-07 11:21:28 +11:00
while ! @queue . empty? && ! ( @queue . num_waiting > 0 )
2014-01-30 16:21:38 +11:00
sleep 0 . 001
end
end
end
2014-02-06 10:14:41 +11:00
def self . without_runner ( redis = nil )
self . new ( redis , true )
end
def initialize ( redis = nil , skip_runner = false )
2014-01-30 16:21:38 +11:00
@redis = $redis || redis
@random_ratio = 0 . 1
2014-02-06 10:14:41 +11:00
unless skip_runner
@runner = Runner . new ( self )
self . class . current = self
end
2014-01-30 16:21:38 +11:00
@manager_id = SecureRandom . hex
end
2014-02-06 10:14:41 +11:00
def self . current
@current
end
def self . current = ( manager )
@current = manager
end
2014-01-30 16:21:38 +11:00
def schedule_info ( klass )
ScheduleInfo . new ( klass , self )
end
def next_run ( klass )
schedule_info ( klass ) . next_run
end
def ensure_schedule! ( klass )
lock do
schedule_info ( klass ) . schedule!
end
end
def remove ( klass )
lock do
schedule_info ( klass ) . del!
end
end
def tick
lock do
( key , due ) , _ = redis . zrange Manager . queue_key , 0 , 0 , withscores : true
2014-02-06 10:14:41 +11:00
return unless key
2014-01-30 16:21:38 +11:00
if due . to_i < = Time . now . to_i
2014-02-06 10:14:41 +11:00
klass = begin
key . constantize
rescue NameError
nil
end
return unless klass
2014-01-30 16:21:38 +11:00
info = schedule_info ( klass )
info . prev_run = Time . now . to_i
2014-02-06 10:14:41 +11:00
info . prev_result = " QUEUED "
info . prev_duration = - 1
2014-01-30 16:21:38 +11:00
info . next_run = nil
info . schedule!
@runner . enq ( klass )
end
end
end
def blocking_tick
tick
@runner . wait_till_done
end
def stop!
@runner . stop!
2014-02-06 10:14:41 +11:00
self . class . current = nil
2014-01-30 16:21:38 +11:00
end
def lock
got_lock = false
lock_key = Manager . lock_key
while ( ! got_lock )
begin
if redis . setnx lock_key , Time . now . to_i + 60
redis . expire lock_key , 60
got_lock = true
else
begin
redis . watch lock_key
time = redis . get Manager . lock_key
if time && time . to_i < Time . now . to_i
got_lock = redis . multi do
redis . set Manager . lock_key , Time . now . to_i + 60
end
end
ensure
redis . unwatch
end
end
end
end
yield
ensure
redis . del Manager . lock_key
end
2014-02-06 10:14:41 +11:00
def self . discover_schedules
schedules = [ ]
ObjectSpace . each_object ( Scheduler :: Schedule ) do | schedule |
schedules << schedule if schedule . scheduled?
end
schedules
end
2014-01-30 16:21:38 +11:00
def self . lock_key
" _scheduler_lock_ "
end
def self . queue_key
" _scheduler_queue_ "
end
def self . schedule_key ( klass )
" _scheduler_ #{ klass } "
end
end
end