discourse/vendor/gems/message_bus/lib/message_bus.rb
2013-02-05 14:16:51 -05:00

240 lines
5 KiB
Ruby

# require 'thin'
# require 'eventmachine'
# require 'rack'
# require 'redis'
require "message_bus/version"
require "message_bus/message"
require "message_bus/reliable_pub_sub"
require "message_bus/client"
require "message_bus/connection_manager"
require "message_bus/message_handler"
require "message_bus/rack/middleware"
# we still need to take care of the logger
if defined?(::Rails)
require 'message_bus/rails/railtie'
end
module MessageBus; end
module MessageBus::Implementation
def logger=(logger)
@logger = logger
end
def logger
return @logger if @logger
require 'logger'
@logger = Logger.new(STDOUT)
end
def sockets_enabled?
@sockets_enabled == false ? false : true
end
def sockets_enabled=(val)
@sockets_enabled = val
end
def long_polling_enabled?
@long_polling_enabled == false ? false : true
end
def long_polling_enabled=(val)
@long_polling_enabled = val
end
def long_polling_interval=(millisecs)
@long_polling_interval = millisecs
end
def long_polling_interval
@long_polling_interval || 30 * 1000
end
def off
@off = true
end
def on
@off = false
end
# Allow us to inject a redis db
def redis_config=(config)
@redis_config = config
end
def redis_config
@redis_config ||= {}
end
def site_id_lookup(&blk)
@site_id_lookup ||= blk
end
def user_id_lookup(&blk)
@user_id_lookup ||= blk
end
def on_connect(&blk)
@on_connect ||= blk
end
def on_disconnect(&blk)
@on_disconnect ||= blk
end
def allow_broadcast=(val)
@allow_broadcast = val
end
def allow_broadcast?
@allow_broadcast ||=
if defined? ::Rails
::Rails.env.test? || ::Rails.env.development?
else
false
end
end
def reliable_pub_sub
@reliable_pub_sub ||= MessageBus::ReliablePubSub.new redis_config
end
def publish(channel, data, opts = nil)
return if @off
user_ids = nil
if opts
user_ids = opts[:user_ids] if opts
end
encoded_data = JSON.dump({
data: data,
user_ids: user_ids
})
reliable_pub_sub.publish(encode_channel_name(channel), encoded_data)
end
def blocking_subscribe(channel=nil, &blk)
if channel
reliable_pub_sub.subscribe(encode_channel_name(channel), &blk)
else
reliable_pub_sub.global_subscribe(&blk)
end
end
ENCODE_SITE_TOKEN = "$|$"
# encode channel name to include site
def encode_channel_name(channel)
if MessageBus.site_id_lookup
raise ArgumentError.new channel if channel.include? ENCODE_SITE_TOKEN
"#{channel}#{ENCODE_SITE_TOKEN}#{MessageBus.site_id_lookup.call}"
else
channel
end
end
def decode_channel_name(channel)
channel.split(ENCODE_SITE_TOKEN)
end
def subscribe(channel=nil, &blk)
subscribe_impl(channel, nil, &blk)
end
# subscribe only on current site
def local_subscribe(channel=nil, &blk)
site_id = MessageBus.site_id_lookup.call if MessageBus.site_id_lookup
subscribe_impl(channel, site_id, &blk)
end
def backlog(channel=nil, last_id)
old =
if channel
reliable_pub_sub.backlog(encode_channel_name(channel), last_id)
else
reliable_pub_sub.global_backlog(encode_channel_name(channel), last_id)
end
old.each{ |m|
decode_message!(m)
}
old
end
def last_id(channel)
reliable_pub_sub.last_id(encode_channel_name(channel))
end
protected
def decode_message!(msg)
channel, site_id = decode_channel_name(msg.channel)
msg.channel = channel
msg.site_id = site_id
parsed = JSON.parse(msg.data)
msg.data = parsed["data"]
msg.user_ids = parsed["user_ids"]
end
def subscribe_impl(channel, site_id, &blk)
@subscriptions ||= {}
@subscriptions[site_id] ||= {}
@subscriptions[site_id][channel] ||= []
@subscriptions[site_id][channel] << blk
ensure_subscriber_thread
end
def ensure_subscriber_thread
@mutex ||= Mutex.new
@mutex.synchronize do
return if @subscriber_thread
@subscriber_thread = Thread.new do
reliable_pub_sub.global_subscribe do |msg|
begin
decode_message!(msg)
globals = @subscriptions[nil]
locals = @subscriptions[msg.site_id] if msg.site_id
global_globals = globals[nil] if globals
local_globals = locals[nil] if locals
globals = globals[msg.channel] if globals
locals = locals[msg.channel] if locals
multi_each(globals,locals, global_globals, local_globals) do |c|
c.call msg
end
rescue => e
MessageBus.logger.warn "failed to process message #{msg.inspect}\n ex: #{e} backtrace: #{e.backtrace}"
end
end
end
end
end
def multi_each(*args,&block)
args.each do |a|
a.each(&block) if a
end
end
end
module MessageBus
extend MessageBus::Implementation
end
# allows for multiple buses per app
class MessageBus::Instance
include MessageBus::Implementation
end