finish Promise

This commit is contained in:
HJfod 2024-02-26 18:27:33 +02:00
parent 463cebf0c4
commit 456c1895be

View file

@ -9,9 +9,26 @@ namespace geode {
struct DefaultProgress { struct DefaultProgress {
std::string message; std::string message;
std::optional<uint8_t> percentage; std::optional<uint8_t> percentage;
DefaultProgress() = default;
DefaultProgress(auto msg) : message(msg) {}
DefaultProgress(auto msg, uint8_t percentage) : message(msg), percentage(percentage) {}
}; };
} }
class PromiseCancellationToken final {
private:
std::shared_ptr<std::atomic_bool> token = std::make_shared<std::atomic_bool>(false);
template <class T, class E, class P>
friend class Promise;
public:
inline operator bool() {
return *token;
}
};
template <class T = impl::DefaultValue, class E = impl::DefaultError, class P = impl::DefaultProgress> template <class T = impl::DefaultValue, class E = impl::DefaultError, class P = impl::DefaultProgress>
class PromiseEventFilter; class PromiseEventFilter;
@ -29,82 +46,57 @@ namespace geode {
using Expect = utils::MiniFunction<void(E)>; using Expect = utils::MiniFunction<void(E)>;
using Progress = utils::MiniFunction<void(P)>; using Progress = utils::MiniFunction<void(P)>;
using Finally = utils::MiniFunction<void()>; using Finally = utils::MiniFunction<void()>;
using SimpleExecutor = utils::MiniFunction<void(
Then resolve,
Expect reject
)>;
using Executor = utils::MiniFunction<void(
Then resolve,
Expect reject,
Progress progress,
PromiseCancellationToken cancellationToken
)>;
/** /**
* Create a Promise. Call the provided callbacks to notify the * Create a Promise. Call the provided callbacks to notify the
* listener when the Promise is finished. Use the other constructor * listener when the Promise is finished. Use the other constructor
* overloads to specify progress and handle cancellation. See the * overloads to specify progress and handle cancellation.
* class description for general information about Promises *
* @param threaded Whether the Promise should start executing in a new
* thread or not; if false, the Promise starts immediately executing in
* the current thread
*
* @note See the class description for general information about
* Promises
*/ */
Promise(utils::MiniFunction<void(Then resolve, Expect reject)>&& create) Promise(SimpleExecutor&& executor, bool threaded = true)
: Promise([create](auto resolve, auto reject, auto, auto const&) { : Promise([executor](auto resolve, auto reject, auto, auto) {
create(resolve, reject); executor(resolve, reject);
}) {} }, threaded) {}
/** /**
* Create a Promise. Call the provided callbacks to notify the * Create a Promise. Call the provided callbacks to notify the
* listener when the Promise is finished. If the user cancels the * listener when the Promise is finished. If the user cancels the
* Promise, this is reflected in the `cancelled` parameter; you can * Promise, this is reflected in the `cancelled` parameter; you can
* read from it, and if it's true, you can stop whatever you were doing * read from it, and if it's true, you can stop whatever you were doing
* and not call any of the other callbacks. See the class description * and not call any of the other callbacks.
* for general information about Promises *
* @param threaded Whether the Promise should start executing in a new
* thread or not; if false, the Promise starts immediately executing in
* the current thread
*
* @note See the class description for general information about
* Promises
*/ */
Promise(utils::MiniFunction<void( Promise(Executor&& executor, bool threaded = true) : m_data(std::make_shared<Data>()) {
Then resolve, if (threaded) {
Expect reject, std::thread([executor = std::move(executor), data = m_data]() mutable {
Progress progress, Promise::invoke_executor(std::move(executor), data);
std::atomic_bool const& cancelled }).detach();
)>&& create) : m_data(std::make_unique<Data>()) {
create(
[data = m_data](auto&& value) {
if (data->cancelled) return;
std::unique_lock<std::mutex> _(data->mutex);
bool handled = false;
if (data->thenHandler) {
Loader::get()->queueInMainThread([fun = std::move(data->thenHandler), v = std::move(value)] {
fun(v);
});
handled = true;
} }
if (data->finallyHandler) { else {
Loader::get()->queueInMainThread([fun = std::move(data->finallyHandler)] { Promise::invoke_executor(std::move(executor), m_data);
fun();
});
handled = true;
} }
if (!handled) {
data->result = Ok(std::move(value));
}
},
[data = m_data](auto&& error) {
if (data->cancelled) return;
std::unique_lock<std::mutex> _(data->mutex);
bool handled = false;
if (data->expectHandler) {
Loader::get()->queueInMainThread([fun = std::move(data->expectHandler), v = std::move(error)] {
fun(v);
});
handled = true;
}
if (data->finallyHandler) {
Loader::get()->queueInMainThread([fun = std::move(data->finallyHandler)] {
fun();
});
handled = true;
}
if (!handled) {
data->result = Err(std::move(error));
}
},
[data = m_data](auto&& p) {
if (data->cancelled) return;
std::unique_lock<std::mutex> _(data->mutex);
if (auto handler = data->progressHandler) {
handler(std::move(p));
}
},
m_data->cancelled
);
} }
/** /**
@ -112,13 +104,13 @@ namespace geode {
* listener at a time. If the Promise has already been resolved, the * listener at a time. If the Promise has already been resolved, the
* callback is immediately queued in the main thread * callback is immediately queued in the main thread
*/ */
Promise& then(Then handler) { Promise& then(Then&& handler) {
if (m_data->cancelled) return *this; if (m_data->cancellationToken) return *this;
std::unique_lock<std::mutex> _(m_data->mutex); std::unique_lock<std::mutex> _(m_data->mutex);
if (m_data->result.has_value()) { if (m_data->result.has_value()) {
auto v = std::move(m_data->result).value(); auto v = std::move(m_data->result).value();
if (v.isOk()) { if (v.index() == 0) {
Loader::get()->queueInMainThread([handler = std::move(handler), ok = std::move(v).unwrap()] { Loader::get()->queueInMainThread([handler = std::move(handler), ok = std::move(std::get<0>(v))] {
handler(ok); handler(ok);
}); });
} }
@ -133,13 +125,13 @@ namespace geode {
* listener at a time. If the Promise has already been resolved, the * listener at a time. If the Promise has already been resolved, the
* callback is immediately queued in the main thread * callback is immediately queued in the main thread
*/ */
Promise& expect(Expect handler) { Promise& expect(Expect&& handler) {
if (m_data->cancelled) return *this; if (m_data->cancellationToken) return *this;
std::unique_lock<std::mutex> _(m_data->mutex); std::unique_lock<std::mutex> _(m_data->mutex);
if (m_data->result.has_value()) { if (m_data->result.has_value()) {
auto v = std::move(m_data->result).value(); auto v = std::move(m_data->result).value();
if (v.isErr()) { if (v.index() == 1) {
Loader::get()->queueInMainThread([handler = std::move(handler), err = std::move(v).unwrapErr()] { Loader::get()->queueInMainThread([handler = std::move(handler), err = std::move(std::get<1>(v))] {
handler(err); handler(err);
}); });
} }
@ -154,8 +146,8 @@ namespace geode {
* only be one listener at a time. If the Promise has already been * only be one listener at a time. If the Promise has already been
* resolved, nothing happens * resolved, nothing happens
*/ */
Promise& progress(Progress handler) { Promise& progress(Progress&& handler) {
if (m_data->cancelled) return *this; if (m_data->cancellationToken) return *this;
std::unique_lock<std::mutex> _(m_data->mutex); std::unique_lock<std::mutex> _(m_data->mutex);
if (!m_data->result.has_value()) { if (!m_data->result.has_value()) {
m_data->progressHandler = handler; m_data->progressHandler = handler;
@ -168,8 +160,8 @@ namespace geode {
* If the Promise has already been resolved, the callback is * If the Promise has already been resolved, the callback is
* immediately queued in the main thread * immediately queued in the main thread
*/ */
Promise& finally(Finally handler) { Promise& finally(Finally&& handler) {
if (m_data->cancelled) return *this; if (m_data->cancellationToken) return *this;
std::unique_lock<std::mutex> _(m_data->mutex); std::unique_lock<std::mutex> _(m_data->mutex);
if (m_data->result.has_value()) { if (m_data->result.has_value()) {
Loader::get()->queueInMainThread([handler = std::move(handler)] { Loader::get()->queueInMainThread([handler = std::move(handler)] {
@ -191,13 +183,22 @@ namespace geode {
* cancel * cancel
*/ */
Promise& cancel() { Promise& cancel() {
if (m_data->cancelled) return *this; if (m_data->cancellationToken) return *this;
std::unique_lock<std::mutex> _(m_data->mutex); std::unique_lock<std::mutex> _(m_data->mutex);
m_data->thenHandler = nullptr; m_data->thenHandler = nullptr;
m_data->expectHandler = nullptr; m_data->expectHandler = nullptr;
m_data->progressHandler = nullptr; m_data->progressHandler = nullptr;
m_data->finallyHandler = nullptr; m_data->finallyHandler = nullptr;
m_data->cancelled = true; m_data->cancellationToken.token = true;
return *this;
}
/**
* Link this Promise to be cancelled alongside another Promise
*/
Promise& link(PromiseCancellationToken otherCancellationToken) {
if (m_data->cancellationToken) return *this;
m_data->cancellationToken = otherCancellationToken;
return *this; return *this;
} }
@ -218,12 +219,67 @@ namespace geode {
Expect expectHandler; Expect expectHandler;
Progress progressHandler; Progress progressHandler;
Finally finallyHandler; Finally finallyHandler;
std::optional<Result<T, E>> result; std::optional<std::variant<T, E>> result;
std::atomic_bool cancelled = false; PromiseCancellationToken cancellationToken;
}; };
// This has to be a shared_ptr so that the data can persist even after // This has to be a shared_ptr so that the data can persist even after
// the future is destroyed, as well as to share it between resolve, reject, and the likes // the future is destroyed, as well as to share it between resolve, reject, and the likes
std::shared_ptr<Data> m_data; std::shared_ptr<Data> m_data;
static void invoke_executor(Executor&& executor, std::shared_ptr<Data> data) {
executor(
[data](auto&& value) {
if (data->cancellationToken) return;
std::unique_lock<std::mutex> _(data->mutex);
bool handled = false;
if (data->thenHandler) {
Loader::get()->queueInMainThread([fun = std::move(data->thenHandler), v = std::move(value)] {
fun(v);
});
handled = true;
}
if (data->finallyHandler) {
Loader::get()->queueInMainThread([fun = std::move(data->finallyHandler)] {
fun();
});
handled = true;
}
if (!handled) {
data->result = std::variant<T, E>(std::in_place_index<0>, std::move(value));
}
},
[data](auto&& error) {
if (data->cancellationToken) return;
std::unique_lock<std::mutex> _(data->mutex);
bool handled = false;
if (data->expectHandler) {
Loader::get()->queueInMainThread([fun = std::move(data->expectHandler), v = std::move(error)] {
fun(v);
});
handled = true;
}
if (data->finallyHandler) {
Loader::get()->queueInMainThread([fun = std::move(data->finallyHandler)] {
fun();
});
handled = true;
}
if (!handled) {
data->result = std::variant<T, E>(std::in_place_index<1>, std::move(error));
}
},
[data](auto&& p) {
if (data->cancellationToken) return;
std::unique_lock<std::mutex> _(data->mutex);
if (auto handler = data->progressHandler) {
Loader::get()->queueInMainThread([p = std::move(p), handler]() mutable {
handler(std::move(p));
});
}
},
data->cancellationToken
);
}
}; };
/** /**