jamiaccount: make getIceOptions async and resolve ips

storeActiveIpAddress is done for each connectivityChange(), but
this logic is incorrect, because the ISP can change the IP of the
router without any connectivityChange(). This means that if no
connectivityChange() the address in the candidates will be incorrect
if the public ip is changed by the ISP.
The idea here is to get the public IP as soon as the IceOptions are
generated.
This changes the signature of getIceOptions() and removes it from
Account. After all, the account has no need to always manage ICE
the same way (maybe one day we will have accounts without ICE).

GitLab: #534
Change-Id: I56613e9ce48ef21f66dcf188d33334802aae601b
This commit is contained in:
Sébastien Blin
2021-04-27 11:02:11 -04:00
committed by Adrien Béraud
parent d97121c454
commit 451854b207
8 changed files with 262 additions and 256 deletions

View File

@ -668,14 +668,6 @@ Account::getActiveAccountCodecInfoList(MediaType mediaType) const
return accountCodecList;
}
const IceTransportOptions
Account::getIceOptions() const noexcept
{
IceTransportOptions opts;
opts.upnpEnable = getUPnPActive();
return opts;
}
const std::string&
Account::getUserAgentName()
{

View File

@ -197,11 +197,6 @@ public:
virtual std::map<std::string, std::string> getNearbyPeers() const { return {}; }
/**
* Store the local/public addresses used to register
*/
virtual void storeActiveIpAddress() {};
/**
* Return the status corresponding to the token.
*/
@ -305,8 +300,6 @@ public:
*/
IpAddr getUPnPIpAddress() const;
virtual const IceTransportOptions getIceOptions() const noexcept;
/**
* Random generator engine
* Logical account state shall never rely on the state of the random generator.

View File

@ -472,78 +472,87 @@ ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certif
};
// If no socket exists, we need to initiate an ICE connection.
auto ice_config = sthis->account.getIceOptions();
ice_config.tcpEnable = true;
ice_config.onInitDone = [w,
cbId,
deviceId = std::move(deviceId),
name = std::move(name),
cert = std::move(cert),
vid,
eraseInfo](bool ok) {
sthis->account.getIceOptions(std::move([w,
cbId,
deviceId = std::move(deviceId),
name = std::move(name),
cert = std::move(cert),
vid,
eraseInfo](auto ice_config) {
auto sthis = w.lock();
if (!sthis)
return;
if (!ok) {
JAMI_ERR("Cannot initialize ICE session.");
for (const auto& pending : sthis->extractPendingCallbacks(deviceId))
pending.cb(nullptr, deviceId);
runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
return;
}
dht::ThreadPool::io().run(
[w = std::move(w), deviceId = std::move(deviceId), vid = std::move(vid)] {
if (auto sthis = w.lock())
sthis->connectDeviceStartIce(deviceId, vid);
});
};
ice_config.onNegoDone = [w,
cbId,
deviceId = std::move(deviceId),
name = std::move(name),
cert = std::move(cert),
vid,
eraseInfo](bool ok) {
auto sthis = w.lock();
if (!sthis)
return;
if (!ok) {
JAMI_ERR("ICE negotiation failed.");
for (const auto& pending : sthis->extractPendingCallbacks(deviceId))
pending.cb(nullptr, deviceId);
runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
return;
}
dht::ThreadPool::io().run([w = std::move(w),
deviceId = std::move(deviceId),
name = std::move(name),
cert = std::move(cert),
vid = std::move(vid)] {
ice_config.tcpEnable = true;
ice_config.onInitDone = [w,
cbId,
deviceId = std::move(deviceId),
name = std::move(name),
cert = std::move(cert),
vid,
eraseInfo](bool ok) {
auto sthis = w.lock();
if (!sthis)
return;
sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert);
});
};
if (!ok) {
JAMI_ERR("Cannot initialize ICE session.");
for (const auto& pending : sthis->extractPendingCallbacks(deviceId))
pending.cb(nullptr, deviceId);
runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
return;
}
auto info = std::make_shared<ConnectionInfo>();
{
std::lock_guard<std::mutex> lk(sthis->infosMtx_);
sthis->infos_[{deviceId, vid}] = info;
}
std::unique_lock<std::mutex> lk {info->mutex_};
info->ice_ = Manager::instance().getIceTransportFactory().createUTransport(
sthis->account.getAccountID().c_str(), 1, false, ice_config);
dht::ThreadPool::io().run(
[w = std::move(w), deviceId = std::move(deviceId), vid = std::move(vid)] {
if (auto sthis = w.lock())
sthis->connectDeviceStartIce(deviceId, vid);
});
};
ice_config.onNegoDone = [w,
cbId,
deviceId = std::move(deviceId),
name = std::move(name),
cert = std::move(cert),
vid,
eraseInfo](bool ok) {
auto sthis = w.lock();
if (!sthis)
return;
if (!ok) {
JAMI_ERR("ICE negotiation failed.");
for (const auto& pending : sthis->extractPendingCallbacks(deviceId))
pending.cb(nullptr, deviceId);
runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
return;
}
if (!info->ice_) {
JAMI_ERR("Cannot initialize ICE session.");
for (const auto& pending : sthis->extractPendingCallbacks(deviceId))
pending.cb(nullptr, deviceId);
eraseInfo();
return;
}
dht::ThreadPool::io().run([w = std::move(w),
deviceId = std::move(deviceId),
name = std::move(name),
cert = std::move(cert),
vid = std::move(vid)] {
auto sthis = w.lock();
if (!sthis)
return;
sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert);
});
};
auto info = std::make_shared<ConnectionInfo>();
{
std::lock_guard<std::mutex> lk(sthis->infosMtx_);
sthis->infos_[{deviceId, vid}] = info;
}
std::unique_lock<std::mutex> lk {info->mutex_};
info->ice_ = Manager::instance().getIceTransportFactory().createUTransport(
sthis->account.getAccountID().c_str(), 1, false, ice_config);
if (!info->ice_) {
JAMI_ERR("Cannot initialize ICE session.");
for (const auto& pending : sthis->extractPendingCallbacks(deviceId))
pending.cb(nullptr, deviceId);
eraseInfo();
}
}));
});
}
@ -816,76 +825,80 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
return;
}
// Note: used when the ice negotiation fails to erase
// all stored structures.
auto eraseInfo = [w = weak(), id = req.id, from = req.from] {
if (auto shared = w.lock()) {
std::lock_guard<std::mutex> lk(shared->infosMtx_);
shared->infos_.erase({from, id});
}
};
// Because the connection is accepted, create an ICE socket.
auto ice_config = account.getIceOptions();
ice_config.tcpEnable = true;
ice_config.onInitDone = [w = weak(), req, eraseInfo](bool ok) {
account.getIceOptions(std::move([w=weak(), req](auto ice_config) {
auto deviceId = req.from.toString();
auto shared = w.lock();
if (!shared)
return;
if (!ok) {
JAMI_ERR("Cannot initialize ICE session.");
if (shared->connReadyCb_)
shared->connReadyCb_(req.from, "", nullptr);
runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
return;
}
// Note: used when the ice negotiation fails to erase
// all stored structures.
auto eraseInfo = [w, id = req.id, from = req.from] {
if (auto shared = w.lock()) {
std::lock_guard<std::mutex> lk(shared->infosMtx_);
shared->infos_.erase({from, id});
}
};
dht::ThreadPool::io().run([w = std::move(w), req = std::move(req)] {
ice_config.tcpEnable = true;
ice_config.onInitDone = [w, req, eraseInfo](bool ok) {
auto shared = w.lock();
if (!shared)
return;
shared->onRequestStartIce(req);
});
};
if (!ok) {
JAMI_ERR("Cannot initialize ICE session.");
if (shared->connReadyCb_)
shared->connReadyCb_(req.from, "", nullptr);
runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
return;
}
ice_config.onNegoDone = [w = weak(), req, eraseInfo](bool ok) {
auto shared = w.lock();
if (!shared)
return;
if (!ok) {
JAMI_ERR("ICE negotiation failed");
dht::ThreadPool::io().run([w = std::move(w), req = std::move(req)] {
auto shared = w.lock();
if (!shared)
return;
shared->onRequestStartIce(req);
});
};
ice_config.onNegoDone = [w, req, eraseInfo](bool ok) {
auto shared = w.lock();
if (!shared)
return;
if (!ok) {
JAMI_ERR("ICE negotiation failed");
if (shared->connReadyCb_)
shared->connReadyCb_(req.from, "", nullptr);
runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
return;
}
dht::ThreadPool::io().run([w = std::move(w), req = std::move(req)] {
if (auto shared = w.lock())
shared->onRequestOnNegoDone(req);
});
};
// Negotiate a new ICE socket
auto info = std::make_shared<ConnectionInfo>();
{
std::lock_guard<std::mutex> lk(shared->infosMtx_);
shared->infos_[{req.from, req.id}] = info;
}
JAMI_INFO("[Account:%s] accepting connection from %s",
shared->account.getAccountID().c_str(),
deviceId.c_str());
std::unique_lock<std::mutex> lk {info->mutex_};
info->ice_ = Manager::instance()
.getIceTransportFactory()
.createUTransport(shared->account.getAccountID().c_str(), 1, true, ice_config);
if (not info->ice_) {
JAMI_ERR("Cannot initialize ICE session.");
if (shared->connReadyCb_)
shared->connReadyCb_(req.from, "", nullptr);
runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
return;
eraseInfo();
}
dht::ThreadPool::io().run([w = std::move(w), req = std::move(req)] {
if (auto shared = w.lock())
shared->onRequestOnNegoDone(req);
});
};
// Negotiate a new ICE socket
auto info = std::make_shared<ConnectionInfo>();
{
std::lock_guard<std::mutex> lk(infosMtx_);
infos_[{req.from, req.id}] = info;
}
JAMI_INFO("[Account:%s] accepting connection from %s",
account.getAccountID().c_str(),
deviceId.c_str());
std::unique_lock<std::mutex> lk {info->mutex_};
info->ice_ = Manager::instance()
.getIceTransportFactory()
.createUTransport(account.getAccountID().c_str(), 1, true, ice_config);
if (not info->ice_) {
JAMI_ERR("Cannot initialize ICE session.");
if (connReadyCb_)
connReadyCb_(req.from, "", nullptr);
eraseInfo();
return;
}
}));
}
void

View File

@ -718,68 +718,69 @@ JamiAccount::onConnectedOutgoingCall(const std::shared_ptr<SIPCall>& call,
return;
JAMI_DBG("[call:%s] outgoing call connected to %s", call->getCallId().c_str(), to_id.c_str());
auto opts = getIceOptions();
opts.onInitDone = [w = weak(), target = std::move(target), call](bool ok) {
if (!ok) {
JAMI_ERR("ICE medias are not initialized");
return;
}
auto shared = w.lock();
if (!shared or !call)
return;
const auto localAddress = ip_utils::getInterfaceAddr(shared->getLocalInterface(),
target.getFamily());
IpAddr addrSdp = shared->getPublishedSameasLocal()
? localAddress
: shared->getPublishedIpAddress(target.getFamily());
// fallback on local address
if (not addrSdp)
addrSdp = localAddress;
// Initialize the session using ULAW as default codec in case of early media
// The session should be ready to receive media once the first INVITE is sent, before
// the session initialization is completed
if (!getSystemCodecContainer()->searchCodecByName("PCMA", jami::MEDIA_AUDIO))
JAMI_WARN("Could not instantiate codec for early media");
// Building the local SDP offer
auto& sdp = call->getSDP();
sdp.setPublishedIP(addrSdp);
auto mediaAttrList = call->getMediaAttributeList();
if (mediaAttrList.empty()) {
JAMI_ERR("Call [%s] has no media. Abort!", call->getCallId().c_str());
return;
}
if (not sdp.createOffer(mediaAttrList)) {
JAMI_ERR("Could not send outgoing INVITE request for new call");
return;
}
// Note: pj_ice_strans_create can call onComplete in the same thread
// This means that iceMutex_ in IceTransport can be locked when onInitDone is called
// So, we need to run the call creation in the main thread
// Also, we do not directly call SIPStartCall before receiving onInitDone, because
// there is an inside waitForInitialization that can block the thread.
runOnMainThread([w = std::move(w), target = std::move(target), call = std::move(call)] {
getIceOptions(std::move([=](auto opts) {
opts.onInitDone = [w = weak(), target = std::move(target), call](bool ok) {
if (!ok) {
JAMI_ERR("ICE medias are not initialized");
return;
}
auto shared = w.lock();
if (!shared)
if (!shared or !call)
return;
if (not shared->SIPStartCall(*call, target)) {
JAMI_ERR("Could not send outgoing INVITE request for new call");
const auto localAddress = ip_utils::getInterfaceAddr(shared->getLocalInterface(),
target.getFamily());
IpAddr addrSdp = shared->getPublishedSameasLocal()
? localAddress
: shared->getPublishedIpAddress(target.getFamily());
// fallback on local address
if (not addrSdp)
addrSdp = localAddress;
// Initialize the session using ULAW as default codec in case of early media
// The session should be ready to receive media once the first INVITE is sent, before
// the session initialization is completed
if (!getSystemCodecContainer()->searchCodecByName("PCMA", jami::MEDIA_AUDIO))
JAMI_WARN("Could not instantiate codec for early media");
// Building the local SDP offer
auto& sdp = call->getSDP();
sdp.setPublishedIP(addrSdp);
auto mediaAttrList = call->getMediaAttributeList();
if (mediaAttrList.empty()) {
JAMI_ERR("Call [%s] has no media. Abort!", call->getCallId().c_str());
return;
}
});
};
call->setIPToIP(true);
call->setPeerNumber(to_id);
call->initIceMediaTransport(true, std::move(opts));
if (not sdp.createOffer(mediaAttrList)) {
JAMI_ERR("Could not send outgoing INVITE request for new call");
return;
}
// Note: pj_ice_strans_create can call onComplete in the same thread
// This means that iceMutex_ in IceTransport can be locked when onInitDone is called
// So, we need to run the call creation in the main thread
// Also, we do not directly call SIPStartCall before receiving onInitDone, because
// there is an inside waitForInitialization that can block the thread.
runOnMainThread([w = std::move(w), target = std::move(target), call = std::move(call)] {
auto shared = w.lock();
if (!shared)
return;
if (not shared->SIPStartCall(*call, target)) {
JAMI_ERR("Could not send outgoing INVITE request for new call");
}
});
};
call->setIPToIP(true);
call->setPeerNumber(to_id);
call->initIceMediaTransport(true, std::move(opts));
}));
}
bool
@ -2695,46 +2696,47 @@ JamiAccount::incomingCall(dht::IceCandidates&& msg,
return;
}
auto callId = call->getCallId();
auto iceOptions = getIceOptions();
iceOptions.onNegoDone = [callId, w = weak()](bool) {
runOnMainThread([callId, w]() {
if (auto shared = w.lock())
shared->checkPendingCall(callId);
getIceOptions(std::move([=](auto iceOptions) {
iceOptions.onNegoDone = [callId, w = weak()](bool) {
runOnMainThread([callId, w]() {
if (auto shared = w.lock())
shared->checkPendingCall(callId);
});
};
auto ice = createIceTransport(("sip:" + call->getCallId()).c_str(),
ICE_COMPONENTS,
false,
iceOptions);
iceOptions.tcpEnable = true;
auto ice_tcp = createIceTransport(("sip:" + call->getCallId()).c_str(),
ICE_COMPONENTS,
true,
iceOptions);
std::weak_ptr<SIPCall> wcall = call;
Manager::instance().addTask([account = shared(), wcall, ice, ice_tcp, msg, from_cert, from] {
auto call = wcall.lock();
// call aborted?
if (not call)
return false;
if (ice->isFailed()) {
JAMI_ERR("[call:%s] ice init failed", call->getCallId().c_str());
call->onFailure(EIO);
return false;
}
// Loop until ICE transport is initialized.
// Note: we suppose that ICE init routine has a an internal timeout (bounded in time)
// and we let upper layers decide when the call shall be aborted (our first check upper).
if ((not ice->isInitialized()) || (ice_tcp && !ice_tcp->isInitialized()))
return true;
account->replyToIncomingIceMsg(call, ice, ice_tcp, msg, from_cert, from);
return false;
});
};
auto ice = createIceTransport(("sip:" + call->getCallId()).c_str(),
ICE_COMPONENTS,
false,
iceOptions);
iceOptions.tcpEnable = true;
auto ice_tcp = createIceTransport(("sip:" + call->getCallId()).c_str(),
ICE_COMPONENTS,
true,
iceOptions);
std::weak_ptr<SIPCall> wcall = call;
Manager::instance().addTask([account = shared(), wcall, ice, ice_tcp, msg, from_cert, from] {
auto call = wcall.lock();
// call aborted?
if (not call)
return false;
if (ice->isFailed()) {
JAMI_ERR("[call:%s] ice init failed", call->getCallId().c_str());
call->onFailure(EIO);
return false;
}
// Loop until ICE transport is initialized.
// Note: we suppose that ICE init routine has a an internal timeout (bounded in time)
// and we let upper layers decide when the call shall be aborted (our first check upper).
if ((not ice->isInitialized()) || (ice_tcp && !ice_tcp->isInitialized()))
return true;
account->replyToIncomingIceMsg(call, ice, ice_tcp, msg, from_cert, from);
return false;
});
}));
}
void
@ -3666,28 +3668,30 @@ JamiAccount::onIsComposing(const std::string& peer, bool isWriting)
}
}
const IceTransportOptions
JamiAccount::getIceOptions() const noexcept
void
JamiAccount::getIceOptions(std::function<void(IceTransportOptions)> cb) noexcept
{
auto opts = SIPAccountBase::getIceOptions();
auto publishedAddr = getPublishedIpAddress();
storeActiveIpAddress([this, cb=std::move(cb)] {
auto opts = SIPAccountBase::getIceOptions();
auto publishedAddr = getPublishedIpAddress();
if (publishedAddr) {
auto interfaceAddr = ip_utils::getInterfaceAddr(getLocalInterface(),
publishedAddr.getFamily());
if (interfaceAddr) {
opts.accountLocalAddr = interfaceAddr;
opts.accountPublicAddr = publishedAddr;
if (publishedAddr) {
auto interfaceAddr = ip_utils::getInterfaceAddr(getLocalInterface(),
publishedAddr.getFamily());
if (interfaceAddr) {
opts.accountLocalAddr = interfaceAddr;
opts.accountPublicAddr = publishedAddr;
}
}
}
return opts;
if (cb)
cb(std::move(opts));
});
}
void
JamiAccount::storeActiveIpAddress()
JamiAccount::storeActiveIpAddress(std::function<void()>&& cb)
{
dht_->getPublicAddress([this](std::vector<dht::SockAddr>&& results) {
dht_->getPublicAddress([this, cb = std::move(cb)](std::vector<dht::SockAddr>&& results) {
bool hasIpv4 {false}, hasIpv6 {false};
for (auto& result : results) {
auto family = result.getFamily();
@ -3714,6 +3718,8 @@ JamiAccount::storeActiveIpAddress()
if (hasIpv4 and hasIpv6)
break;
}
if (cb)
cb();
});
}

View File

@ -435,12 +435,12 @@ public:
/**
* Store the local/public addresses used to register
*/
void storeActiveIpAddress() override;
void storeActiveIpAddress(std::function<void()>&& cb = {});
/**
* Create and return ICE options.
*/
const IceTransportOptions getIceOptions() const noexcept override;
void getIceOptions(std::function<void(IceTransportOptions)> cb) noexcept;
#ifdef DRING_TESTABLE
ConnectionManager& connectionManager() { return *connectionManager_; }

View File

@ -468,7 +468,8 @@ SIPAccountBase::generateVideoPort() const
const IceTransportOptions
SIPAccountBase::getIceOptions() const noexcept
{
auto opts = Account::getIceOptions(); // Local copy of global account ICE settings
IceTransportOptions opts;
opts.upnpEnable = getUPnPActive();
if (stunEnabled_)
opts.stunServers.emplace_back(StunServerInfo().setUri(stunServer_));

View File

@ -243,7 +243,7 @@ public:
void setStunServer(const std::string& srv) { stunServer_ = srv; }
const IceTransportOptions getIceOptions() const noexcept override;
const IceTransportOptions getIceOptions() const noexcept;
virtual void sendTextMessage(const std::string& to,
const std::map<std::string, std::string>& payloads,

View File

@ -262,10 +262,11 @@ UPnPContext::setPublicAddress(const IpAddr& addr)
if (not addr)
return;
JAMI_DBG("Setting the known public address to %s", addr.toString().c_str());
std::lock_guard<std::mutex> lock(mappingMutex_);
knownPublicAddress_ = std::move(addr);
if (knownPublicAddress_ != addr) {
knownPublicAddress_ = std::move(addr);
JAMI_DBG("Setting the known public address to %s", addr.toString().c_str());
}
}
bool