Don't blow up if Redis switches to READONLY

This commit is contained in:
Robin Ward 2015-04-24 13:10:43 -04:00
parent 8129cce104
commit 5b3f99aa50
56 changed files with 194 additions and 147 deletions

View file

@ -54,7 +54,7 @@ class Admin::UsersController < Admin::AdminController
@user.suspended_at = DateTime.now @user.suspended_at = DateTime.now
@user.save! @user.save!
StaffActionLogger.new(current_user).log_user_suspend(@user, params[:reason]) StaffActionLogger.new(current_user).log_user_suspend(@user, params[:reason])
MessageBus.publish "/logout", @user.id, user_ids: [@user.id] DiscourseBus.publish "/logout", @user.id, user_ids: [@user.id]
render nothing: true render nothing: true
end end
@ -71,7 +71,7 @@ class Admin::UsersController < Admin::AdminController
if @user if @user
@user.auth_token = nil @user.auth_token = nil
@user.save! @user.save!
MessageBus.publish "/logout", @user.id, user_ids: [@user.id] DiscourseBus.publish "/logout", @user.id, user_ids: [@user.id]
render json: success_json render json: success_json
else else
render json: {error: I18n.t('admin_js.admin.users.id_not_found')}, status: 404 render json: {error: I18n.t('admin_js.admin.users.id_not_found')}, status: 404
@ -350,7 +350,7 @@ class Admin::UsersController < Admin::AdminController
end end
def refresh_browser(user) def refresh_browser(user)
MessageBus.publish "/file-change", ["refresh"], user_ids: [user.id] DiscourseBus.publish "/file-change", ["refresh"], user_ids: [user.id]
end end
end end

View file

@ -222,7 +222,7 @@ SQL
end end
def publish_categories_list def publish_categories_list
MessageBus.publish('/categories', {categories: ActiveModel::ArraySerializer.new(Category.latest).as_json}) DiscourseBus.publish('/categories', {categories: ActiveModel::ArraySerializer.new(Category.latest).as_json})
end end
def parent_category_validator def parent_category_validator

View file

@ -97,7 +97,7 @@ class ColorScheme < ActiveRecord::Base
end end
def publish_discourse_stylesheet def publish_discourse_stylesheet
MessageBus.publish("/discourse_stylesheet", self.name) DiscourseBus.publish("/discourse_stylesheet", self.name)
DiscourseStylesheets.cache.clear DiscourseStylesheets.cache.clear
end end

View file

