diff --git a/lib/scheduler/manager.rb b/lib/scheduler/manager.rb index fda3c300e..e3c934f28 100644 --- a/lib/scheduler/manager.rb +++ b/lib/scheduler/manager.rb @@ -121,13 +121,15 @@ module Scheduler self.new(redis, true) end - def initialize(redis = nil, skip_runner = false) + def initialize(redis = nil, options=nil) @redis = $redis || redis @random_ratio = 0.1 - unless skip_runner + unless options && options[:skip_runner] @runner = Runner.new(self) self.class.current = self end + + @hostname = options && options[:hostname] @manager_id = SecureRandom.hex end @@ -139,6 +141,10 @@ module Scheduler @current = manager end + def hostname + @hostname ||= `hostname`.strip + end + def schedule_info(klass) ScheduleInfo.new(klass, self) end @@ -162,17 +168,22 @@ module Scheduler def reschedule_orphans! lock do - redis.zrange(Manager.queue_key, 0, -1).each do |key| - klass = get_klass(key) - next unless klass - info = schedule_info(klass) + reschedule_orphans_on! + reschedule_orphans_on!(hostname) + end + end - if ['QUEUED', 'RUNNING'].include?(info.prev_result) && - (info.current_owner.blank? || !redis.get(info.current_owner)) - info.prev_result = 'ORPHAN' - info.next_run = Time.now.to_i - info.write! - end + def reschedule_orphans_on!(hostname=nil) + redis.zrange(Manager.queue_key(hostname), 0, -1).each do |key| + klass = get_klass(key) + next unless klass + info = schedule_info(klass) + + if ['QUEUED', 'RUNNING'].include?(info.prev_result) && + (info.current_owner.blank? || !redis.get(info.current_owner)) + info.prev_result = 'ORPHAN' + info.next_run = Time.now.to_i + info.write! end end end @@ -185,24 +196,30 @@ module Scheduler def tick lock do - (key, due), _ = redis.zrange Manager.queue_key, 0, 0, withscores: true - return unless key - if due.to_i <= Time.now.to_i - klass = get_klass(key) - unless klass - # corrupt key, nuke it (renamed job or something) - redis.zrem Manager.queue_key, key - return - end - info = schedule_info(klass) - info.prev_run = Time.now.to_i - info.prev_result = "QUEUED" - info.prev_duration = -1 - info.next_run = nil - info.current_owner = identity_key - info.schedule! - @runner.enq(klass) + schedule_next_job + schedule_next_job(hostname) + end + end + + def schedule_next_job(hostname=nil) + (key, due), _ = redis.zrange Manager.queue_key(hostname), 0, 0, withscores: true + + return unless key + if due.to_i <= Time.now.to_i + klass = get_klass(key) + unless klass + # corrupt key, nuke it (renamed job or something) + redis.zrem Manager.queue_key(hostname), key + return end + info = schedule_info(klass) + info.prev_run = Time.now.to_i + info.prev_result = "QUEUED" + info.prev_duration = -1 + info.next_run = nil + info.current_owner = identity_key + info.schedule! + @runner.enq(klass) end end @@ -256,19 +273,27 @@ module Scheduler end def identity_key - @identity_key ||= "_scheduler_#{`hostname`}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}" + @identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}" end def self.lock_key "_scheduler_lock_" end - def self.queue_key - "_scheduler_queue_" + def self.queue_key(hostname=nil) + if hostname + "_scheduler_queue_#{hostname}_" + else + "_scheduler_queue_" + end end - def self.schedule_key(klass) - "_scheduler_#{klass}" + def self.schedule_key(klass,hostname=nil) + if hostname + "_scheduler_#{klass}_#{hostname}" + else + "_scheduler_#{klass}" + end end end end diff --git a/lib/scheduler/schedule.rb b/lib/scheduler/schedule.rb index 822e06fa3..a00fe1330 100644 --- a/lib/scheduler/schedule.rb +++ b/lib/scheduler/schedule.rb @@ -17,6 +17,15 @@ module Scheduler::Schedule @every end + # schedule job indepndently on each host (looking at hostname) + def per_host + @per_host = true + end + + def is_per_host + @per_host + end + def schedule_info manager = Scheduler::Manager.without_runner manager.schedule_info self diff --git a/lib/scheduler/schedule_info.rb b/lib/scheduler/schedule_info.rb index 9416ffa3b..755bc621a 100644 --- a/lib/scheduler/schedule_info.rb +++ b/lib/scheduler/schedule_info.rb @@ -86,6 +86,7 @@ module Scheduler end def write! + clear! redis.set key, { next_run: @next_run, @@ -95,7 +96,7 @@ module Scheduler current_owner: @current_owner }.to_json - redis.zadd Manager.queue_key, @next_run , @klass + redis.zadd queue_key, @next_run , @klass end def del! @@ -104,7 +105,19 @@ module Scheduler end def key - Manager.schedule_key(@klass) + if @klass.is_per_host + Manager.schedule_key(@klass, @manager.hostname) + else + Manager.schedule_key(@klass) + end + end + + def queue_key + if @klass.is_per_host + Manager.queue_key(@manager.hostname) + else + Manager.queue_key + end end def redis @@ -114,7 +127,7 @@ module Scheduler private def clear! redis.del key - redis.zrem Manager.queue_key, @klass + redis.zrem queue_key, @klass end end diff --git a/spec/components/scheduler/manager_spec.rb b/spec/components/scheduler/manager_spec.rb index 52c92a998..3f7fa93c8 100644 --- a/spec/components/scheduler/manager_spec.rb +++ b/spec/components/scheduler/manager_spec.rb @@ -33,6 +33,25 @@ describe Scheduler::Manager do sleep 1000 end end + + class PerHostJob + extend ::Scheduler::Schedule + + per_host + every 10.minutes + + def self.runs=(val) + @runs = val + end + + def self.runs + @runs ||= 0 + end + + def perform + self.class.runs += 1 + end + end end let(:manager) { Scheduler::Manager.new(DiscourseRedis.new) } @@ -42,12 +61,43 @@ describe Scheduler::Manager do $redis.del manager.class.queue_key manager.remove(Testing::RandomJob) manager.remove(Testing::SuperLongJob) + manager.remove(Testing::PerHostJob) end after do manager.stop! manager.remove(Testing::RandomJob) manager.remove(Testing::SuperLongJob) + manager.remove(Testing::PerHostJob) + end + + describe 'per host jobs' do + it "correctly schedules on multiple hosts" do + Testing::PerHostJob.runs = 0 + + hosts = ['a','b','c'] + + hosts.map do |host| + + manager = Scheduler::Manager.new(DiscourseRedis.new, hostname: host) + manager.ensure_schedule!(Testing::PerHostJob) + + info = manager.schedule_info(Testing::PerHostJob) + info.next_run = Time.now.to_i - 1 + info.write! + + manager + + end.each do |manager| + + manager.blocking_tick + manager.stop! + + end + + expect(Testing::PerHostJob.runs).to eq(3) + + end end describe '#sync' do @@ -63,7 +113,6 @@ describe Scheduler::Manager do $redis.zadd Scheduler::Manager.queue_key, Time.now.to_i - 1000, "BLABLA" manager.tick expect($redis.zcard(Scheduler::Manager.queue_key)).to eq(0) - end it 'should recover from crashed manager' do