diff --git a/config/application.rb b/config/application.rb
index 85723bf21..abfeaa717 100644
--- a/config/application.rb
+++ b/config/application.rb
@@ -20,6 +20,7 @@ module Discourse
     # -- all .rb files in that directory are automatically loaded.
 
     require 'discourse'
+    require 'message_bus_diags'
 
     # Custom directories with classes and modules you want to be autoloadable.
     config.autoload_paths += %W(#{config.root}/app/serializers)
diff --git a/lib/message_bus_diags.rb b/lib/message_bus_diags.rb
new file mode 100644
index 000000000..5d86bb61d
--- /dev/null
+++ b/lib/message_bus_diags.rb
@@ -0,0 +1,32 @@
+class MessageBusDiags
+
+  @host_info = {}
+
+  def self.my_id
+    @my_id ||= "#{`hostname`}-#{Process.pid}"
+  end
+
+  def self.seen_host(name)
+    @host_info[name] = DateTime.now
+  end
+
+  def self.establish_peer_names
+    MessageBus.publish "/server-name", {channel: "/server-name-reply/#{my_id}"}
+  end
+
+  def self.seen_hosts
+    @host_info
+  end
+  
+  unless @subscribed
+
+    MessageBus.subscribe "/server-name-reply/#{my_id}" do |msg|
+      MessageBusDiags.seen_host(msg.data)
+    end
+
+    MessageBus.subscribe "/server-name" do |msg|
+      MessageBus.publish msg.data["channel"], MessageBusDiags.my_id
+    end
+    @subscribed = true
+  end
+end
diff --git a/vendor/gems/message_bus/lib/message_bus/reliable_pub_sub.rb b/vendor/gems/message_bus/lib/message_bus/reliable_pub_sub.rb
index 6ea0726dc..73d7992c5 100644
--- a/vendor/gems/message_bus/lib/message_bus/reliable_pub_sub.rb
+++ b/vendor/gems/message_bus/lib/message_bus/reliable_pub_sub.rb
@@ -10,6 +10,31 @@ require 'redis'
 
 class MessageBus::ReliablePubSub
 
+  class NoMoreRetries < StandardError; end
+  class BackLogOutOfOrder < StandardError 
+    attr_accessor :highest_id
+
+    def initialize(highest_id)
+      @highest_id = highest_id
+    end
+  end
+
+  def max_publish_retries=(val)
+    @max_publish_retries = val
+  end
+
+  def max_publish_retries
+    @max_publish_retries ||= 10
+  end
+  
+  def max_publish_wait=(ms)
+    @max_publish_wait = ms
+  end
+
+  def max_publish_wait
+    @max_publish_wait ||= 500
+  end
+
   # max_backlog_size is per multiplexed channel 
   def initialize(redis_config = {}, max_backlog_size = 1000)
     @redis_config = redis_config
@@ -42,14 +67,14 @@ class MessageBus::ReliablePubSub
     @pub_redis ||= new_redis_connection
   end
 
-  def offset_key(channel)
-    "__mb_offset_#{channel}"
-  end
-
   def backlog_key(channel)
     "__mb_backlog_#{channel}"
   end
 
+  def backlog_id_key(channel)
+    "__mb_backlog_id_#{channel}"
+  end
+
   def global_id_key
     "__mb_global_id"
   end
@@ -57,10 +82,6 @@ class MessageBus::ReliablePubSub
   def global_backlog_key
     "__mb_global_backlog"
   end
-  
-  def global_offset_key
-    "__mb_global_offset"
-  end
 
   # use with extreme care, will nuke all of the data
   def reset! 
@@ -71,74 +92,49 @@ class MessageBus::ReliablePubSub
 
   def publish(channel, data)
     redis = pub_redis 
-    offset_key = offset_key(channel)
+    backlog_id_key = backlog_id_key(channel)
     backlog_key = backlog_key(channel)
 
-    redis.watch(offset_key, backlog_key, global_id_key, global_backlog_key, global_offset_key) do
-      offset = redis.get(offset_key).to_i
-      backlog = redis.llen(backlog_key).to_i
+    global_id = nil
+    backlog_id = nil
 
-      global_offset = redis.get(global_offset_key).to_i
-      global_backlog = redis.llen(global_backlog_key).to_i
-
-      global_id = redis.get(global_id_key).to_i
-      global_id += 1
-
-      too_big = backlog + 1 > @max_backlog_size
-      global_too_big = global_backlog + 1 > @max_global_backlog_size
-
-      message_id = backlog + offset + 1 
-      redis.multi do 
-        if too_big
-          redis.ltrim backlog_key, (backlog+1) - @max_backlog_size, -1
-          offset += (backlog+1) - @max_backlog_size
-          redis.set(offset_key, offset)
-        end
-
-        if global_too_big
-          redis.ltrim global_backlog_key, (global_backlog+1) - @max_global_backlog_size, -1
-          global_offset += (global_backlog+1) - @max_global_backlog_size
-          redis.set(global_offset_key, global_offset)
-        end
-
-        msg = MessageBus::Message.new global_id, message_id, channel, data
-        payload = msg.encode
-
-        redis.set global_id_key, global_id
-        redis.rpush backlog_key, payload
-        redis.rpush global_backlog_key, message_id.to_s << "|" << channel
-        redis.publish redis_channel_name, payload
-      end
-
-      return message_id
+    redis.multi do |m|
+      global_id = m.incr(global_id_key)
+      backlog_id = m.incr(backlog_id_key)
     end
+
+    global_id = global_id.value
+    backlog_id = backlog_id.value
+
+    msg = MessageBus::Message.new global_id, backlog_id, channel, data
+    payload = msg.encode
+
+    redis.zadd backlog_key, backlog_id, payload
+    redis.zadd global_backlog_key, global_id, backlog_id.to_s << "|" << channel
+
+    redis.publish redis_channel_name, payload
+
+    if backlog_id > @max_backlog_size
+      redis.zremrangebyscore backlog_key, 1, backlog_id - @max_backlog_size
+    end
+
+    if global_id > @max_global_backlog_size
+      redis.zremrangebyscore global_backlog_key, 1, backlog_id - @max_backlog_size
+    end
+
+    backlog_id
   end
 
   def last_id(channel)
     redis = pub_redis 
-    offset_key = offset_key(channel)
-    backlog_key = backlog_key(channel)
-    
-    offset,len = nil
-    redis.watch offset_key, backlog_key do 
-      offset = redis.get(offset_key).to_i
-      len = redis.llen backlog_key
-    end
-    offset + len
+    backlog_id_key = backlog_id_key(channel)
+    redis.get(backlog_id_key).to_i
   end
 
   def backlog(channel, last_id = nil)
     redis = pub_redis 
-    offset_key = offset_key(channel)
     backlog_key = backlog_key(channel)
-
-    items = nil
-
-    redis.watch offset_key, backlog_key do 
-      offset = redis.get(offset_key).to_i
-      start_at = last_id.to_i - offset
-      items = redis.lrange backlog_key, start_at, -1
-    end
+    items = redis.zrangebyscore backlog_key, last_id.to_i + 1, "+inf"
 
     items.map do |i|
       MessageBus::Message.decode(i)
@@ -147,14 +143,9 @@ class MessageBus::ReliablePubSub
 
   def global_backlog(last_id = nil)
     last_id = last_id.to_i
-    items = nil
     redis = pub_redis
 
-    redis.watch global_backlog_key, global_offset_key do 
-      offset = redis.get(global_offset_key).to_i
-      start_at = last_id.to_i - offset
-      items = redis.lrange global_backlog_key, start_at, -1
-    end
+    items = redis.zrangebyscore global_backlog_key, last_id.to_i + 1, "+inf"
 
     items.map! do |i|
       pipe = i.index "|"
@@ -165,27 +156,19 @@ class MessageBus::ReliablePubSub
     end
 
     items.compact!
-
     items
   end
 
   def get_message(channel, message_id)
     redis = pub_redis 
-    offset_key = offset_key(channel)
     backlog_key = backlog_key(channel)
 
-    msg = nil
-    redis.watch(offset_key, backlog_key) do
-      offset = redis.get(offset_key).to_i
-      idx = (message_id-1) - offset
-      return nil if idx < 0 
-      msg = redis.lindex(backlog_key, idx)
+    items = redis.zrangebyscore backlog_key, message_id, message_id
+    if items && items[0] 
+      MessageBus::Message.decode(items[0])
+    else
+      nil
     end
-
-    if msg 
-      msg = MessageBus::Message.decode(msg)
-    end
-    msg
   end
 
   def subscribe(channel, last_id = nil)
@@ -193,22 +176,53 @@ class MessageBus::ReliablePubSub
     #   can cut down on connections if we only have one global subscriber 
     raise ArgumentError unless block_given?
 
+    if last_id
+      # we need to translate this to a global id, at least give it a shot
+      #   we are subscribing on global and global is always going to be bigger than local
+      #   so worst case is a replay of a few messages
+      message = get_message(channel, last_id)
+      if message
+        last_id = message.global_id
+      end
+    end
     global_subscribe(last_id) do |m|
       yield m if m.channel == channel
     end
   end
 
+  def process_global_backlog(highest_id, raise_error, &blk)
+    global_backlog(highest_id).each do |old|
+      if highest_id + 1 == old.global_id
+        yield old
+        highest_id = old.global_id
+      else
+        raise BackLogOutOfOrder.new(highest_id) if raise_error
+        if old.global_id > highest_id
+          yield old
+          highest_id = old.global_id
+        end
+      end
+    end
+    highest_id
+  end
+
   def global_subscribe(last_id=nil, &blk)
     raise ArgumentError unless block_given?
     highest_id = last_id
 
-    clear_backlog = lambda do 
-      global_backlog(highest_id).each do |old|
-        highest_id = old.global_id
-        yield old
+    clear_backlog = lambda do
+      retries = 4
+      begin 
+        highest_id = process_global_backlog(highest_id, retries > 0, &blk)
+      rescue BackLogOutOfOrder => e
+        highest_id = e.highest_id
+        retries -= 1
+        sleep(rand(50) / 1000.0)
+        retry
       end
     end
 
+
     begin
       redis = new_redis_connection
 
@@ -224,11 +238,18 @@ class MessageBus::ReliablePubSub
         end
         on.message do |c,m|
           m = MessageBus::Message.decode m
-          if highest_id && m.global_id != highest_id + 1
+
+          # we have 2 options
+          #
+          # 1. message came in the correct order GREAT, just deal with it
+          # 2. message came in the incorrect order COMPLICATED, wait a tiny bit and clear backlog 
+          
+          if highest_id.nil? || m.global_id == highest_id + 1
+            highest_id = m.global_id 
+            yield m
+          else 
             clear_backlog.call(&blk)
           end
-          yield m if highest_id.nil? || m.global_id > highest_id
-          highest_id = m.global_id
         end
       end
     rescue => error
@@ -238,5 +259,4 @@ class MessageBus::ReliablePubSub
     end
   end
 
-
 end
diff --git a/vendor/gems/message_bus/spec/lib/message_bus_spec.rb b/vendor/gems/message_bus/spec/lib/message_bus_spec.rb
index f4c6cbc58..de33fddb3 100644
--- a/vendor/gems/message_bus/spec/lib/message_bus_spec.rb
+++ b/vendor/gems/message_bus/spec/lib/message_bus_spec.rb
@@ -70,8 +70,6 @@ describe MessageBus do
 
     r = MessageBus.backlog("/chuck", id)
 
-    wait_for(1000) { r.length == 2 }
-
     r.map{|i| i.data}.to_a.should == ['foo', 'bar']
   end
 
diff --git a/vendor/gems/message_bus/spec/lib/multi_process_spec.rb b/vendor/gems/message_bus/spec/lib/multi_process_spec.rb
new file mode 100644
index 000000000..89cdef42e
--- /dev/null
+++ b/vendor/gems/message_bus/spec/lib/multi_process_spec.rb
@@ -0,0 +1,60 @@
+require 'spec_helper'
+require 'message_bus'
+
+describe MessageBus::ReliablePubSub do
+
+  def new_bus
+    MessageBus::ReliablePubSub.new(:db => 10)
+  end
+
+  def work_it
+    Signal.trap("HUP") { exit }
+
+    bus = new_bus
+    $stdout.reopen("/dev/null", "w")
+    $stderr.reopen("/dev/null", "w")
+    # subscribe blocks, so we need a new bus to transmit
+    new_bus.subscribe("/echo", 0) do |msg|
+      bus.publish("/response", Process.pid.to_s)
+    end
+  end
+
+  def spawn_child
+    r = fork
+    if r.nil?
+      work_it
+    else
+      r
+    end
+  end
+
+  it 'gets every response from child processes' do 
+    pid = nil
+    Redis.new(:db => 10).flushall
+    begin
+      pids = (1..10).map{spawn_child} 
+      responses = [] 
+      bus = MessageBus::ReliablePubSub.new(:db => 10)
+      Thread.new do
+        bus.subscribe("/response", 0) do |msg|
+          responses << msg if pids.include? msg.data.to_i
+        end
+      end
+      10.times{bus.publish("/echo", Process.pid.to_s)}
+      wait_for 4000 do 
+        responses.count == 100
+      end
+
+      # p responses.group_by(&:data).map{|k,v|[k, v.count]}
+      # p responses.group_by(&:global_id).map{|k,v|[k, v.count]}
+      responses.count.should == 100
+    ensure
+      if pids
+        pids.each do |pid| 
+          Process.kill("HUP", pid)
+          Process.wait(pid)
+        end
+      end
+    end
+  end
+end
diff --git a/vendor/gems/message_bus/spec/lib/reliable_pub_sub_spec.rb b/vendor/gems/message_bus/spec/lib/reliable_pub_sub_spec.rb
index 4a89d58fc..5b7fbf6d7 100644
--- a/vendor/gems/message_bus/spec/lib/reliable_pub_sub_spec.rb
+++ b/vendor/gems/message_bus/spec/lib/reliable_pub_sub_spec.rb
@@ -101,8 +101,8 @@ describe MessageBus::ReliablePubSub do
     end
 
     t.kill
+  
     got.length.should == 3
-
     got.map{|m| m.data}.should == ["1","2","3"]
   end