remove spinlocks & make sent async response pimpl

This commit is contained in:
altalk23 2022-12-12 15:40:05 +03:00
parent ca61385f0d
commit a47937c379
4 changed files with 128 additions and 41 deletions
loader
include/Geode/utils
src

View file

@ -75,20 +75,8 @@ namespace geode::utils::web {
*/
class SentAsyncWebRequest {
private:
std::string m_id;
std::string m_url;
std::vector<AsyncThen> m_thens;
std::vector<AsyncExpect> m_expects;
std::vector<AsyncProgress> m_progresses;
std::vector<AsyncCancelled> m_cancelleds;
std::atomic<bool> m_paused = true;
std::atomic<bool> m_cancelled = false;
std::atomic<bool> m_finished = false;
std::atomic<bool> m_cleanedUp = false;
mutable std::mutex m_mutex;
std::variant<std::monostate, std::ostream*, ghc::filesystem::path> m_target =
std::monostate();
std::vector<std::string> m_httpHeaders;
class Impl;
std::shared_ptr<Impl> m_impl;
template <class T>
friend class AsyncWebResult;
@ -101,9 +89,11 @@ namespace geode::utils::web {
public:
/**
* Do not call this manually.
* Do not call these manually.
*/
SentAsyncWebRequest(AsyncWebRequest const&, std::string const& id);
SentAsyncWebRequest();
~SentAsyncWebRequest();
static std::shared_ptr<SentAsyncWebRequest> create(AsyncWebRequest const&, std::string const& id);
/**
* Cancel the request. Cleans up any downloaded files, but if you run

View file

@ -380,7 +380,10 @@ void Loader::Impl::updateAllDependencies() {
}
void Loader::Impl::waitForModsToBeLoaded() {
while (!m_earlyLoadFinished) {}
auto lock = std::unique_lock(m_earlyLoadFinishedMutex);
m_earlyLoadFinishedCV.wait(lock, [this] {
return bool(m_earlyLoadFinished);
});
}
bool Loader::Impl::didLastLaunchCrash() const {

View file

@ -49,6 +49,9 @@ public:
std::vector<ScheduledFunction> m_scheduledFunctions;
mutable std::mutex m_scheduledFunctionsMutex;
bool m_isSetup = false;
std::condition_variable m_earlyLoadFinishedCV;
std::mutex m_earlyLoadFinishedMutex;
std::atomic_bool m_earlyLoadFinished = false;
// InternalLoader

View file

@ -125,17 +125,70 @@ Result<std::string> web::fetch(std::string const& url) {
return Err("Error getting info: " + std::string(curl_easy_strerror(res)));
}
class SentAsyncWebRequest::Impl {
private:
enum class Status {
Paused,
Running,
Finished,
Cancelled,
CleanedUp,
};
std::string m_id;
std::string m_url;
std::vector<AsyncThen> m_thens;
std::vector<AsyncExpect> m_expects;
std::vector<AsyncProgress> m_progresses;
std::vector<AsyncCancelled> m_cancelleds;
Status m_status = Status::Paused;
std::atomic<bool> m_paused = true;
std::atomic<bool> m_cancelled = false;
std::atomic<bool> m_finished = false;
std::atomic<bool> m_cleanedUp = false;
std::condition_variable m_statusCV;
std::mutex m_statusMutex;
SentAsyncWebRequest* m_self;
mutable std::mutex m_mutex;
std::variant<std::monostate, std::ostream*, ghc::filesystem::path> m_target =
std::monostate();
std::vector<std::string> m_httpHeaders;
template <class T>
friend class AsyncWebResult;
friend class AsyncWebRequest;
void pause();
void resume();
void error(std::string const& error);
void doCancel();
public:
Impl(SentAsyncWebRequest* self, AsyncWebRequest const&, std::string const& id);
void cancel();
bool finished() const;
friend class SentAsyncWebRequest;
};
static std::unordered_map<std::string, SentAsyncWebRequestHandle> RUNNING_REQUESTS{};
static std::mutex RUNNING_REQUESTS_MUTEX;
SentAsyncWebRequest::SentAsyncWebRequest(AsyncWebRequest const& req, std::string const& id) :
SentAsyncWebRequest::Impl::Impl(SentAsyncWebRequest* self, AsyncWebRequest const& req, std::string const& id) :
m_id(id), m_url(req.m_url), m_target(req.m_target), m_httpHeaders(req.m_httpHeaders) {
#define AWAIT_RESUME() \
while (m_paused) {} \
if (m_cancelled) { \
this->doCancel(); \
return; \
}
{\
auto lock = std::unique_lock(m_statusMutex);\
m_statusCV.wait(lock, [this]() { \
return !m_paused; \
});\
if (m_cancelled) {\
this->doCancel();\
return;\
}\
}\
if (req.m_then) m_thens.push_back(req.m_then);
if (req.m_progress) m_progresses.push_back(req.m_progress);
@ -188,7 +241,7 @@ SentAsyncWebRequest::SentAsyncWebRequest(AsyncWebRequest const& req, std::string
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
struct ProgressData {
SentAsyncWebRequest* self;
SentAsyncWebRequest::Impl* self;
std::ofstream* file;
} data{this, file.get()};
@ -197,7 +250,10 @@ SentAsyncWebRequest::SentAsyncWebRequest(AsyncWebRequest const& req, std::string
CURLOPT_PROGRESSFUNCTION,
+[](void* ptr, double total, double now, double, double) -> int {
auto data = static_cast<ProgressData*>(ptr);
while (data->self->m_paused) {}
auto lock = std::unique_lock(data->self->m_statusMutex);
data->self->m_statusCV.wait(lock, [data]() {
return !data->self->m_paused;
});
if (data->self->m_cancelled) {
if (data->file) {
data->file->close();
@ -207,7 +263,7 @@ SentAsyncWebRequest::SentAsyncWebRequest(AsyncWebRequest const& req, std::string
Loader::get()->queueInGDThread([self = data->self, now, total]() {
std::lock_guard _(self->m_mutex);
for (auto& prog : self->m_progresses) {
prog(*self, now, total);
prog(*self->m_self, now, total);
}
});
return 0;
@ -229,7 +285,7 @@ SentAsyncWebRequest::SentAsyncWebRequest(AsyncWebRequest const& req, std::string
Loader::get()->queueInGDThread([this, ret]() {
std::lock_guard _(m_mutex);
for (auto& then : m_thens) {
then(*this, ret);
then(*this->m_self, ret);
}
std::lock_guard __(RUNNING_REQUESTS_MUTEX);
RUNNING_REQUESTS.erase(m_id);
@ -237,7 +293,7 @@ SentAsyncWebRequest::SentAsyncWebRequest(AsyncWebRequest const& req, std::string
}).detach();
}
void SentAsyncWebRequest::doCancel() {
void SentAsyncWebRequest::Impl::doCancel() {
if (m_cleanedUp) return;
m_cleanedUp = true;
@ -256,14 +312,14 @@ void SentAsyncWebRequest::doCancel() {
Loader::get()->queueInGDThread([this]() {
std::lock_guard _(m_mutex);
for (auto& canc : m_cancelleds) {
canc(*this);
canc(*this->m_self);
}
});
this->error("Request cancelled");
}
void SentAsyncWebRequest::cancel() {
void SentAsyncWebRequest::Impl::cancel() {
m_cancelled = true;
// if already finished, cancel anyway to clean up
if (m_finished) {
@ -271,20 +327,23 @@ void SentAsyncWebRequest::cancel() {
}
}
void SentAsyncWebRequest::pause() {
void SentAsyncWebRequest::Impl::pause() {
m_paused = true;
}
void SentAsyncWebRequest::resume() {
void SentAsyncWebRequest::Impl::resume() {
m_paused = false;
}
bool SentAsyncWebRequest::finished() const {
bool SentAsyncWebRequest::Impl::finished() const {
return m_finished;
}
void SentAsyncWebRequest::error(std::string const& error) {
while (m_paused) {};
void SentAsyncWebRequest::Impl::error(std::string const& error) {
auto lock = std::unique_lock(m_statusMutex);
m_statusCV.wait(lock, [this]() {
return bool(m_paused);
});
Loader::get()->queueInGDThread([this, error]() {
std::lock_guard _(m_mutex);
for (auto& expect : m_expects) {
@ -295,6 +354,38 @@ void SentAsyncWebRequest::error(std::string const& error) {
});
}
SentAsyncWebRequest::SentAsyncWebRequest() : m_impl() {}
SentAsyncWebRequest::~SentAsyncWebRequest() {}
std::shared_ptr<SentAsyncWebRequest> SentAsyncWebRequest::create(AsyncWebRequest const& request, std::string const& id) {
auto ret = std::make_shared<SentAsyncWebRequest>();
ret->m_impl = std::move(std::make_shared<SentAsyncWebRequest::Impl>(ret.get(), request, id));
return ret;
}
void SentAsyncWebRequest::doCancel() {
return m_impl->doCancel();
}
void SentAsyncWebRequest::cancel() {
return m_impl->cancel();
}
void SentAsyncWebRequest::pause() {
return m_impl->pause();
}
void SentAsyncWebRequest::resume() {
return m_impl->resume();
}
bool SentAsyncWebRequest::finished() const {
return m_impl->finished();
}
void SentAsyncWebRequest::error(std::string const& error) {
return m_impl->error(error);
}
AsyncWebRequest& AsyncWebRequest::join(std::string const& requestID) {
m_joinID = requestID;
return *this;
@ -341,16 +432,16 @@ SentAsyncWebRequestHandle AsyncWebRequest::send() {
static size_t COUNTER = 0;
if (m_joinID && RUNNING_REQUESTS.count(m_joinID.value())) {
auto& req = RUNNING_REQUESTS.at(m_joinID.value());
std::lock_guard _(req->m_mutex);
if (m_then) req->m_thens.push_back(m_then);
if (m_progress) req->m_progresses.push_back(m_progress);
if (m_expect) req->m_expects.push_back(m_expect);
if (m_cancelled) req->m_cancelleds.push_back(m_cancelled);
std::lock_guard _(req->m_impl->m_mutex);
if (m_then) req->m_impl->m_thens.push_back(m_then);
if (m_progress) req->m_impl->m_progresses.push_back(m_progress);
if (m_expect) req->m_impl->m_expects.push_back(m_expect);
if (m_cancelled) req->m_impl->m_cancelleds.push_back(m_cancelled);
ret = req;
}
else {
auto id = m_joinID.value_or("__anon_request_" + std::to_string(COUNTER++));
ret = std::make_shared<SentAsyncWebRequest>(*this, id);
ret = SentAsyncWebRequest::create(*this, id);
RUNNING_REQUESTS.insert({id, ret});
}