@ -97,7 +97,7 @@ class Post < ActiveRecord::Base
# special failsafe for posts missing topics # special failsafe for posts missing topics
# consistency checks should fix, but message # consistency checks should fix, but message
# is safe to skip # is safe to skip
MessageBus.publish("/topic/#{topic_id}", { DiscourseBus.publish("/topic/#{topic_id}", {
id: id, id: id,
post_number: post_number, post_number: post_number,
updated_at: Time.now, updated_at: Time.now,

View file

@ -56,7 +56,7 @@ class PostAction < ActiveRecord::Base
$redis.set('posts_flagged_count', posts_flagged_count) $redis.set('posts_flagged_count', posts_flagged_count)
user_ids = User.staff.pluck(:id) user_ids = User.staff.pluck(:id)
MessageBus.publish('/flagged_counts', { total: posts_flagged_count }, { user_ids: user_ids }) DiscourseBus.publish('/flagged_counts', { total: posts_flagged_count }, { user_ids: user_ids })
end end
def self.flagged_posts_count def self.flagged_posts_count

View file

@ -30,7 +30,7 @@ class QueuedPost < ActiveRecord::Base
def self.broadcast_new! def self.broadcast_new!
msg = { post_queue_new_count: QueuedPost.new_count } msg = { post_queue_new_count: QueuedPost.new_count }
MessageBus.publish('/queue_counts', msg, user_ids: User.staff.pluck(:id)) DiscourseBus.publish('/queue_counts', msg, user_ids: User.staff.pluck(:id))
end end
def reject!(rejected_by) def reject!(rejected_by)

View file

@ -34,11 +34,11 @@ class SiteCustomization < ActiveRecord::Base
after_save do after_save do
remove_from_cache! remove_from_cache!
if stylesheet_changed? || mobile_stylesheet_changed? if stylesheet_changed? || mobile_stylesheet_changed?
MessageBus.publish "/file-change/#{key}", SecureRandom.hex DiscourseBus.publish "/file-change/#{key}", SecureRandom.hex
MessageBus.publish "/file-change/#{SiteCustomization::ENABLED_KEY}", SecureRandom.hex DiscourseBus.publish "/file-change/#{SiteCustomization::ENABLED_KEY}", SecureRandom.hex
end end
MessageBus.publish "/header-change/#{key}", header if header_changed? DiscourseBus.publish "/header-change/#{key}", header if header_changed?
MessageBus.publish "/footer-change/#{key}", footer if footer_changed? DiscourseBus.publish "/footer-change/#{key}", footer if footer_changed?
DiscourseStylesheets.cache.clear DiscourseStylesheets.cache.clear
end end
@ -109,7 +109,7 @@ class SiteCustomization < ActiveRecord::Base
end end
def self.remove_from_cache!(key, broadcast = true) def self.remove_from_cache!(key, broadcast = true)
MessageBus.publish('/site_customization', key: key) if broadcast DiscourseBus.publish('/site_customization', key: key) if broadcast
clear_cache! clear_cache!
end end

View file

@ -636,7 +636,7 @@ class Topic < ActiveRecord::Base
self.add_moderator_post(user, I18n.t("archetypes.banner.message.make")) self.add_moderator_post(user, I18n.t("archetypes.banner.message.make"))
self.save self.save
MessageBus.publish('/site/banner', banner) DiscourseBus.publish('/site/banner', banner)
end end
def remove_banner!(user) def remove_banner!(user)
@ -644,7 +644,7 @@ class Topic < ActiveRecord::Base
self.add_moderator_post(user, I18n.t("archetypes.banner.message.remove")) self.add_moderator_post(user, I18n.t("archetypes.banner.message.remove"))
self.save self.save
MessageBus.publish('/site/banner', nil) DiscourseBus.publish('/site/banner', nil)
end end
def banner def banner

View file

@ -33,7 +33,7 @@ class TopicTrackingState
group_ids = topic.category && topic.category.secure_group_ids group_ids = topic.category && topic.category.secure_group_ids
MessageBus.publish("/new", message.as_json, group_ids: group_ids) DiscourseBus.publish("/new", message.as_json, group_ids: group_ids)
publish_read(topic.id, 1, topic.user_id) publish_read(topic.id, 1, topic.user_id)
end end
@ -51,7 +51,7 @@ class TopicTrackingState
} }
group_ids = topic.category && topic.category.secure_group_ids group_ids = topic.category && topic.category.secure_group_ids
MessageBus.publish("/latest", message.as_json, group_ids: group_ids) DiscourseBus.publish("/latest", message.as_json, group_ids: group_ids)
end end
def self.publish_unread(post) def self.publish_unread(post)
@ -77,7 +77,7 @@ class TopicTrackingState
} }
} }
MessageBus.publish("/unread/#{tu.user_id}", message.as_json, group_ids: group_ids) DiscourseBus.publish("/unread/#{tu.user_id}", message.as_json, group_ids: group_ids)
end end
end end
@ -97,7 +97,7 @@ class TopicTrackingState
} }
} }
MessageBus.publish("/unread/#{user_id}", message.as_json, user_ids: [user_id]) DiscourseBus.publish("/unread/#{user_id}", message.as_json, user_ids: [user_id])
end end

View file

