From a89bcbe47a2c3a8b8b6d240ff4498069fba305e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Blin?= Date: Wed, 13 May 2020 11:16:46 -0400 Subject: [PATCH] jamiaccount: remove eventHandler and prefer callbacks IceTransport has the onNegoDone callback that can be used to replace the current mechanism about checking ice transports Change-Id: Iee96be646516f86136066418e38cbac3f8aefa21 --- src/ice_transport.h | 2 +- src/jamidht/jamiaccount.cpp | 136 +++++++++++++++--------------------- src/jamidht/jamiaccount.h | 8 +-- 3 files changed, 59 insertions(+), 87 deletions(-) diff --git a/src/ice_transport.h b/src/ice_transport.h index 5a7a59ae0..7ac2f2a40 100644 --- a/src/ice_transport.h +++ b/src/ice_transport.h @@ -73,7 +73,7 @@ struct TurnServerInfo { struct IceTransportOptions { bool upnpEnable {false}; IceTransportCompleteCb onInitDone {}; - IceTransportCompleteCb onNegoDone{}; + IceTransportCompleteCb onNegoDone {}; IceRecvInfo onRecvReady{}; // Detect that we have data to read but without destroying the buffer std::vector stunServers; std::vector turnServers; diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp index a2425203d..44863b369 100644 --- a/src/jamidht/jamiaccount.cpp +++ b/src/jamidht/jamiaccount.cpp @@ -303,10 +303,6 @@ JamiAccount::JamiAccount(const std::string& accountID, bool /* presenceEnabled * JamiAccount::~JamiAccount() { shutdownConnections(); - if (eventHandler) { - eventHandler->destroy(); - eventHandler.reset(); - } if(peerDiscovery_){ peerDiscovery_->stopPublish(PEER_DISCOVERY_JAMI_SERVICE); peerDiscovery_->stopDiscovery(PEER_DISCOVERY_JAMI_SERVICE); @@ -493,20 +489,29 @@ JamiAccount::startOutgoingCall(const std::shared_ptr& call, const std:: auto dev_call = manager.callFactory.newCall(*this, manager.getNewCallID(), Call::CallType::OUTGOING, call->getDetails()); + + auto callId = dev_call->getCallId(); + auto onNegoDone = [callId, w=weak()](bool) { + if (auto shared = w.lock()) { + shared->checkPendingCall(callId); + } + }; + std::weak_ptr weak_dev_call = dev_call; + auto iceOptions = getIceOptions(); + iceOptions.onNegoDone = onNegoDone; dev_call->setIPToIP(true); dev_call->setSecure(isTlsEnabled()); auto ice = createIceTransport(("sip:" + dev_call->getCallId()).c_str(), - ICE_COMPONENTS, true, getIceOptions()); + ICE_COMPONENTS, true, iceOptions); if (not ice) { JAMI_WARN("[call %s] Can't create ICE", call->getCallId().c_str()); dev_call->removeCall(); return; } - auto ice_config = getIceOptions(); - ice_config.tcpEnable = true; - auto ice_tcp = createIceTransport(("sip:" + dev_call->getCallId()).c_str(), ICE_COMPONENTS, true, ice_config); + iceOptions.tcpEnable = true; + auto ice_tcp = createIceTransport(("sip:" + dev_call->getCallId()).c_str(), ICE_COMPONENTS, true, iceOptions); if (not ice_tcp) { JAMI_WARN("Can't create ICE over TCP, will only use UDP"); } @@ -595,7 +600,7 @@ JamiAccount::startOutgoingCall(const std::shared_ptr& call, const std:: ); std::lock_guard lock(sthis->callsMutex_); - sthis->pendingCalls_.emplace_back(PendingCall{ + sthis->pendingCalls_.emplace(call->getCallId(), PendingCall { std::chrono::steady_clock::now(), std::move(ice), std::move(ice_tcp), weak_dev_call, std::move(listenKey), @@ -604,7 +609,6 @@ JamiAccount::startOutgoingCall(const std::shared_ptr& call, const std:: peer_account, tls::CertificateStore::instance().getCertificate(toUri) }); - sthis->checkPendingCallsTask(); return false; }); }; @@ -1398,44 +1402,30 @@ JamiAccount::registerName(const std::string& password, const std::string& name) } #endif -bool -JamiAccount::handlePendingCallList() +void +JamiAccount::checkPendingCall(const std::string& callId) { - // Process pending call into a local list to not block threads depending on this list, - // as incoming call handlers. - decltype(pendingCalls_) pending_calls; - { - std::lock_guard lock(callsMutex_); - pending_calls = std::move(pendingCalls_); - pendingCalls_.clear(); + // Note only one check at a time. In fact, the UDP and TCP negotiation + // can finish at the same time and we need to avoid potential race conditions. + std::lock_guard lk(callsMutex_); + auto it = pendingCalls_.find(callId); + if (it == pendingCalls_.end()) return; + + bool incoming = !it->second.call_key; + bool handled; + try { + handled = handlePendingCall(it->second, incoming); + } catch (const std::exception& e) { + JAMI_ERR("[DHT] exception during pending call handling: %s", e.what()); + handled = true; // drop from pending list } - auto pc_iter = std::begin(pending_calls); - while (pc_iter != std::end(pending_calls)) { - bool incoming = !pc_iter->call_key; // do it now, handlePendingCall may invalidate pc data - bool handled; - - try { - handled = handlePendingCall(*pc_iter, incoming); - } catch (const std::exception& e) { - JAMI_ERR("[DHT] exception during pending call handling: %s", e.what()); - handled = true; // drop from pending list - } - - if (handled) { + if (handled) { + if (not incoming) { // Cancel pending listen (outgoing call) - if (not incoming) - dht_->cancelListen(pc_iter->call_key, std::move(pc_iter->listen_key)); - pc_iter = pending_calls.erase(pc_iter); - } else - ++pc_iter; - } - - // Re-integrate non-handled and valid pending calls - { - std::lock_guard lock(callsMutex_); - pendingCalls_.splice(std::end(pendingCalls_), pending_calls); - return not pendingCalls_.empty(); + dht_->cancelListen(it->second.call_key, std::move(it->second.listen_key)); + } + pendingCalls_.erase(it); } } @@ -1609,7 +1599,6 @@ JamiAccount::handlePendingCall(PendingCall& pc, bool incoming) call->setTransport(transport); if (incoming) { - std::lock_guard lock(callsMutex_); pendingSipCalls_.emplace_back(std::move(pc)); // copy of pc } else { // Be acknowledged on transport connection/disconnection @@ -2113,10 +2102,17 @@ JamiAccount::incomingCall(dht::IceCandidates&& msg, const std::shared_ptrgetCallId()).c_str(), ICE_COMPONENTS, false, getIceOptions()); - auto ice_config = getIceOptions(); - ice_config.tcpEnable = true; - auto ice_tcp = createIceTransport(("sip:" + call->getCallId()).c_str(), ICE_COMPONENTS, true, ice_config); + auto callId = call->getCallId(); + auto onNegoDone = [callId, w=weak()](bool) { + if (auto shared = w.lock()) { + shared->checkPendingCall(callId); + } + }; + auto iceOptions = getIceOptions(); + iceOptions.onNegoDone = onNegoDone; + auto ice = createIceTransport(("sip:"+call->getCallId()).c_str(), ICE_COMPONENTS, false, iceOptions); + iceOptions.tcpEnable = true; + auto ice_tcp = createIceTransport(("sip:" + call->getCallId()).c_str(), ICE_COMPONENTS, true, iceOptions); std::weak_ptr wcall = call; Manager::instance().addTask([account=shared(), wcall, ice, ice_tcp, msg, from_cert, from] { @@ -2200,20 +2196,17 @@ JamiAccount::replyToIncomingIceMsg(const std::shared_ptr& call, call->setPeerNumber(from); // Let the call handled by the PendingCall handler loop - { - std::lock_guard lock(callsMutex_); - pendingCalls_.emplace_back( - PendingCall{/*.start = */ started_time, - /*.ice_sp = */ udp_failed ? nullptr : ice, - /*.ice_tcp_sp = */ tcp_failed ? nullptr : ice_tcp, - /*.call = */ wcall, - /*.listen_key = */ {}, - /*.call_key = */ {}, - /*.from = */ peer_ice_msg.from, - /*.from_account = */ from_id, - /*.from_cert = */ from_cert}); - checkPendingCallsTask(); - } + std::lock_guard lock(callsMutex_); + pendingCalls_.emplace(call->getCallId(), + PendingCall{/*.start = */ started_time, + /*.ice_sp = */ udp_failed ? nullptr : ice, + /*.ice_tcp_sp = */ tcp_failed ? nullptr : ice_tcp, + /*.call = */ wcall, + /*.listen_key = */ {}, + /*.call_key = */ {}, + /*.from = */ peer_ice_msg.from, + /*.from_account = */ from_id, + /*.from_cert = */ from_cert}); } void @@ -2237,7 +2230,6 @@ JamiAccount::doUnregister(std::function released_cb) std::lock_guard lock(callsMutex_); pendingCalls_.clear(); pendingSipCalls_.clear(); - checkPendingCallsTask(); } dht_->join(); @@ -3094,22 +3086,6 @@ JamiAccount::getLastMessages(const uint64_t& base_timestamp) return SIPAccountBase::getLastMessages(base_timestamp); } -void -JamiAccount::checkPendingCallsTask() -{ - decltype(eventHandler) handler; - if (not pendingCalls_.empty()) { - handler = Manager::instance().scheduler().scheduleAtFixedRate([w = weak()] { - if (auto this_ = w.lock()) - return this_->handlePendingCallList(); - return false; - }, std::chrono::milliseconds(10)); - } - std::swap(handler, eventHandler); - if (handler) - handler->cancel(); -} - void JamiAccount::startAccountPublish() { diff --git a/src/jamidht/jamiaccount.h b/src/jamidht/jamiaccount.h index 5c28f0fb1..8a314b9cc 100644 --- a/src/jamidht/jamiaccount.h +++ b/src/jamidht/jamiaccount.h @@ -507,7 +507,7 @@ private: */ void onPortMappingAdded(uint16_t port_used, bool success); - bool handlePendingCallList(); + void checkPendingCall(const std::string& callId); bool handlePendingCall(PendingCall& pc, bool incoming); void loadAccount(const std::string& archive_password = {}, const std::string& archive_pin = {}, const std::string& archive_path = {}); @@ -563,8 +563,6 @@ private: template std::shared_ptr createIceTransport(const Args&... args); - void checkPendingCallsTask(); - #if HAVE_RINGNS std::string nameServer_; std::string registeredName_; @@ -580,7 +578,7 @@ private: /** * DHT calls waiting for ICE negotiation */ - std::list pendingCalls_; + std::map pendingCalls_; /** * Incoming DHT calls that are not yet actual SIP calls. @@ -677,8 +675,6 @@ private: bool accountPeerDiscovery_ {false}; bool accountPublish_ {false}; - std::shared_ptr eventHandler {}; - /** * Avoid to refresh the cache multiple times */