diff --git a/lib/backup_restore.rb b/lib/backup_restore.rb new file mode 100644 index 000000000..e8821dd80 --- /dev/null +++ b/lib/backup_restore.rb @@ -0,0 +1,140 @@ +require_dependency "export/exporter" +require_dependency "import/importer" + +module BackupRestore + + class OperationRunningError < RuntimeError; end + + DUMP_FILE = "dump.sql" + METADATA_FILE = "meta.json" + + def self.backup!(user_id, publish_to_message_bus = false) + exporter = Export::Exporter.new(user_id, publish_to_message_bus) + start! exporter + end + + def self.restore!(user_id, filename, publish_to_message_bus = false) + importer = Import::Importer.new(user_id, filename, publish_to_message_bus) + start! importer + end + + def self.rollback! + raise BackupRestore::OperationRunningError if BackupRestore.is_operation_running? + if can_rollback? + rename_schema("backup", "public") + establish_app + end + end + + def self.cancel! + set_shutdown_signal! + true + end + + def self.mark_as_running! + # TODO: should acquire a lock and raise an exception if already running! + $redis.set(running_key, "1") + end + + def self.is_operation_running? + !!$redis.get(running_key) + end + + def self.mark_as_not_running! + $redis.del(running_key) + end + + def self.should_shutdown? + !!$redis.get(shutdown_signal_key) + end + + def self.can_rollback? + backup_tables_count > 0 + end + + def self.operations_status + { + is_operation_running: is_operation_running?, + can_rollback: can_rollback?, + } + end + + def self.current_version + ActiveRecord::Migrator.current_version + end + + def self.can_rollback? + User.exec_sql("SELECT 1 FROM pg_namespace WHERE nspname = 'backup'").count > 0 + end + + def self.rename_schema(old_name, new_name) + sql = <<-SQL + BEGIN; + DROP SCHEMA IF EXISTS #{new_name} CASCADE; + ALTER SCHEMA #{old_name} RENAME TO #{new_name}; + COMMIT; + SQL + + User.exec_sql(sql) + end + + private + + def self.running_key + "backup_restore_operation_is_running" + end + + def self.shutdown_signal_key + "backup_restore_operation_should_shutdown" + end + + def self.set_shutdown_signal! + $redis.set(shutdown_signal_key, "1") + end + + def self.clear_shutdown_signal! + $redis.del(shutdown_signal_key) + end + + def self.start!(runner) + child = fork do + begin + after_fork + runner.run + rescue Exception => e + puts "--------------------------------------------" + puts "---------------- EXCEPTION -----------------" + puts e.message + puts e.backtrace.join("\n") + puts "--------------------------------------------" + ensure + begin + clear_shutdown_signal! + rescue Exception => e + puts "============================================" + puts "================ EXCEPTION =================" + puts e.message + puts e.backtrace.join("\n") + puts "============================================" + ensure + exit!(0) + end + end + end + + Process.detach(child) + + true + end + + def self.after_fork + $redis.client.reconnect + Rails.cache.reconnect + MessageBus.after_fork + end + + def self.backup_tables_count + User.exec_sql("SELECT COUNT(*) AS count FROM information_schema.tables WHERE table_schema = 'backup'")[0]['count'].to_i + end + +end diff --git a/lib/export/exporter.rb b/lib/export/exporter.rb new file mode 100644 index 000000000..8d66e93b5 --- /dev/null +++ b/lib/export/exporter.rb @@ -0,0 +1,302 @@ +module Export + + class Exporter + + def initialize(user_id, publish_to_message_bus = false) + @user_id, @publish_to_message_bus = user_id, publish_to_message_bus + + ensure_no_operation_is_running + ensure_we_have_a_user + + initialize_state + end + + def run + log "[STARTED]" + log "'#{@user.username}' has started the backup!" + + mark_export_as_running + + listen_for_shutdown_signal + + enable_readonly_mode + + pause_sidekiq + wait_for_sidekiq + + ensure_directory_exists(@tmp_directory) + + write_metadata + + dump_public_schema + + update_dump + + log "Finalizing backup..." + + ensure_directory_exists(@archive_directory) + + create_archive + + notify_user + rescue SystemExit + log "Backup process was cancelled!" + rescue Exception => ex + log "EXCEPTION: " + ex.message + log ex.backtrace.join("\n") + else + @success = true + "#{@archive_basename}.tar.gz" + ensure + clean_up + @success ? log("[SUCCESS]") : log("[FAILED]") + end + + protected + + def ensure_no_operation_is_running + raise BackupRestore::OperationRunningError if BackupRestore.is_operation_running? + end + + def ensure_we_have_a_user + @user = User.where(id: @user_id).first + raise Discourse::InvalidParameters.new(:user_id) unless @user + end + + def initialize_state + @success = false + @current_db = RailsMultisite::ConnectionManagement.current_db + @timestamp = Time.now.strftime("%Y-%m-%d-%H%M%S") + @tmp_directory = File.join(Rails.root, "tmp", "backups", @current_db, @timestamp) + @dump_filename = File.join(@tmp_directory, BackupRestore::DUMP_FILE) + @meta_filename = File.join(@tmp_directory, BackupRestore::METADATA_FILE) + @archive_directory = File.join(Rails.root, "public", "backups", @current_db) + @archive_basename = File.join(@archive_directory, @timestamp) + end + + def listen_for_shutdown_signal + Thread.new do + while BackupRestore.is_operation_running? + exit if BackupRestore.should_shutdown? + sleep 0.1 + end + end + end + + def mark_export_as_running + log "Marking backup as running..." + BackupRestore.mark_as_running! + end + + def enable_readonly_mode + log "Enabling readonly mode..." + Discourse.enable_readonly_mode + end + + def pause_sidekiq + log "Pausing sidekiq..." + Sidekiq.pause! + end + + def wait_for_sidekiq + log "Waiting for sidekiq to finish running jobs..." + iterations = 0 + while (running = Sidekiq::Queue.all.map(&:size).sum) > 0 + log " Waiting for #{running} jobs..." + sleep 2 + iterations += 1 + raise "Sidekiq did not finish running all the jobs in the allowed time!" if iterations >= 15 + end + end + + def write_metadata + log "Writing metadata to '#{@meta_filename}'..." + metadata = { + source: "discourse", + version: BackupRestore.current_version + } + File.write(@meta_filename, metadata.to_json) + end + + def dump_public_schema + log "Dumping the public schema of the database..." + + pg_dump_command = build_pg_dump_command + log "Running: #{pg_dump_command}" + + logs = Queue.new + pg_dump_running = true + + Thread.new do + while pg_dump_running + message = logs.pop.strip + log(message) unless message.blank? + end + end + + IO.popen("#{pg_dump_command} 2>&1") do |pipe| + begin + while line = pipe.readline + logs << line + end + rescue EOFError + # finished reading... + ensure + pg_dump_running = false + logs << "" + end + end + + raise "pg_dump failed" unless $?.success? + end + + def build_pg_dump_command + db_conf = Rails.configuration.database_configuration[Rails.env] + host = db_conf["host"] || "localhost" + password = db_conf["password"] + username = db_conf["username"] || "postgres" + database = db_conf["database"] + + [ "PGPASSWORD=#{password}", # pass the password to pg_dump + "pg_dump", # the pg_dump command + "--exclude-schema=backup,restore", # exclude both backup & restore schemes + "--file='#{@dump_filename}'", # output to the dump.sql file + "--no-owner", # do not output commands to set ownership of objects + "--no-privileges", # prevent dumping of access privileges + "--verbose", # specifies verbose mode + "--host=#{host}", # the hostname to connect to + "--username=#{username}", # the username to connect as + database # the name of the database to dump + ].join(" ") + end + + def update_dump + log "Updating dump for more awesomeness..." + + sed_command = build_sed_command + + log "Running: #{sed_command}" + + `#{sed_command}` + end + + def build_sed_command + # in order to limit the downtime when restoring as much as possible + # we force the restoration to happen in the "restore" schema + + # during the restoration, this make sure we + # - drop the "restore" schema if it exists + # - create the "restore" schema + # - prepend the "restore" schema into the search_path + + regexp = "^SET search_path = public, pg_catalog;$" + + replacement = [ "DROP SCHEMA IF EXISTS restore CASCADE;", + "CREATE SCHEMA restore;", + "SET search_path = restore, public, pg_catalog;", + ].join("\\n") + + # we only want to replace the VERY first occurence of the search_path command + expression = "0,/#{regexp}/s//#{replacement}/" + + # I tried to use the --in-place argument but it was SLOOOWWWWwwwwww + # so I output the result into another file and rename it back afterwards + [ "sed --expression='#{expression}' < #{@dump_filename} > #{@dump_filename}.tmp", + "&&", + "mv #{@dump_filename}.tmp #{@dump_filename}", + ].join(" ") + end + + def create_archive + log "Creating archive: #{File.basename(@archive_basename)}.tar.gz" + + tar_filename = "#{@archive_basename}.tar" + + log "Making sure archive does not already exist..." + `rm -f #{tar_filename}` + `rm -f #{tar_filename}.gz` + + log "Creating empty archive..." + `tar --create --file #{tar_filename} --files-from /dev/null` + + log "Archiving metadata..." + FileUtils.cd(File.dirname(@meta_filename)) do + `tar --append --file #{tar_filename} #{File.basename(@meta_filename)}` + end + + log "Archiving data dump..." + FileUtils.cd(File.dirname(@dump_filename)) do + `tar --append --file #{tar_filename} #{File.basename(@dump_filename)}` + end + + upload_directory = "uploads/" + @current_db + + if Dir[upload_directory].present? + + log "Archiving uploads..." + FileUtils.cd(File.join(Rails.root, "public")) do + `tar --append --file #{tar_filename} #{upload_directory}` + end + + end + + log "Gzipping archive..." + `gzip #{tar_filename}` + end + + def notify_user + log "Notifying '#{@user.username}' of the success of the backup..." + # NOTE: will only notify if @user != Discourse.site_contact_user + SystemMessage.create(@user, :export_succeeded) + end + + def clean_up + log "Cleaning stuff up..." + remove_tmp_directory + unpause_sidekiq + disable_readonly_mode + mark_export_as_not_running + log "Finished!" + end + + def remove_tmp_directory + log "Removing tmp '#{@tmp_directory}' directory..." + FileUtils.rm_rf(@tmp_directory) if Dir[@tmp_directory].present? + rescue + log "Something went wrong while removing the following tmp directory: #{@tmp_directory}" + end + + def unpause_sidekiq + log "Unpausing sidekiq..." + Sidekiq.unpause! + end + + def disable_readonly_mode + log "Disabling readonly mode..." + Discourse.disable_readonly_mode + end + + def mark_export_as_not_running + log "Marking backup as finished..." + BackupRestore.mark_as_not_running! + end + + def ensure_directory_exists(directory) + log "Making sure '#{directory}' exists..." + FileUtils.mkdir_p(directory) + end + + def log(message) + puts(message) rescue nil + publish_log(message) rescue nil + end + + def publish_log(message) + return unless @publish_to_message_bus + data = { timestamp: Time.now, operation: "backup", message: message } + MessageBus.publish("/admin/backups/logs", data, user_ids: [@user_id]) + end + + end + +end diff --git a/lib/import/importer.rb b/lib/import/importer.rb new file mode 100644 index 000000000..91d010902 --- /dev/null +++ b/lib/import/importer.rb @@ -0,0 +1,326 @@ +module Import + + class ImportDisabledError < RuntimeError; end + class FilenameMissingError < RuntimeError; end + + class Importer + + def initialize(user_id, filename, publish_to_message_bus = false) + @user_id, @filename, @publish_to_message_bus = user_id, filename, publish_to_message_bus + + ensure_import_is_enabled + ensure_no_operation_is_running + ensure_we_have_a_user + ensure_we_have_a_filename + + initialize_state + end + + def run + log "[STARTED]" + log "'#{@user_info[:username]}' has started the restore!" + + mark_import_as_running + + listen_for_shutdown_signal + + enable_readonly_mode + + pause_sidekiq + wait_for_sidekiq + + ensure_directory_exists(@tmp_directory) + + copy_archive_to_tmp_directory + unzip_archive + + extract_metadata + validate_metadata + + extract_dump + + restore_dump + + #----------- CRITICAL -------------- + switch_schema! + #----------- CRITICAL -------------- + + log "Finalizing restore..." + + migrate_database + + reconnect_database + + extract_uploads + + notify_user + rescue SystemExit + log "Restore process was cancelled!" + rollback + rescue Exception => ex + log "EXCEPTION: " + ex.message + log ex.backtrace.join("\n") + rollback + else + @success = true + ensure + clean_up + @success ? log("[SUCCESS]") : log("[FAILED]") + end + + protected + + def ensure_import_is_enabled + raise Import::ImportDisabledError unless SiteSetting.allow_import? + end + + def ensure_no_operation_is_running + raise BackupRestore::OperationRunningError if BackupRestore.is_operation_running? + end + + def ensure_we_have_a_user + user = User.where(id: @user_id).first + raise Discourse::InvalidParameters.new(:user_id) unless user + # keep some user data around to check them against the newly restored database + @user_info = { id: user.id, username: user.username, email: user.email } + end + + def ensure_we_have_a_filename + raise Import::FilenameMissingError if @filename.nil? + end + + def initialize_state + @success = false + @current_db = RailsMultisite::ConnectionManagement.current_db + @current_version = BackupRestore.current_version + @timestamp = Time.now.strftime("%Y-%m-%d-%H%M%S") + @tmp_directory = File.join(Rails.root, "tmp", "restores", @current_db, @timestamp) + @archive_filename = File.join(@tmp_directory, @filename) + @tar_filename = @archive_filename[0...-3] + @meta_filename = File.join(@tmp_directory, BackupRestore::METADATA_FILE) + @dump_filename = File.join(@tmp_directory, BackupRestore::DUMP_FILE) + end + + def listen_for_shutdown_signal + Thread.new do + while BackupRestore.is_operation_running? + exit if BackupRestore.should_shutdown? + sleep 0.1 + end + end + end + + def mark_import_as_running + log "Marking restore as running..." + BackupRestore.mark_as_running! + end + + def enable_readonly_mode + log "Enabling readonly mode..." + Discourse.enable_readonly_mode + end + + def pause_sidekiq + log "Pausing sidekiq..." + Sidekiq.pause! + end + + def wait_for_sidekiq + log "Waiting for sidekiq to finish running jobs..." + iterations = 0 + while (running = Sidekiq::Queue.all.map(&:size).sum) > 0 + log " Waiting for #{running} jobs..." + sleep 5 + iterations += 1 + raise "Sidekiq did not finish running all the jobs in the allowed time!" if iterations >= 20 + end + end + + def copy_archive_to_tmp_directory + log "Copying archive to tmp directory..." + source = File.join(Backup.base_directory, @filename) + `cp #{source} #{@archive_filename}` + end + + def unzip_archive + log "Unzipping archive..." + FileUtils.cd(@tmp_directory) { `gzip --decompress #{@archive_filename}` } + end + + def extract_metadata + log "Extracting metadata file..." + FileUtils.cd(@tmp_directory) { `tar --extract --file #{@tar_filename} #{BackupRestore::METADATA_FILE}` } + @metadata = Oj.load_file(@meta_filename) + end + + def validate_metadata + log "Validating metadata..." + log " Current version: #{@current_version}" + log " Restored version: #{@metadata["version"]}" + + error = "You're trying to import a more recent version of the schema. You should migrate first!" + raise error if @metadata["version"] > @current_version + end + + def extract_dump + log "Extracting dump file..." + FileUtils.cd(@tmp_directory) { `tar --extract --file #{@tar_filename} #{BackupRestore::DUMP_FILE}` } + end + + def restore_dump + log "Restoring dump file... (can be quite long)" + + psql_command = build_psql_command + log "Running: #{psql_command}" + + logs = Queue.new + psql_running = true + has_error = false + + Thread.new do + while psql_running + message = logs.pop.strip + has_error ||= (message =~ /ERROR:/) + log(message) unless message.blank? + end + end + + IO.popen("#{psql_command} 2>&1") do |pipe| + begin + while line = pipe.readline + logs << line + end + rescue EOFError + # finished reading... + ensure + psql_running = false + logs << "" + end + end + + # psql does not return a valid exit code when an error happens + raise "psql failed" if has_error + end + + def build_psql_command + db_conf = Rails.configuration.database_configuration[Rails.env] + host = db_conf["host"] || "localhost" + password = db_conf["password"] + username = db_conf["username"] || "postgres" + database = db_conf["database"] + + [ "PGPASSWORD=#{password}", # pass the password to psql + "psql", # the psql command + "--dbname='#{database}'", # connect to database *dbname* + "--file='#{@dump_filename}'", # read the dump + "--single-transaction", # all or nothing (also runs COPY commands faster) + "--host=#{host}", # the hostname to connect to + "--username=#{username}" # the username to connect as + ].join(" ") + end + + def switch_schema! + log "Switching schemas..." + + sql = <<-SQL + BEGIN; + DROP SCHEMA IF EXISTS backup CASCADE; + ALTER SCHEMA public RENAME TO backup; + ALTER SCHEMA restore RENAME TO public; + COMMIT; + SQL + + User.exec_sql(sql) + end + + def migrate_database + log "Migrating the database..." + Discourse::Application.load_tasks + ENV["VERSION"] = @current_version.to_s + Rake::Task["db:migrate:up"].invoke + end + + def reconnect_database + log "Reconnecting to the database..." + ActiveRecord::Base.establish_connection + end + + def extract_uploads + log "Extracting uploads..." + if `tar --list --file #{@tar_filename} | grep 'uploads/'`.present? + FileUtils.cd(File.join(Rails.root, "public")) do + `tar --extract --keep-newer-files --file #{@tar_filename} uploads/` + end + end + end + + def notify_user + if user = User.where(email: @user_info[:email]).first + log "Notifying '#{user.username}' of the success of the restore..." + # NOTE: will only notify if user != Discourse.site_contact_user + SystemMessage.create(user, :import_succeeded) + else + log "Could not send notification to '#{@user_info[:username]}' (#{@user_info[:email]}), because the user does not exists..." + end + end + + def rollback + log "Trying to rollback..." + if BackupRestore.can_rollback? + log "Rolling back to previous working state..." + BackupRestore.rename_schema("backup", "public") + else + log "No backup schema was created yet!" + end + end + + def clean_up + log "Cleaning stuff up..." + remove_tmp_directory + unpause_sidekiq + disable_readonly_mode + mark_import_as_not_running + log "Finished!" + end + + def remove_tmp_directory + log "Removing tmp '#{@tmp_directory}' directory..." + FileUtils.rm_rf(@tmp_directory) if Dir[@tmp_directory].present? + rescue + log "Something went wrong while removing the following tmp directory: #{@tmp_directory}" + end + + def unpause_sidekiq + log "Unpausing sidekiq..." + Sidekiq.unpause! + end + + def disable_readonly_mode + log "Disabling readonly mode..." + Discourse.disable_readonly_mode + end + + def mark_import_as_not_running + log "Marking restore as finished..." + BackupRestore.mark_as_not_running! + end + + def ensure_directory_exists(directory) + log "Making sure #{directory} exists..." + FileUtils.mkdir_p(directory) + end + + def log(message) + puts(message) rescue nil + publish_log(message) rescue nil + end + + def publish_log(message) + return unless @publish_to_message_bus + data = { timestamp: Time.now, operation: "restore", message: message } + MessageBus.publish("/admin/backups/logs", data, user_ids: [@user_id]) + end + + end + +end