require_dependency 'import/json_decoder' require_dependency 'import/import' require_dependency 'import/adapter/base' (Dir.entries(File.join( Rails.root, 'lib', 'import', 'adapter' )) - ['.', '..', 'base.rb']).each do |f| require_dependency "import/adapter/#{f}" end module Jobs class Importer < Jobs::Base sidekiq_options :retry => false BACKUP_SCHEMA = 'backup' def initialize @index_definitions = {} @format = :json @warnings = [] end def execute(args) ordered_models_for_import.each { |model| model.primary_key } # a HACK to workaround cache problems raise Import::ImportDisabledError unless SiteSetting.allow_import? raise Import::ImportInProgressError if Import::is_import_running? raise Export::ExportInProgressError if Export::is_export_running? # Disable printing of NOTICE, DETAIL and other unimportant messages from postgresql User.exec_sql("SET client_min_messages TO WARNING") @format = args[:format] || :json @archive_filename = args[:filename] if args[:user_id] # After the import is done, we'll need to reload the user record and make sure it's the same person # before sending a notification user = User.where(id: args[:user_id].to_i).first @user_info = { user_id: user.id, email: user.email } else @user_info = nil end start_import backup_tables begin load_data create_indexes extract_uploads rescue log "Performing a ROLLBACK because something went wrong!" rollback raise end ensure finish_import end def ordered_models_for_import Export.models_included_in_export end def start_import if @format != :json raise Import::FormatInvalidError elsif @archive_filename.nil? raise Import::FilenameMissingError else extract_files @decoder = Import::JsonDecoder.new( File.join(tmp_directory, 'tables.json') ) Import.set_import_started Discourse.enable_maintenance_mode end self end def tmp_directory @tmp_directory ||= begin f = File.join( Rails.root, 'tmp', Time.now.strftime('import%Y%m%d%H%M%S') ) Dir.mkdir(f) unless Dir[f].present? f end end def extract_files FileUtils.cd( tmp_directory ) do `tar xvzf #{@archive_filename} tables.json` end end def backup_tables log " Backing up tables" ActiveRecord::Base.transaction do create_backup_schema ordered_models_for_import.each do |model| backup_and_setup_table( model ) end end self end def create_backup_schema User.exec_sql("DROP SCHEMA IF EXISTS #{BACKUP_SCHEMA} CASCADE") User.exec_sql("CREATE SCHEMA #{BACKUP_SCHEMA}") self end def backup_and_setup_table( model ) log " #{model.table_name}" @index_definitions[model.table_name] = model.exec_sql("SELECT indexdef FROM pg_indexes WHERE tablename = '#{model.table_name}' and schemaname = 'public';").map { |x| x['indexdef'] } model.exec_sql("ALTER TABLE #{model.table_name} SET SCHEMA #{BACKUP_SCHEMA}") model.exec_sql("CREATE TABLE #{model.table_name} (LIKE #{BACKUP_SCHEMA}.#{model.table_name} INCLUDING DEFAULTS INCLUDING CONSTRAINTS INCLUDING COMMENTS INCLUDING STORAGE);") self end def load_data log " Importing data" @decoder.start( callbacks: { schema_info: method(:set_schema_info), table_data: method(:load_table) } ) self end def set_schema_info(arg) if arg[:source] and arg[:source].downcase == 'discourse' if arg[:version] and arg[:version] <= Export.current_schema_version @export_schema_version = arg[:version] if arg[:table_count] == ordered_models_for_import.size true else raise Import::WrongTableCountError.new("Expected to find #{ordered_models_for_import.size} tables, but export file has #{arg[:table_count]} tables!") end elsif arg[:version].nil? raise ArgumentError.new("The schema version must be provided.") else raise Import::UnsupportedSchemaVersion.new("Export file is from a newer version of Discourse. Upgrade and run migrations to import this file.") end else raise Import::UnsupportedExportSource end end def load_table(table_name, fields_arg, row_data, row_count) fields = fields_arg.dup model = Export::models_included_in_export.find { |m| m.table_name == table_name } if model @adapters ||= Import.adapters_for_version( @export_schema_version ) log " #{table_name}: #{row_count} rows" if @adapters[table_name] @adapters[table_name].each do |adapter| fields = adapter.apply_to_column_names(table_name, fields) end end if fields.size > model.columns.size raise Import::WrongFieldCountError.new("Table #{table_name} is expected to have #{model.columns.size} fields, but got #{fields.size}! Maybe your Discourse server is older than the server that this export file comes from?") end # If there are fewer fields in the data than the model has, then insert only those fields and # hope that the table uses default values or allows null for the missing columns. # If the table doesn't have defaults or is not nullable, then a migration adapter should have been created # along with the migration. column_info = model.columns col_num = -1 rows = row_data.map do |row| if @adapters[table_name] @adapters[table_name].each do |adapter| row = adapter.apply_to_row(table_name, row) end end row end.transpose.map do |col_values| col_num += 1 case column_info[col_num].type when :boolean col_values.map { |v| v.nil? ? nil : (v == 'f' ? false : true) } else col_values end end.transpose parameter_markers = fields.map {|x| "?"}.join(',') sql_stmt = "INSERT INTO #{table_name} (#{fields.join(',')}) VALUES (#{parameter_markers})" rows.each do |row| User.exec_sql(sql_stmt, *row) end true else add_warning "Export file contained an unrecognized table named: #{table_name}! It was ignored." end end def create_indexes log " Creating indexes" ordered_models_for_import.each do |model| log " #{model.table_name}" @index_definitions[model.table_name].each do |indexdef| model.exec_sql( indexdef ) end # The indexdef statements don't create the primary keys, so we need to find the primary key and do it ourselves. pkey_index_def = @index_definitions[model.table_name].find { |ixdef| ixdef =~ / ([\S]{1,}_pkey) / } if pkey_index_def and pkey_index_name = / ([\S]{1,}_pkey) /.match(pkey_index_def)[1] model.exec_sql( "ALTER TABLE ONLY #{model.table_name} ADD PRIMARY KEY USING INDEX #{pkey_index_name}" ) end if model.columns.map(&:name).include?('id') max_id = model.exec_sql("SELECT MAX(id) AS max FROM #{model.table_name}")[0]['max'].to_i + 1 seq_name = "#{model.table_name}_id_seq" model.exec_sql("CREATE SEQUENCE #{seq_name} START WITH #{max_id} INCREMENT BY 1 NO MINVALUE NO MAXVALUE CACHE 1") model.exec_sql("ALTER SEQUENCE #{seq_name} OWNED BY #{model.table_name}.id") model.exec_sql("ALTER TABLE #{model.table_name} ALTER COLUMN id SET DEFAULT nextval('#{seq_name}')") end end self end def extract_uploads if `tar tf #{@archive_filename} | grep "uploads/"`.present? FileUtils.cd( File.join(Rails.root, 'public') ) do `tar -xz --keep-newer-files -f #{@archive_filename} uploads/` end end end def rollback ordered_models_for_import.each do |model| log " #{model.table_name}" model.exec_sql("DROP TABLE IF EXISTS #{model.table_name}") rescue nil begin model.exec_sql("ALTER TABLE #{BACKUP_SCHEMA}.#{model.table_name} SET SCHEMA public") rescue => e log " Failed to restore. #{e.class.name}: #{e.message}" end end end def finish_import Import.set_import_is_not_running Discourse.disable_maintenance_mode FileUtils.rm_rf(tmp_directory) if Dir[tmp_directory].present? if @warnings.size > 0 log "WARNINGS:" @warnings.each do |message| log " #{message}" end end # send_notification end def send_notification # Doesn't work. "WARNING: Can't mass-assign protected attributes: created_at" # Still a problem with the activerecord schema_cache I think. # if @user_info and @user_info[:user_id] # user = User.where(id: @user_info[:user_id]).first # if user and user.email == @user_info[:email] # SystemMessage.new(user).create('import_succeeded') # end # end true end def add_warning(message) @warnings << message end def log(*args) puts args args.each do |arg| Rails.logger.info "#{Time.now.to_formatted_s(:db)}: [IMPORTER] #{arg}" end true end end end