#pragma once #include "general.hpp" #include "MiniFunction.hpp" #include "../loader/Event.hpp" #include "../loader/Loader.hpp" namespace geode { template class [[nodiscard]] Task final { public: struct [[nodiscard]] Cancel final {}; class Result final { private: std::variant m_value; std::optional getValue() && { if (m_value.index() == 0) { return std::optional(std::move(std::get<0>(std::move(m_value)))); } return std::nullopt; } bool isCancelled() const { return m_value.index() == 1; } template friend class Task; public: Result(Result&&) = default; Result(Result const&) = delete; Result(T&& value) : m_value(std::in_place_index<0>, std::forward(value)) {} Result(Cancel const&) : m_value(std::in_place_index<1>, Cancel()) {} template Result(V&& value) requires std::is_constructible_v : m_value(std::in_place_index<0>, std::forward(value)) {} }; public: enum class Status { Pending, Finished, Cancelled, }; class Handle final { private: std::recursive_mutex m_mutex; Status m_status = Status::Pending; std::optional m_resultValue; bool m_finalEventPosted = false; std::unique_ptr m_mapListener = { nullptr, +[](void*) {} }; class PrivateMarker final {}; static std::shared_ptr create() { return std::make_shared(PrivateMarker()); } bool is(Status status) { std::unique_lock lock(m_mutex); return m_status == status; } template friend class Task; public: Handle(PrivateMarker) {} }; class Event final : public geode::Event { private: std::shared_ptr m_handle; std::variant m_value; EventListenerProtocol* m_for = nullptr; Event(std::shared_ptr handle, std::variant&& value) : m_handle(handle), m_value(std::move(value)) {} static Event createFinished(std::shared_ptr handle, T* value) { return Event(handle, std::variant(std::in_place_index<0>, value)); } static Event createProgressed(std::shared_ptr handle, P* value) { return Event(handle, std::variant(std::in_place_index<1>, value)); } static Event createCancelled(std::shared_ptr handle) { return Event(handle, std::variant(std::in_place_index<2>, Cancel())); } template friend class Task; public: T* getValue() { return m_value.index() == 0 ? std::get<0>(m_value) : nullptr; } T const* getValue() const { return m_value.index() == 0 ? std::get<0>(m_value) : nullptr; } P* getProgress() { return m_value.index() == 1 ? std::get<1>(m_value) : nullptr; } P const* getProgress() const { return m_value.index() == 1 ? std::get<1>(m_value) : nullptr; } bool isCancelled() const { return m_value.index() == 2; } void cancel() { Task::cancel(m_handle); } }; using Value = T; using Progress = P; using PostResult = utils::MiniFunction; using PostProgress = utils::MiniFunction; using HasBeenCancelled = utils::MiniFunction; using Run = utils::MiniFunction; using RunWithCallback = utils::MiniFunction; using Callback = void(Event*); private: EventListenerProtocol* m_listener = nullptr; std::shared_ptr m_handle; Task(std::shared_ptr handle) : m_handle(handle) {} static void finish(std::shared_ptr handle, T&& value) { if (!handle) return; std::unique_lock lock(handle->m_mutex); if (handle->m_status == Status::Pending) { handle->m_status = Status::Finished; handle->m_resultValue.emplace(std::move(value)); Loader::get()->queueInMainThread([handle, value = &*handle->m_resultValue]() mutable { Event::createFinished(handle, value).post(); std::unique_lock lock(handle->m_mutex); handle->m_finalEventPosted = true; }); } } static void progress(std::shared_ptr handle, P&& value) { if (!handle) return; std::unique_lock lock(handle->m_mutex); if (handle->m_status == Status::Pending) { Loader::get()->queueInMainThread([handle, value = std::move(value)]() mutable { Event::createProgressed(handle, &value).post(); }); } } static void cancel(std::shared_ptr handle) { if (!handle) return; std::unique_lock lock(handle->m_mutex); if (handle->m_status == Status::Pending) { handle->m_status = Status::Cancelled; Loader::get()->queueInMainThread([handle]() mutable { Event::createCancelled(handle).post(); std::unique_lock lock(handle->m_mutex); handle->m_finalEventPosted = true; }); } } template friend class Task; public: Task() : m_handle(nullptr) {} Task(Task const& other) : m_handle(other.m_handle) {} Task(Task&& other) : m_handle(std::move(other.m_handle)) {} Task& operator=(Task const& other) { m_handle = other.m_handle; return *this; } Task& operator=(Task&& other) { m_handle = std::move(other.m_handle); return *this; } bool operator==(Task const& other) const { return m_handle == other.m_handle; } bool operator!=(Task const& other) const { return m_handle != other.m_handle; } bool operator<(Task const& other) const { return m_handle < other.m_handle; } bool operator<=(Task const& other) const { return m_handle <= other.m_handle; } bool operator>(Task const& other) const { return m_handle > other.m_handle; } bool operator>=(Task const& other) const { return m_handle >= other.m_handle; } T* getFinishedValue() { if (m_handle && m_handle->m_resultValue) { return &*m_handle->m_resultValue; } return nullptr; } void cancel() { Task::cancel(m_handle); } bool isPending() { return m_handle && m_handle->is(Status::Pending); } bool isFinished() { return m_handle && m_handle->is(Status::Finished); } bool isCancelled() { return m_handle && m_handle->is(Status::Cancelled); } static Task immediate(T value) { auto task = Task(Handle::create()); Task::finish(task.m_handle, std::move(value)); return task; } static Task run(Run&& body) { auto task = Task(Handle::create()); std::thread([handle = std::weak_ptr(task.m_handle), body = std::move(body)] { utils::thread::setName(fmt::format("Task @{}", fmt::ptr(handle.lock()))); auto result = body( [handle](P progress) { Task::progress(handle.lock(), std::move(progress)); }, [handle]() -> bool { // The task has been cancelled if the user has explicitly cancelled it, // or if there is no one listening anymore auto lock = handle.lock(); return !(lock && lock->is(Status::Pending)); } ); if (result.isCancelled()) { Task::cancel(handle.lock()); } else { Task::finish(handle.lock(), std::move(*std::move(result).getValue())); } }).detach(); return task; } static Task runWithCallback(RunWithCallback&& body) { auto task = Task(Handle::create()); std::thread([handle = std::weak_ptr(task.m_handle), body = std::move(body)] { utils::thread::setName(fmt::format("Task (w/ callback) @{}", fmt::ptr(handle.lock()))); body( [handle](Result result) { if (result.isCancelled()) { Task::cancel(handle.lock()); } else { Task::finish(handle.lock(), std::move(*std::move(result).getValue())); } }, [handle](P progress) { Task::progress(handle.lock(), std::move(progress)); }, [handle]() -> bool { // The task has been cancelled if the user has explicitly cancelled it, // or if there is no one listening anymore auto lock = handle.lock(); return !(lock && lock->is(Status::Pending)); } ); }).detach(); return task; } template auto map(ResultMapper&& resultMapper, ProgressMapper&& progressMapper) { using T2 = decltype(resultMapper(std::declval())); using P2 = decltype(progressMapper(std::declval())); auto task = Task(Task::Handle::create()); // Lock the current task until we have managed to create our new one std::unique_lock lock(m_handle->m_mutex); // If the current task is cancelled, cancel the new one immediately if (m_handle->m_status == Status::Cancelled) { Task::cancel(task.m_handle); } // If the current task is finished, immediately map the value and post that else if (m_handle->m_status == Status::Finished) { Task::finish(task.m_handle, std::move(resultMapper(&*m_handle->m_resultValue))); } // Otherwise start listening and waiting for the current task to finish else { task.m_handle->m_mapListener = std::unique_ptr( static_cast(new EventListener( [ handle = std::weak_ptr(task.m_handle), resultMapper = std::move(resultMapper), progressMapper = std::move(progressMapper) ](Event* event) { if (auto v = event->getValue()) { Task::finish(handle.lock(), std::move(resultMapper(v))); } else if (auto p = event->getProgress()) { Task::progress(handle.lock(), std::move(progressMapper(p))); } else if (event->isCancelled()) { Task::cancel(handle.lock()); } }, *this )), +[](void* ptr) { delete static_cast*>(ptr); } ); } return task; } template requires std::copy_constructible

