diff --git a/src/jamidht/connectionmanager.cpp b/src/jamidht/connectionmanager.cpp index 472ace5c5..aa13ca098 100644 --- a/src/jamidht/connectionmanager.cpp +++ b/src/jamidht/connectionmanager.cpp @@ -103,6 +103,10 @@ public: isDestroying_ = true; { std::lock_guard lk(connectCbsMtx_); + // Call all pending callbacks that channel is not ready + for (auto& [deviceId, pcbs] : pendingCbs_) + for (auto& pending : pcbs) + pending.cb(nullptr, deviceId); pendingCbs_.clear(); } removeUnusedConnections(); diff --git a/test/unitTest/connectionManager/connectionManager.cpp b/test/unitTest/connectionManager/connectionManager.cpp index aa54b1270..603ed5629 100644 --- a/test/unitTest/connectionManager/connectionManager.cpp +++ b/test/unitTest/connectionManager/connectionManager.cpp @@ -32,6 +32,7 @@ #include "common.h" using namespace DRing::Account; +using namespace std::literals::chrono_literals; namespace jami { namespace test { @@ -75,6 +76,7 @@ private: void testCannotSendBeacon(); void testConnectivityChangeTriggerBeacon(); void testOnNoBeaconTriggersShutdown(); + void testShutdownWhileNegotiating(); CPPUNIT_TEST_SUITE(ConnectionManagerTest); CPPUNIT_TEST(testConnectDevice); @@ -97,6 +99,7 @@ private: CPPUNIT_TEST(testCannotSendBeacon); CPPUNIT_TEST(testConnectivityChangeTriggerBeacon); CPPUNIT_TEST(testOnNoBeaconTriggersShutdown); + CPPUNIT_TEST(testShutdownWhileNegotiating); CPPUNIT_TEST_SUITE_END(); }; @@ -149,9 +152,8 @@ ConnectionManagerTest::testConnectDevice() } cv.notify_one(); }); - CPPUNIT_ASSERT( - cvReceive.wait_for(lk, std::chrono::seconds(60), [&] { return successfullyReceive; })); - CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { return successfullyConnected; })); + CPPUNIT_ASSERT(cvReceive.wait_for(lk, 60s, [&] { return successfullyReceive; })); + CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] { return successfullyConnected; })); } void @@ -195,7 +197,7 @@ ConnectionManagerTest::testAcceptConnection() cv.notify_one(); }); - CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { + CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] { return successfullyReceive && successfullyConnected && receiverConnected; })); } @@ -248,7 +250,7 @@ ConnectionManagerTest::testMultipleChannels() cv.notify_one(); }); - CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { + CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] { return successfullyConnected && successfullyConnected2 && receiverConnected == 2; })); CPPUNIT_ASSERT(aliceAccount->connectionManager().activeSockets() == 1); @@ -303,7 +305,7 @@ ConnectionManagerTest::testMultipleChannelsOneDeclined() cv.notify_one(); }); - CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { + CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] { return successfullyNotConnected && successfullyConnected2 && receiverConnected == 1; })); CPPUNIT_ASSERT(aliceAccount->connectionManager().activeSockets() == 1); @@ -358,7 +360,7 @@ ConnectionManagerTest::testMultipleChannelsSameName() cv.notify_one(); }); - CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { + CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] { return successfullyConnected && successfullyConnected2 && receiverConnected == 2; })); } @@ -435,7 +437,7 @@ ConnectionManagerTest::testSendReceiveData() cv.notify_one(); }); - CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { + CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] { return events == 4 && successfullyReceive && successfullyConnected && successfullyConnected2 && dataOk && dataOk2; })); @@ -482,7 +484,7 @@ ConnectionManagerTest::testDeclineConnection() } cv.notify_one(); }); - cv.wait_for(lk, std::chrono::seconds(30)); + cv.wait_for(lk, 30s); CPPUNIT_ASSERT(successfullyReceive); CPPUNIT_ASSERT(!successfullyConnected); CPPUNIT_ASSERT(!receiverConnected); @@ -528,7 +530,7 @@ ConnectionManagerTest::testAcceptsICERequest() cv.notify_one(); }); - CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { + CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return successfullyReceive && successfullyConnected && receiverConnected; })); } @@ -573,7 +575,7 @@ ConnectionManagerTest::testDeclineICERequest() cv.notify_one(); }); - cv.wait_for(lk, std::chrono::seconds(30)); + cv.wait_for(lk, 30s); CPPUNIT_ASSERT(successfullyReceive); CPPUNIT_ASSERT(!receiverConnected); CPPUNIT_ASSERT(!successfullyConnected); @@ -622,11 +624,9 @@ ConnectionManagerTest::testChannelRcvShutdown() } }); - CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { - return bobSock && successfullyConnected; - })); + CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return bobSock && successfullyConnected; })); bobSock->shutdown(); - CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { return shutdownReceived; })); + CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return shutdownReceived; })); } void @@ -676,8 +676,8 @@ ConnectionManagerTest::testChannelSenderShutdown() } }); - rcv.wait_for(lk, std::chrono::seconds(30)); - scv.wait_for(lk, std::chrono::seconds(30)); + rcv.wait_for(lk, 30s); + scv.wait_for(lk, 30s); CPPUNIT_ASSERT(shutdownReceived); CPPUNIT_ASSERT(successfullyReceive); CPPUNIT_ASSERT(successfullyConnected); @@ -734,10 +734,10 @@ ConnectionManagerTest::testCloseConnectionWithDevice() } }); - rcv.wait_for(lk, std::chrono::seconds(30)); + rcv.wait_for(lk, 30s); // This should trigger onShutdown aliceAccount->connectionManager().closeConnectionsWith(bobDeviceId); - CPPUNIT_ASSERT(scv.wait_for(lk, std::chrono::seconds(60), [&] { + CPPUNIT_ASSERT(scv.wait_for(lk, 60s, [&] { return events == 2 && successfullyReceive && successfullyConnected && receiverConnected; })); } @@ -768,7 +768,7 @@ ConnectionManagerTest::testShutdownCallbacks() } else { chan2cv.notify_one(); // Do not return directly. Let the connection be closed - std::this_thread::sleep_for(std::chrono::seconds(10)); + std::this_thread::sleep_for(10s); } return true; }); @@ -788,7 +788,7 @@ ConnectionManagerTest::testShutdownCallbacks() } }); // Connect first channel. This will initiate a mx sock - CPPUNIT_ASSERT(rcv.wait_for(lk, std::chrono::seconds(30), [&] { + CPPUNIT_ASSERT(rcv.wait_for(lk, 30s, [&] { return successfullyReceive && successfullyConnected && receiverConnected; })); @@ -801,11 +801,11 @@ ConnectionManagerTest::testShutdownCallbacks() channel2NotConnected = !socket; rcv.notify_one(); }); - chan2cv.wait_for(lk, std::chrono::seconds(30)); + chan2cv.wait_for(lk, 30s); // This should trigger onShutdown for second callback bobAccount->connectionManager().closeConnectionsWith(aliceDeviceId); - CPPUNIT_ASSERT(rcv.wait_for(lk, std::chrono::seconds(30), [&] { return channel2NotConnected; })); + CPPUNIT_ASSERT(rcv.wait_for(lk, 30s, [&] { return channel2NotConnected; })); } void @@ -849,7 +849,7 @@ ConnectionManagerTest::testFloodSocket() } cv.notify_one(); }); - CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { + CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return successfullyReceive && successfullyConnected && receiverConnected; })); CPPUNIT_ASSERT(receiverConnected); @@ -865,9 +865,7 @@ ConnectionManagerTest::testFloodSocket() } cv.notify_one(); }); - CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { - return successfullyConnected && receiverConnected; - })); + CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return successfullyConnected && receiverConnected; })); successfullyConnected = false; receiverConnected = false; aliceAccount->connectionManager().connectDevice(bobDeviceId, @@ -880,9 +878,7 @@ ConnectionManagerTest::testFloodSocket() } cv.notify_one(); }); - CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { - return successfullyConnected && receiverConnected; - })); + CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return successfullyConnected && receiverConnected; })); std::mutex mtxRcv {}; std::string alphabet, shouldRcv, rcv1, rcv2, rcv3; for (int i = 0; i < 100; ++i) @@ -908,7 +904,7 @@ ConnectionManagerTest::testFloodSocket() sendSock3->write(reinterpret_cast(send.data()), send.size(), ec); CPPUNIT_ASSERT(!ec); } - CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { + CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] { return shouldRcv == rcv1 && shouldRcv == rcv2 && shouldRcv == rcv3; })); } @@ -956,7 +952,7 @@ ConnectionManagerTest::testDestroyWhileSending() } cv.notify_one(); }); - CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { + CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return successfullyReceive && successfullyConnected && receiverConnected; })); successfullyConnected = false; @@ -971,9 +967,7 @@ ConnectionManagerTest::testDestroyWhileSending() } cv.notify_one(); }); - CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { - return successfullyConnected && receiverConnected; - })); + CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return successfullyConnected && receiverConnected; })); successfullyConnected = false; receiverConnected = false; aliceAccount->connectionManager().connectDevice(bobDeviceId, @@ -986,9 +980,7 @@ ConnectionManagerTest::testDestroyWhileSending() } cv.notify_one(); }); - CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { - return successfullyConnected && receiverConnected; - })); + CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return successfullyConnected && receiverConnected; })); std::mutex mtxRcv {}; std::string alphabet; for (int i = 0; i < 100; ++i) @@ -1027,7 +1019,7 @@ ConnectionManagerTest::testIsConnecting() [&](const std::shared_ptr&, const std::string&) { successfullyReceive = true; cv.notify_one(); - std::this_thread::sleep_for(std::chrono::seconds(2)); + std::this_thread::sleep_for(2s); return true; }); @@ -1042,9 +1034,9 @@ ConnectionManagerTest::testIsConnecting() cv.notify_one(); }); // connectDevice is full async, so isConnecting will be true after a few ms. - CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { return successfullyReceive; })); + CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] { return successfullyReceive; })); CPPUNIT_ASSERT(aliceAccount->connectionManager().isConnecting(bobDeviceId, "sip")); - CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { return successfullyConnected; })); + CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] { return successfullyConnected; })); std::this_thread::sleep_for( std::chrono::milliseconds(100)); // Just to wait for the callback to finish CPPUNIT_ASSERT(!aliceAccount->connectionManager().isConnecting(bobDeviceId, "sip")); @@ -1086,8 +1078,7 @@ ConnectionManagerTest::testCanSendBeacon() cv.notify_one(); }); // connectDevice is full async, so isConnecting will be true after a few ms. - CPPUNIT_ASSERT( - cv.wait_for(lk, std::chrono::seconds(30), [&] { return aliceSocket && bobSocket; })); + CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return aliceSocket && bobSocket; })); CPPUNIT_ASSERT(aliceSocket->canSendBeacon()); CPPUNIT_ASSERT(bobSocket->canSendBeacon()); } @@ -1128,8 +1119,7 @@ ConnectionManagerTest::testCannotSendBeacon() cv.notify_one(); }); // connectDevice is full async, so isConnecting will be true after a few ms. - CPPUNIT_ASSERT( - cv.wait_for(lk, std::chrono::seconds(30), [&] { return aliceSocket && bobSocket; })); + CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return aliceSocket && bobSocket; })); int version = 1412; bobSocket->setOnVersionCb([&](auto v) { @@ -1138,7 +1128,7 @@ ConnectionManagerTest::testCannotSendBeacon() }); aliceSocket->setVersion(0); aliceSocket->sendVersion(); - CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { return version == 0; })); + CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return version == 0; })); CPPUNIT_ASSERT(!bobSocket->canSendBeacon()); } @@ -1178,8 +1168,7 @@ ConnectionManagerTest::testConnectivityChangeTriggerBeacon() cv.notify_one(); }); // connectDevice is full async, so isConnecting will be true after a few ms. - CPPUNIT_ASSERT( - cv.wait_for(lk, std::chrono::seconds(30), [&] { return aliceSocket && bobSocket; })); + CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return aliceSocket && bobSocket; })); bool hasRequest = false; bobSocket->setOnBeaconCb([&](auto p) { @@ -1188,7 +1177,7 @@ ConnectionManagerTest::testConnectivityChangeTriggerBeacon() cv.notify_one(); }); aliceAccount->connectionManager().connectivityChanged(); - CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(10), [&] { return hasRequest; })); + CPPUNIT_ASSERT(cv.wait_for(lk, 10s, [&] { return hasRequest; })); } void @@ -1227,8 +1216,7 @@ ConnectionManagerTest::testOnNoBeaconTriggersShutdown() cv.notify_one(); }); // connectDevice is full async, so isConnecting will be true after a few ms. - CPPUNIT_ASSERT( - cv.wait_for(lk, std::chrono::seconds(30), [&] { return aliceSocket && bobSocket; })); + CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return aliceSocket && bobSocket; })); bool isClosed = false; aliceSocket->onShutdown([&] { @@ -1237,7 +1225,41 @@ ConnectionManagerTest::testOnNoBeaconTriggersShutdown() }); bobSocket->answerToBeacon(false); aliceAccount->connectionManager().connectivityChanged(); - CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(10), [&] { return isClosed; })); + CPPUNIT_ASSERT(cv.wait_for(lk, 10s, [&] { return isClosed; })); +} + +void +ConnectionManagerTest::testShutdownWhileNegotiating() +{ + auto aliceAccount = Manager::instance().getAccount(aliceId); + auto bobAccount = Manager::instance().getAccount(bobId); + auto bobDeviceId = DeviceId(std::string(bobAccount->currentDeviceId())); + + aliceAccount->connectionManager().onICERequest([](const DeviceId&) { return true; }); + + std::mutex mtx; + std::unique_lock lk {mtx}; + std::condition_variable cv; + bool successfullyReceive = false; + bool notConnected = false; + + bobAccount->connectionManager().onICERequest([&](const DeviceId&) { + successfullyReceive = true; + cv.notify_one(); + return true; + }); + + aliceAccount->connectionManager().connectDevice(bobDeviceId, + "git://*", + [&](std::shared_ptr socket, + const DeviceId&) { + notConnected = !socket; + cv.notify_one(); + }); + + CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return successfullyReceive; })); + Manager::instance().setAccountActive(aliceId, false, true); + CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return notConnected; })); } } // namespace test