@ -40,7 +40,7 @@ class TopicUser < ActiveRecord::Base
notifications_reason_id: reason notifications_reason_id: reason
) )
MessageBus.publish("/topic/#{topic_id}", { DiscourseBus.publish("/topic/#{topic_id}", {
notification_level_change: notification_levels[:tracking], notification_level_change: notification_levels[:tracking],
notifications_reason_id: reason notifications_reason_id: reason
}, user_ids: [user_id]) }, user_ids: [user_id])
@ -113,7 +113,7 @@ class TopicUser < ActiveRecord::Base
end end
if attrs[:notification_level] if attrs[:notification_level]
MessageBus.publish("/topic/#{topic_id}", DiscourseBus.publish("/topic/#{topic_id}",
{notification_level_change: attrs[:notification_level]}, user_ids: [user_id]) {notification_level_change: attrs[:notification_level]}, user_ids: [user_id])
end end
@ -196,7 +196,7 @@ class TopicUser < ActiveRecord::Base
end end
if before != after if before != after
MessageBus.publish("/topic/#{topic_id}", {notification_level_change: after}, user_ids: [user.id]) DiscourseBus.publish("/topic/#{topic_id}", {notification_level_change: after}, user_ids: [user.id])
end end
end end
@ -219,7 +219,7 @@ class TopicUser < ActiveRecord::Base
WHERE ftu.user_id = :user_id and ftu.topic_id = :topic_id)", WHERE ftu.user_id = :user_id and ftu.topic_id = :topic_id)",
args) args)
MessageBus.publish("/topic/#{topic_id}", {notification_level_change: args[:new_status]}, user_ids: [user.id]) DiscourseBus.publish("/topic/#{topic_id}", {notification_level_change: args[:new_status]}, user_ids: [user.id])
end end
end end

View file

