connectionmanager: announce non ready channel on shutdown

This is useful for the conversation_module if any channel is pending
while shutting down, as the previous fetch status was not updated.
More generally, every connectDevice() should call the associated
callback.

Add a unit test to replicate this scenario.

Change-Id: I72f2975dc15dd4bac3f55c2f899ebb1ae5a7a7f3
This commit is contained in:
Sébastien Blin
2022-08-08 12:00:28 -04:00
parent 03ed11b92c
commit 3bc5888fe2
2 changed files with 78 additions and 52 deletions

View File

@ -103,6 +103,10 @@ public:
isDestroying_ = true; isDestroying_ = true;
{ {
std::lock_guard<std::mutex> lk(connectCbsMtx_); std::lock_guard<std::mutex> lk(connectCbsMtx_);
// Call all pending callbacks that channel is not ready
for (auto& [deviceId, pcbs] : pendingCbs_)
for (auto& pending : pcbs)
pending.cb(nullptr, deviceId);
pendingCbs_.clear(); pendingCbs_.clear();
} }
removeUnusedConnections(); removeUnusedConnections();

View File

@ -32,6 +32,7 @@
#include "common.h" #include "common.h"
using namespace DRing::Account; using namespace DRing::Account;
using namespace std::literals::chrono_literals;
namespace jami { namespace jami {
namespace test { namespace test {
@ -75,6 +76,7 @@ private:
void testCannotSendBeacon(); void testCannotSendBeacon();
void testConnectivityChangeTriggerBeacon(); void testConnectivityChangeTriggerBeacon();
void testOnNoBeaconTriggersShutdown(); void testOnNoBeaconTriggersShutdown();
void testShutdownWhileNegotiating();
CPPUNIT_TEST_SUITE(ConnectionManagerTest); CPPUNIT_TEST_SUITE(ConnectionManagerTest);
CPPUNIT_TEST(testConnectDevice); CPPUNIT_TEST(testConnectDevice);
@ -97,6 +99,7 @@ private:
CPPUNIT_TEST(testCannotSendBeacon); CPPUNIT_TEST(testCannotSendBeacon);
CPPUNIT_TEST(testConnectivityChangeTriggerBeacon); CPPUNIT_TEST(testConnectivityChangeTriggerBeacon);
CPPUNIT_TEST(testOnNoBeaconTriggersShutdown); CPPUNIT_TEST(testOnNoBeaconTriggersShutdown);
CPPUNIT_TEST(testShutdownWhileNegotiating);
CPPUNIT_TEST_SUITE_END(); CPPUNIT_TEST_SUITE_END();
}; };
@ -149,9 +152,8 @@ ConnectionManagerTest::testConnectDevice()
} }
cv.notify_one(); cv.notify_one();
}); });
CPPUNIT_ASSERT( CPPUNIT_ASSERT(cvReceive.wait_for(lk, 60s, [&] { return successfullyReceive; }));
cvReceive.wait_for(lk, std::chrono::seconds(60), [&] { return successfullyReceive; })); CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] { return successfullyConnected; }));
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { return successfullyConnected; }));
} }
void void
@ -195,7 +197,7 @@ ConnectionManagerTest::testAcceptConnection()
cv.notify_one(); cv.notify_one();
}); });
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] {
return successfullyReceive && successfullyConnected && receiverConnected; return successfullyReceive && successfullyConnected && receiverConnected;
})); }));
} }
@ -248,7 +250,7 @@ ConnectionManagerTest::testMultipleChannels()
cv.notify_one(); cv.notify_one();
}); });
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] {
return successfullyConnected && successfullyConnected2 && receiverConnected == 2; return successfullyConnected && successfullyConnected2 && receiverConnected == 2;
})); }));
CPPUNIT_ASSERT(aliceAccount->connectionManager().activeSockets() == 1); CPPUNIT_ASSERT(aliceAccount->connectionManager().activeSockets() == 1);
@ -303,7 +305,7 @@ ConnectionManagerTest::testMultipleChannelsOneDeclined()
cv.notify_one(); cv.notify_one();
}); });
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] {
return successfullyNotConnected && successfullyConnected2 && receiverConnected == 1; return successfullyNotConnected && successfullyConnected2 && receiverConnected == 1;
})); }));
CPPUNIT_ASSERT(aliceAccount->connectionManager().activeSockets() == 1); CPPUNIT_ASSERT(aliceAccount->connectionManager().activeSockets() == 1);
@ -358,7 +360,7 @@ ConnectionManagerTest::testMultipleChannelsSameName()
cv.notify_one(); cv.notify_one();
}); });
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] {
return successfullyConnected && successfullyConnected2 && receiverConnected == 2; return successfullyConnected && successfullyConnected2 && receiverConnected == 2;
})); }));
} }
@ -435,7 +437,7 @@ ConnectionManagerTest::testSendReceiveData()
cv.notify_one(); cv.notify_one();
}); });
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] {
return events == 4 && successfullyReceive && successfullyConnected && successfullyConnected2 return events == 4 && successfullyReceive && successfullyConnected && successfullyConnected2
&& dataOk && dataOk2; && dataOk && dataOk2;
})); }));
@ -482,7 +484,7 @@ ConnectionManagerTest::testDeclineConnection()
} }
cv.notify_one(); cv.notify_one();
}); });
cv.wait_for(lk, std::chrono::seconds(30)); cv.wait_for(lk, 30s);
CPPUNIT_ASSERT(successfullyReceive); CPPUNIT_ASSERT(successfullyReceive);
CPPUNIT_ASSERT(!successfullyConnected); CPPUNIT_ASSERT(!successfullyConnected);
CPPUNIT_ASSERT(!receiverConnected); CPPUNIT_ASSERT(!receiverConnected);
@ -528,7 +530,7 @@ ConnectionManagerTest::testAcceptsICERequest()
cv.notify_one(); cv.notify_one();
}); });
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] {
return successfullyReceive && successfullyConnected && receiverConnected; return successfullyReceive && successfullyConnected && receiverConnected;
})); }));
} }
@ -573,7 +575,7 @@ ConnectionManagerTest::testDeclineICERequest()
cv.notify_one(); cv.notify_one();
}); });
cv.wait_for(lk, std::chrono::seconds(30)); cv.wait_for(lk, 30s);
CPPUNIT_ASSERT(successfullyReceive); CPPUNIT_ASSERT(successfullyReceive);
CPPUNIT_ASSERT(!receiverConnected); CPPUNIT_ASSERT(!receiverConnected);
CPPUNIT_ASSERT(!successfullyConnected); CPPUNIT_ASSERT(!successfullyConnected);
@ -622,11 +624,9 @@ ConnectionManagerTest::testChannelRcvShutdown()
} }
}); });
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return bobSock && successfullyConnected; }));
return bobSock && successfullyConnected;
}));
bobSock->shutdown(); bobSock->shutdown();
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { return shutdownReceived; })); CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return shutdownReceived; }));
} }
void void
@ -676,8 +676,8 @@ ConnectionManagerTest::testChannelSenderShutdown()
} }
}); });
rcv.wait_for(lk, std::chrono::seconds(30)); rcv.wait_for(lk, 30s);
scv.wait_for(lk, std::chrono::seconds(30)); scv.wait_for(lk, 30s);
CPPUNIT_ASSERT(shutdownReceived); CPPUNIT_ASSERT(shutdownReceived);
CPPUNIT_ASSERT(successfullyReceive); CPPUNIT_ASSERT(successfullyReceive);
CPPUNIT_ASSERT(successfullyConnected); CPPUNIT_ASSERT(successfullyConnected);
@ -734,10 +734,10 @@ ConnectionManagerTest::testCloseConnectionWithDevice()
} }
}); });
rcv.wait_for(lk, std::chrono::seconds(30)); rcv.wait_for(lk, 30s);
// This should trigger onShutdown // This should trigger onShutdown
aliceAccount->connectionManager().closeConnectionsWith(bobDeviceId); aliceAccount->connectionManager().closeConnectionsWith(bobDeviceId);
CPPUNIT_ASSERT(scv.wait_for(lk, std::chrono::seconds(60), [&] { CPPUNIT_ASSERT(scv.wait_for(lk, 60s, [&] {
return events == 2 && successfullyReceive && successfullyConnected && receiverConnected; return events == 2 && successfullyReceive && successfullyConnected && receiverConnected;
})); }));
} }
@ -768,7 +768,7 @@ ConnectionManagerTest::testShutdownCallbacks()
} else { } else {
chan2cv.notify_one(); chan2cv.notify_one();
// Do not return directly. Let the connection be closed // Do not return directly. Let the connection be closed
std::this_thread::sleep_for(std::chrono::seconds(10)); std::this_thread::sleep_for(10s);
} }
return true; return true;
}); });
@ -788,7 +788,7 @@ ConnectionManagerTest::testShutdownCallbacks()
} }
}); });
// Connect first channel. This will initiate a mx sock // Connect first channel. This will initiate a mx sock
CPPUNIT_ASSERT(rcv.wait_for(lk, std::chrono::seconds(30), [&] { CPPUNIT_ASSERT(rcv.wait_for(lk, 30s, [&] {
return successfullyReceive && successfullyConnected && receiverConnected; return successfullyReceive && successfullyConnected && receiverConnected;
})); }));
@ -801,11 +801,11 @@ ConnectionManagerTest::testShutdownCallbacks()
channel2NotConnected = !socket; channel2NotConnected = !socket;
rcv.notify_one(); rcv.notify_one();
}); });
chan2cv.wait_for(lk, std::chrono::seconds(30)); chan2cv.wait_for(lk, 30s);
// This should trigger onShutdown for second callback // This should trigger onShutdown for second callback
bobAccount->connectionManager().closeConnectionsWith(aliceDeviceId); bobAccount->connectionManager().closeConnectionsWith(aliceDeviceId);
CPPUNIT_ASSERT(rcv.wait_for(lk, std::chrono::seconds(30), [&] { return channel2NotConnected; })); CPPUNIT_ASSERT(rcv.wait_for(lk, 30s, [&] { return channel2NotConnected; }));
} }
void void
@ -849,7 +849,7 @@ ConnectionManagerTest::testFloodSocket()
} }
cv.notify_one(); cv.notify_one();
}); });
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] {
return successfullyReceive && successfullyConnected && receiverConnected; return successfullyReceive && successfullyConnected && receiverConnected;
})); }));
CPPUNIT_ASSERT(receiverConnected); CPPUNIT_ASSERT(receiverConnected);
@ -865,9 +865,7 @@ ConnectionManagerTest::testFloodSocket()
} }
cv.notify_one(); cv.notify_one();
}); });
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return successfullyConnected && receiverConnected; }));
return successfullyConnected && receiverConnected;
}));
successfullyConnected = false; successfullyConnected = false;
receiverConnected = false; receiverConnected = false;
aliceAccount->connectionManager().connectDevice(bobDeviceId, aliceAccount->connectionManager().connectDevice(bobDeviceId,
@ -880,9 +878,7 @@ ConnectionManagerTest::testFloodSocket()
} }
cv.notify_one(); cv.notify_one();
}); });
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return successfullyConnected && receiverConnected; }));
return successfullyConnected && receiverConnected;
}));
std::mutex mtxRcv {}; std::mutex mtxRcv {};
std::string alphabet, shouldRcv, rcv1, rcv2, rcv3; std::string alphabet, shouldRcv, rcv1, rcv2, rcv3;
for (int i = 0; i < 100; ++i) for (int i = 0; i < 100; ++i)
@ -908,7 +904,7 @@ ConnectionManagerTest::testFloodSocket()
sendSock3->write(reinterpret_cast<unsigned char*>(send.data()), send.size(), ec); sendSock3->write(reinterpret_cast<unsigned char*>(send.data()), send.size(), ec);
CPPUNIT_ASSERT(!ec); CPPUNIT_ASSERT(!ec);
} }
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] {
return shouldRcv == rcv1 && shouldRcv == rcv2 && shouldRcv == rcv3; return shouldRcv == rcv1 && shouldRcv == rcv2 && shouldRcv == rcv3;
})); }));
} }
@ -956,7 +952,7 @@ ConnectionManagerTest::testDestroyWhileSending()
} }
cv.notify_one(); cv.notify_one();
}); });
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] {
return successfullyReceive && successfullyConnected && receiverConnected; return successfullyReceive && successfullyConnected && receiverConnected;
})); }));
successfullyConnected = false; successfullyConnected = false;
@ -971,9 +967,7 @@ ConnectionManagerTest::testDestroyWhileSending()
} }
cv.notify_one(); cv.notify_one();
}); });
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return successfullyConnected && receiverConnected; }));
return successfullyConnected && receiverConnected;
}));
successfullyConnected = false; successfullyConnected = false;
receiverConnected = false; receiverConnected = false;
aliceAccount->connectionManager().connectDevice(bobDeviceId, aliceAccount->connectionManager().connectDevice(bobDeviceId,
@ -986,9 +980,7 @@ ConnectionManagerTest::testDestroyWhileSending()
} }
cv.notify_one(); cv.notify_one();
}); });
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return successfullyConnected && receiverConnected; }));
return successfullyConnected && receiverConnected;
}));
std::mutex mtxRcv {}; std::mutex mtxRcv {};
std::string alphabet; std::string alphabet;
for (int i = 0; i < 100; ++i) for (int i = 0; i < 100; ++i)
@ -1027,7 +1019,7 @@ ConnectionManagerTest::testIsConnecting()
[&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string&) { [&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string&) {
successfullyReceive = true; successfullyReceive = true;
cv.notify_one(); cv.notify_one();
std::this_thread::sleep_for(std::chrono::seconds(2)); std::this_thread::sleep_for(2s);
return true; return true;
}); });
@ -1042,9 +1034,9 @@ ConnectionManagerTest::testIsConnecting()
cv.notify_one(); cv.notify_one();
}); });
// connectDevice is full async, so isConnecting will be true after a few ms. // connectDevice is full async, so isConnecting will be true after a few ms.
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { return successfullyReceive; })); CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] { return successfullyReceive; }));
CPPUNIT_ASSERT(aliceAccount->connectionManager().isConnecting(bobDeviceId, "sip")); CPPUNIT_ASSERT(aliceAccount->connectionManager().isConnecting(bobDeviceId, "sip"));
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { return successfullyConnected; })); CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] { return successfullyConnected; }));
std::this_thread::sleep_for( std::this_thread::sleep_for(
std::chrono::milliseconds(100)); // Just to wait for the callback to finish std::chrono::milliseconds(100)); // Just to wait for the callback to finish
CPPUNIT_ASSERT(!aliceAccount->connectionManager().isConnecting(bobDeviceId, "sip")); CPPUNIT_ASSERT(!aliceAccount->connectionManager().isConnecting(bobDeviceId, "sip"));
@ -1086,8 +1078,7 @@ ConnectionManagerTest::testCanSendBeacon()
cv.notify_one(); cv.notify_one();
}); });
// connectDevice is full async, so isConnecting will be true after a few ms. // connectDevice is full async, so isConnecting will be true after a few ms.
CPPUNIT_ASSERT( CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return aliceSocket && bobSocket; }));
cv.wait_for(lk, std::chrono::seconds(30), [&] { return aliceSocket && bobSocket; }));
CPPUNIT_ASSERT(aliceSocket->canSendBeacon()); CPPUNIT_ASSERT(aliceSocket->canSendBeacon());
CPPUNIT_ASSERT(bobSocket->canSendBeacon()); CPPUNIT_ASSERT(bobSocket->canSendBeacon());
} }
@ -1128,8 +1119,7 @@ ConnectionManagerTest::testCannotSendBeacon()
cv.notify_one(); cv.notify_one();
}); });
// connectDevice is full async, so isConnecting will be true after a few ms. // connectDevice is full async, so isConnecting will be true after a few ms.
CPPUNIT_ASSERT( CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return aliceSocket && bobSocket; }));
cv.wait_for(lk, std::chrono::seconds(30), [&] { return aliceSocket && bobSocket; }));
int version = 1412; int version = 1412;
bobSocket->setOnVersionCb([&](auto v) { bobSocket->setOnVersionCb([&](auto v) {
@ -1138,7 +1128,7 @@ ConnectionManagerTest::testCannotSendBeacon()
}); });
aliceSocket->setVersion(0); aliceSocket->setVersion(0);
aliceSocket->sendVersion(); aliceSocket->sendVersion();
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { return version == 0; })); CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return version == 0; }));
CPPUNIT_ASSERT(!bobSocket->canSendBeacon()); CPPUNIT_ASSERT(!bobSocket->canSendBeacon());
} }
@ -1178,8 +1168,7 @@ ConnectionManagerTest::testConnectivityChangeTriggerBeacon()
cv.notify_one(); cv.notify_one();
}); });
// connectDevice is full async, so isConnecting will be true after a few ms. // connectDevice is full async, so isConnecting will be true after a few ms.
CPPUNIT_ASSERT( CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return aliceSocket && bobSocket; }));
cv.wait_for(lk, std::chrono::seconds(30), [&] { return aliceSocket && bobSocket; }));
bool hasRequest = false; bool hasRequest = false;
bobSocket->setOnBeaconCb([&](auto p) { bobSocket->setOnBeaconCb([&](auto p) {
@ -1188,7 +1177,7 @@ ConnectionManagerTest::testConnectivityChangeTriggerBeacon()
cv.notify_one(); cv.notify_one();
}); });
aliceAccount->connectionManager().connectivityChanged(); aliceAccount->connectionManager().connectivityChanged();
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(10), [&] { return hasRequest; })); CPPUNIT_ASSERT(cv.wait_for(lk, 10s, [&] { return hasRequest; }));
} }
void void
@ -1227,8 +1216,7 @@ ConnectionManagerTest::testOnNoBeaconTriggersShutdown()
cv.notify_one(); cv.notify_one();
}); });
// connectDevice is full async, so isConnecting will be true after a few ms. // connectDevice is full async, so isConnecting will be true after a few ms.
CPPUNIT_ASSERT( CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return aliceSocket && bobSocket; }));
cv.wait_for(lk, std::chrono::seconds(30), [&] { return aliceSocket && bobSocket; }));
bool isClosed = false; bool isClosed = false;
aliceSocket->onShutdown([&] { aliceSocket->onShutdown([&] {
@ -1237,7 +1225,41 @@ ConnectionManagerTest::testOnNoBeaconTriggersShutdown()
}); });
bobSocket->answerToBeacon(false); bobSocket->answerToBeacon(false);
aliceAccount->connectionManager().connectivityChanged(); aliceAccount->connectionManager().connectivityChanged();
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(10), [&] { return isClosed; })); CPPUNIT_ASSERT(cv.wait_for(lk, 10s, [&] { return isClosed; }));
}
void
ConnectionManagerTest::testShutdownWhileNegotiating()
{
auto aliceAccount = Manager::instance().getAccount<JamiAccount>(aliceId);
auto bobAccount = Manager::instance().getAccount<JamiAccount>(bobId);
auto bobDeviceId = DeviceId(std::string(bobAccount->currentDeviceId()));
aliceAccount->connectionManager().onICERequest([](const DeviceId&) { return true; });
std::mutex mtx;
std::unique_lock<std::mutex> lk {mtx};
std::condition_variable cv;
bool successfullyReceive = false;
bool notConnected = false;
bobAccount->connectionManager().onICERequest([&](const DeviceId&) {
successfullyReceive = true;
cv.notify_one();
return true;
});
aliceAccount->connectionManager().connectDevice(bobDeviceId,
"git://*",
[&](std::shared_ptr<ChannelSocket> socket,
const DeviceId&) {
notConnected = !socket;
cv.notify_one();
});
CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return successfullyReceive; }));
Manager::instance().setAccountActive(aliceId, false, true);
CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return notConnected; }));
} }
} // namespace test } // namespace test