diff --git a/lib/scheduler/manager.rb b/lib/scheduler/manager.rb
index b1449973a..838c4db9d 100644
--- a/lib/scheduler/manager.rb
+++ b/lib/scheduler/manager.rb
@@ -12,32 +12,78 @@ module Scheduler
 
     class Runner
       def initialize(manager)
+        @mutex = Mutex.new
         @queue = Queue.new
         @manager = manager
+        @reschedule_orphans_thread = Thread.new do
+          while true
+            sleep 1.minute
+            @mutex.synchronize do
+              reschedule_orphans
+            end
+          end
+        end
+        @keep_alive_thread = Thread.new do
+          while true
+            @mutex.synchronize do
+              keep_alive
+            end
+            sleep (@manager.keep_alive_duration / 2)
+          end
+        end
         @thread = Thread.new do
           while true
-            klass = @queue.deq
-            failed = false
-            start = Time.now.to_f
-            info = @manager.schedule_info(klass)
-            begin
-              info.prev_result = "RUNNING"
-              info.write!
-              klass.new.perform
-            rescue => e
-              Scheduler::Manager.handle_exception(e)
-              failed = true
-            end
-            duration = ((Time.now.to_f - start) * 1000).to_i
-            info.prev_duration = duration
-            info.prev_result = failed ? "FAILED" : "OK"
-            info.write!
+            process_queue
           end
         end
       end
 
+      def keep_alive
+        @manager.keep_alive
+      rescue => ex
+        Scheduler::Manager.handle_exception(ex)
+      end
+
+      def reschedule_orphans
+        @manager.reschedule_orphans!
+      rescue => ex
+        Scheduler::Manager.handle_exception(ex)
+      end
+
+      def process_queue
+        klass = @queue.deq
+        # hack alert, I need to both deq and set @running atomically.
+        @running = true
+        failed = false
+        start = Time.now.to_f
+        info = @mutex.synchronize { @manager.schedule_info(klass) }
+        begin
+          info.prev_result = "RUNNING"
+          @mutex.synchronize { info.write! }
+          klass.new.perform
+        rescue => e
+          Scheduler::Manager.handle_exception(e)
+          failed = true
+        end
+        duration = ((Time.now.to_f - start) * 1000).to_i
+        info.prev_duration = duration
+        info.prev_result = failed ? "FAILED" : "OK"
+        info.current_owner = nil
+        attempts(3) do
+          @mutex.synchronize { info.write! }
+        end
+      rescue => ex
+        Scheduler::Manager.handle_exception(ex)
+      ensure
+        @running = false
+      end
+
       def stop!
-        @thread.kill
+        @mutex.synchronize do
+          @thread.kill
+          @keep_alive_thread.kill
+          @reschedule_orphans_thread.kill
+        end
       end
 
       def enq(klass)
@@ -48,7 +94,23 @@ module Scheduler
         while !@queue.empty? && !(@queue.num_waiting > 0)
           sleep 0.001
         end
+        # this is a hack, but is only used for test anyway
+        sleep 0.001
+        while @running
+          sleep 0.001
+        end
       end
+
+      def attempts(n)
+        n.times {
+          begin
+            yield; break
+          rescue
+            sleep Random.rand
+          end
+        }
+      end
+
     end
 
     def self.without_runner(redis=nil)
@@ -94,22 +156,42 @@ module Scheduler
       end
     end
 
+    def reschedule_orphans!
+      lock do
+        redis.zrange(Manager.queue_key, 0, -1).each do |key|
+          klass = get_klass(key)
+          next unless klass
+          info = schedule_info(klass)
+
+          if ['QUEUED', 'RUNNING'].include?(info.prev_result) &&
+            (info.current_owner.blank? || !redis.get(info.current_owner))
+            info.prev_result = 'ORPHAN'
+            info.next_run = Time.now.to_i
+            info.write!
+          end
+        end
+      end
+    end
+
+    def get_klass(name)
+      name.constantize
+    rescue NameError
+      nil
+    end
+
     def tick
       lock do
         (key, due), _ = redis.zrange Manager.queue_key, 0, 0, withscores: true
         return unless key
         if due.to_i <= Time.now.to_i
-          klass = begin
-                    key.constantize
-                  rescue NameError
-                    nil
-                  end
+          klass = get_klass(key)
           return unless klass
           info = schedule_info(klass)
           info.prev_run = Time.now.to_i
           info.prev_result = "QUEUED"
           info.prev_duration = -1
           info.next_run = nil
+          info.current_owner = identity_key
           info.schedule!
           @runner.enq(klass)
         end
@@ -126,6 +208,13 @@ module Scheduler
       self.class.current = nil
     end
 
+    def keep_alive_duration
+      60
+    end
+
+    def keep_alive
+      redis.setex identity_key, keep_alive_duration, ""
+    end
 
     def lock
       got_lock = false
@@ -157,6 +246,7 @@ module Scheduler
       redis.del Manager.lock_key
     end
 