@ -206,7 +206,7 @@ class User < ActiveRecord::Base
# tricky, we need our bus to be subscribed from the right spot # tricky, we need our bus to be subscribed from the right spot
def sync_notification_channel_position def sync_notification_channel_position
@unread_notifications_by_type = nil @unread_notifications_by_type = nil
self.notification_channel_position = MessageBus.last_id("/notification/#{id}") self.notification_channel_position = DiscourseBus.last_id("/notification/#{id}")
end end
def invited_by def invited_by
@ -298,7 +298,7 @@ class User < ActiveRecord::Base
end end
def publish_notifications_state def publish_notifications_state
MessageBus.publish("/notification/#{id}", DiscourseBus.publish("/notification/#{id}",
{unread_notifications: unread_notifications, {unread_notifications: unread_notifications,
unread_private_messages: unread_private_messages, unread_private_messages: unread_private_messages,
total_unread_notifications: total_unread_notifications}, total_unread_notifications: total_unread_notifications},

View file

@ -224,7 +224,7 @@ SQL
end end
if action.user if action.user
MessageBus.publish("/users/#{action.user.username.downcase}", action.id, user_ids: [user_id], group_ids: group_ids) DiscourseBus.publish("/users/#{action.user.username.downcase}", action.id, user_ids: [user_id], group_ids: group_ids)
end end
action action
@ -240,7 +240,7 @@ SQL
require_parameters(hash, :action_type, :user_id, :acting_user_id, :target_topic_id, :target_post_id) require_parameters(hash, :action_type, :user_id, :acting_user_id, :target_topic_id, :target_post_id)
if action = UserAction.find_by(hash.except(:created_at)) if action = UserAction.find_by(hash.except(:created_at))
action.destroy action.destroy
MessageBus.publish("/user/#{hash[:user_id]}", {user_action_id: action.id, remove: true}) DiscourseBus.publish("/user/#{hash[:user_id]}", {user_action_id: action.id, remove: true})
end end
update_like_count(hash[:user_id], hash[:action_type], -1) update_like_count(hash[:user_id], hash[:action_type], -1)

View file

@ -77,7 +77,7 @@ class UserDestroyer
end end
StaffActionLogger.new(@actor == user ? Discourse.system_user : @actor).log_user_deletion(user, opts.slice(:context)) StaffActionLogger.new(@actor == user ? Discourse.system_user : @actor).log_user_deletion(user, opts.slice(:context))
MessageBus.publish "/file-change", ["refresh"], user_ids: [user.id] DiscourseBus.publish "/file-change", ["refresh"], user_ids: [user.id]
end end
end end
end end

View file

@ -0,0 +1,3 @@
require_dependency 'discourse_bus'
DiscourseBus.start!

View file

@ -1,47 +0,0 @@
MessageBus.site_id_lookup do
RailsMultisite::ConnectionManagement.current_db
end
MessageBus.extra_response_headers_lookup do |env|
{
"Access-Control-Allow-Origin" => Discourse.base_url_no_prefix,
"Access-Control-Allow-Methods" => "GET, POST",
"Access-Control-Allow-Headers" => "X-SILENCE-LOGGER, X-Shared-Session-Key"
}
end
MessageBus.user_id_lookup do |env|
user = CurrentUser.lookup_from_env(env)
user.id if user
end
MessageBus.group_ids_lookup do |env|
user = CurrentUser.lookup_from_env(env)
user.groups.select('groups.id').map{|g| g.id} if user
end
MessageBus.on_connect do |site_id|
RailsMultisite::ConnectionManagement.establish_connection(db: site_id)
end
MessageBus.on_disconnect do |site_id|
ActiveRecord::Base.connection_handler.clear_active_connections!
end
# Point at our redis
MessageBus.redis_config = YAML.load(ERB.new(File.new("#{Rails.root}/config/redis.yml").read).result)[Rails.env].symbolize_keys
MessageBus.long_polling_enabled = SiteSetting.enable_long_polling
MessageBus.long_polling_interval = SiteSetting.long_polling_interval
MessageBus.is_admin_lookup do |env|
user = CurrentUser.lookup_from_env(env)
if user && user.admin
true
else
false
end
end
MessageBus.cache_assets = !Rails.env.development?
MessageBus.enable_diagnostics

View file

@ -85,7 +85,7 @@ class Auth::DefaultCurrentUserProvider
if SiteSetting.log_out_strict && (user = current_user) if SiteSetting.log_out_strict && (user = current_user)
user.auth_token = nil user.auth_token = nil
user.save! user.save!
MessageBus.publish "/logout", user.id, user_ids: [user.id] DiscourseBus.publish "/logout", user.id, user_ids: [user.id]
end end
cookies[TOKEN_COOKIE] = nil cookies[TOKEN_COOKIE] = nil
end end

View file

@ -62,7 +62,7 @@ module BackupRestore
def self.logs def self.logs
id = start_logs_message_id id = start_logs_message_id
MessageBus.backlog(LOGS_CHANNEL, id).map { |m| m.data } DiscourseBus.backlog(LOGS_CHANNEL, id).map { |m| m.data }
end end
def self.current_version def self.current_version
@ -142,7 +142,7 @@ module BackupRestore
end end
def self.save_start_logs_message_id def self.save_start_logs_message_id
id = MessageBus.last_id(LOGS_CHANNEL) id = DiscourseBus.last_id(LOGS_CHANNEL)
$redis.set(start_logs_message_id_key, id) $redis.set(start_logs_message_id_key, id)
end end

View file

@ -336,7 +336,7 @@ module BackupRestore
def publish_log(message, timestamp) def publish_log(message, timestamp)
return unless @publish_to_message_bus return unless @publish_to_message_bus
data = { timestamp: timestamp, operation: "backup", message: message } data = { timestamp: timestamp, operation: "backup", message: message }
MessageBus.publish(BackupRestore::LOGS_CHANNEL, data, user_ids: [@user_id]) DiscourseBus.publish(BackupRestore::LOGS_CHANNEL, data, user_ids: [@user_id])
end end
def save_log(message, timestamp) def save_log(message, timestamp)

View file

@ -354,7 +354,7 @@ module BackupRestore
def publish_log(message, timestamp) def publish_log(message, timestamp)
return unless @publish_to_message_bus return unless @publish_to_message_bus
data = { timestamp: timestamp, operation: "restore", message: message } data = { timestamp: timestamp, operation: "restore", message: message }
MessageBus.publish(BackupRestore::LOGS_CHANNEL, data, user_ids: [@user_id]) DiscourseBus.publish(BackupRestore::LOGS_CHANNEL, data, user_ids: [@user_id])
end end
def save_log(message, timestamp) def save_log(message, timestamp)

View file

@ -95,10 +95,10 @@ module Discourse
digest = Digest::MD5.hexdigest(ActionView::Base.assets_manifest.assets.values.sort.join) digest = Digest::MD5.hexdigest(ActionView::Base.assets_manifest.assets.values.sort.join)
channel = "/global/asset-version" channel = "/global/asset-version"
message = MessageBus.last_message(channel) message = DiscourseBus.last_message(channel)
unless message && message.data == digest unless message && message.data == digest
MessageBus.publish channel, digest DiscourseBus.publish channel, digest
end end
digest digest
end end
@ -169,7 +169,7 @@ module Discourse
def self.enable_readonly_mode def self.enable_readonly_mode
$redis.set(readonly_mode_key, 1) $redis.set(readonly_mode_key, 1)
MessageBus.publish(readonly_channel, true) DiscourseBus.publish(readonly_channel, true)
keep_readonly_mode keep_readonly_mode
true true
end end
@ -186,7 +186,7 @@ module Discourse
def self.disable_readonly_mode def self.disable_readonly_mode
$redis.del(readonly_mode_key) $redis.del(readonly_mode_key)
MessageBus.publish(readonly_channel, false) DiscourseBus.publish(readonly_channel, false)
true true
end end
@ -197,9 +197,9 @@ module Discourse
def self.request_refresh! def self.request_refresh!
# Causes refresh on next click for all clients # Causes refresh on next click for all clients
# #
# This is better than `MessageBus.publish "/file-change", ["refresh"]` because # This is better than `DiscourseBus.publish "/file-change", ["refresh"]` because
# it spreads the refreshes out over a time period # it spreads the refreshes out over a time period
MessageBus.publish '/global/asset-version', 'clobber' DiscourseBus.publish '/global/asset-version', 'clobber'
end end
def self.git_version def self.git_version
@ -274,7 +274,7 @@ module Discourse
def self.after_fork def self.after_fork
current_db = RailsMultisite::ConnectionManagement.current_db current_db = RailsMultisite::ConnectionManagement.current_db
RailsMultisite::ConnectionManagement.establish_connection(db: current_db) RailsMultisite::ConnectionManagement.establish_connection(db: current_db)
MessageBus.after_fork DiscourseBus.after_fork
SiteSetting.after_fork SiteSetting.after_fork
$redis.client.reconnect $redis.client.reconnect
Rails.cache.reconnect Rails.cache.reconnect

73
lib/discourse_bus.rb Normal file
View file

@ -0,0 +1,73 @@
# Proxies MessageBus and noops READONLY errors.
module DiscourseBus
def self.start!
MessageBus.site_id_lookup do
RailsMultisite::ConnectionManagement.current_db
end
MessageBus.extra_response_headers_lookup do |env|
{
"Access-Control-Allow-Origin" => Discourse.base_url_no_prefix,
"Access-Control-Allow-Methods" => "GET, POST",
"Access-Control-Allow-Headers" => "X-SILENCE-LOGGER, X-Shared-Session-Key"
}
end
MessageBus.user_id_lookup do |env|
user = CurrentUser.lookup_from_env(env)
user.id if user
end
MessageBus.group_ids_lookup do |env|
user = CurrentUser.lookup_from_env(env)
user.groups.select('groups.id').map{|g| g.id} if user
end
MessageBus.on_connect do |site_id|
RailsMultisite::ConnectionManagement.establish_connection(db: site_id)
end
MessageBus.on_disconnect do |site_id|
ActiveRecord::Base.connection_handler.clear_active_connections!
end
# Point at our redis
MessageBus.redis_config = YAML.load(ERB.new(File.new("#{Rails.root}/config/redis.yml").read).result)[Rails.env].symbolize_keys
MessageBus.long_polling_enabled = SiteSetting.enable_long_polling
MessageBus.long_polling_interval = SiteSetting.long_polling_interval
MessageBus.is_admin_lookup do |env|
user = CurrentUser.lookup_from_env(env)
if user && user.admin
true
else
false
end
end
MessageBus.cache_assets = !Rails.env.development?
MessageBus.enable_diagnostics
end
def self.proxy_readonly(*methods)
methods.each do |name|
define_singleton_method(name) do |*args, &block|
DiscourseRedis.ignore_readonly { MessageBus.send(name, *args, &block) }
end
end
end
proxy_readonly :publish,
:subscribe,
:unsubscribe,
:last_id,
:last_message,
:backlog,
:after_fork,
:on_connect,
:on_disconnect,
:track_publish
end

View file

@ -20,8 +20,8 @@ class DiscourseRedis
"redis://#{(':' + config['password'] + '@') if config['password']}#{config['host']}:#{config['port']}/#{config['db']}" "redis://#{(':' + config['password'] + '@') if config['password']}#{config['host']}:#{config['port']}/#{config['db']}"
end end
def initialize def initialize(config=nil)
@config = DiscourseRedis.config @config = config || DiscourseRedis.config
@redis = DiscourseRedis.raw_connection(@config) @redis = DiscourseRedis.raw_connection(@config)
end end
@ -34,10 +34,20 @@ class DiscourseRedis
self.class.url(@config) self.class.url(@config)
end end
def self.ignore_readonly
yield
rescue Redis::CommandError => ex
if ex.message =~ /READONLY/
STDERR.puts "WARN: Redis is in a readonly state. Performed a noop"
else
raise ex
end
end
# prefix the key with the namespace # prefix the key with the namespace
def method_missing(meth, *args, &block) def method_missing(meth, *args, &block)
if @redis.respond_to?(meth) if @redis.respond_to?(meth)
@redis.send(meth, *args, &block) DiscourseRedis.ignore_readonly { @redis.send(meth, *args, &block) }
else else
super super
end end
@ -54,29 +64,37 @@ class DiscourseRedis
:zremrangebyscore, :zrevrange, :zrevrangebyscore, :zrevrank, :zrangebyscore].each do |m| :zremrangebyscore, :zrevrange, :zrevrangebyscore, :zrevrank, :zrangebyscore].each do |m|
define_method m do |*args| define_method m do |*args|
args[0] = "#{DiscourseRedis.namespace}:#{args[0]}" args[0] = "#{DiscourseRedis.namespace}:#{args[0]}"
@redis.send(m, *args) DiscourseRedis.ignore_readonly { @redis.send(m, *args) }
end end
end end
def del(k) def del(k)
DiscourseRedis.ignore_readonly do
k = "#{DiscourseRedis.namespace}:#{k}" k = "#{DiscourseRedis.namespace}:#{k}"
@redis.del k @redis.del k
end end
end
def keys(pattern=nil) def keys(pattern=nil)
DiscourseRedis.ignore_readonly do
len = DiscourseRedis.namespace.length + 1 len = DiscourseRedis.namespace.length + 1
@redis.keys("#{DiscourseRedis.namespace}:#{pattern || '*'}").map{ @redis.keys("#{DiscourseRedis.namespace}:#{pattern || '*'}").map{
|k| k[len..-1] |k| k[len..-1]
} }
end end
end
def delete_prefixed(prefix) def delete_prefixed(prefix)
DiscourseRedis.ignore_readonly do
keys("#{prefix}*").each { |k| $redis.del(k) } keys("#{prefix}*").each { |k| $redis.del(k) }
end end
end
def flushdb def flushdb
DiscourseRedis.ignore_readonly do
keys.each{|k| del(k)} keys.each{|k| del(k)}
end end
end
def reconnect def reconnect
@redis.client.reconnect @redis.client.reconnect

