2013-02-05 14:16:51 -05:00
#
# A wrapper around redis that namespaces keys with the current site id
#
2014-01-06 16:50:04 +11:00
require_dependency 'cache'
2013-02-05 14:16:51 -05:00
class DiscourseRedis
2016-03-02 22:01:48 +08:00
class FallbackHandler
include Singleton
MASTER_LINK_STATUS = " master_link_status:up " . freeze
2016-04-07 15:45:42 +08:00
CONNECTION_TYPES = %w{ normal pubsub } . each ( & :freeze )
2016-03-02 22:01:48 +08:00
def initialize
@master = true
@running = false
@mutex = Mutex . new
@slave_config = DiscourseRedis . slave_config
end
def verify_master
synchronize do
2016-04-15 17:05:03 +08:00
return if @running || recently_checked?
2016-03-02 22:01:48 +08:00
@running = true
end
Thread . new { initiate_fallback_to_master }
end
def initiate_fallback_to_master
begin
slave_client = :: Redis :: Client . new ( @slave_config )
2016-08-08 16:02:23 +10:00
logger . info " #{ log_prefix } : Checking connection to master server... "
2016-03-02 22:01:48 +08:00
if slave_client . call ( [ :info ] ) . split ( " \r \n " ) . include? ( MASTER_LINK_STATUS )
2016-08-08 16:02:23 +10:00
logger . info " #{ log_prefix } : Master server is active, killing all connections to slave... "
2016-04-07 15:45:42 +08:00
CONNECTION_TYPES . each do | connection_type |
slave_client . call ( [ :client , [ :kill , 'type' , connection_type ] ] )
end
2016-03-02 22:01:48 +08:00
Discourse . clear_readonly!
Discourse . request_refresh!
@master = true
end
ensure
@running = false
@last_checked = Time . zone . now
slave_client . disconnect
end
end
def master
synchronize { @master }
end
def master = ( args )
synchronize { @master = args }
end
def recently_checked?
if @last_checked
2016-04-15 17:05:03 +08:00
Time . zone . now < = ( @last_checked + 5 . seconds )
2016-03-02 22:01:48 +08:00
else
false
end
end
2016-04-18 10:41:40 +08:00
# Used for testing
def reset!
@master = true
@last_checked = nil
@running = false
end
2016-03-02 22:01:48 +08:00
private
def synchronize
@mutex . synchronize { yield }
end
2016-04-01 14:23:39 +08:00
def logger
Rails . logger
end
def log_prefix
" #{ self . class } "
end
2016-03-02 22:01:48 +08:00
end
class Connector < Redis :: Client :: Connector
MASTER = 'master' . freeze
SLAVE = 'slave' . freeze
def initialize ( options )
super ( options )
@slave_options = DiscourseRedis . slave_config ( options )
@fallback_handler = DiscourseRedis :: FallbackHandler . instance
end
def resolve
2016-03-22 10:01:54 +11:00
return @options unless @slave_options [ :host ]
2016-03-02 22:01:48 +08:00
begin
options = @options . dup
options . delete ( :connector )
client = :: Redis :: Client . new ( options )
client . call ( [ :role ] ) [ 0 ]
@options
2016-03-11 18:54:01 +08:00
rescue Redis :: ConnectionError , Redis :: CannotConnectError , RuntimeError = > ex
# A consul service name may be deregistered for a redis container setup
raise ex if ex . class == RuntimeError && ex . message != " Name or service not known "
2016-03-02 22:01:48 +08:00
return @slave_options if ! @fallback_handler . master
@fallback_handler . master = false
raise ex
ensure
client . disconnect
end
end
end
2013-02-25 19:42:20 +03:00
2013-03-24 23:19:59 -07:00
def self . raw_connection ( config = nil )
config || = self . config
2015-06-25 16:51:48 +10:00
Redis . new ( config )
2013-03-24 23:19:59 -07:00
end
def self . config
2015-06-25 16:51:48 +10:00
GlobalSetting . redis_config
2013-03-24 23:19:59 -07:00
end
2016-03-02 22:01:48 +08:00
def self . slave_config ( options = config )
options . dup . merge! ( { host : options [ :slave_host ] , port : options [ :slave_port ] } )
end
2015-04-24 13:10:43 -04:00
def initialize ( config = nil )
@config = config || DiscourseRedis . config
2013-03-24 23:19:59 -07:00
@redis = DiscourseRedis . raw_connection ( @config )
2013-02-05 14:16:51 -05:00
end
2016-03-02 22:01:48 +08:00
def self . fallback_handler
@fallback_handler || = DiscourseRedis :: FallbackHandler . instance
end
2013-12-20 16:34:34 -05:00
def without_namespace
# Only use this if you want to store and fetch data that's shared between sites
@redis
end
2015-04-24 13:10:43 -04:00
def self . ignore_readonly
yield
rescue Redis :: CommandError = > ex
if ex . message =~ / READONLY /
2015-04-29 11:49:58 -04:00
unless Discourse . recently_readonly?
2015-04-24 14:32:18 -04:00
STDERR . puts " WARN: Redis is in a readonly state. Performed a noop "
end
2016-03-02 22:01:48 +08:00
fallback_handler . verify_master if ! fallback_handler . master
2015-04-29 11:49:58 -04:00
Discourse . received_readonly!
2015-04-24 13:10:43 -04:00
else
raise ex
end
end
2013-02-05 14:16:51 -05:00
# prefix the key with the namespace
def method_missing ( meth , * args , & block )
if @redis . respond_to? ( meth )
2015-04-24 13:10:43 -04:00
DiscourseRedis . ignore_readonly { @redis . send ( meth , * args , & block ) }
2013-02-05 14:16:51 -05:00
else
super
end
end
# Proxy key methods through, but prefix the keys with the namespace
2014-01-06 16:50:04 +11:00
[ :append , :blpop , :brpop , :brpoplpush , :decr , :decrby , :exists , :expire , :expireat , :get , :getbit , :getrange , :getset ,
2013-05-06 09:51:09 +10:00
:hdel , :hexists , :hget , :hgetall , :hincrby , :hincrbyfloat , :hkeys , :hlen , :hmget , :hmset , :hset , :hsetnx , :hvals , :incr ,
2013-12-31 15:52:16 -05:00
:incrby , :incrbyfloat , :lindex , :linsert , :llen , :lpop , :lpush , :lpushx , :lrange , :lrem , :lset , :ltrim ,
2015-09-28 16:38:52 +10:00
:mapped_hmset , :mapped_hmget , :mapped_mget , :mapped_mset , :mapped_msetnx , :move , :mset ,
2013-05-06 09:51:09 +10:00
:msetnx , :persist , :pexpire , :pexpireat , :psetex , :pttl , :rename , :renamenx , :rpop , :rpoplpush , :rpush , :rpushx , :sadd , :scard ,
:sdiff , :set , :setbit , :setex , :setnx , :setrange , :sinter , :sismember , :smembers , :sort , :spop , :srandmember , :srem , :strlen ,
:sunion , :ttl , :type , :watch , :zadd , :zcard , :zcount , :zincrby , :zrange , :zrangebyscore , :zrank , :zrem , :zremrangebyrank ,
:zremrangebyscore , :zrevrange , :zrevrangebyscore , :zrevrank , :zrangebyscore ] . each do | m |
2013-02-10 00:02:29 +01:00
define_method m do | * args |
2015-05-06 09:53:10 +10:00
args [ 0 ] = " #{ namespace } : #{ args [ 0 ] } "
2015-04-24 13:10:43 -04:00
DiscourseRedis . ignore_readonly { @redis . send ( m , * args ) }
2013-02-10 00:02:29 +01:00
end
2013-02-05 14:16:51 -05:00
end
2015-09-28 16:38:52 +10:00
def mget ( * args )
args . map! { | a | " #{ namespace } : #{ a } " }
DiscourseRedis . ignore_readonly { @redis . mget ( * args ) }
end
2014-01-06 16:50:04 +11:00
def del ( k )
2015-04-24 13:10:43 -04:00
DiscourseRedis . ignore_readonly do
2015-05-06 09:53:10 +10:00
k = " #{ namespace } : #{ k } "
2015-04-24 13:10:43 -04:00
@redis . del k
end
2014-01-06 16:50:04 +11:00
end
2015-01-29 11:44:51 -05:00
def keys ( pattern = nil )
2015-04-24 13:10:43 -04:00
DiscourseRedis . ignore_readonly do
2015-05-06 09:53:10 +10:00
len = namespace . length + 1
@redis . keys ( " #{ namespace } : #{ pattern || '*' } " ) . map {
2015-04-24 13:10:43 -04:00
| k | k [ len .. - 1 ]
}
end
2014-01-06 16:50:04 +11:00
end
2015-02-02 12:44:21 -05:00
def delete_prefixed ( prefix )
2015-04-24 13:10:43 -04:00
DiscourseRedis . ignore_readonly do
keys ( " #{ prefix } * " ) . each { | k | $redis . del ( k ) }
end
2015-02-02 12:44:21 -05:00
end
2014-01-06 16:50:04 +11:00
def flushdb
2015-04-24 13:10:43 -04:00
DiscourseRedis . ignore_readonly do
keys . each { | k | del ( k ) }
end
2014-01-06 16:50:04 +11:00
end
def reconnect
@redis . client . reconnect
end
2015-05-06 09:53:10 +10:00
def namespace
RailsMultisite :: ConnectionManagement . current_db
end
2013-02-05 14:16:51 -05:00
def self . namespace
2015-05-06 09:53:10 +10:00
Rails . logger . warn ( " DiscourseRedis.namespace is going to be deprecated, do not use it! " )
2013-02-05 14:16:51 -05:00
RailsMultisite :: ConnectionManagement . current_db
end
2013-03-11 05:33:20 -07:00
def self . new_redis_store
2014-01-06 16:50:04 +11:00
Cache . new
2013-03-11 05:33:20 -07:00
end
2013-02-05 14:16:51 -05:00
end