mirror of
https://github.com/geode-sdk/geode.git
synced 2025-03-13 22:49:52 -04:00
new promise impl (does not compile for shit)
This commit is contained in:
parent
c762283b49
commit
8101ae50ab
5 changed files with 480 additions and 330 deletions
|
@ -16,190 +16,353 @@ namespace geode {
|
|||
};
|
||||
}
|
||||
|
||||
class PromiseCancellationToken final {
|
||||
private:
|
||||
std::shared_ptr<std::atomic_bool> token = std::make_shared<std::atomic_bool>(false);
|
||||
|
||||
template <class T, class E, class P>
|
||||
friend class Promise;
|
||||
|
||||
public:
|
||||
inline operator bool() {
|
||||
return *token;
|
||||
}
|
||||
};
|
||||
struct CancelledState final {};
|
||||
|
||||
template <class T = impl::DefaultValue, class E = impl::DefaultError, class P = impl::DefaultProgress>
|
||||
class PromiseEventFilter;
|
||||
|
||||
/**
|
||||
* Represents an asynchronous `Result`. Similar to `Future` in Rust, or
|
||||
* `Promise` in JavaScript. May have only one of each kind of callback. Can
|
||||
* also be used to monitor the progress of the upcoming value. All
|
||||
* callbacks are always run in the main thread, so interacting with UI is
|
||||
* safe
|
||||
*/
|
||||
template <class T = impl::DefaultValue, class E = impl::DefaultError, class P = impl::DefaultProgress>
|
||||
class [[nodiscard]] Promise final {
|
||||
class Promise final {
|
||||
public:
|
||||
using Then = utils::MiniFunction<void(T)>;
|
||||
using Expect = utils::MiniFunction<void(E)>;
|
||||
using Progress = utils::MiniFunction<void(P)>;
|
||||
using Finally = utils::MiniFunction<void()>;
|
||||
using SimpleExecutor = utils::MiniFunction<void(
|
||||
Then resolve,
|
||||
Expect reject
|
||||
)>;
|
||||
using Executor = utils::MiniFunction<void(
|
||||
Then resolve,
|
||||
Expect reject,
|
||||
Progress progress,
|
||||
PromiseCancellationToken cancellationToken
|
||||
)>;
|
||||
using Value = T;
|
||||
using Error = E;
|
||||
using Progress = P;
|
||||
|
||||
/**
|
||||
* Create a Promise. Call the provided callbacks to notify the
|
||||
* listener when the Promise is finished. Use the other constructor
|
||||
* overloads to specify progress and handle cancellation.
|
||||
*
|
||||
* @param threaded Whether the Promise should start executing in a new
|
||||
* thread or not; if false, the Promise starts immediately executing in
|
||||
* the current thread
|
||||
*
|
||||
* @note See the class description for general information about
|
||||
* Promises
|
||||
*/
|
||||
Promise(SimpleExecutor&& executor, bool threaded = true)
|
||||
: Promise([executor](auto resolve, auto reject, auto, auto) {
|
||||
executor(resolve, reject);
|
||||
}, threaded) {}
|
||||
using OnResolved = utils::MiniFunction<void(Value)>;
|
||||
using OnRejected = utils::MiniFunction<void(Error)>;
|
||||
using OnProgress = utils::MiniFunction<void(Progress)>;
|
||||
using OnFinished = utils::MiniFunction<void()>;
|
||||
using OnCancelled = utils::MiniFunction<void()>;
|
||||
|
||||
using State = std::variant<Value, Error, Progress, CancelledState>;
|
||||
using OnStateChange = utils::MiniFunction<void(State)>;
|
||||
|
||||
// These are needed if for example Value and Error are the same type
|
||||
static constexpr size_t STATE_VALUE_INDEX = 0;
|
||||
static constexpr size_t STATE_ERROR_INDEX = 1;
|
||||
static constexpr size_t STATE_PROGRESS_INDEX = 2;
|
||||
static constexpr size_t STATE_CANCELLED_INDEX = 3;
|
||||
|
||||
Promise() : m_data(std::make_shared<Data>()) {}
|
||||
|
||||
Promise(utils::MiniFunction<void(OnResolved, OnRejected)> source, bool threaded = true)
|
||||
: Promise([source](auto resolve, auto reject, auto, auto) {
|
||||
source(resolve, reject);
|
||||
}) {}
|
||||
|
||||
/**
|
||||
* Create a Promise. Call the provided callbacks to notify the
|
||||
* listener when the Promise is finished. If the user cancels the
|
||||
* Promise, this is reflected in the `cancelled` parameter; you can
|
||||
* read from it, and if it's true, you can stop whatever you were doing
|
||||
* and not call any of the other callbacks.
|
||||
*
|
||||
* @param threaded Whether the Promise should start executing in a new
|
||||
* thread or not; if false, the Promise starts immediately executing in
|
||||
* the current thread
|
||||
*
|
||||
* @note See the class description for general information about
|
||||
* Promises
|
||||
*/
|
||||
Promise(Executor&& executor, bool threaded = true) : m_data(std::make_shared<Data>()) {
|
||||
Promise(utils::MiniFunction<void(OnResolved, OnRejected, OnProgress, OnCancelled)> source, bool threaded = true)
|
||||
: Promise([source](auto onStateChanged) {
|
||||
source(
|
||||
[onStateChanged](auto&& value) {
|
||||
onStateChanged(State(std::in_place_index<STATE_VALUE_INDEX>, value));
|
||||
},
|
||||
[onStateChanged](auto&& error) {
|
||||
onStateChanged(State(std::in_place_index<STATE_ERROR_INDEX>, error));
|
||||
},
|
||||
[onStateChanged](auto&& progress) {
|
||||
onStateChanged(State(std::in_place_index<STATE_PROGRESS_INDEX>, progress));
|
||||
},
|
||||
[onStateChanged]() {
|
||||
onStateChanged(State(std::in_place_index<STATE_CANCELLED_INDEX>, CancelledState()));
|
||||
}
|
||||
);
|
||||
}) {}
|
||||
|
||||
Promise(utils::MiniFunction<void(OnStateChange)> source, bool threaded = true) : m_data(std::make_shared<Data>()) {
|
||||
if (threaded) {
|
||||
std::thread([executor = std::move(executor), data = m_data]() mutable {
|
||||
Promise::invoke_executor(std::move(executor), data);
|
||||
std::thread([source = std::move(source), data = m_data]() mutable {
|
||||
Promise::invoke_source(std::move(source), data);
|
||||
}).detach();
|
||||
}
|
||||
else {
|
||||
Promise::invoke_executor(std::move(executor), m_data);
|
||||
Promise::invoke_source(std::move(source), m_data);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a listener for when the Promise finishes. There may only be one
|
||||
* listener at a time. If the Promise has already been resolved, the
|
||||
* callback is immediately queued in the main thread
|
||||
*/
|
||||
Promise& then(Then&& handler) {
|
||||
if (m_data->cancellationToken) return *this;
|
||||
Promise then(utils::MiniFunction<void(Value)>&& callback) {
|
||||
return this->template then<Value>([callback](auto value) {
|
||||
callback(value);
|
||||
return std::move(value);
|
||||
});
|
||||
}
|
||||
template <class T2>
|
||||
requires (!std::is_void_v<T2>)
|
||||
Promise<T2, E, P> then(utils::MiniFunction<T2(T)>&& callback) {
|
||||
if (m_data->cancelled) return make_cancelled<T2, E, P>();
|
||||
std::unique_lock<std::mutex> _(m_data->mutex);
|
||||
|
||||
// Check if this Promise has already been resolved, and if so
|
||||
// immediately queue the callback with the value
|
||||
if (m_data->result.has_value()) {
|
||||
auto v = std::move(m_data->result).value();
|
||||
auto v = m_data->result.value();
|
||||
if (v.index() == 0) {
|
||||
Loader::get()->queueInMainThread([handler = std::move(handler), ok = std::move(std::get<0>(v))] {
|
||||
handler(ok);
|
||||
Loader::get()->queueInMainThread([callback = std::move(callback), ok = std::move(std::get<0>(v))] {
|
||||
callback(ok);
|
||||
});
|
||||
}
|
||||
return make_cancelled<T2, E, P>();
|
||||
}
|
||||
else {
|
||||
m_data->thenHandler = handler;
|
||||
}
|
||||
return *this;
|
||||
|
||||
return Promise<T2, E, P>([data = m_data, callback](auto fwdStateToNextPromise) {
|
||||
data->callback = [fwdStateToNextPromise, callback](auto&& state) {
|
||||
// Can't use std::visit if Value and Error are the same >:(
|
||||
switch (state.index()) {
|
||||
case STATE_VALUE_INDEX: {
|
||||
auto mapped = callback(std::get<STATE_VALUE_INDEX>(state));
|
||||
fwdStateToNextPromise(Promise<T2, E, P>::State(
|
||||
std::in_place_index<STATE_VALUE_INDEX>,
|
||||
std::move(mapped)
|
||||
));
|
||||
} break;
|
||||
|
||||
case STATE_ERROR_INDEX: {
|
||||
fwdStateToNextPromise(Promise<T2, E, P>::State(
|
||||
std::in_place_index<STATE_ERROR_INDEX>,
|
||||
std::move(std::get<STATE_ERROR_INDEX>(state))
|
||||
));
|
||||
} break;
|
||||
|
||||
case STATE_PROGRESS_INDEX: {
|
||||
fwdStateToNextPromise(Promise<T2, E, P>::State(
|
||||
std::in_place_index<STATE_PROGRESS_INDEX>,
|
||||
std::move(std::get<STATE_PROGRESS_INDEX>(state))
|
||||
));
|
||||
} break;
|
||||
|
||||
case STATE_CANCELLED_INDEX: {
|
||||
fwdStateToNextPromise(Promise<T2, E, P>::State(
|
||||
std::in_place_index<STATE_CANCELLED_INDEX>,
|
||||
std::move(std::get<STATE_CANCELLED_INDEX>(state))
|
||||
));
|
||||
} break;
|
||||
}
|
||||
};
|
||||
});
|
||||
}
|
||||
/**
|
||||
* Add a listener for when the Promise fails. There may only be one
|
||||
* listener at a time. If the Promise has already been resolved, the
|
||||
* callback is immediately queued in the main thread
|
||||
*/
|
||||
Promise& expect(Expect&& handler) {
|
||||
if (m_data->cancellationToken) return *this;
|
||||
|
||||
template <class T2, class E2>
|
||||
requires (!std::is_void_v<T2>)
|
||||
Promise<T2, E2, P> then(utils::MiniFunction<Result<T2, E2>(Result<T, E>)>&& callback) {
|
||||
if (m_data->cancelled) return make_cancelled<T2, E2, P>();
|
||||
std::unique_lock<std::mutex> _(m_data->mutex);
|
||||
|
||||
// Check if this Promise has already been resolved, and if so
|
||||
// immediately queue the callback with the value
|
||||
if (m_data->result.has_value()) {
|
||||
auto v = std::move(m_data->result).value();
|
||||
auto v = m_data->result.value();
|
||||
if (v.index() == 0) {
|
||||
Loader::get()->queueInMainThread([callback = std::move(callback), ok = std::move(std::get<0>(v))] {
|
||||
(void)callback(Ok(ok));
|
||||
});
|
||||
}
|
||||
else {
|
||||
Loader::get()->queueInMainThread([callback = std::move(callback), err = std::move(std::get<1>(v))] {
|
||||
(void)callback(Err(err));
|
||||
});
|
||||
}
|
||||
return make_cancelled<T2, E2, P>();
|
||||
}
|
||||
|
||||
return Promise<T2, E2, P>([data = m_data, callback](auto fwdStateToNextPromise) {
|
||||
data->callback = [fwdStateToNextPromise, callback](auto&& state) {
|
||||
// Can't use std::visit if Value and Error are the same >:(
|
||||
switch (state.index()) {
|
||||
case STATE_VALUE_INDEX: {
|
||||
auto mapped = callback(Ok(std::move(std::get<STATE_VALUE_INDEX>(state))));
|
||||
if (mapped) {
|
||||
fwdStateToNextPromise(Promise<T2, E2, P>::State(
|
||||
std::in_place_index<STATE_VALUE_INDEX>,
|
||||
std::move(mapped).unwrap()
|
||||
));
|
||||
}
|
||||
else {
|
||||
fwdStateToNextPromise(Promise<T2, E2, P>::State(
|
||||
std::in_place_index<STATE_ERROR_INDEX>,
|
||||
std::move(mapped).unwrapErr()
|
||||
));
|
||||
}
|
||||
} break;
|
||||
|
||||
case STATE_ERROR_INDEX: {
|
||||
auto mapped = callback(Err(std::move(std::get<STATE_ERROR_INDEX>(state))));
|
||||
if (mapped) {
|
||||
fwdStateToNextPromise(Promise<T2, E2, P>::State(
|
||||
std::in_place_index<STATE_VALUE_INDEX>,
|
||||
std::move(mapped).unwrap()
|
||||
));
|
||||
}
|
||||
else {
|
||||
fwdStateToNextPromise(Promise<T2, E2, P>::State(
|
||||
std::in_place_index<STATE_ERROR_INDEX>,
|
||||
std::move(mapped).unwrapErr()
|
||||
));
|
||||
}
|
||||
} break;
|
||||
|
||||
case STATE_PROGRESS_INDEX: {
|
||||
fwdStateToNextPromise(Promise<T2, E2, P>::State(
|
||||
std::in_place_index<STATE_PROGRESS_INDEX>,
|
||||
std::move(std::get<STATE_PROGRESS_INDEX>(state))
|
||||
));
|
||||
} break;
|
||||
|
||||
case STATE_CANCELLED_INDEX: {
|
||||
fwdStateToNextPromise(Promise<T2, E2, P>::State(
|
||||
std::in_place_index<STATE_CANCELLED_INDEX>,
|
||||
std::move(std::get<STATE_CANCELLED_INDEX>(state))
|
||||
));
|
||||
} break;
|
||||
}
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
Promise expect(utils::MiniFunction<void(Error)>&& callback) {
|
||||
return this->template expect<Error>([callback](auto error) {
|
||||
callback(error);
|
||||
return std::move(error);
|
||||
});
|
||||
}
|
||||
template <class E2>
|
||||
requires (!std::is_void_v<E2>)
|
||||
Promise<T, E2, P> expect(utils::MiniFunction<E2(E)>&& callback) {
|
||||
if (m_data->cancelled) return make_cancelled<T, E2, P>();
|
||||
std::unique_lock<std::mutex> _(m_data->mutex);
|
||||
|
||||
// Check if this Promise has already been resolved, and if so
|
||||
// immediately queue the callback with the value
|
||||
if (m_data->result.has_value()) {
|
||||
auto v = m_data->result.value();
|
||||
if (v.index() == 1) {
|
||||
Loader::get()->queueInMainThread([handler = std::move(handler), err = std::move(std::get<1>(v))] {
|
||||
handler(err);
|
||||
Loader::get()->queueInMainThread([callback = std::move(callback), err = std::move(std::get<1>(v))] {
|
||||
callback(err);
|
||||
});
|
||||
}
|
||||
return make_cancelled<T, E2, P>();
|
||||
}
|
||||
else {
|
||||
m_data->expectHandler = handler;
|
||||
}
|
||||
return *this;
|
||||
|
||||
return Promise<T, E2, P>([data = m_data, callback](auto fwdStateToNextPromise) {
|
||||
data->callback = [fwdStateToNextPromise, callback](auto&& state) {
|
||||
// Can't use std::visit if Value and Error are the same >:(
|
||||
switch (state.index()) {
|
||||
case STATE_VALUE_INDEX: {
|
||||
fwdStateToNextPromise(Promise<T, E2, P>::State(
|
||||
std::in_place_index<STATE_VALUE_INDEX>,
|
||||
std::move(std::get<STATE_VALUE_INDEX>(state))
|
||||
));
|
||||
} break;
|
||||
|
||||
case STATE_ERROR_INDEX: {
|
||||
auto mapped = callback(std::get<STATE_ERROR_INDEX>(state));
|
||||
fwdStateToNextPromise(Promise<T, E2, P>::State(
|
||||
std::in_place_index<STATE_ERROR_INDEX>,
|
||||
std::move(mapped)
|
||||
));
|
||||
} break;
|
||||
|
||||
case STATE_PROGRESS_INDEX: {
|
||||
fwdStateToNextPromise(Promise<T, E2, P>::State(
|
||||
std::in_place_index<STATE_PROGRESS_INDEX>,
|
||||
std::move(std::get<STATE_PROGRESS_INDEX>(state))
|
||||
));
|
||||
} break;
|
||||
|
||||
case STATE_CANCELLED_INDEX: {
|
||||
fwdStateToNextPromise(Promise<T, E2, P>::State(
|
||||
std::in_place_index<STATE_CANCELLED_INDEX>,
|
||||
std::move(std::get<STATE_CANCELLED_INDEX>(state))
|
||||
));
|
||||
} break;
|
||||
}
|
||||
};
|
||||
});
|
||||
}
|
||||
/**
|
||||
* Add a listener for when the Promise's progress is updated. There may
|
||||
* only be one listener at a time. If the Promise has already been
|
||||
* resolved, nothing happens
|
||||
*/
|
||||
Promise& progress(Progress&& handler) {
|
||||
if (m_data->cancellationToken) return *this;
|
||||
std::unique_lock<std::mutex> _(m_data->mutex);
|
||||
if (!m_data->result.has_value()) {
|
||||
m_data->progressHandler = handler;
|
||||
}
|
||||
return *this;
|
||||
|
||||
Promise progress(utils::MiniFunction<void(Progress)>&& callback) {
|
||||
return this->template progress<Progress>([callback](auto prog) {
|
||||
callback(prog);
|
||||
return std::move(prog);
|
||||
});
|
||||
}
|
||||
/**
|
||||
* Add a listener for when the Promise is finished, regardless of if
|
||||
* it was succesful or not. There may only be one listener at a time.
|
||||
* If the Promise has already been resolved, the callback is
|
||||
* immediately queued in the main thread
|
||||
*/
|
||||
Promise& finally(Finally&& handler) {
|
||||
if (m_data->cancellationToken) return *this;
|
||||
template <class P2>
|
||||
requires (!std::is_void_v<P2>)
|
||||
Promise<T, E, P2> progress(utils::MiniFunction<P2(P)>&& callback) {
|
||||
if (m_data->cancelled) return make_cancelled<T, E, P2>();
|
||||
std::unique_lock<std::mutex> _(m_data->mutex);
|
||||
|
||||
// Check if this Promise has already been resolved
|
||||
if (m_data->result.has_value()) {
|
||||
Loader::get()->queueInMainThread([handler = std::move(handler)] {
|
||||
handler();
|
||||
});
|
||||
return make_cancelled<T, E, P2>();
|
||||
}
|
||||
else {
|
||||
m_data->finallyHandler = handler;
|
||||
}
|
||||
return *this;
|
||||
|
||||
return Promise<T, E, P2>([data = m_data, callback](auto fwdStateToNextPromise) {
|
||||
data->callback = [fwdStateToNextPromise, callback](auto&& state) {
|
||||
if (state.index() == STATE_PROGRESS_INDEX) {
|
||||
auto mapped = callback(std::get<STATE_PROGRESS_INDEX>(state));
|
||||
fwdStateToNextPromise(Promise<T, E, P2>::State(std::in_place_index<STATE_PROGRESS_INDEX>, mapped));
|
||||
}
|
||||
else {
|
||||
fwdStateToNextPromise(state);
|
||||
}
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel the Promise. Removes all listeners and sets the signal for
|
||||
* cancelling. Whether or not the promise actually can interrupt its
|
||||
* operation depends on the Promise; as such, this is not guaranteed to
|
||||
* actually stop the operation that created the Promise, but it is
|
||||
* guaranteed that the listener will not be notified after a call to
|
||||
* cancel
|
||||
*/
|
||||
Promise& cancel() {
|
||||
if (m_data->cancellationToken) return *this;
|
||||
Promise finally(utils::MiniFunction<void()>&& callback) {
|
||||
if (m_data->cancelled) return make_cancelled();
|
||||
std::unique_lock<std::mutex> _(m_data->mutex);
|
||||
m_data->thenHandler = nullptr;
|
||||
m_data->expectHandler = nullptr;
|
||||
m_data->progressHandler = nullptr;
|
||||
m_data->finallyHandler = nullptr;
|
||||
m_data->cancellationToken.token = true;
|
||||
return *this;
|
||||
|
||||
// Check if this Promise has already been resolved, and if so
|
||||
// immediately queue the callback with the value
|
||||
if (m_data->result.has_value()) {
|
||||
Loader::get()->queueInMainThread([callback = std::move(callback)] {
|
||||
callback();
|
||||
});
|
||||
return make_cancelled();
|
||||
}
|
||||
|
||||
return Promise([data = m_data, callback](auto fwdStateToNextPromise) {
|
||||
data->callback = [fwdStateToNextPromise, callback](auto&& state) {
|
||||
if (state.index() == STATE_VALUE_INDEX || state.index() == STATE_ERROR_INDEX) {
|
||||
callback();
|
||||
}
|
||||
fwdStateToNextPromise(state);
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Link this Promise to be cancelled alongside another Promise
|
||||
*/
|
||||
Promise& link(PromiseCancellationToken otherCancellationToken) {
|
||||
if (m_data->cancellationToken) return *this;
|
||||
m_data->cancellationToken = otherCancellationToken;
|
||||
return *this;
|
||||
Promise cancelled(utils::MiniFunction<void()>&& callback) {
|
||||
if (m_data->cancelled) {
|
||||
Loader::get()->queueInMainThread([callback = std::move(callback)] {
|
||||
callback();
|
||||
});
|
||||
return make_cancelled();
|
||||
}
|
||||
std::unique_lock<std::mutex> _(m_data->mutex);
|
||||
|
||||
if (m_data->result.has_value()) {
|
||||
return make_cancelled();
|
||||
}
|
||||
|
||||
return Promise([data = m_data, callback](auto fwdStateToNextPromise) {
|
||||
data->callback = [fwdStateToNextPromise, callback](auto&& state) {
|
||||
if (state.index() == STATE_CANCELLED_INDEX) {
|
||||
callback();
|
||||
}
|
||||
fwdStateToNextPromise(state);
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
void resolve(Value&& value) {
|
||||
invoke_callback(State(std::in_place_index<STATE_VALUE_INDEX>, std::move(value)), m_data);
|
||||
}
|
||||
void reject(Error&& error) {
|
||||
invoke_callback(State(std::in_place_index<STATE_ERROR_INDEX>, std::move(error)), m_data);
|
||||
}
|
||||
void cancel() {
|
||||
m_data->cancelled = true;
|
||||
invoke_callback(State(std::in_place_index<STATE_CANCELLED_INDEX>, CancelledState()), m_data);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -212,73 +375,45 @@ namespace geode {
|
|||
PromiseEventFilter<T, E, P> listen();
|
||||
|
||||
private:
|
||||
struct Data {
|
||||
// Mutex for handlers & result
|
||||
struct Data final {
|
||||
std::mutex mutex;
|
||||
Then thenHandler;
|
||||
Expect expectHandler;
|
||||
Progress progressHandler;
|
||||
Finally finallyHandler;
|
||||
std::optional<std::variant<T, E>> result;
|
||||
PromiseCancellationToken cancellationToken;
|
||||
OnStateChange callback;
|
||||
std::optional<std::variant<Value, Error>> result;
|
||||
std::atomic_bool cancelled;
|
||||
};
|
||||
// This has to be a shared_ptr so that the data can persist even after
|
||||
// the future is destroyed, as well as to share it between resolve, reject, and the likes
|
||||
std::shared_ptr<Data> m_data;
|
||||
|
||||
static void invoke_executor(Executor&& executor, std::shared_ptr<Data> data) {
|
||||
executor(
|
||||
[data](auto&& value) {
|
||||
if (data->cancellationToken) return;
|
||||
std::unique_lock<std::mutex> _(data->mutex);
|
||||
bool handled = false;
|
||||
if (data->thenHandler) {
|
||||
Loader::get()->queueInMainThread([fun = std::move(data->thenHandler), v = std::move(value)] {
|
||||
fun(v);
|
||||
});
|
||||
handled = true;
|
||||
}
|
||||
if (data->finallyHandler) {
|
||||
Loader::get()->queueInMainThread([fun = std::move(data->finallyHandler)] {
|
||||
fun();
|
||||
});
|
||||
handled = true;
|
||||
}
|
||||
if (!handled) {
|
||||
data->result = std::variant<T, E>(std::in_place_index<0>, std::move(value));
|
||||
}
|
||||
},
|
||||
[data](auto&& error) {
|
||||
if (data->cancellationToken) return;
|
||||
std::unique_lock<std::mutex> _(data->mutex);
|
||||
bool handled = false;
|
||||
if (data->expectHandler) {
|
||||
Loader::get()->queueInMainThread([fun = std::move(data->expectHandler), v = std::move(error)] {
|
||||
fun(v);
|
||||
});
|
||||
handled = true;
|
||||
}
|
||||
if (data->finallyHandler) {
|
||||
Loader::get()->queueInMainThread([fun = std::move(data->finallyHandler)] {
|
||||
fun();
|
||||
});
|
||||
handled = true;
|
||||
}
|
||||
if (!handled) {
|
||||
data->result = std::variant<T, E>(std::in_place_index<1>, std::move(error));
|
||||
}
|
||||
},
|
||||
[data](auto&& p) {
|
||||
if (data->cancellationToken) return;
|
||||
std::unique_lock<std::mutex> _(data->mutex);
|
||||
if (auto handler = data->progressHandler) {
|
||||
Loader::get()->queueInMainThread([p = std::move(p), handler]() mutable {
|
||||
handler(std::move(p));
|
||||
});
|
||||
}
|
||||
},
|
||||
data->cancellationToken
|
||||
);
|
||||
template <class T2 = Value, class E2 = Error, class P2 = Progress>
|
||||
static Promise<T2, E2, P2> make_cancelled() {
|
||||
auto ret = Promise<T2, E2, P2>();
|
||||
ret.cancel();
|
||||
return std::move(ret);
|
||||
}
|
||||
|
||||
static void invoke_callback(State&& state, std::shared_ptr<Data> data) {
|
||||
if (data->cancelled) return;
|
||||
std::unique_lock<std::mutex> _(data->mutex);
|
||||
|
||||
if (data->callback) {
|
||||
data->callback(State(state));
|
||||
}
|
||||
|
||||
// Store the state to let future installed callbacks be immediately resolved
|
||||
if (state.index() == STATE_VALUE_INDEX) {
|
||||
data->result = std::variant<Value, Error>(std::in_place_index<0>, std::get<0>(std::move(state)));
|
||||
}
|
||||
else if (state.index() == STATE_ERROR_INDEX) {
|
||||
data->result = std::variant<Value, Error>(std::in_place_index<1>, std::get<1>(std::move(state)));
|
||||
}
|
||||
else if (state.index() == STATE_CANCELLED_INDEX) {
|
||||
data->cancelled = true;
|
||||
}
|
||||
}
|
||||
|
||||
static void invoke_source(utils::MiniFunction<void(OnStateChange)>&& source, std::shared_ptr<Data> data) {
|
||||
source([data](auto&& state) {
|
||||
invoke_callback(std::move(state), data);
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -35,35 +35,35 @@ static Result<matjson::Value, ServerError> parseServerPayload(web::WebResponse&&
|
|||
return Ok(obj["payload"]);
|
||||
}
|
||||
|
||||
static void parseServerError(auto reject, auto error) {
|
||||
static ServerError parseServerError(auto error) {
|
||||
// The server should return errors as `{ "error": "...", "payload": "" }`
|
||||
if (auto asJson = error.json()) {
|
||||
auto json = asJson.unwrap();
|
||||
if (json.is_object() && json.contains("error")) {
|
||||
reject(ServerError(
|
||||
return ServerError(
|
||||
error.code(),
|
||||
"{}", json.template get<std::string>("error")
|
||||
));
|
||||
);
|
||||
}
|
||||
else {
|
||||
reject(ServerError(error.code(), "Unknown (not valid JSON)"));
|
||||
return ServerError(error.code(), "Unknown (not valid JSON)");
|
||||
}
|
||||
}
|
||||
// But if we get something else for some reason, return that
|
||||
else {
|
||||
reject(ServerError(
|
||||
return ServerError(
|
||||
error.code(),
|
||||
"{}", error.string().unwrapOr("Unknown (not a valid string)")
|
||||
));
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
static void parseServerProgress(auto progress, auto prog, auto msg) {
|
||||
static ServerProgress parseServerProgress(auto prog, auto msg) {
|
||||
if (auto per = prog.downloadProgress()) {
|
||||
progress({ msg, static_cast<uint8_t>(*per) });
|
||||
return ServerProgress(msg, static_cast<uint8_t>(*per));
|
||||
}
|
||||
else {
|
||||
progress({ msg });
|
||||
return ServerProgress(msg);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -335,83 +335,83 @@ ServerPromise<ServerModsList> server::getMods(ModsQuery const& query) {
|
|||
req.param("page", std::to_string(query.page + 1));
|
||||
req.param("per_page", std::to_string(query.pageSize));
|
||||
|
||||
return ServerPromise<ServerModsList>([req = std::move(req)](auto resolve, auto reject, auto progress, auto cancel) mutable {
|
||||
req.get(getServerAPIBaseURL() + "/mods")
|
||||
.then([resolve, reject](auto value) {
|
||||
return req.get(getServerAPIBaseURL() + "/mods")
|
||||
.then<ServerModsList, ServerError>([](auto result) -> Result<ServerModsList, ServerError> {
|
||||
if (result) {
|
||||
auto value = std::move(result).unwrap();
|
||||
|
||||
// Store the code, since the value is moved afterwards
|
||||
auto code = value.code();
|
||||
|
||||
// Parse payload
|
||||
auto payload = parseServerPayload(std::move(value));
|
||||
if (!payload) {
|
||||
return reject(payload.unwrapErr());
|
||||
return Err(payload.unwrapErr());
|
||||
}
|
||||
// Parse response
|
||||
auto list = ServerModsList::parse(payload.unwrap());
|
||||
if (!list) {
|
||||
return reject(ServerError(code, "Unable to parse response: {}", list.unwrapErr()));
|
||||
return Err(ServerError(code, "Unable to parse response: {}", list.unwrapErr()));
|
||||
}
|
||||
resolve(list.unwrap());
|
||||
})
|
||||
.expect([resolve, reject](auto error) {
|
||||
return Ok(list.unwrap());
|
||||
}
|
||||
else {
|
||||
auto error = std::move(result).unwrapErr();
|
||||
// Treat a 404 as empty mods list
|
||||
if (error.code() == 404) {
|
||||
return resolve(ServerModsList());
|
||||
return Ok(ServerModsList());
|
||||
}
|
||||
parseServerError(reject, error);
|
||||
})
|
||||
.progress([progress](auto prog) {
|
||||
parseServerProgress(progress, prog, "Downloading mods");
|
||||
})
|
||||
.link(cancel);
|
||||
});
|
||||
return Err(parseServerError(error));
|
||||
}
|
||||
})
|
||||
.progress<ServerProgress>([](auto prog) {
|
||||
return parseServerProgress(prog, "Downloading mods");
|
||||
});
|
||||
}
|
||||
|
||||
ServerPromise<ServerModMetadata> server::getMod(std::string const& id) {
|
||||
auto req = web::WebRequest();
|
||||
req.userAgent(getServerUserAgent());
|
||||
return ServerPromise<ServerModMetadata>([req = std::move(req), id](auto resolve, auto reject, auto progress, auto cancel) mutable {
|
||||
req.get(getServerAPIBaseURL() + "/mods/" + id)
|
||||
.then([resolve, reject](auto value) {
|
||||
return req.get(getServerAPIBaseURL() + "/mods/" + id)
|
||||
.then<ServerModMetadata, ServerError>([](auto result) -> Result<ServerModMetadata, ServerError> {
|
||||
if (result) {
|
||||
auto value = result.unwrap();
|
||||
|
||||
// Store the code, since the value is moved afterwards
|
||||
auto code = value.code();
|
||||
|
||||
// Parse payload
|
||||
auto payload = parseServerPayload(std::move(value));
|
||||
if (!payload) {
|
||||
return reject(payload.unwrapErr());
|
||||
return Err(payload.unwrapErr());
|
||||
}
|
||||
// Parse response
|
||||
auto list = ServerModMetadata::parse(payload.unwrap());
|
||||
if (!list) {
|
||||
return reject(ServerError(code, "Unable to parse response: {}", list.unwrapErr()));
|
||||
return Err(ServerError(code, "Unable to parse response: {}", list.unwrapErr()));
|
||||
}
|
||||
resolve(list.unwrap());
|
||||
})
|
||||
.expect([reject](auto error) {
|
||||
parseServerError(reject, error);
|
||||
})
|
||||
.progress([progress, id](auto prog) {
|
||||
parseServerProgress(progress, prog, "Downloading logo for " + id);
|
||||
})
|
||||
.link(cancel);
|
||||
});
|
||||
return Ok(list.unwrap());
|
||||
}
|
||||
else {
|
||||
return Err(parseServerError(result.unwrapErr()));
|
||||
}
|
||||
})
|
||||
.progress<ServerProgress>([id](auto prog) {
|
||||
return parseServerProgress(prog, "Downloading metadata for " + id);
|
||||
});
|
||||
}
|
||||
|
||||
ServerPromise<ByteVector> server::getModLogo(std::string const& id) {
|
||||
auto req = web::WebRequest();
|
||||
req.userAgent(getServerUserAgent());
|
||||
return ServerPromise<ByteVector>([req = std::move(req), id](auto resolve, auto reject, auto progress, auto cancel) mutable {
|
||||
req.get(getServerAPIBaseURL() + "/mods/" + id + "/logo")
|
||||
.then([resolve](auto response) {
|
||||
resolve(response.data());
|
||||
})
|
||||
.expect([reject](auto error) {
|
||||
parseServerError(reject, error);
|
||||
})
|
||||
.progress([progress, id](auto prog) {
|
||||
parseServerProgress(progress, prog, "Downloading logo for " + id);
|
||||
})
|
||||
.link(cancel);
|
||||
});
|
||||
return req.get(getServerAPIBaseURL() + "/mods/" + id + "/logo")
|
||||
.then<ByteVector>([](auto response) {
|
||||
return response.data();
|
||||
})
|
||||
.expect<ServerError>([](auto error) {
|
||||
return parseServerError(error);
|
||||
})
|
||||
.progress<ServerProgress>([id](auto prog) {
|
||||
return parseServerProgress(prog, "Downloading logo for " + id);
|
||||
});
|
||||
}
|
||||
|
|
|
@ -90,8 +90,9 @@ namespace server {
|
|||
Args&&... args
|
||||
) : code(code), details(fmt::vformat(format, fmt::make_format_args(args...))) {}
|
||||
};
|
||||
using ServerProgress = Promise<>::Progress;
|
||||
template <class T>
|
||||
using ServerPromise = Promise<T, ServerError>;
|
||||
using ServerPromise = Promise<T, ServerError, ServerProgress>;
|
||||
|
||||
std::string getServerAPIBaseURL();
|
||||
std::string getServerUserAgent();
|
||||
|
@ -115,10 +116,12 @@ namespace server {
|
|||
using Extract = decltype(detail::ExtractServerReqParams(F));
|
||||
using Result = Extract::Result;
|
||||
using Query = Extract::Query;
|
||||
using Cached = std::variant<ServerPromise<Result>, Result>;
|
||||
|
||||
private:
|
||||
class Cache final {
|
||||
using Pending = ServerPromise<Result>;
|
||||
using CachedOrPending = std::variant<Pending, Result>;
|
||||
|
||||
std::mutex m_mutex;
|
||||
// I know this looks like a goofy choice over just
|
||||
// `std::unordered_map`, but hear me out:
|
||||
|
@ -142,18 +145,55 @@ namespace server {
|
|||
// lightning-fast (🚀), and besides the main performance benefit
|
||||
// comes from the lack of a web request - not how many extra
|
||||
// milliseconds we can squeeze out of a map access
|
||||
std::vector<std::pair<Query, Result>> m_values;
|
||||
std::vector<std::pair<Query, CachedOrPending>> m_values;
|
||||
size_t m_sizeLimit = 20;
|
||||
|
||||
template <class As>
|
||||
As* find(Query const& query) {
|
||||
auto it = std::find_if(m_values.begin(), m_values.end(), [](auto const& q) {
|
||||
return q.first == query;
|
||||
});
|
||||
if (it == m_values.end()) {
|
||||
return nullptr;
|
||||
}
|
||||
if (auto as = std::get_if<As>(&it->second)) {
|
||||
return as;
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
public:
|
||||
std::optional<Result> get(Query const& query) {
|
||||
std::unique_lock _(m_mutex);
|
||||
// mfw no std::optional::map
|
||||
if (auto found = ranges::find(m_values, [&query](auto const& q) { return q.first == query; })) {
|
||||
return found->second;
|
||||
if (auto v = this->template find<Result>(query)) {
|
||||
return *v;
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
ServerPromise<Result> pend(Query&& query, auto func) {
|
||||
std::unique_lock _(m_mutex);
|
||||
ServerPromise<Result> ret;
|
||||
if (auto prev = this->template find<Pending>(query)) {
|
||||
return prev;
|
||||
}
|
||||
else {
|
||||
ret = ServerPromise<Result>([this, func, query = Query(query)](auto resolve, auto reject, auto progress, auto cancelled) {
|
||||
func(Query(query))
|
||||
.then([this, resolve, query = std::move(query)](auto res) {
|
||||
this->add(Query(query), Result(res));
|
||||
resolve(res);
|
||||
})
|
||||
.expect([reject](auto err) {
|
||||
reject(err);
|
||||
})
|
||||
.progress([progress](auto prog) {
|
||||
progress(prog);
|
||||
})
|
||||
.link(cancelled);
|
||||
});
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
void add(Query&& query, Result&& result) {
|
||||
std::unique_lock _(m_mutex);
|
||||
auto value = std::make_pair(std::move(query), std::move(result));
|
||||
|
@ -186,32 +226,15 @@ namespace server {
|
|||
Cache m_cache;
|
||||
|
||||
ServerPromise<Result> fetch(Query const& query) {
|
||||
return ServerPromise<Result>([this, query = Query(query)](auto resolve, auto reject, auto progress, auto cancelled) {
|
||||
F(Query(query))
|
||||
.then([this, resolve, query = std::move(query)](auto res) {
|
||||
m_cache.add(Query(query), Result(res));
|
||||
resolve(res);
|
||||
})
|
||||
.expect([reject](auto err) {
|
||||
reject(err);
|
||||
})
|
||||
.progress([progress](auto prog) {
|
||||
progress(prog);
|
||||
})
|
||||
.link(cancelled);
|
||||
});
|
||||
}
|
||||
|
||||
static ServerResultCache makeShared() {
|
||||
listenForSettingChanges<int64_t>("server-cache-size-limit", +[](int64_t size) {
|
||||
ServerResultCache::shared().setSizeLimit(size);
|
||||
});
|
||||
return ServerResultCache();
|
||||
return m_cache.pend(Query(query));
|
||||
}
|
||||
|
||||
public:
|
||||
static ServerResultCache<F>& shared() {
|
||||
static auto inst = makeShared();
|
||||
static auto inst = ServerResultCache();
|
||||
static auto _ = listenForSettingChanges<int64_t>("server-cache-size-limit", +[](int64_t size) {
|
||||
ServerResultCache::shared().setSizeLimit(size);
|
||||
});
|
||||
return inst;
|
||||
}
|
||||
|
||||
|
|
|
@ -89,24 +89,21 @@ static auto loadInstalledModsPage(server::ModsQuery&& query) {
|
|||
}
|
||||
|
||||
static auto loadServerModsPage(server::ModsQuery&& query) {
|
||||
return ModListSource::ProviderPromise([query = std::move(query)](auto resolve, auto reject, auto progress, auto cancelled) {
|
||||
server::getMods(query)
|
||||
.then([resolve, reject](server::ServerModsList list) {
|
||||
return server::getMods(query)
|
||||
.then<ModListSource::ProvidedMods>([](server::ServerModsList list) {
|
||||
auto content = ModListSource::ProvidedMods();
|
||||
for (auto&& mod : std::move(list.mods)) {
|
||||
content.mods.push_back(ModSource(std::move(mod)));
|
||||
}
|
||||
content.totalModCount = list.totalModCount;
|
||||
resolve(content);
|
||||
return content;
|
||||
})
|
||||
.expect([reject](auto error) {
|
||||
reject(ModListSource::LoadPageError("Error loading mods", error.details));
|
||||
.expect<ModListSource::LoadPageError>([](auto error) {
|
||||
return ModListSource::LoadPageError("Error loading mods", error.details);
|
||||
})
|
||||
.progress([progress](auto prog) {
|
||||
progress(prog.percentage);
|
||||
})
|
||||
.link(cancelled);
|
||||
});
|
||||
.progress<std::optional<uint8_t>>([](auto prog) {
|
||||
return prog.percentage;
|
||||
});
|
||||
}
|
||||
|
||||
typename ModListSource::PagePromise ModListSource::loadPage(size_t page, bool update) {
|
||||
|
@ -124,33 +121,30 @@ typename ModListSource::PagePromise ModListSource::loadPage(size_t page, bool up
|
|||
});
|
||||
}
|
||||
m_cachedPages.erase(page);
|
||||
return PagePromise([this, page](auto resolve, auto reject, auto progress, auto cancelled) {
|
||||
m_provider.get(server::ModsQuery {
|
||||
.query = m_query,
|
||||
.page = page,
|
||||
// todo: loader setting to change this
|
||||
.pageSize = PER_PAGE,
|
||||
})
|
||||
.then([page, this, resolve, reject](auto data) {
|
||||
if (data.totalModCount == 0 || data.mods.empty()) {
|
||||
return reject(ModListSource::LoadPageError("No mods found :("));
|
||||
}
|
||||
auto pageData = Page();
|
||||
for (auto mod : std::move(data.mods)) {
|
||||
pageData.push_back(ModItem::create(std::move(mod)));
|
||||
}
|
||||
m_cachedItemCount = data.totalModCount;
|
||||
m_cachedPages.insert({ page, pageData });
|
||||
resolve(pageData);
|
||||
})
|
||||
.expect([this, reject = reject](auto error) {
|
||||
reject(error);
|
||||
})
|
||||
.progress([this, progress = std::move(progress)](auto prog) {
|
||||
progress(prog);
|
||||
})
|
||||
.link(cancelled);
|
||||
}, false);
|
||||
return m_provider.get(server::ModsQuery {
|
||||
.query = m_query,
|
||||
.page = page,
|
||||
// todo: loader setting to change this
|
||||
.pageSize = PER_PAGE,
|
||||
})
|
||||
.then<Page, ModListSource::LoadPageError>([page, this](auto result) -> Result<Page, ModListSource::LoadPageError> {
|
||||
if (result) {
|
||||
auto data = result.unwrap();
|
||||
if (data.totalModCount == 0 || data.mods.empty()) {
|
||||
return Err(ModListSource::LoadPageError("No mods found :("));
|
||||
}
|
||||
auto pageData = Page();
|
||||
for (auto mod : std::move(data.mods)) {
|
||||
pageData.push_back(ModItem::create(std::move(mod)));
|
||||
}
|
||||
m_cachedItemCount = data.totalModCount;
|
||||
m_cachedPages.insert({ page, pageData });
|
||||
return Ok(pageData);
|
||||
}
|
||||
else {
|
||||
return Err(result.unwrapErr());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
std::optional<size_t> ModListSource::getPageCount() const {
|
||||
|
|
|
@ -131,12 +131,10 @@ WebPromise WebRequest::send(std::string_view method, std::string_view url) {
|
|||
WebResponse response;
|
||||
Impl* impl;
|
||||
WebPromise::Progress progress;
|
||||
PromiseCancellationToken cancelled;
|
||||
} responseData = {
|
||||
.response = WebResponse(),
|
||||
.impl = impl.get(),
|
||||
.progress = progress,
|
||||
.cancelled = cancelled,
|
||||
};
|
||||
|
||||
// Store downloaded response data into a byte vector
|
||||
|
|
Loading…
Reference in a new issue