View file

@ -51,7 +51,7 @@ class DistributedCache
return if @subscribed return if @subscribed
@lock.synchronize do @lock.synchronize do
return if @subscribed return if @subscribed
MessageBus.subscribe(channel_name) do |message| DiscourseBus.subscribe(channel_name) do |message|
@lock.synchronize do @lock.synchronize do
process_message(message) process_message(message)
end end
@ -63,7 +63,7 @@ class DistributedCache
def self.publish(hash, message) def self.publish(hash, message)
message[:origin] = hash.object_id message[:origin] = hash.object_id
message[:hash_key] = hash.key message[:hash_key] = hash.key
MessageBus.publish(channel_name, message, { user_ids: [-1] }) DiscourseBus.publish(channel_name, message, { user_ids: [-1] })
end end
def self.set(hash, key, value) def self.set(hash, key, value)

View file

@ -11,7 +11,7 @@ class MessageBusDiags
end end
def self.establish_peer_names def self.establish_peer_names
MessageBus.publish "/server-name", {channel: "/server-name-reply/#{my_id}"} DiscourseBus.publish "/server-name", {channel: "/server-name-reply/#{my_id}"}
end end
def self.seen_hosts def self.seen_hosts
@ -20,12 +20,12 @@ class MessageBusDiags
unless @subscribed unless @subscribed
MessageBus.subscribe "/server-name-reply/#{my_id}" do |msg| DiscourseBus.subscribe "/server-name-reply/#{my_id}" do |msg|
MessageBusDiags.seen_host(msg.data) MessageBusDiags.seen_host(msg.data)
end end
MessageBus.subscribe "/server-name" do |msg| DiscourseBus.subscribe "/server-name" do |msg|
MessageBus.publish msg.data["channel"], MessageBusDiags.my_id DiscourseBus.publish msg.data["channel"], MessageBusDiags.my_id
end end
@subscribed = true @subscribed = true
end end

