From ed45a1dce363e734642b78e238f88d9874be7680 Mon Sep 17 00:00:00 2001 From: Sam Date: Thu, 30 Jan 2014 16:21:38 +1100 Subject: [PATCH] FEATURE: new scheduler so we can deprecate sidetiq This is a work in progress, should have it finished tomorrow. --- lib/scheduler/manager.rb | 144 ++++++++++++++++++ lib/scheduler/schedule.rb | 5 + lib/scheduler/schedule_info.rb | 86 +++++++++++ lib/scheduler/scheduler.rb | 6 + spec/components/scheduler/manager_spec.rb | 74 +++++++++ .../scheduler/schedule_info_spec.rb | 51 +++++++ 6 files changed, 366 insertions(+) create mode 100644 lib/scheduler/manager.rb create mode 100644 lib/scheduler/schedule.rb create mode 100644 lib/scheduler/schedule_info.rb create mode 100644 lib/scheduler/scheduler.rb create mode 100644 spec/components/scheduler/manager_spec.rb create mode 100644 spec/components/scheduler/schedule_info_spec.rb diff --git a/lib/scheduler/manager.rb b/lib/scheduler/manager.rb new file mode 100644 index 000000000..735d71b27 --- /dev/null +++ b/lib/scheduler/manager.rb @@ -0,0 +1,144 @@ +# Initially we used sidetiq, this was a problem: +# +# 1. No mechnism to add "randomisation" into job execution +# 2. No stats about previous runs or failures +# 3. Dependency on ice_cube gem causes runaway CPU + +module Scheduler + class Manager + attr_accessor :random_ratio, :redis + + + class Runner + def initialize(manager) + @queue = Queue.new + @manager = manager + @thread = Thread.new do + while true + klass = @queue.deq + failed = false + start = Time.now.to_f + begin + klass.new.perform + rescue + failed = true + end + duration = ((Time.now.to_f - start) * 1000).to_i + info = @manager.schedule_info(klass) + info.prev_duration = duration + info.prev_result = failed ? "FAILED" : "OK" + info.write! + end + end + end + + def stop! + @thread.kill + end + + def enq(klass) + @queue << klass + end + + def wait_till_done + while !@queue.empty? && !@queue.num_waiting == 1 + sleep 0.001 + end + end + end + + def initialize(redis = nil) + @redis = $redis || redis + @random_ratio = 0.1 + @runner = Runner.new(self) + @manager_id = SecureRandom.hex + end + + def schedule_info(klass) + ScheduleInfo.new(klass, self) + end + + def next_run(klass) + schedule_info(klass).next_run + end + + def ensure_schedule!(klass) + lock do + schedule_info(klass).schedule! + end + + end + + def remove(klass) + lock do + schedule_info(klass).del! + end + end + + def tick + lock do + (key, due), _ = redis.zrange Manager.queue_key, 0, 0, withscores: true + if due.to_i <= Time.now.to_i + klass = key.constantize + info = schedule_info(klass) + info.prev_run = Time.now.to_i + info.next_run = nil + info.schedule! + @runner.enq(klass) + end + end + end + + def blocking_tick + tick + @runner.wait_till_done + end + + def stop! + @runner.stop! + end + + + def lock + got_lock = false + lock_key = Manager.lock_key + + while(!got_lock) + begin + if redis.setnx lock_key, Time.now.to_i + 60 + redis.expire lock_key, 60 + got_lock = true + else + begin + redis.watch lock_key + time = redis.get Manager.lock_key + if time && time.to_i < Time.now.to_i + got_lock = redis.multi do + redis.set Manager.lock_key, Time.now.to_i + 60 + end + end + ensure + redis.unwatch + end + end + + end + end + yield + ensure + redis.del Manager.lock_key + end + + def self.lock_key + "_scheduler_lock_" + end + + def self.queue_key + "_scheduler_queue_" + end + + def self.schedule_key(klass) + "_scheduler_#{klass}" + end + end +end diff --git a/lib/scheduler/schedule.rb b/lib/scheduler/schedule.rb new file mode 100644 index 000000000..17f2d82e4 --- /dev/null +++ b/lib/scheduler/schedule.rb @@ -0,0 +1,5 @@ +module Scheduler::Schedule + def every(duration=nil) + @every ||= duration + end +end diff --git a/lib/scheduler/schedule_info.rb b/lib/scheduler/schedule_info.rb new file mode 100644 index 000000000..d8aeaa71f --- /dev/null +++ b/lib/scheduler/schedule_info.rb @@ -0,0 +1,86 @@ +module Scheduler + class ScheduleInfo + attr_accessor :next_run, + :prev_run, + :prev_duration, + :prev_result + + def initialize(klass, manager) + @klass = klass + @manager = manager + + key = Manager.schedule_key(klass) + data = nil + + if data = $redis.get(key) + data = JSON.parse(data) + end + + if data + @next_run = data["next_run"] + @prev_run = data["prev_run"] + @prev_result = data["prev_result"] + @prev_duration = data["prev_duration"] + end + rescue + # corrupt redis + @next_run = @prev_run = @prev_result = @prev_duration = nil + end + + def valid? + return false unless @next_run + (!@prev_run && @next_run < Time.now.to_i + 5.minutes) || + ( @prev_run && + @prev_run <= Time.now.to_i && + @next_run < @prev_run + @klass.every * (1 + @manager.random_ratio) + ) + end + + def schedule! + if !valid? && @prev_run + mixup = @klass.every * @manager.random_ratio + mixup = (mixup * Random.rand - mixup / 2).to_i + @next_run = @prev_run + mixup + @klass.every + end + + if !valid? + @next_run = Time.now.to_i + 5.minutes * Random.rand + end + + write! + end + + def write! + key = Manager.schedule_key(@klass) + clear! + redis.set key, { + next_run: @next_run, + prev_run: @prev_run, + prev_duration: @prev_duration, + prev_result: @prev_result + }.to_json + redis.zadd Manager.queue_key, @next_run , @klass + end + + def del! + clear! + @next_run = @prev_run = @prev_result = @prev_duration = nil + end + + def key + Manager.schedule_key(@klass) + end + + def redis + @manager.redis + end + + private + def clear! + key = Manager.schedule_key(@klass) + redis.del key + redis.zrem Manager.queue_key, key + end + + end +end diff --git a/lib/scheduler/scheduler.rb b/lib/scheduler/scheduler.rb new file mode 100644 index 000000000..a69b1f7a5 --- /dev/null +++ b/lib/scheduler/scheduler.rb @@ -0,0 +1,6 @@ +module Scheduler +end + +require_dependency 'scheduler/schedule' +require_dependency 'scheduler/schedule_info' +require_dependency 'scheduler/manager' diff --git a/spec/components/scheduler/manager_spec.rb b/spec/components/scheduler/manager_spec.rb new file mode 100644 index 000000000..10ee5015c --- /dev/null +++ b/spec/components/scheduler/manager_spec.rb @@ -0,0 +1,74 @@ +# encoding: utf-8 +require 'spec_helper' +require 'scheduler/scheduler' + +describe Scheduler::Manager do + + module Testing + class RandomJob + extend ::Scheduler::Schedule + + def self.runs=(val) + @runs = val + end + + def self.runs + @runs ||= 0 + end + + every 5.minutes + + def perform + self.class.runs+=1 + sleep 0.001 + end + end + end + + let(:manager) { Scheduler::Manager.new } + + before do + $redis.del manager.class.queue_key + end + + after do + manager.stop! + end + + describe '#tick' do + it 'should only run pending job once' do + + Testing::RandomJob.runs = 0 + + info = manager.schedule_info(Testing::RandomJob) + info.next_run = Time.now.to_i - 1 + info.write! + + (0..5).map do + Thread.new do + manager = Scheduler::Manager.new(Redis.new) + manager.blocking_tick + end + end.map(&:join) + + Testing::RandomJob.runs.should == 1 + + info = manager.schedule_info(Testing::RandomJob) + info.prev_run.should be <= Time.now.to_i + info.prev_duration.should be > 0 + info.prev_result.should == "OK" + end + + end + + describe '#next_run' do + it 'should be within the next 5 mins if it never ran' do + + manager.remove(Testing::RandomJob) + manager.ensure_schedule!(Testing::RandomJob) + + manager.next_run(Testing::RandomJob) + .should be_within(5.minutes.to_i).of(Time.now.to_i + 5.minutes) + end + end +end diff --git a/spec/components/scheduler/schedule_info_spec.rb b/spec/components/scheduler/schedule_info_spec.rb new file mode 100644 index 000000000..db3234afa --- /dev/null +++ b/spec/components/scheduler/schedule_info_spec.rb @@ -0,0 +1,51 @@ +# encoding: utf-8 +require 'spec_helper' +require 'scheduler/scheduler' + +describe Scheduler::ScheduleInfo do + + class RandomJob + extend ::Scheduler::Schedule + + every 1.hour + + def perform + # work_it + end + end + + let(:manager){ Scheduler::Manager.new } + + before do + @info = manager.schedule_info(RandomJob) + @info.del! + $redis.del manager.class.queue_key + end + + after do + manager.stop! + end + + it 'starts off invalid' do + @info.valid?.should be_false + end + + it 'will have a due date in the next 5 minutes if it was blank' do + @info.schedule! + @info.valid?.should be_true + @info.next_run.should be_within(5.minutes).of(Time.now.to_i) + end + + it 'will have a due date within the next hour if it just ran' do + @info.prev_run = Time.now.to_i + @info.schedule! + @info.valid?.should be_true + @info.next_run.should be_within(1.hour * manager.random_ratio).of(Time.now.to_i + 1.hour) + end + + it 'is invalid if way in the future' do + @info.next_run = Time.now.to_i + 1.year + @info.valid?.should be_false + end + +end