2014-01-30 16:21:38 +11:00
# Initially we used sidetiq, this was a problem:
#
# 1. No mechnism to add "randomisation" into job execution
# 2. No stats about previous runs or failures
# 3. Dependency on ice_cube gem causes runaway CPU
2014-04-11 11:13:33 +05:30
require_dependency 'distributed_mutex'
2014-01-30 16:21:38 +11:00
module Scheduler
class Manager
2016-05-30 12:28:05 +10:00
attr_accessor :random_ratio , :redis , :enable_stats
2014-01-30 16:21:38 +11:00
class Runner
def initialize ( manager )
2016-05-30 13:59:39 +10:00
@stopped = false
2014-02-12 13:32:34 +11:00
@mutex = Mutex . new
2014-01-30 16:21:38 +11:00
@queue = Queue . new
@manager = manager
2014-02-12 13:32:34 +11:00
@reschedule_orphans_thread = Thread . new do
2016-05-30 13:59:39 +10:00
while ! @stopped
2014-02-12 13:32:34 +11:00
sleep 1 . minute
@mutex . synchronize do
reschedule_orphans
2014-01-30 16:21:38 +11:00
end
end
end
2014-02-12 13:32:34 +11:00
@keep_alive_thread = Thread . new do
2016-05-30 13:59:39 +10:00
while ! @stopped
2014-02-12 13:32:34 +11:00
@mutex . synchronize do
keep_alive
end
sleep ( @manager . keep_alive_duration / 2 )
end
end
@thread = Thread . new do
2016-05-30 13:59:39 +10:00
while ! @stopped
if @manager . enable_stats
begin
RailsMultisite :: ConnectionManagement . establish_connection ( db : " default " )
process_queue
ensure
ActiveRecord :: Base . connection_handler . clear_active_connections!
end
else
process_queue
end
2014-02-12 13:32:34 +11:00
end
end
end
def keep_alive
@manager . keep_alive
rescue = > ex
2015-02-09 12:47:46 -08:00
Discourse . handle_job_exception ( ex , { message : " Scheduling manager keep-alive " } )
2014-02-12 13:32:34 +11:00
end
def reschedule_orphans
@manager . reschedule_orphans!
rescue = > ex
2015-02-09 12:47:46 -08:00
Discourse . handle_job_exception ( ex , { message : " Scheduling manager orphan rescheduler " } )
2014-02-12 13:32:34 +11:00
end
2016-05-30 11:38:08 +10:00
def hostname
@hostname || = begin
` hostname `
rescue
" unknown "
end
end
2014-02-12 13:32:34 +11:00
def process_queue
klass = @queue . deq
2016-05-30 13:59:39 +10:00
return unless klass
2014-02-12 13:32:34 +11:00
# hack alert, I need to both deq and set @running atomically.
@running = true
failed = false
start = Time . now . to_f
info = @mutex . synchronize { @manager . schedule_info ( klass ) }
2016-05-30 11:38:08 +10:00
stat = nil
2014-02-12 13:32:34 +11:00
begin
info . prev_result = " RUNNING "
@mutex . synchronize { info . write! }
2016-05-30 12:28:05 +10:00
if @manager . enable_stats
stat = SchedulerStat . create! (
name : klass . to_s ,
hostname : hostname ,
pid : Process . pid ,
started_at : Time . zone . now ,
live_slots_start : GC . stat [ :heap_live_slots ]
)
end
2014-02-12 13:32:34 +11:00
klass . new . perform
2014-07-17 13:22:46 -07:00
rescue Jobs :: HandledExceptionWrapper
2014-07-17 15:19:58 -07:00
# Discourse.handle_exception was already called, and we don't have any extra info to give
2014-07-17 13:22:46 -07:00
failed = true
2014-02-12 13:32:34 +11:00
rescue = > e
2015-02-09 12:47:46 -08:00
Discourse . handle_job_exception ( e , { message : " Running a scheduled job " , job : klass } )
2014-02-12 13:32:34 +11:00
failed = true
end
duration = ( ( Time . now . to_f - start ) * 1000 ) . to_i
info . prev_duration = duration
info . prev_result = failed ? " FAILED " : " OK "
info . current_owner = nil
2016-05-30 12:28:05 +10:00
if stat
stat . update_columns (
duration_ms : duration ,
live_slots_finish : GC . stat [ :heap_live_slots ] ,
success : ! failed
)
end
2014-02-12 13:32:34 +11:00
attempts ( 3 ) do
@mutex . synchronize { info . write! }
end
rescue = > ex
2015-02-09 12:47:46 -08:00
Discourse . handle_job_exception ( ex , { message : " Processing scheduled job queue " } )
2014-02-12 13:32:34 +11:00
ensure
@running = false
2014-01-30 16:21:38 +11:00
end
def stop!
2014-02-12 13:32:34 +11:00
@mutex . synchronize do
2016-05-30 13:59:39 +10:00
@stopped = true
2014-02-12 13:32:34 +11:00
@keep_alive_thread . kill
@reschedule_orphans_thread . kill
2016-05-30 13:59:39 +10:00
enq ( nil )
Thread . new do
sleep 5
@thread . kill
end
2014-02-12 13:32:34 +11:00
end
2014-01-30 16:21:38 +11:00
end
def enq ( klass )
@queue << klass
end
def wait_till_done
2014-02-07 11:21:28 +11:00
while ! @queue . empty? && ! ( @queue . num_waiting > 0 )
2014-01-30 16:21:38 +11:00
sleep 0 . 001
end
2014-02-12 13:32:34 +11:00
# this is a hack, but is only used for test anyway
sleep 0 . 001
while @running
sleep 0 . 001
end
2014-01-30 16:21:38 +11:00
end
2014-02-12 13:32:34 +11:00
def attempts ( n )
n . times {
begin
yield ; break
rescue
sleep Random . rand
end
}
end
2014-01-30 16:21:38 +11:00
end
2014-02-06 10:14:41 +11:00
def self . without_runner ( redis = nil )
2015-06-26 14:02:17 +10:00
self . new ( redis , skip_runner : true )
2014-02-06 10:14:41 +11:00
end
2015-06-26 13:32:08 +10:00
def initialize ( redis = nil , options = nil )
2014-01-30 16:21:38 +11:00
@redis = $redis || redis
@random_ratio = 0 . 1
2015-06-26 13:32:08 +10:00
unless options && options [ :skip_runner ]
2014-02-06 10:14:41 +11:00
@runner = Runner . new ( self )
self . class . current = self
end
2015-06-26 13:32:08 +10:00
@hostname = options && options [ :hostname ]
2014-01-30 16:21:38 +11:00
@manager_id = SecureRandom . hex
2016-05-30 12:28:05 +10:00
if options && options . key? ( :enable_stats )
@enable_stats = options [ :enable_stats ]
else
@enable_stats = true
end
2014-01-30 16:21:38 +11:00
end
2014-02-06 10:14:41 +11:00
def self . current
@current
end
def self . current = ( manager )
@current = manager
end
2015-06-26 13:32:08 +10:00
def hostname
@hostname || = ` hostname ` . strip
end
2014-01-30 16:21:38 +11:00
def schedule_info ( klass )
ScheduleInfo . new ( klass , self )
end
def next_run ( klass )
schedule_info ( klass ) . next_run
end
def ensure_schedule! ( klass )
lock do
schedule_info ( klass ) . schedule!
end
end
def remove ( klass )
lock do
schedule_info ( klass ) . del!
end
end
2014-02-12 13:32:34 +11:00
def reschedule_orphans!
lock do
2015-06-26 13:32:08 +10:00
reschedule_orphans_on!
reschedule_orphans_on! ( hostname )
end
end
def reschedule_orphans_on! ( hostname = nil )
redis . zrange ( Manager . queue_key ( hostname ) , 0 , - 1 ) . each do | key |
klass = get_klass ( key )
next unless klass
info = schedule_info ( klass )
if [ 'QUEUED' , 'RUNNING' ] . include? ( info . prev_result ) &&
( info . current_owner . blank? || ! redis . get ( info . current_owner ) )
info . prev_result = 'ORPHAN'
info . next_run = Time . now . to_i
info . write!
2014-02-12 13:32:34 +11:00
end
end
end
def get_klass ( name )
name . constantize
rescue NameError
nil
end
2014-01-30 16:21:38 +11:00
def tick
lock do
2015-06-26 13:32:08 +10:00
schedule_next_job
schedule_next_job ( hostname )
end
end
def schedule_next_job ( hostname = nil )
( key , due ) , _ = redis . zrange Manager . queue_key ( hostname ) , 0 , 0 , withscores : true
return unless key
2016-01-11 18:31:28 +01:00
2015-06-26 13:32:08 +10:00
if due . to_i < = Time . now . to_i
klass = get_klass ( key )
unless klass
# corrupt key, nuke it (renamed job or something)
redis . zrem Manager . queue_key ( hostname ) , key
return
2014-01-30 16:21:38 +11:00
end
2015-06-26 13:32:08 +10:00
info = schedule_info ( klass )
info . prev_run = Time . now . to_i
info . prev_result = " QUEUED "
info . prev_duration = - 1
info . next_run = nil
info . current_owner = identity_key
info . schedule!
@runner . enq ( klass )
2014-01-30 16:21:38 +11:00
end
end
def blocking_tick
tick
@runner . wait_till_done
end
def stop!
@runner . stop!
2014-02-06 10:14:41 +11:00
self . class . current = nil
2014-01-30 16:21:38 +11:00
end
2014-02-12 13:32:34 +11:00
def keep_alive_duration
60
end
def keep_alive
redis . setex identity_key , keep_alive_duration , " "
end
2014-01-30 16:21:38 +11:00
def lock
2014-04-11 11:13:33 +05:30
DistributedMutex . new ( Manager . lock_key ) . synchronize do
yield
2014-01-30 16:21:38 +11:00
end
end
2014-02-12 13:32:34 +11:00
2014-02-06 10:14:41 +11:00
def self . discover_schedules
2014-04-17 15:57:17 +10:00
# hack for developemnt reloader is crazytown
# multiple classes with same name can be in
# object space
unique = Set . new
2014-02-06 10:14:41 +11:00
schedules = [ ]
ObjectSpace . each_object ( Scheduler :: Schedule ) do | schedule |
2014-04-17 15:57:17 +10:00
if schedule . scheduled?
next if unique . include? ( schedule . to_s )
schedules << schedule
unique << schedule . to_s
end
2014-02-06 10:14:41 +11:00
end
schedules
end
2014-02-12 13:32:34 +11:00
@mutex = Mutex . new
def self . seq
@mutex . synchronize do
@i || = 0
@i += 1
end
end
def identity_key
2015-06-26 13:32:08 +10:00
@identity_key || = " _scheduler_ #{ hostname } : #{ Process . pid } : #{ self . class . seq } : #{ SecureRandom . hex } "
2014-02-12 13:32:34 +11:00
end
2014-01-30 16:21:38 +11:00
def self . lock_key
" _scheduler_lock_ "
end
2015-06-26 13:32:08 +10:00
def self . queue_key ( hostname = nil )
if hostname
" _scheduler_queue_ #{ hostname } _ "
else
" _scheduler_queue_ "
end
2014-01-30 16:21:38 +11:00
end
2015-06-26 13:32:08 +10:00
def self . schedule_key ( klass , hostname = nil )
if hostname
" _scheduler_ #{ klass } _ #{ hostname } "
else
" _scheduler_ #{ klass } "
end
2014-01-30 16:21:38 +11:00
end
end
end