diff --git a/lib/distributed_mutex.rb b/lib/distributed_mutex.rb new file mode 100644 index 000000000..5d85451c9 --- /dev/null +++ b/lib/distributed_mutex.rb @@ -0,0 +1,51 @@ +# Cross-process locking using Redis. +class DistributedMutex + attr_accessor :redis + attr_reader :got_lock + + def initialize(key, redis=nil) + @key = key + @redis = redis || $redis + @got_lock = false + end + + def try_to_get_lock + if redis.setnx @key, Time.now.to_i + 60 + redis.expire @key, 60 + @got_lock = true + else + begin + redis.watch @key + time = redis.get @key + if time && time.to_i < Time.now.to_i + @got_lock = redis.multi do + redis.set @key, Time.now.to_i + 60 + end + end + ensure + redis.unwatch + end + end + end + + def get_lock + return if @got_lock + + start = Time.now + while !@got_lock + try_to_get_lock + end + end + + def release_lock + redis.del @key + @got_lock = false + end + + def synchronize + get_lock + yield + ensure + release_lock + end +end diff --git a/lib/scheduler/manager.rb b/lib/scheduler/manager.rb index 74ef05658..9529e7c6b 100644 --- a/lib/scheduler/manager.rb +++ b/lib/scheduler/manager.rb @@ -4,6 +4,8 @@ # 2. No stats about previous runs or failures # 3. Dependency on ice_cube gem causes runaway CPU +require_dependency 'distributed_mutex' + module Scheduler class Manager attr_accessor :random_ratio, :redis @@ -220,33 +222,9 @@ module Scheduler end def lock - got_lock = false - lock_key = Manager.lock_key - - while(!got_lock) - begin - if redis.setnx lock_key, Time.now.to_i + 60 - redis.expire lock_key, 60 - got_lock = true - else - begin - redis.watch lock_key - time = redis.get Manager.lock_key - if time && time.to_i < Time.now.to_i - got_lock = redis.multi do - redis.set Manager.lock_key, Time.now.to_i + 60 - end - end - ensure - redis.unwatch - end - end - - end + DistributedMutex.new(Manager.lock_key).synchronize do + yield end - yield - ensure - redis.del Manager.lock_key end diff --git a/plugins/poll/poll.rb b/plugins/poll/poll.rb index 57d8632cf..2d6614469 100644 --- a/plugins/poll/poll.rb +++ b/plugins/poll/poll.rb @@ -143,15 +143,17 @@ module ::PollPlugin return if is_closed? # Get the user's current vote. - vote = get_vote(user) - vote = nil unless details.keys.include? vote + DistributedMutex.new(details_key).synchronize do + vote = get_vote(user) + vote = nil unless details.keys.include? vote - new_details = details.dup - new_details[vote] -= 1 if vote - new_details[option] += 1 + new_details = details.dup + new_details[vote] -= 1 if vote + new_details[option] += 1 - ::PluginStore.set("poll", vote_key(user), option) - set_details! new_details + ::PluginStore.set("poll", vote_key(user), option) + set_details! new_details + end end def serialize(user) diff --git a/spec/components/distributed_mutex_spec.rb b/spec/components/distributed_mutex_spec.rb new file mode 100644 index 000000000..70a00ba8a --- /dev/null +++ b/spec/components/distributed_mutex_spec.rb @@ -0,0 +1,36 @@ +require 'spec_helper' +require_dependency 'distributed_mutex' + +describe DistributedMutex do + it "allows only one mutex object to have the lock at a time" do + m1 = DistributedMutex.new("test_mutex_key") + m2 = DistributedMutex.new("test_mutex_key") + + m1.get_lock + m2.got_lock.should be_false + + t = Thread.new do + m2.get_lock + end + + m1.release_lock + t.join + m2.got_lock.should == true + end + + it "synchronizes correctly" do + array = [] + t = Thread.new do + DistributedMutex.new("correct_sync").synchronize do + sleep 0.01 + array.push 1 + end + end + sleep 0.005 + DistributedMutex.new("correct_sync").synchronize do + array.push 2 + end + t.join + array.should == [1, 2] + end +end