View file

@ -221,7 +221,7 @@ module SiteSettingExtension
def ensure_listen_for_changes def ensure_listen_for_changes
unless @subscribed unless @subscribed
MessageBus.subscribe("/site_settings") do |message| DiscourseBus.subscribe("/site_settings") do |message|
process_message(message) process_message(message)
end end
@subscribed = true @subscribed = true
@ -233,10 +233,10 @@ module SiteSettingExtension
if data["process"] != process_id if data["process"] != process_id
begin begin
@last_message_processed = message.global_id @last_message_processed = message.global_id
MessageBus.on_connect.call(message.site_id) DiscourseBus.on_connect.call(message.site_id)
refresh! refresh!
ensure ensure
MessageBus.on_disconnect.call(message.site_id) DiscourseBus.on_disconnect.call(message.site_id)
end end
end end
end end
@ -294,7 +294,7 @@ module SiteSettingExtension
end end
def notify_changed! def notify_changed!
MessageBus.publish('/site_settings', {process: process_id}) DiscourseBus.publish('/site_settings', {process: process_id})
end end
def has_setting?(name) def has_setting?(name)

View file

@ -66,7 +66,7 @@ after_initialize do
post.custom_fields["#{VOTES_CUSTOM_FIELD}-#{user_id}"] = votes post.custom_fields["#{VOTES_CUSTOM_FIELD}-#{user_id}"] = votes
post.save_custom_fields post.save_custom_fields
MessageBus.publish("/polls/#{post_id}", { poll: poll }) DiscourseBus.publish("/polls/#{post_id}", { poll: poll })
render json: { poll: poll, vote: options } render json: { poll: poll, vote: options }
end end
@ -100,7 +100,7 @@ after_initialize do
post.custom_fields[POLLS_CUSTOM_FIELD] = polls post.custom_fields[POLLS_CUSTOM_FIELD] = polls
post.save_custom_fields post.save_custom_fields
MessageBus.publish("/polls/#{post_id}", { poll: polls[poll_name] }) DiscourseBus.publish("/polls/#{post_id}", { poll: polls[poll_name] })
render json: { poll: polls[poll_name] } render json: { poll: polls[poll_name] }
end end

