From f3c6144e3b66cadce9a0970c7d8437091b895117 Mon Sep 17 00:00:00 2001
From: Sam Saffron <sam.saffron@gmail.com>
Date: Fri, 8 Feb 2013 21:39:38 +1100
Subject: [PATCH] Something here is messed up Revert "message bus fixes and
 diagnostics"

This reverts commit 36d1aafe1eddf0f483cc77ec4e9217237fdd6d08.
---
 config/application.rb                         |   1 -
 lib/message_bus_diags.rb                      |  32 ---
 .../lib/message_bus/reliable_pub_sub.rb       | 200 ++++++++----------
 .../message_bus/spec/lib/message_bus_spec.rb  |   2 +
 .../spec/lib/multi_process_spec.rb            |  60 ------
 .../spec/lib/reliable_pub_sub_spec.rb         |   2 +-
 6 files changed, 93 insertions(+), 204 deletions(-)
 delete mode 100644 lib/message_bus_diags.rb
 delete mode 100644 vendor/gems/message_bus/spec/lib/multi_process_spec.rb

diff --git a/config/application.rb b/config/application.rb
index abfeaa717..85723bf21 100644
--- a/config/application.rb
+++ b/config/application.rb
@@ -20,7 +20,6 @@ module Discourse
     # -- all .rb files in that directory are automatically loaded.
 
     require 'discourse'