auto map(ResultMapper&& resultMapper) { return this->map(std::move(resultMapper), +[](P* p) -> P { return *p; }); } ListenerResult handle(utils::MiniFunction fn, Event* e) { if (e->m_handle == m_handle && (!e->m_for || e->m_for == m_listener)) { fn(e); } return ListenerResult::Propagate; } // todo: i believe alk wanted tasks to be in their own pool EventListenerPool* getPool() const { return DefaultEventListenerPool::get(); } void setListener(EventListenerProtocol* listener) { m_listener = listener; if (!m_handle) return; // If this task has already been finished and the finish event // isn't pending in the event queue, immediately queue up a // finish event for this listener std::unique_lock lock(m_handle->m_mutex); if (m_handle->m_finalEventPosted) { if (m_handle->m_status == Status::Finished) { Loader::get()->queueInMainThread([handle = m_handle, listener = m_listener, value = &*m_handle->m_resultValue]() { auto ev = Event::createFinished(handle, value); ev.m_for = listener; ev.post(); }); } else { Loader::get()->queueInMainThread([handle = m_handle, listener = m_listener]() { auto ev = Event::createCancelled(handle); ev.m_for = listener; ev.post(); }); } } } EventListenerProtocol* getListener() const { return m_listener; } }; static_assert(is_filter>, "The Task class must be a valid event filter!"); }