View file

@ -10,7 +10,7 @@ describe ::DiscoursePoll::PollsController do
describe "#vote" do describe "#vote" do
it "works" do it "works" do
MessageBus.expects(:publish) DiscourseBus.expects(:publish)
xhr :put, :vote, { post_id: poll.id, poll_name: "poll", options: ["A"] } xhr :put, :vote, { post_id: poll.id, poll_name: "poll", options: ["A"] }
@ -76,7 +76,7 @@ describe ::DiscoursePoll::PollsController do
describe "#toggle_status" do describe "#toggle_status" do
it "works for OP" do it "works for OP" do
MessageBus.expects(:publish) DiscourseBus.expects(:publish)
xhr :put, :toggle_status, { post_id: poll.id, poll_name: "poll", status: "closed" } xhr :put, :toggle_status, { post_id: poll.id, poll_name: "poll", status: "closed" }
expect(response).to be_success expect(response).to be_success
@ -86,7 +86,7 @@ describe ::DiscoursePoll::PollsController do
it "works for staff" do it "works for staff" do
log_in(:moderator) log_in(:moderator)
MessageBus.expects(:publish) DiscourseBus.expects(:publish)
xhr :put, :toggle_status, { post_id: poll.id, poll_name: "poll", status: "closed" } xhr :put, :toggle_status, { post_id: poll.id, poll_name: "poll", status: "closed" }
expect(response).to be_success expect(response).to be_success

