From d3f911cc4c529a1e2a2f9ff8785792e083c04fd9 Mon Sep 17 00:00:00 2001 From: Sam Saffron Date: Fri, 8 Feb 2013 22:03:45 +1100 Subject: [PATCH] Revert "Something here is messed up Revert "message bus fixes and diagnostics"" This reverts commit f3c6144e3b66cadce9a0970c7d8437091b895117. --- config/application.rb | 1 + lib/message_bus_diags.rb | 32 +++ .../lib/message_bus/reliable_pub_sub.rb | 200 ++++++++++-------- .../message_bus/spec/lib/message_bus_spec.rb | 2 - .../spec/lib/multi_process_spec.rb | 60 ++++++ .../spec/lib/reliable_pub_sub_spec.rb | 2 +- 6 files changed, 204 insertions(+), 93 deletions(-) create mode 100644 lib/message_bus_diags.rb create mode 100644 vendor/gems/message_bus/spec/lib/multi_process_spec.rb diff --git a/config/application.rb b/config/application.rb index 85723bf21..abfeaa717 100644 --- a/config/application.rb +++ b/config/application.rb @@ -20,6 +20,7 @@ module Discourse # -- all .rb files in that directory are automatically loaded. require 'discourse' + require 'message_bus_diags' # Custom directories with classes and modules you want to be autoloadable. config.autoload_paths += %W(#{config.root}/app/serializers) diff --git a/lib/message_bus_diags.rb b/lib/message_bus_diags.rb new file mode 100644 index 000000000..5d86bb61d --- /dev/null +++ b/lib/message_bus_diags.rb @@ -0,0 +1,32 @@ +class MessageBusDiags + + @host_info = {} + + def self.my_id + @my_id ||= "#{`hostname`}-#{Process.pid}" + end + + def self.seen_host(name) + @host_info[name] = DateTime.now + end + + def self.establish_peer_names + MessageBus.publish "/server-name", {channel: "/server-name-reply/#{my_id}"} + end + + def self.seen_hosts + @host_info + end + + unless @subscribed + + MessageBus.subscribe "/server-name-reply/#{my_id}" do |msg| + MessageBusDiags.seen_host(msg.data) + end + + MessageBus.subscribe "/server-name" do |msg| + MessageBus.publish msg.data["channel"], MessageBusDiags.my_id + end + @subscribed = true + end +end diff --git a/vendor/gems/message_bus/lib/message_bus/reliable_pub_sub.rb b/vendor/gems/message_bus/lib/message_bus/reliable_pub_sub.rb index 6ea0726dc..73d7992c5 100644 --- a/vendor/gems/message_bus/lib/message_bus/reliable_pub_sub.rb +++ b/vendor/gems/message_bus/lib/message_bus/reliable_pub_sub.rb @@ -10,6 +10,31 @@ require 'redis' class MessageBus::ReliablePubSub + class NoMoreRetries < StandardError; end + class BackLogOutOfOrder < StandardError + attr_accessor :highest_id + + def initialize(highest_id) + @highest_id = highest_id + end + end + + def max_publish_retries=(val) + @max_publish_retries = val + end + + def max_publish_retries + @max_publish_retries ||= 10 + end + + def max_publish_wait=(ms) + @max_publish_wait = ms + end + + def max_publish_wait + @max_publish_wait ||= 500 + end + # max_backlog_size is per multiplexed channel def initialize(redis_config = {}, max_backlog_size = 1000) @redis_config = redis_config @@ -42,14 +67,14 @@ class MessageBus::ReliablePubSub @pub_redis ||= new_redis_connection end - def offset_key(channel) - "__mb_offset_#{channel}" - end - def backlog_key(channel) "__mb_backlog_#{channel}" end + def backlog_id_key(channel) + "__mb_backlog_id_#{channel}" + end + def global_id_key "__mb_global_id" end @@ -57,10 +82,6 @@ class MessageBus::ReliablePubSub def global_backlog_key "__mb_global_backlog" end - - def global_offset_key - "__mb_global_offset" - end # use with extreme care, will nuke all of the data def reset! @@ -71,74 +92,49 @@ class MessageBus::ReliablePubSub def publish(channel, data) redis = pub_redis - offset_key = offset_key(channel) + backlog_id_key = backlog_id_key(channel) backlog_key = backlog_key(channel) - redis.watch(offset_key, backlog_key, global_id_key, global_backlog_key, global_offset_key) do - offset = redis.get(offset_key).to_i - backlog = redis.llen(backlog_key).to_i + global_id = nil + backlog_id = nil - global_offset = redis.get(global_offset_key).to_i - global_backlog = redis.llen(global_backlog_key).to_i - - global_id = redis.get(global_id_key).to_i - global_id += 1 - - too_big = backlog + 1 > @max_backlog_size - global_too_big = global_backlog + 1 > @max_global_backlog_size - - message_id = backlog + offset + 1 - redis.multi do - if too_big - redis.ltrim backlog_key, (backlog+1) - @max_backlog_size, -1 - offset += (backlog+1) - @max_backlog_size - redis.set(offset_key, offset) - end - - if global_too_big - redis.ltrim global_backlog_key, (global_backlog+1) - @max_global_backlog_size, -1 - global_offset += (global_backlog+1) - @max_global_backlog_size - redis.set(global_offset_key, global_offset) - end - - msg = MessageBus::Message.new global_id, message_id, channel, data - payload = msg.encode - - redis.set global_id_key, global_id - redis.rpush backlog_key, payload - redis.rpush global_backlog_key, message_id.to_s << "|" << channel - redis.publish redis_channel_name, payload - end - - return message_id + redis.multi do |m| + global_id = m.incr(global_id_key) + backlog_id = m.incr(backlog_id_key) end + + global_id = global_id.value + backlog_id = backlog_id.value + + msg = MessageBus::Message.new global_id, backlog_id, channel, data + payload = msg.encode + + redis.zadd backlog_key, backlog_id, payload + redis.zadd global_backlog_key, global_id, backlog_id.to_s << "|" << channel + + redis.publish redis_channel_name, payload + + if backlog_id > @max_backlog_size + redis.zremrangebyscore backlog_key, 1, backlog_id - @max_backlog_size + end + + if global_id > @max_global_backlog_size + redis.zremrangebyscore global_backlog_key, 1, backlog_id - @max_backlog_size + end + + backlog_id end def last_id(channel) redis = pub_redis - offset_key = offset_key(channel) - backlog_key = backlog_key(channel) - - offset,len = nil - redis.watch offset_key, backlog_key do - offset = redis.get(offset_key).to_i - len = redis.llen backlog_key - end - offset + len + backlog_id_key = backlog_id_key(channel) + redis.get(backlog_id_key).to_i end def backlog(channel, last_id = nil) redis = pub_redis - offset_key = offset_key(channel) backlog_key = backlog_key(channel) - - items = nil - - redis.watch offset_key, backlog_key do - offset = redis.get(offset_key).to_i - start_at = last_id.to_i - offset - items = redis.lrange backlog_key, start_at, -1 - end + items = redis.zrangebyscore backlog_key, last_id.to_i + 1, "+inf" items.map do |i| MessageBus::Message.decode(i) @@ -147,14 +143,9 @@ class MessageBus::ReliablePubSub def global_backlog(last_id = nil) last_id = last_id.to_i - items = nil redis = pub_redis - redis.watch global_backlog_key, global_offset_key do - offset = redis.get(global_offset_key).to_i - start_at = last_id.to_i - offset - items = redis.lrange global_backlog_key, start_at, -1 - end + items = redis.zrangebyscore global_backlog_key, last_id.to_i + 1, "+inf" items.map! do |i| pipe = i.index "|" @@ -165,27 +156,19 @@ class MessageBus::ReliablePubSub end items.compact! - items end def get_message(channel, message_id) redis = pub_redis - offset_key = offset_key(channel) backlog_key = backlog_key(channel) - msg = nil - redis.watch(offset_key, backlog_key) do - offset = redis.get(offset_key).to_i - idx = (message_id-1) - offset - return nil if idx < 0 - msg = redis.lindex(backlog_key, idx) + items = redis.zrangebyscore backlog_key, message_id, message_id + if items && items[0] + MessageBus::Message.decode(items[0]) + else + nil end - - if msg - msg = MessageBus::Message.decode(msg) - end - msg end def subscribe(channel, last_id = nil) @@ -193,22 +176,53 @@ class MessageBus::ReliablePubSub # can cut down on connections if we only have one global subscriber raise ArgumentError unless block_given? + if last_id + # we need to translate this to a global id, at least give it a shot + # we are subscribing on global and global is always going to be bigger than local + # so worst case is a replay of a few messages + message = get_message(channel, last_id) + if message + last_id = message.global_id + end + end global_subscribe(last_id) do |m| yield m if m.channel == channel end end + def process_global_backlog(highest_id, raise_error, &blk) + global_backlog(highest_id).each do |old| + if highest_id + 1 == old.global_id + yield old + highest_id = old.global_id + else + raise BackLogOutOfOrder.new(highest_id) if raise_error + if old.global_id > highest_id + yield old + highest_id = old.global_id + end + end + end + highest_id + end + def global_subscribe(last_id=nil, &blk) raise ArgumentError unless block_given? highest_id = last_id - clear_backlog = lambda do - global_backlog(highest_id).each do |old| - highest_id = old.global_id - yield old + clear_backlog = lambda do + retries = 4 + begin + highest_id = process_global_backlog(highest_id, retries > 0, &blk) + rescue BackLogOutOfOrder => e + highest_id = e.highest_id + retries -= 1 + sleep(rand(50) / 1000.0) + retry end end + begin redis = new_redis_connection @@ -224,11 +238,18 @@ class MessageBus::ReliablePubSub end on.message do |c,m| m = MessageBus::Message.decode m - if highest_id && m.global_id != highest_id + 1 + + # we have 2 options + # + # 1. message came in the correct order GREAT, just deal with it + # 2. message came in the incorrect order COMPLICATED, wait a tiny bit and clear backlog + + if highest_id.nil? || m.global_id == highest_id + 1 + highest_id = m.global_id + yield m + else clear_backlog.call(&blk) end - yield m if highest_id.nil? || m.global_id > highest_id - highest_id = m.global_id end end rescue => error @@ -238,5 +259,4 @@ class MessageBus::ReliablePubSub end end - end diff --git a/vendor/gems/message_bus/spec/lib/message_bus_spec.rb b/vendor/gems/message_bus/spec/lib/message_bus_spec.rb index f4c6cbc58..de33fddb3 100644 --- a/vendor/gems/message_bus/spec/lib/message_bus_spec.rb +++ b/vendor/gems/message_bus/spec/lib/message_bus_spec.rb @@ -70,8 +70,6 @@ describe MessageBus do r = MessageBus.backlog("/chuck", id) - wait_for(1000) { r.length == 2 } - r.map{|i| i.data}.to_a.should == ['foo', 'bar'] end diff --git a/vendor/gems/message_bus/spec/lib/multi_process_spec.rb b/vendor/gems/message_bus/spec/lib/multi_process_spec.rb new file mode 100644 index 000000000..89cdef42e --- /dev/null +++ b/vendor/gems/message_bus/spec/lib/multi_process_spec.rb @@ -0,0 +1,60 @@ +require 'spec_helper' +require 'message_bus' + +describe MessageBus::ReliablePubSub do + + def new_bus + MessageBus::ReliablePubSub.new(:db => 10) + end + + def work_it + Signal.trap("HUP") { exit } + + bus = new_bus + $stdout.reopen("/dev/null", "w") + $stderr.reopen("/dev/null", "w") + # subscribe blocks, so we need a new bus to transmit + new_bus.subscribe("/echo", 0) do |msg| + bus.publish("/response", Process.pid.to_s) + end + end + + def spawn_child + r = fork + if r.nil? + work_it + else + r + end + end + + it 'gets every response from child processes' do + pid = nil + Redis.new(:db => 10).flushall + begin + pids = (1..10).map{spawn_child} + responses = [] + bus = MessageBus::ReliablePubSub.new(:db => 10) + Thread.new do + bus.subscribe("/response", 0) do |msg| + responses << msg if pids.include? msg.data.to_i + end + end + 10.times{bus.publish("/echo", Process.pid.to_s)} + wait_for 4000 do + responses.count == 100 + end + + # p responses.group_by(&:data).map{|k,v|[k, v.count]} + # p responses.group_by(&:global_id).map{|k,v|[k, v.count]} + responses.count.should == 100 + ensure + if pids + pids.each do |pid| + Process.kill("HUP", pid) + Process.wait(pid) + end + end + end + end +end diff --git a/vendor/gems/message_bus/spec/lib/reliable_pub_sub_spec.rb b/vendor/gems/message_bus/spec/lib/reliable_pub_sub_spec.rb index 4a89d58fc..5b7fbf6d7 100644 --- a/vendor/gems/message_bus/spec/lib/reliable_pub_sub_spec.rb +++ b/vendor/gems/message_bus/spec/lib/reliable_pub_sub_spec.rb @@ -101,8 +101,8 @@ describe MessageBus::ReliablePubSub do end t.kill + got.length.should == 3 - got.map{|m| m.data}.should == ["1","2","3"] end