+
     def self.discover_schedules
       schedules = []
       ObjectSpace.each_object(Scheduler::Schedule) do |schedule|
@@ -165,6 +255,18 @@ module Scheduler
       schedules
     end
 
+    @mutex = Mutex.new
+    def self.seq
+      @mutex.synchronize do
+        @i ||= 0
+        @i += 1
+      end
+    end
+
+    def identity_key
+      @identity_key ||= "_scheduler_#{`hostname`}:#{Process.pid}:#{self.class.seq}"
+    end
+
     def self.lock_key
       "_scheduler_lock_"
     end
diff --git a/lib/scheduler/schedule_info.rb b/lib/scheduler/schedule_info.rb
index d8aeaa71f..18d12147b 100644
--- a/lib/scheduler/schedule_info.rb
+++ b/lib/scheduler/schedule_info.rb
@@ -3,7 +3,8 @@ module Scheduler
     attr_accessor :next_run,
                   :prev_run,
                   :prev_duration,
-                  :prev_result
+                  :prev_result,
+                  :current_owner
 
     def initialize(klass, manager)
       @klass = klass
@@ -21,10 +22,11 @@ module Scheduler
         @prev_run = data["prev_run"]
         @prev_result = data["prev_result"]
         @prev_duration = data["prev_duration"]
+        @current_owner = data["current_owner"]
       end
     rescue
       # corrupt redis
-      @next_run = @prev_run = @prev_result = @prev_duration = nil
+      @next_run = @prev_run = @prev_result = @prev_duration = @current_owner = nil
     end
 
     def valid?
@@ -57,14 +59,15 @@ module Scheduler
         next_run: @next_run,
         prev_run: @prev_run,
         prev_duration: @prev_duration,
-        prev_result: @prev_result
+        prev_result: @prev_result,
+        current_owner: @current_owner
       }.to_json
       redis.zadd Manager.queue_key, @next_run , @klass
     end
 
     def del!
       clear!
-      @next_run = @prev_run = @prev_result = @prev_duration = nil
+      @next_run = @prev_run = @prev_result = @prev_duration = @current_owner = nil
     end
 
     def key
diff --git a/lib/scheduler/views/scheduler.erb b/lib/scheduler/views/scheduler.erb
index dcd3e4157..b46bd4c2f 100644
--- a/lib/scheduler/views/scheduler.erb
+++ b/lib/scheduler/views/scheduler.erb
@@ -15,6 +15,7 @@
           <th style="width: 15%">Last Run</th>
           <th style="width: 15%">Last Result</th>
           <th style="width: 15%">Last Duration</th>
+          <th style="width: 15%">Last Owner</th>
           <th style="width: 15%">Next Run Due</th>
           <th style="width: 10%">Actions</th>
         </thead>
@@ -37,6 +38,9 @@
             <td>
               <%= @info.prev_duration %>
             </td>
+            <td>
+              <%= @info.current_owner %>
+            </td>
             <td>
               <% next_run = @info.next_run %>
               <% if next_run.nil? %>
diff --git a/spec/components/scheduler/manager_spec.rb b/spec/components/scheduler/manager_spec.rb
index b1ff8e0ee..2f9dbe8e5 100644
--- a/spec/components/scheduler/manager_spec.rb
+++ b/spec/components/scheduler/manager_spec.rb
@@ -23,19 +23,60 @@ describe Scheduler::Manager do
         sleep 0.001
       end
     end
+
+    class SuperLongJob
+      extend ::Scheduler::Schedule
+
+      every 10.minutes
+
+      def perform
+        sleep 1000
+      end
+    end
   end
 
   let(:manager) { Scheduler::Manager.new(DiscourseRedis.new) }
 
   before do
+    $redis.del manager.class.lock_key
     $redis.del manager.class.queue_key
+    manager.remove(Testing::RandomJob)
+    manager.remove(Testing::SuperLongJob)
   end
 
   after do
     manager.stop!
+    manager.remove(Testing::RandomJob)
+    manager.remove(Testing::SuperLongJob)
+  end
+
+  describe '#sync' do
+
+    it 'increases' do
+      Scheduler::Manager.seq.should == Scheduler::Manager.seq - 1
+    end
   end
 
   describe '#tick' do
+
+    it 'should recover from crashed manager' do
+
+      info = manager.schedule_info(Testing::SuperLongJob)
+      info.next_run = Time.now.to_i - 1
+      info.write!
+
+      manager.tick
+      manager.stop!
+
+      $redis.del manager.identity_key
+
+      manager = Scheduler::Manager.new(DiscourseRedis.new)
+      manager.reschedule_orphans!
+
+      info = manager.schedule_info(Testing::SuperLongJob)
+      info.next_run.should <= Time.now.to_i
+    end
+
     it 'should only run pending job once' do
 
       Testing::RandomJob.runs = 0
@@ -48,6 +89,7 @@ describe Scheduler::Manager do
         Thread.new do
           manager = Scheduler::Manager.new(DiscourseRedis.new)
           manager.blocking_tick
+          manager.stop!
         end
       end.map(&:join)