From 66d5d955577a57e7a7afe0f7ea00b924ee0a3b26 Mon Sep 17 00:00:00 2001 From: Olivier Dion Date: Fri, 13 Aug 2021 09:41:49 -0400 Subject: [PATCH] agent: Fix possible race conditions Change-Id: Ie6c21d1505f218a0de3bf79aec4565fc3713a922 --- test/agent/agent.cpp | 107 ++++++++++++++++++++++++++++--------------- 1 file changed, 71 insertions(+), 36 deletions(-) diff --git a/test/agent/agent.cpp b/test/agent/agent.cpp index c29e179e9..aa656f8cb 100644 --- a/test/agent/agent.cpp +++ b/test/agent/agent.cpp @@ -57,8 +57,9 @@ Agent::ping(const std::string& conversation) { LOG_AGENT_STATE(); - auto cv = std::make_shared(); - auto pongReceived = std::make_shared(false); + std::mutex mtx; + std::condition_variable cv; + std::atomic pongReceived(false); std::string alphabet = "0123456789ABCDEF"; std::string messageSent; @@ -67,7 +68,7 @@ Agent::ping(const std::string& conversation) messageSent.push_back(alphabet[rand() % alphabet.size()]); } - onMessageReceived_.add([=](const std::string& /* accountID */, + onMessageReceived_.add([&](const std::string& /* accountID */, const std::string& conversationID, std::map message) { @@ -77,14 +78,15 @@ Agent::ping(const std::string& conversation) auto msg = message.at("body"); - if (pongReceived->load()) { + if (pongReceived.load()) { return false; } if (conversationID == conversation and message.at("author") != peerID_ and msg == "PONG:" + messageSent) { - *pongReceived = true; - cv->notify_one(); + std::unique_lock lk(mtx); + pongReceived.store(true); + cv.notify_one(); return false; } @@ -97,11 +99,10 @@ Agent::ping(const std::string& conversation) /* Waiting for echo */ - std::mutex mutex; - std::unique_lock lck(mutex); + std::unique_lock lk(mtx); - bool ret = std::cv_status::no_timeout == cv->wait_for(lck, std::chrono::seconds(30)) - and pongReceived->load(); + bool ret = (std::cv_status::no_timeout == cv.wait_for(lk, std::chrono::seconds(30)) and + pongReceived.load()); AGENT_INFO("Pong %s", ret ? "received" : "missing"); @@ -142,20 +143,19 @@ Agent::placeCall(const std::string& contact) { LOG_AGENT_STATE(); - auto cv = std::make_shared(); + std::mutex mtx; + std::condition_variable cv; + bool success(false); + bool over(false); - auto callID = DRing::placeCall(accountID_, contact); - auto success = std::make_shared>(false); - auto over = std::make_shared>(false); + std::string callID = ""; - if (callID.empty()) { - return false; - } - - onCallStateChanged_.add([=](const std::string& call_id, const std::string& state, signed code) { + onCallStateChanged_.add([&](const std::string& call_id, const std::string& state, signed code) { AGENT_INFO("[call:%s] In state %s : %d", callID.c_str(), state.c_str(), code); + std::unique_lock lk(mtx); + if (call_id != callID) { return true; } @@ -163,39 +163,42 @@ Agent::placeCall(const std::string& contact) bool ret = true; if ("CURRENT" == state) { - success->store(true); + success = true; } else if ("OVER" == state) { - over->store(true); + over = true; ret = false; } - cv->notify_one(); + cv.notify_one(); return ret; }); - std::mutex mtx; - std::unique_lock lck {mtx}; + callID = DRing::placeCall(accountID_, contact); AGENT_INFO("Waiting for call %s", callID.c_str()); /* TODO - Parametize me */ - cv->wait_for(lck, std::chrono::seconds(30), [=]{ - return success->load() or over->load(); - }); + { + std::unique_lock lk (mtx); + cv.wait_for(lk, std::chrono::seconds(30), [&]{ + return success or over; + }); + } - if (success->load()) { + if (success) { AGENT_INFO("[call:%s] to %s: SUCCESS", callID.c_str(), contact.c_str()); DRing::hangUp(callID); } else { AGENT_INFO("[call:%s] to %s: FAIL", callID.c_str(), contact.c_str()); } - if (not over->load()) { - cv->wait_for(lck, std::chrono::seconds(30), [=] { return over->load(); }); + if (not over) { + std::unique_lock lk (mtx); + cv.wait_for(lk, std::chrono::seconds(30), [&] { return over; }); } - return success->load(); + return success; } void @@ -230,8 +233,37 @@ Agent::activate(bool state) { LOG_AGENT_STATE(); + std::mutex mtx; + std::condition_variable cv; + bool done = false; + + onVolatileDetailsChanged_.add([&](const std::string& accountID, + const std::map& details) { + + if (accountID_ != accountID) { + return true; + } + + AGENT_INFO("Account is %s", + details.at(DRing::Account::VolatileProperties::ACTIVE).c_str()); + + std::unique_lock lk(mtx); + + done = true; + cv.notify_one(); + + return false; + }); + + DRing::setAccountActive(accountID_, state); + std::unique_lock lk(mtx); + + cv.wait_for(lk, std::chrono::seconds(10), [&]{ + return done; + }); + if (state) { waitForAnnouncement(); } @@ -482,12 +514,11 @@ Agent::registerStaticCallbacks() void Agent::waitForAnnouncement(std::chrono::seconds timeout) { - auto cv = std::make_shared(); + std::condition_variable cv; std::mutex mtx; - std::unique_lock lk {mtx}; - onVolatileDetailsChanged_.add([=](const std::string& accountID, + onVolatileDetailsChanged_.add([&](const std::string& accountID, const std::map& details) { if (accountID_ != accountID) { @@ -503,12 +534,16 @@ Agent::waitForAnnouncement(std::chrono::seconds timeout) return true; } - cv->notify_one(); + std::unique_lock lk (mtx); + + cv.notify_one(); return false; }); - AGENT_ASSERT(std::cv_status::no_timeout == cv->wait_for(lk, timeout), + std::unique_lock lk (mtx); + + AGENT_ASSERT(std::cv_status::no_timeout == cv.wait_for(lk, timeout), "Timeout while waiting for account announcement on DHT"); }