diff --git a/lib/distributed_mutex.rb b/lib/distributed_mutex.rb new file mode 100644 index 000000000..5d85451c9 --- /dev/null +++ b/lib/distributed_mutex.rb @@ -0,0 +1,51 @@ +# Cross-process locking using Redis. +class DistributedMutex + attr_accessor :redis + attr_reader :got_lock + + def initialize(key, redis=nil) + @key = key + @redis = redis || $redis + @got_lock = false + end + + def try_to_get_lock + if redis.setnx @key, Time.now.to_i + 60 + redis.expire @key, 60 + @got_lock = true + else + begin + redis.watch @key + time = redis.get @key + if time && time.to_i < Time.now.to_i + @got_lock = redis.multi do + redis.set @key, Time.now.to_i + 60 + end + end + ensure + redis.unwatch + end + end + end + + def get_lock + return if @got_lock + + start = Time.now + while !@got_lock + try_to_get_lock + end + end + + def release_lock + redis.del @key + @got_lock = false + end + + def synchronize + get_lock + yield + ensure + release_lock + end +end diff --git a/lib/scheduler/manager.rb b/lib/scheduler/manager.rb index 74ef05658..9529e7c6b 100644 --- a/lib/scheduler/manager.rb +++ b/lib/scheduler/manager.rb @@ -4,6 +4,8 @@ # 2. No stats about previous runs or failures # 3. Dependency on ice_cube gem causes runaway CPU +require_dependency 'distributed_mutex' + module Scheduler class Manager attr_accessor :random_ratio, :redis @@ -220,33 +222,9 @@ module Scheduler end def lock - got_lock = false - lock_key = Manager.lock_key - - while(!got_lock) - begin - if redis.setnx lock_key, Time.now.to_i + 60 - redis.expire lock_key, 60 - got_lock = true - else - begin - redis.watch lock_key - time = redis.get Manager.lock_key - if time && time.to_i < Time.now.to_i - got_lock = redis.multi do - redis.set Manager.lock_key, Time.now.to_i + 60 - end - end - ensure - redis.unwatch - end - end - - end + DistributedMutex.new(Manager.lock_key).synchronize do + yield end - yield - ensure - redis.del Manager.lock_key end diff --git a/spec/components/distributed_mutex_spec.rb b/spec/components/distributed_mutex_spec.rb new file mode 100644 index 000000000..70a00ba8a --- /dev/null +++ b/spec/components/distributed_mutex_spec.rb @@ -0,0 +1,36 @@ +require 'spec_helper' +require_dependency 'distributed_mutex' + +describe DistributedMutex do + it "allows only one mutex object to have the lock at a time" do + m1 = DistributedMutex.new("test_mutex_key") + m2 = DistributedMutex.new("test_mutex_key") + + m1.get_lock + m2.got_lock.should be_false + + t = Thread.new do + m2.get_lock + end + + m1.release_lock + t.join + m2.got_lock.should == true + end + + it "synchronizes correctly" do + array = [] + t = Thread.new do + DistributedMutex.new("correct_sync").synchronize do + sleep 0.01 + array.push 1 + end + end + sleep 0.005 + DistributedMutex.new("correct_sync").synchronize do + array.push 2 + end + t.join + array.should == [1, 2] + end +end