-    require 'message_bus_diags'
 
     # Custom directories with classes and modules you want to be autoloadable.
     config.autoload_paths += %W(#{config.root}/app/serializers)
diff --git a/lib/message_bus_diags.rb b/lib/message_bus_diags.rb
deleted file mode 100644
index 5d86bb61d..000000000
--- a/lib/message_bus_diags.rb
+++ /dev/null
@@ -1,32 +0,0 @@
-class MessageBusDiags
-
-  @host_info = {}
-
-  def self.my_id
-    @my_id ||= "#{`hostname`}-#{Process.pid}"
-  end
-
-  def self.seen_host(name)
-    @host_info[name] = DateTime.now
-  end
-
-  def self.establish_peer_names
-    MessageBus.publish "/server-name", {channel: "/server-name-reply/#{my_id}"}
-  end
-
-  def self.seen_hosts
-    @host_info
-  end
-  
-  unless @subscribed
-
-    MessageBus.subscribe "/server-name-reply/#{my_id}" do |msg|
-      MessageBusDiags.seen_host(msg.data)
-    end
-
-    MessageBus.subscribe "/server-name" do |msg|
-      MessageBus.publish msg.data["channel"], MessageBusDiags.my_id
-    end
-    @subscribed = true
-  end
-end
diff --git a/vendor/gems/message_bus/lib/message_bus/reliable_pub_sub.rb b/vendor/gems/message_bus/lib/message_bus/reliable_pub_sub.rb
index 73d7992c5..6ea0726dc 100644
--- a/vendor/gems/message_bus/lib/message_bus/reliable_pub_sub.rb
+++ b/vendor/gems/message_bus/lib/message_bus/reliable_pub_sub.rb
@@ -10,31 +10,6 @@ require 'redis'
 
 class MessageBus::ReliablePubSub
 
-  class NoMoreRetries < StandardError; end
-  class BackLogOutOfOrder < StandardError 
-    attr_accessor :highest_id
-
-    def initialize(highest_id)
-      @highest_id = highest_id
-    end
-  end
-
-  def max_publish_retries=(val)
-    @max_publish_retries = val
-  end
-
-  def max_publish_retries
-    @max_publish_retries ||= 10
-  end
-  
-  def max_publish_wait=(ms)
-    @max_publish_wait = ms
-  end
-
-  def max_publish_wait
-    @max_publish_wait ||= 500
-  end
-
   # max_backlog_size is per multiplexed channel 
   def initialize(redis_config = {}, max_backlog_size = 1000)
     @redis_config = redis_config
@@ -67,12 +42,12 @@ class MessageBus::ReliablePubSub
     @pub_redis ||= new_redis_connection
   end
 
-  def backlog_key(channel)
-    "__mb_backlog_#{channel}"
+  def offset_key(channel)
+    "__mb_offset_#{channel}"
   end
 
-  def backlog_id_key(channel)
-    "__mb_backlog_id_#{channel}"
+  def backlog_key(channel)
+    "__mb_backlog_#{channel}"
   end
 
   def global_id_key
@@ -82,6 +57,10 @@ class MessageBus::ReliablePubSub
   def global_backlog_key
     "__mb_global_backlog"
   end
+  
+  def global_offset_key
+    "__mb_global_offset"
+  end
 
   # use with extreme care, will nuke all of the data
   def reset! 
@@ -92,49 +71,74 @@ class MessageBus::ReliablePubSub
 
   def publish(channel, data)
     redis = pub_redis 
-    backlog_id_key = backlog_id_key(channel)
+    offset_key = offset_key(channel)
     backlog_key = backlog_key(channel)
 
-    global_id = nil
-    backlog_id = nil
+    redis.watch(offset_key, backlog_key, global_id_key, global_backlog_key, global_offset_key) do
+      offset = redis.get(offset_key).to_i
+      backlog = redis.llen(backlog_key).to_i
 
-    redis.multi do |m|
-      global_id = m.incr(global_id_key)
-      backlog_id = m.incr(backlog_id_key)
+      global_offset = redis.get(global_offset_key).to_i
+      global_backlog = redis.llen(global_backlog_key).to_i
+
+      global_id = redis.get(global_id_key).to_i
+      global_id += 1
+
+      too_big = backlog + 1 > @max_backlog_size
+      global_too_big = global_backlog + 1 > @max_global_backlog_size
+
+      message_id = backlog + offset + 1 
+      redis.multi do 
+        if too_big
+          redis.ltrim backlog_key, (backlog+1) - @max_backlog_size, -1
+          offset += (backlog+1) - @max_backlog_size
+          redis.set(offset_key, offset)
+        end
+
+        if global_too_big
+          redis.ltrim global_backlog_key, (global_backlog+1) - @max_global_backlog_size, -1
+          global_offset += (global_backlog+1) - @max_global_backlog_size
+          redis.set(global_offset_key, global_offset)
+        end
+
+        msg = MessageBus::Message.new global_id, message_id, channel, data
+        payload = msg.encode
+
+        redis.set global_id_key, global_id
+        redis.rpush backlog_key, payload
+        redis.rpush global_backlog_key, message_id.to_s << "|" << channel
+        redis.publish redis_channel_name, payload
+      end
+
+      return message_id
     end
-
-    global_id = global_id.value
-    backlog_id = backlog_id.value
-
-    msg = MessageBus::Message.new global_id, backlog_id, channel, data
-    payload = msg.encode
-
-    redis.zadd backlog_key, backlog_id, payload
-    redis.zadd global_backlog_key, global_id, backlog_id.to_s << "|" << channel
-
-    redis.publish redis_channel_name, payload
-
-    if backlog_id > @max_backlog_size
-      redis.zremrangebyscore backlog_key, 1, backlog_id - @max_backlog_size
-    end
-
-    if global_id > @max_global_backlog_size
-      redis.zremrangebyscore global_backlog_key, 1, backlog_id - @max_backlog_size
-    end
-
-    backlog_id
   end
 
   def last_id(channel)
     redis = pub_redis 
-    backlog_id_key = backlog_id_key(channel)
-    redis.get(backlog_id_key).to_i
+    offset_key = offset_key(channel)
+    backlog_key = backlog_key(channel)
+    
+    offset,len = nil
+    redis.watch offset_key, backlog_key do 
+      offset = redis.get(offset_key).to_i
+      len = redis.llen backlog_key
+    end
+    offset + len
   end
 
   def backlog(channel, last_id = nil)
     redis = pub_redis 
+    offset_key = offset_key(channel)
     backlog_key = backlog_key(channel)
-    items = redis.zrangebyscore backlog_key, last_id.to_i + 1, "+inf"
+
+    items = nil
+
+    redis.watch offset_key, backlog_key do 
+      offset = redis.get(offset_key).to_i
+      start_at = last_id.to_i - offset
+      items = redis.lrange backlog_key, start_at, -1
+    end
 
     items.map do |i|
       MessageBus::Message.decode(i)
@@ -143,9 +147,14 @@ class MessageBus::ReliablePubSub
 
   def global_backlog(last_id = nil)
     last_id = last_id.to_i
+    items = nil
     redis = pub_redis
 
-    items = redis.zrangebyscore global_backlog_key, last_id.to_i + 1, "+inf"
+    redis.watch global_backlog_key, global_offset_key do 
+      offset = redis.get(global_offset_key).to_i
+      start_at = last_id.to_i - offset
+      items = redis.lrange global_backlog_key, start_at, -1
+    end
 
     items.map! do |i|
       pipe = i.index "|"
@@ -156,19 +165,27 @@ class MessageBus::ReliablePubSub
     end
 
     items.compact!
+
     items
   end
 
   def get_message(channel, message_id)
     redis = pub_redis 
+    offset_key = offset_key(channel)
     backlog_key = backlog_key(channel)
 
-    items = redis.zrangebyscore backlog_key, message_id, message_id
-    if items && items[0] 
-      MessageBus::Message.decode(items[0])
-    else
-      nil
+    msg = nil
+    redis.watch(offset_key, backlog_key) do
+      offset = redis.get(offset_key).to_i
+      idx = (message_id-1) - offset
+      return nil if idx < 0 
+      msg = redis.lindex(backlog_key, idx)
     end
+
+    if msg 
+      msg = MessageBus::Message.decode(msg)
+    end
+    msg
   end
 
   def subscribe(channel, last_id = nil)
@@ -176,53 +193,22 @@ class MessageBus::ReliablePubSub
     #   can cut down on connections if we only have one global subscriber 
     raise ArgumentError unless block_given?
 
-    if last_id
-      # we need to translate this to a global id, at least give it a shot
-      #   we are subscribing on global and global is always going to be bigger than local
-      #   so worst case is a replay of a few messages
-      message = get_message(channel, last_id)
-      if message
-        last_id = message.global_id
-      end
-    end
     global_subscribe(last_id) do |m|
       yield m if m.channel == channel
     end
   end
 
-  def process_global_backlog(highest_id, raise_error, &blk)
-    global_backlog(highest_id).each do |old|
-      if highest_id + 1 == old.global_id
-        yield old
-        highest_id = old.global_id
-      else
-        raise BackLogOutOfOrder.new(highest_id) if raise_error
-        if old.global_id > highest_id
-          yield old
-          highest_id = old.global_id
-        end
-      end
-    end
-    highest_id
-  end
-
   def global_subscribe(last_id=nil, &blk)
     raise ArgumentError unless block_given?
     highest_id = last_id
 
-    clear_backlog = lambda do
-      retries = 4
-      begin 
-        highest_id = process_global_backlog(highest_id, retries > 0, &blk)
-      rescue BackLogOutOfOrder => e
-        highest_id = e.highest_id
-        retries -= 1
-        sleep(rand(50) / 1000.0)
-        retry
+    clear_backlog = lambda do 
+      global_backlog(highest_id).each do |old|
+        highest_id = old.global_id
+        yield old
       end
     end
 
-
     begin
       redis = new_redis_connection
 
@@ -238,18 +224,11 @@ class MessageBus::ReliablePubSub
         end
         on.message do |c,m|
           m = MessageBus::Message.decode m
-
-          # we have 2 options
-          #
-          # 1. message came in the correct order GREAT, just deal with it
-          # 2. message came in the incorrect order COMPLICATED, wait a tiny bit and clear backlog 
-          
-          if highest_id.nil? || m.global_id == highest_id + 1
-            highest_id = m.global_id 
-            yield m
-          else 
+          if highest_id && m.global_id != highest_id + 1
             clear_backlog.call(&blk)
           end
+          yield m if highest_id.nil? || m.global_id > highest_id
+          highest_id = m.global_id
         end
       end
     rescue => error
@@ -259,4 +238,5 @@ class MessageBus::ReliablePubSub
     end
   end
 
+
 end
diff --git a/vendor/gems/message_bus/spec/lib/message_bus_spec.rb b/vendor/gems/message_bus/spec/lib/message_bus_spec.rb
index de33fddb3..f4c6cbc58 100644
--- a/vendor/gems/message_bus/spec/lib/message_bus_spec.rb
+++ b/vendor/gems/message_bus/spec/lib/message_bus_spec.rb
@@ -70,6 +70,8 @@ describe MessageBus do
 
     r = MessageBus.backlog("/chuck", id)
 
+    wait_for(1000) { r.length == 2 }
+
     r.map{|i| i.data}.to_a.should == ['foo', 'bar']
   end
 
diff --git a/vendor/gems/message_bus/spec/lib/multi_process_spec.rb b/vendor/gems/message_bus/spec/lib/multi_process_spec.rb
deleted file mode 100644
index 89cdef42e..000000000
--- a/vendor/gems/message_bus/spec/lib/multi_process_spec.rb
+++ /dev/null
@@ -1,60 +0,0 @@
-require 'spec_helper'
-require 'message_bus'
-
-describe MessageBus::ReliablePubSub do
-
-  def new_bus
-    MessageBus::ReliablePubSub.new(:db => 10)
-  end
-
-  def work_it
-    Signal.trap("HUP") { exit }
-
-    bus = new_bus
-    $stdout.reopen("/dev/null", "w")
-    $stderr.reopen("/dev/null", "w")
-    # subscribe blocks, so we need a new bus to transmit
-    new_bus.subscribe("/echo", 0) do |msg|
-      bus.publish("/response", Process.pid.to_s)
-    end
-  end
-
-  def spawn_child
-    r = fork
-    if r.nil?
-      work_it
-    else
-      r
-    end
-  end
-
-  it 'gets every response from child processes' do 
-    pid = nil
-    Redis.new(:db => 10).flushall
-    begin
-      pids = (1..10).map{spawn_child} 
-      responses = [] 
-      bus = MessageBus::ReliablePubSub.new(:db => 10)
-      Thread.new do
-        bus.subscribe("/response", 0) do |msg|
-          responses << msg if pids.include? msg.data.to_i
-        end
-      end
-      10.times{bus.publish("/echo", Process.pid.to_s)}
-      wait_for 4000 do 
-        responses.count == 100
-      end
-
-      # p responses.group_by(&:data).map{|k,v|[k, v.count]}
-      # p responses.group_by(&:global_id).map{|k,v|[k, v.count]}
-      responses.count.should == 100
-    ensure
-      if pids
-        pids.each do |pid| 
-          Process.kill("HUP", pid)
-          Process.wait(pid)
-        end
-      end
-    end
-  end
-end
diff --git a/vendor/gems/message_bus/spec/lib/reliable_pub_sub_spec.rb b/vendor/gems/message_bus/spec/lib/reliable_pub_sub_spec.rb
index 5b7fbf6d7..4a89d58fc 100644
--- a/vendor/gems/message_bus/spec/lib/reliable_pub_sub_spec.rb
+++ b/vendor/gems/message_bus/spec/lib/reliable_pub_sub_spec.rb
@@ -101,8 +101,8 @@ describe MessageBus::ReliablePubSub do
     end
 
     t.kill
-  
     got.length.should == 3
+
     got.map{|m| m.data}.should == ["1","2","3"]
   end