View file

@ -89,7 +89,7 @@ describe Discourse do
it "adds a key in redis and publish a message through the message bus" do it "adds a key in redis and publish a message through the message bus" do
$redis.expects(:set).with(Discourse.readonly_mode_key, 1) $redis.expects(:set).with(Discourse.readonly_mode_key, 1)
MessageBus.expects(:publish).with(Discourse.readonly_channel, true) DiscourseBus.expects(:publish).with(Discourse.readonly_channel, true)
Discourse.enable_readonly_mode Discourse.enable_readonly_mode
end end
@ -99,7 +99,7 @@ describe Discourse do
it "removes a key from redis and publish a message through the message bus" do it "removes a key from redis and publish a message through the message bus" do
$redis.expects(:del).with(Discourse.readonly_mode_key) $redis.expects(:del).with(Discourse.readonly_mode_key)
MessageBus.expects(:publish).with(Discourse.readonly_channel, false) DiscourseBus.expects(:publish).with(Discourse.readonly_channel, false)
Discourse.disable_readonly_mode Discourse.disable_readonly_mode
end end

View file

@ -72,7 +72,7 @@ describe PostCreator do
it "does not notify on system messages" do it "does not notify on system messages" do
admin = Fabricate(:admin) admin = Fabricate(:admin)
messages = MessageBus.track_publish do messages = DiscourseBus.track_publish do
p = PostCreator.create(admin, basic_topic_params.merge(post_type: Post.types[:moderator_action])) p = PostCreator.create(admin, basic_topic_params.merge(post_type: Post.types[:moderator_action]))
PostCreator.create(admin, basic_topic_params.merge(topic_id: p.topic_id, post_type: Post.types[:moderator_action])) PostCreator.create(admin, basic_topic_params.merge(topic_id: p.topic_id, post_type: Post.types[:moderator_action]))
end end
@ -93,7 +93,7 @@ describe PostCreator do
created_post = nil created_post = nil
reply = nil reply = nil
messages = MessageBus.track_publish do messages = DiscourseBus.track_publish do
created_post = PostCreator.new(admin, basic_topic_params.merge(category: cat.id)).create created_post = PostCreator.new(admin, basic_topic_params.merge(category: cat.id)).create
reply = PostCreator.new(admin, raw: "this is my test reply 123 testing", topic_id: created_post.topic_id).create reply = PostCreator.new(admin, raw: "this is my test reply 123 testing", topic_id: created_post.topic_id).create
end end
@ -118,7 +118,7 @@ describe PostCreator do
it 'generates the correct messages for a normal topic' do it 'generates the correct messages for a normal topic' do
p = nil p = nil
messages = MessageBus.track_publish do messages = DiscourseBus.track_publish do
p = creator.create p = creator.create
end end

View file

@ -573,7 +573,7 @@ describe Topic do
describe "make_banner!" do describe "make_banner!" do
it "changes the topic archetype to 'banner'" do it "changes the topic archetype to 'banner'" do
messages = MessageBus.track_publish do messages = DiscourseBus.track_publish do
topic.make_banner!(user) topic.make_banner!(user)
expect(topic.archetype).to eq(Archetype.banner) expect(topic.archetype).to eq(Archetype.banner)
end end
@ -597,7 +597,7 @@ describe Topic do
it "resets the topic archetype" do it "resets the topic archetype" do
topic.expects(:add_moderator_post) topic.expects(:add_moderator_post)
MessageBus.expects(:publish).with("/site/banner", nil) DiscourseBus.expects(:publish).with("/site/banner", nil)
topic.remove_banner!(user) topic.remove_banner!(user)
expect(topic.archetype).to eq(Archetype.default) expect(topic.archetype).to eq(Archetype.default)
end end