mirror of
https://github.com/codeninjasllc/discourse.git
synced 2024-11-23 23:58:31 -05:00
0efcb9cee9
causes erratic spec failure
180 lines
3.9 KiB
Ruby
180 lines
3.9 KiB
Ruby
# Initially we used sidetiq, this was a problem:
|
|
#
|
|
# 1. No mechnism to add "randomisation" into job execution
|
|
# 2. No stats about previous runs or failures
|
|
# 3. Dependency on ice_cube gem causes runaway CPU
|
|
|
|
module Scheduler
|
|
class Manager
|
|
extend Sidekiq::ExceptionHandler
|
|
attr_accessor :random_ratio, :redis
|
|
|
|
|
|
class Runner
|
|
def initialize(manager)
|
|
@queue = Queue.new
|
|
@manager = manager
|
|
@thread = Thread.new do
|
|
while true
|
|
klass = @queue.deq
|
|
failed = false
|
|
start = Time.now.to_f
|
|
info = @manager.schedule_info(klass)
|
|
begin
|
|
info.prev_result = "RUNNING"
|
|
info.write!
|
|
klass.new.perform
|
|
rescue => e
|
|
Scheduler::Manager.handle_exception(e)
|
|
failed = true
|
|
end
|
|
duration = ((Time.now.to_f - start) * 1000).to_i
|
|
info.prev_duration = duration
|
|
info.prev_result = failed ? "FAILED" : "OK"
|
|
info.write!
|
|
end
|
|
end
|
|
end
|
|
|
|
def stop!
|
|
@thread.kill
|
|
end
|
|
|
|
def enq(klass)
|
|
@queue << klass
|
|
end
|
|
|
|
def wait_till_done
|
|
while !@queue.empty? && !@queue.num_waiting > 0
|
|
sleep 0.001
|
|
end
|
|
end
|
|
end
|
|
|
|
def self.without_runner(redis=nil)
|
|
self.new(redis, true)
|
|
end
|
|
|
|
def initialize(redis = nil, skip_runner = false)
|
|
@redis = $redis || redis
|
|
@random_ratio = 0.1
|
|
unless skip_runner
|
|
@runner = Runner.new(self)
|
|
self.class.current = self
|
|
end
|
|
@manager_id = SecureRandom.hex
|
|
end
|
|
|
|
def self.current
|
|
@current
|
|
end
|
|
|
|
def self.current=(manager)
|
|
@current = manager
|
|
end
|
|
|
|
def schedule_info(klass)
|
|
ScheduleInfo.new(klass, self)
|
|
end
|
|
|
|
def next_run(klass)
|
|
schedule_info(klass).next_run
|
|
end
|
|
|
|
def ensure_schedule!(klass)
|
|
lock do
|
|
schedule_info(klass).schedule!
|
|
end
|
|
|
|
end
|
|
|
|
def remove(klass)
|
|
lock do
|
|
schedule_info(klass).del!
|
|
end
|
|
end
|
|
|
|
def tick
|
|
lock do
|
|
(key, due), _ = redis.zrange Manager.queue_key, 0, 0, withscores: true
|
|
return unless key
|
|
if due.to_i <= Time.now.to_i
|
|
klass = begin
|
|
key.constantize
|
|
rescue NameError
|
|
nil
|
|
end
|
|
return unless klass
|
|
info = schedule_info(klass)
|
|
info.prev_run = Time.now.to_i
|
|
info.prev_result = "QUEUED"
|
|
info.prev_duration = -1
|
|
info.next_run = nil
|
|
info.schedule!
|
|
@runner.enq(klass)
|
|
end
|
|
end
|
|
end
|
|
|
|
def blocking_tick
|
|
tick
|
|
@runner.wait_till_done
|
|
end
|
|
|
|
def stop!
|
|
@runner.stop!
|
|
self.class.current = nil
|
|
end
|
|
|
|
|
|
def lock
|
|
got_lock = false
|
|
lock_key = Manager.lock_key
|
|
|
|
while(!got_lock)
|
|
begin
|
|
if redis.setnx lock_key, Time.now.to_i + 60
|
|
redis.expire lock_key, 60
|
|
got_lock = true
|
|
else
|
|
begin
|
|
redis.watch lock_key
|
|
time = redis.get Manager.lock_key
|
|
if time && time.to_i < Time.now.to_i
|
|
got_lock = redis.multi do
|
|
redis.set Manager.lock_key, Time.now.to_i + 60
|
|
end
|
|
end
|
|
ensure
|
|
redis.unwatch
|
|
end
|
|
end
|
|
|
|
end
|
|
end
|
|
yield
|
|
ensure
|
|
redis.del Manager.lock_key
|
|
end
|
|
|
|
def self.discover_schedules
|
|
schedules = []
|
|
ObjectSpace.each_object(Scheduler::Schedule) do |schedule|
|
|
schedules << schedule if schedule.scheduled?
|
|
end
|
|
schedules
|
|
end
|
|
|
|
def self.lock_key
|
|
"_scheduler_lock_"
|
|
end
|
|
|
|
def self.queue_key
|
|
"_scheduler_queue_"
|
|
end
|
|
|
|
def self.schedule_key(klass)
|
|
"_scheduler_#{klass}"
|
|
end
|
|
end
|
|
end
|