jamiaccount: remove eventHandler and prefer callbacks

IceTransport has the onNegoDone callback that can be used to
replace the current mechanism about checking ice transports

Change-Id: Iee96be646516f86136066418e38cbac3f8aefa21
This commit is contained in:
Sébastien Blin
2020-05-13 11:16:46 -04:00
committed by Adrien Béraud
parent c7d85bb926
commit a89bcbe47a
3 changed files with 59 additions and 87 deletions

View File

@ -73,7 +73,7 @@ struct TurnServerInfo {
struct IceTransportOptions {
bool upnpEnable {false};
IceTransportCompleteCb onInitDone {};
IceTransportCompleteCb onNegoDone{};
IceTransportCompleteCb onNegoDone {};
IceRecvInfo onRecvReady{}; // Detect that we have data to read but without destroying the buffer
std::vector<StunServerInfo> stunServers;
std::vector<TurnServerInfo> turnServers;

View File

@ -303,10 +303,6 @@ JamiAccount::JamiAccount(const std::string& accountID, bool /* presenceEnabled *
JamiAccount::~JamiAccount()
{
shutdownConnections();
if (eventHandler) {
eventHandler->destroy();
eventHandler.reset();
}
if(peerDiscovery_){
peerDiscovery_->stopPublish(PEER_DISCOVERY_JAMI_SERVICE);
peerDiscovery_->stopDiscovery(PEER_DISCOVERY_JAMI_SERVICE);
@ -493,20 +489,29 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::
auto dev_call = manager.callFactory.newCall<SIPCall, JamiAccount>(*this, manager.getNewCallID(),
Call::CallType::OUTGOING,
call->getDetails());
auto callId = dev_call->getCallId();
auto onNegoDone = [callId, w=weak()](bool) {
if (auto shared = w.lock()) {
shared->checkPendingCall(callId);
}
};
std::weak_ptr<SIPCall> weak_dev_call = dev_call;
auto iceOptions = getIceOptions();
iceOptions.onNegoDone = onNegoDone;
dev_call->setIPToIP(true);
dev_call->setSecure(isTlsEnabled());
auto ice = createIceTransport(("sip:" + dev_call->getCallId()).c_str(),
ICE_COMPONENTS, true, getIceOptions());
ICE_COMPONENTS, true, iceOptions);
if (not ice) {
JAMI_WARN("[call %s] Can't create ICE", call->getCallId().c_str());
dev_call->removeCall();
return;
}
auto ice_config = getIceOptions();
ice_config.tcpEnable = true;
auto ice_tcp = createIceTransport(("sip:" + dev_call->getCallId()).c_str(), ICE_COMPONENTS, true, ice_config);
iceOptions.tcpEnable = true;
auto ice_tcp = createIceTransport(("sip:" + dev_call->getCallId()).c_str(), ICE_COMPONENTS, true, iceOptions);
if (not ice_tcp) {
JAMI_WARN("Can't create ICE over TCP, will only use UDP");
}
@ -595,7 +600,7 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::
);
std::lock_guard<std::mutex> lock(sthis->callsMutex_);
sthis->pendingCalls_.emplace_back(PendingCall{
sthis->pendingCalls_.emplace(call->getCallId(), PendingCall {
std::chrono::steady_clock::now(),
std::move(ice), std::move(ice_tcp), weak_dev_call,
std::move(listenKey),
@ -604,7 +609,6 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::
peer_account,
tls::CertificateStore::instance().getCertificate(toUri)
});
sthis->checkPendingCallsTask();
return false;
});
};
@ -1398,44 +1402,30 @@ JamiAccount::registerName(const std::string& password, const std::string& name)
}
#endif
bool
JamiAccount::handlePendingCallList()
void
JamiAccount::checkPendingCall(const std::string& callId)
{
// Process pending call into a local list to not block threads depending on this list,
// as incoming call handlers.
decltype(pendingCalls_) pending_calls;
{
std::lock_guard<std::mutex> lock(callsMutex_);
pending_calls = std::move(pendingCalls_);
pendingCalls_.clear();
// Note only one check at a time. In fact, the UDP and TCP negotiation
// can finish at the same time and we need to avoid potential race conditions.
std::lock_guard<std::mutex> lk(callsMutex_);
auto it = pendingCalls_.find(callId);
if (it == pendingCalls_.end()) return;
bool incoming = !it->second.call_key;
bool handled;
try {
handled = handlePendingCall(it->second, incoming);
} catch (const std::exception& e) {
JAMI_ERR("[DHT] exception during pending call handling: %s", e.what());
handled = true; // drop from pending list
}
auto pc_iter = std::begin(pending_calls);
while (pc_iter != std::end(pending_calls)) {
bool incoming = !pc_iter->call_key; // do it now, handlePendingCall may invalidate pc data
bool handled;
try {
handled = handlePendingCall(*pc_iter, incoming);
} catch (const std::exception& e) {
JAMI_ERR("[DHT] exception during pending call handling: %s", e.what());
handled = true; // drop from pending list
}
if (handled) {
if (handled) {
if (not incoming) {
// Cancel pending listen (outgoing call)
if (not incoming)
dht_->cancelListen(pc_iter->call_key, std::move(pc_iter->listen_key));
pc_iter = pending_calls.erase(pc_iter);
} else
++pc_iter;
}
// Re-integrate non-handled and valid pending calls
{
std::lock_guard<std::mutex> lock(callsMutex_);
pendingCalls_.splice(std::end(pendingCalls_), pending_calls);
return not pendingCalls_.empty();
dht_->cancelListen(it->second.call_key, std::move(it->second.listen_key));
}
pendingCalls_.erase(it);
}
}
@ -1609,7 +1599,6 @@ JamiAccount::handlePendingCall(PendingCall& pc, bool incoming)
call->setTransport(transport);
if (incoming) {
std::lock_guard<std::mutex> lock(callsMutex_);
pendingSipCalls_.emplace_back(std::move(pc)); // copy of pc
} else {
// Be acknowledged on transport connection/disconnection
@ -2113,10 +2102,17 @@ JamiAccount::incomingCall(dht::IceCandidates&& msg, const std::shared_ptr<dht::c
if (!call) {
return;
}
auto ice = createIceTransport(("sip:"+call->getCallId()).c_str(), ICE_COMPONENTS, false, getIceOptions());
auto ice_config = getIceOptions();
ice_config.tcpEnable = true;
auto ice_tcp = createIceTransport(("sip:" + call->getCallId()).c_str(), ICE_COMPONENTS, true, ice_config);
auto callId = call->getCallId();
auto onNegoDone = [callId, w=weak()](bool) {
if (auto shared = w.lock()) {
shared->checkPendingCall(callId);
}
};
auto iceOptions = getIceOptions();
iceOptions.onNegoDone = onNegoDone;
auto ice = createIceTransport(("sip:"+call->getCallId()).c_str(), ICE_COMPONENTS, false, iceOptions);
iceOptions.tcpEnable = true;
auto ice_tcp = createIceTransport(("sip:" + call->getCallId()).c_str(), ICE_COMPONENTS, true, iceOptions);
std::weak_ptr<SIPCall> wcall = call;
Manager::instance().addTask([account=shared(), wcall, ice, ice_tcp, msg, from_cert, from] {
@ -2200,20 +2196,17 @@ JamiAccount::replyToIncomingIceMsg(const std::shared_ptr<SIPCall>& call,
call->setPeerNumber(from);
// Let the call handled by the PendingCall handler loop
{
std::lock_guard<std::mutex> lock(callsMutex_);
pendingCalls_.emplace_back(
PendingCall{/*.start = */ started_time,
/*.ice_sp = */ udp_failed ? nullptr : ice,
/*.ice_tcp_sp = */ tcp_failed ? nullptr : ice_tcp,
/*.call = */ wcall,
/*.listen_key = */ {},
/*.call_key = */ {},
/*.from = */ peer_ice_msg.from,
/*.from_account = */ from_id,
/*.from_cert = */ from_cert});
checkPendingCallsTask();
}
std::lock_guard<std::mutex> lock(callsMutex_);
pendingCalls_.emplace(call->getCallId(),
PendingCall{/*.start = */ started_time,
/*.ice_sp = */ udp_failed ? nullptr : ice,
/*.ice_tcp_sp = */ tcp_failed ? nullptr : ice_tcp,
/*.call = */ wcall,
/*.listen_key = */ {},
/*.call_key = */ {},
/*.from = */ peer_ice_msg.from,
/*.from_account = */ from_id,
/*.from_cert = */ from_cert});
}
void
@ -2237,7 +2230,6 @@ JamiAccount::doUnregister(std::function<void(bool)> released_cb)
std::lock_guard<std::mutex> lock(callsMutex_);
pendingCalls_.clear();
pendingSipCalls_.clear();
checkPendingCallsTask();
}
dht_->join();
@ -3094,22 +3086,6 @@ JamiAccount::getLastMessages(const uint64_t& base_timestamp)
return SIPAccountBase::getLastMessages(base_timestamp);
}
void
JamiAccount::checkPendingCallsTask()
{
decltype(eventHandler) handler;
if (not pendingCalls_.empty()) {
handler = Manager::instance().scheduler().scheduleAtFixedRate([w = weak()] {
if (auto this_ = w.lock())
return this_->handlePendingCallList();
return false;
}, std::chrono::milliseconds(10));
}
std::swap(handler, eventHandler);
if (handler)
handler->cancel();
}
void
JamiAccount::startAccountPublish()
{

View File

@ -507,7 +507,7 @@ private:
*/
void onPortMappingAdded(uint16_t port_used, bool success);
bool handlePendingCallList();
void checkPendingCall(const std::string& callId);
bool handlePendingCall(PendingCall& pc, bool incoming);
void loadAccount(const std::string& archive_password = {}, const std::string& archive_pin = {}, const std::string& archive_path = {});
@ -563,8 +563,6 @@ private:
template <class... Args>
std::shared_ptr<IceTransport> createIceTransport(const Args&... args);
void checkPendingCallsTask();
#if HAVE_RINGNS
std::string nameServer_;
std::string registeredName_;
@ -580,7 +578,7 @@ private:
/**
* DHT calls waiting for ICE negotiation
*/
std::list<PendingCall> pendingCalls_;
std::map<std::string, PendingCall> pendingCalls_;
/**
* Incoming DHT calls that are not yet actual SIP calls.
@ -677,8 +675,6 @@ private:
bool accountPeerDiscovery_ {false};
bool accountPublish_ {false};
std::shared_ptr<RepeatedTask> eventHandler {};
/**
* Avoid to refresh the cache multiple times
*/