mirror of
https://github.com/codeninjasllc/discourse.git
synced 2024-11-30 10:58:31 -05:00
FEATURE: AR adapter to failover to a replica DB server.
This commit is contained in:
parent
0ef141b2c3
commit
46589a1a0c
5 changed files with 208 additions and 2 deletions
|
@ -18,11 +18,14 @@ class GlobalSetting
|
|||
|
||||
def self.database_config
|
||||
hash = {"adapter" => "postgresql"}
|
||||
%w{pool timeout socket host port username password}.each do |s|
|
||||
%w{pool timeout socket host port username password replica_host replica_port}.each do |s|
|
||||
if val = self.send("db_#{s}")
|
||||
hash[s] = val
|
||||
end
|
||||
end
|
||||
|
||||
hash["adapter"] = "postgresql_fallback" if hash["replica_host"]
|
||||
|
||||
hostnames = [ hostname ]
|
||||
hostnames << backup_hostname if backup_hostname.present?
|
||||
|
||||
|
|
|
@ -1,7 +1,13 @@
|
|||
development:
|
||||
prepared_statements: false
|
||||
adapter: postgresql
|
||||
adapter: postgresql_fallback
|
||||
host: 172.17.0.2
|
||||
port: 6432
|
||||
database: discourse_development
|
||||
username: tgxworld
|
||||
password: test
|
||||
replica_host: 172.17.0.3
|
||||
replica_port: 6432
|
||||
min_messages: warning
|
||||
pool: 5
|
||||
timeout: 5000
|
||||
|
|
|
@ -43,6 +43,12 @@ db_password =
|
|||
# see: https://github.com/rails/rails/issues/21992
|
||||
db_prepared_statements = false
|
||||
|
||||
# host address for db replica server
|
||||
db_replica_host =
|
||||
|
||||
# port running replica db server, defaults to 5432 if not set
|
||||
db_replica_port =
|
||||
|
||||
# hostname running the forum
|
||||
hostname = "www.example.com"
|
||||
|
||||
|
|
|
@ -0,0 +1,136 @@
|
|||
require 'active_record/connection_adapters/abstract_adapter'
|
||||
require 'active_record/connection_adapters/postgresql_adapter'
|
||||
require 'discourse'
|
||||
require 'concurrent'
|
||||
|
||||
class TaskObserver
|
||||
def update(time, result, ex)
|
||||
if result
|
||||
logger.info { "PG connection heartbeat successfully returned #{result}" }
|
||||
elsif ex.is_a?(Concurrent::TimeoutError)
|
||||
logger.warning { "PG connection heartbeat timed out".freeze }
|
||||
else
|
||||
if ex.message.include?("PG::UnableToSend")
|
||||
logger.info { "PG connection heartbeat: Master connection is not active.".freeze }
|
||||
else
|
||||
logger.error { "PG connection heartbeat failed with error: \"#{ex}\"" }
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def logger
|
||||
Rails.logger
|
||||
end
|
||||
end
|
||||
|
||||
module ActiveRecord
|
||||
module ConnectionHandling
|
||||
def postgresql_fallback_connection(config)
|
||||
master_connection = postgresql_connection(config)
|
||||
|
||||
replica_connection = postgresql_connection(config.dup.merge({
|
||||
host: config[:replica_host], port: config[:replica_port]
|
||||
}))
|
||||
verify_replica(replica_connection)
|
||||
|
||||
klass = ConnectionAdapters::PostgreSQLFallbackAdapter.proxy_pass(master_connection.class)
|
||||
klass.new(master_connection, replica_connection, logger, config)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def verify_replica(connection)
|
||||
value = connection.raw_connection.exec("SELECT pg_is_in_recovery()").values[0][0]
|
||||
raise "Replica database server is not in recovery mode." if value == 'f'
|
||||
end
|
||||
end
|
||||
|
||||
module ConnectionAdapters
|
||||
class PostgreSQLFallbackAdapter < AbstractAdapter
|
||||
ADAPTER_NAME = "PostgreSQLFallback".freeze
|
||||
MAX_FAILURE = 5
|
||||
HEARTBEAT_INTERVAL = 5
|
||||
|
||||
attr_reader :main_connection
|
||||
|
||||
def self.all_methods(klass)
|
||||
methods = []
|
||||
|
||||
(klass.ancestors - AbstractAdapter.ancestors).each do |_klass|
|
||||
%w(public protected private).map do |level|
|
||||
methods << _klass.send("#{level}_instance_methods", false)
|
||||
end
|
||||
end
|
||||
|
||||
methods.flatten.uniq.sort
|
||||
end
|
||||
|
||||
def self.proxy_pass(klass)
|
||||
Class.new(self) do
|
||||
(self.all_methods(klass) - self.all_methods(self)).each do |method|
|
||||
self.class_eval <<-EOF
|
||||
def #{method}(*args, &block)
|
||||
proxy_method(:#{method}, *args, &block)
|
||||
end
|
||||
EOF
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def initialize(master_connection, replica_connection, logger, config)
|
||||
super(nil, logger, config)
|
||||
|
||||
@master_connection = master_connection
|
||||
@main_connection = @master_connection
|
||||
@replica_connection = replica_connection
|
||||
@failure_count = 0
|
||||
load!
|
||||
end
|
||||
|
||||
def proxy_method(method, *args, &block)
|
||||
@main_connection.send(method, *args, &block)
|
||||
rescue ActiveRecord::StatementInvalid => e
|
||||
if e.message.include?("PG::UnableToSend") && @main_connection == @master_connection
|
||||
@failure_count += 1
|
||||
|
||||
if @failure_count == MAX_FAILURE
|
||||
Discourse.enable_readonly_mode if !Discourse.readonly_mode?
|
||||
@main_connection = @replica_connection
|
||||
load!
|
||||
connection_heartbeart(@master_connection)
|
||||
@failure_count = 0
|
||||
else
|
||||
proxy_method(method, *args, &block)
|
||||
end
|
||||
end
|
||||
|
||||
raise e
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def load!
|
||||
@visitor = @main_connection.visitor
|
||||
@connection = @main_connection.raw_connection
|
||||
end
|
||||
|
||||
def connection_heartbeart(connection, interval = HEARTBEAT_INTERVAL)
|
||||
timer_task = Concurrent::TimerTask.new(execution_interval: interval) do |task|
|
||||
connection.reconnect!
|
||||
|
||||
if connection.active?
|
||||
@main_connection = connection
|
||||
load!
|
||||
Discourse.disable_readonly_mode if Discourse.readonly_mode?
|
||||
task.shutdown
|
||||
end
|
||||
end
|
||||
|
||||
timer_task.add_observer(TaskObserver.new)
|
||||
timer_task.execute
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,55 @@
|
|||
require 'rails_helper'
|
||||
require_dependency 'active_record/connection_adapters/postgresql_fallback_adapter'
|
||||
|
||||
describe ActiveRecord::ConnectionAdapters::PostgreSQLFallbackAdapter do
|
||||
let(:master_connection) { ActiveRecord::Base.connection }
|
||||
let(:replica_connection) { master_connection.dup }
|
||||
let(:adapter) { described_class.new(master_connection, replica_connection, nil, nil) }
|
||||
|
||||
before :each do
|
||||
ActiveRecord::Base.clear_all_connections!
|
||||
end
|
||||
|
||||
describe "proxy_method" do
|
||||
context "when master connection is not active" do
|
||||
before do
|
||||
replica_connection.stubs(:send)
|
||||
master_connection.stubs(:send).raises(ActiveRecord::StatementInvalid.new('PG::UnableToSend'))
|
||||
master_connection.stubs(:reconnect!)
|
||||
master_connection.stubs(:active?).returns(false)
|
||||
|
||||
@old_const = described_class::HEARTBEAT_INTERVAL
|
||||
described_class.const_set("HEARTBEAT_INTERVAL", 0.1)
|
||||
end
|
||||
|
||||
after do
|
||||
Discourse.disable_readonly_mode
|
||||
described_class.const_set("HEARTBEAT_INTERVAL", @old_const)
|
||||
end
|
||||
|
||||
it "should set site to readonly mode and carry out failover and switch back procedures" do
|
||||
expect(adapter.main_connection).to eq(master_connection)
|
||||
adapter.proxy_method('some method')
|
||||
expect(Discourse.readonly_mode?).to eq(true)
|
||||
expect(adapter.main_connection).to eq(replica_connection)
|
||||
|
||||
master_connection.stubs(:active?).returns(true)
|
||||
sleep 0.15
|
||||
|
||||
expect(Discourse.readonly_mode?).to eq(false)
|
||||
expect(adapter.main_connection).to eq(master_connection)
|
||||
end
|
||||
end
|
||||
|
||||
it 'should raise errors not related to the database connection' do
|
||||
master_connection.stubs(:send).raises(StandardError.new)
|
||||
expect { adapter.proxy_method('some method') }.to raise_error(StandardError)
|
||||
end
|
||||
|
||||
it 'should proxy methods successfully' do
|
||||
expect(adapter.proxy_method(:execute, 'SELECT 1').values[0][0]).to eq("1")
|
||||
expect(adapter.proxy_method(:active?)).to eq(true)
|
||||
expect(adapter.proxy_method(:raw_connection)).to eq(master_connection.raw_connection)
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in a new issue