From e048f1b9996352f31d9458ef3d9e1dfc6443985e Mon Sep 17 00:00:00 2001 From: Adrien Beraud Date: Wed, 5 Feb 2025 14:04:17 -0500 Subject: [PATCH] conversation: mutex is History Change-Id: I178b437e99cf31c13622df136d635ff2147a04a9 --- src/jamidht/conversation.cpp | 67 +++++++++++++++++------------------- 1 file changed, 31 insertions(+), 36 deletions(-) diff --git a/src/jamidht/conversation.cpp b/src/jamidht/conversation.cpp index 50bf4c1e9..bf658536b 100644 --- a/src/jamidht/conversation.cpp +++ b/src/jamidht/conversation.cpp @@ -122,6 +122,12 @@ using MessageList = std::list>; struct History { + // While loading the history, we need to avoid: + // - reloading history (can just be ignored) + // - adding new commits (should wait for history to be loaded) + std::mutex mutex {}; + std::condition_variable cv {}; + bool loading {false}; MessageList messageList {}; std::map> quickAccess {}; std::map>> pendingEditions {}; @@ -670,19 +676,12 @@ public: /** * Loaded history represents the linearized history to show for clients */ - mutable std::mutex loadingMtx_; mutable History loadedHistory_ {}; std::vector> addToHistory( const std::vector>& commits, bool messageReceived = false, bool commitFromSelf = false, History* history = nullptr) const; - // While loading the history, we need to avoid: - // - reloading history (can just be ignored) - // - adding new commits (should wait for history to be loaded) - bool isLoadingHistory_ {false}; - mutable std::mutex historyMtx_ {}; - mutable std::condition_variable historyCv_ {}; void handleReaction(History& history, const std::shared_ptr& sharedCommit) const; @@ -879,15 +878,13 @@ Conversation::Impl::loadMessages(const LogOptions& options) std::vector Conversation::Impl::loadMessages2(const LogOptions& options, History* optHistory) { + auto history = optHistory ? optHistory : &loadedHistory_; - std::lock_guard lk(loadingMtx_); - - if (!optHistory) { - std::lock_guard lock(historyMtx_); - if (!repository_ || isLoadingHistory_) - return {}; - isLoadingHistory_ = true; + // history->mutex is locked by the caller + if (!repository_ || history->loading) { + return {}; } + history->loading = true; // By convention, if options.nbOfCommits is zero, then we // don't impose a limit on the number of commits returned. @@ -956,17 +953,17 @@ Conversation::Impl::loadMessages2(const LogOptions& options, History* optHistory replies.erase(it); } std::shared_ptr firstMsg; - if (!optHistory && msgList.empty() && !loadedHistory_.messageList.empty()) { + if ((history == &loadedHistory_) && msgList.empty() && !loadedHistory_.messageList.empty()) { firstMsg = *loadedHistory_.messageList.rbegin(); } - auto added = addToHistory({message}, false, false, optHistory); + auto added = addToHistory({message}, false, false, history); if (!added.empty() && firstMsg) { emitSignal(accountId_, repository_->id(), *firstMsg); } msgList.insert(msgList.end(), added.begin(), added.end()); - }, + }, /* postCondition */ [&](auto, auto, auto) { // Stop logging if there was a limit set on the number of commits @@ -979,18 +976,15 @@ Conversation::Impl::loadMessages2(const LogOptions& options, History* optHistory options.from, options.logIfNotFound); + history->loading = false; + history->cv.notify_all(); + // Convert for client (remove ptr) std::vector ret; ret.reserve(msgList.size()); for (const auto& msg: msgList) { ret.emplace_back(*msg); } - if (!optHistory) { - std::lock_guard lock(historyMtx_); - isLoadingHistory_ = false; - historyCv_.notify_all(); - } - return ret; } @@ -1180,9 +1174,9 @@ Conversation::Impl::addToHistory(const std::vectorgetUsername(); - if (messageReceived && (!optHistory && isLoadingHistory_)) { - std::unique_lock lk(historyMtx_); - historyCv_.wait(lk, [&] { return !isLoadingHistory_; }); + if (messageReceived && (optHistory == &loadedHistory_ && optHistory->loading)) { + std::unique_lock lk(optHistory->mutex); + optHistory->cv.wait(lk, [&] { return !optHistory->loading; }); } std::vector> messages; auto addCommit = [&](const auto& commit) { @@ -1200,7 +1194,7 @@ Conversation::Impl::addToHistory(const std::vector(); sharedCommit->fromMapStringString(commit); // Set message status based on cache (only on history for client) - if (!commitFromSelf && optHistory == nullptr) { + if (!commitFromSelf && optHistory == &loadedHistory_) { std::lock_guard lk(messageStatusMtx_); for (const auto& member: repository_->members()) { // If we have a status cached, use it @@ -1708,6 +1702,7 @@ Conversation::loadMessages2(const OnLoadMessages2& cb, const LogOptions& options return; dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] { if (auto sthis = w.lock()) { + std::lock_guard lk(sthis->pimpl_->loadedHistory_.mutex); cb(sthis->pimpl_->loadMessages2(options)); } }); @@ -1716,7 +1711,7 @@ Conversation::loadMessages2(const OnLoadMessages2& cb, const LogOptions& options void Conversation::clearCache() { - std::lock_guard lk(pimpl_->loadingMtx_); + std::lock_guard lk(pimpl_->loadedHistory_.mutex); pimpl_->loadedHistory_.messageList.clear(); pimpl_->loadedHistory_.quickAccess.clear(); pimpl_->loadedHistory_.pendingEditions.clear(); @@ -1730,17 +1725,16 @@ Conversation::clearCache() std::string Conversation::lastCommitId() const { + { + std::lock_guard lk(pimpl_->loadedHistory_.mutex); + if (!pimpl_->loadedHistory_.messageList.empty()) + return (*pimpl_->loadedHistory_.messageList.begin())->id; + } LogOptions options; options.nbOfCommits = 1; options.skipMerge = true; History optHistory; - { - std::lock_guard lk(pimpl_->historyMtx_); - if (!pimpl_->loadedHistory_.messageList.empty()) - return (*pimpl_->loadedHistory_.messageList.begin())->id; - } - - std::lock_guard lk(pimpl_->writeMtx_); + std::scoped_lock lock(pimpl_->writeMtx_, optHistory.mutex); auto res = pimpl_->loadMessages2(options, &optHistory); if (res.empty()) return {}; @@ -2241,7 +2235,7 @@ Conversation::Impl::updateStatus(const std::string& uri, options.logIfNotFound = false; options.fastLog = true; History optHistory; - std::lock_guard lk(historyMtx_); // Avoid to announce messages while updating status. + std::lock_guard lk(optHistory.mutex); // Avoid to announce messages while updating status. auto res = loadMessages2(options, &optHistory); if (res.size() == 0) { // In this case, commit is not received yet, so we cache it @@ -2562,6 +2556,7 @@ Conversation::countInteractions(const std::string& toId, options.logIfNotFound = false; options.fastLog = true; History history; + std::lock_guard lk(history.mutex); auto res = pimpl_->loadMessages2(options, &history); return res.size(); }