turn: improve connectivity with unreachable TURN

Initializing a ICE session will need to gather candidates. The TURN
can be long to retrieve, and fails can be really long to detect.
There is at least 3 cases of failures:

+ IPv6 badly configured, which can cause a DNS resolution timeout
of several minutes (that's why the IP was cached)
+ Empty DNS entries, causing a resolution failure
+ A TURN server un-reachable or wrongly configured (e.g. 1.1.1.1)

The idea here is to resolve the TURN and test the connection before
caching it. And use it when cached.
This avoid all resolutions steps and we're basically sure that
it was working.

Other approaches:
+ Add a new callback in pjsip to detect that the TURN is taking too long
to remove it for next calls, but I prefer to not add another patch in
pj and it's not an ideal solution
+ trickle ICE to not wait for all candidates, but this is a big changes
and will generate more DHT messages
+ Do not retransmit messages, but this is against the RFC

Change-Id: Iaec4308bca8cbbbfa4d6b1b6d7a7759b8062a67a
GitLab: #781
This commit is contained in:
Sébastien Blin
2022-11-01 16:13:28 -04:00
committed by Adrien Béraud
parent 87c5f3cef8
commit 64ffbffcb5
12 changed files with 667 additions and 172 deletions

View File

@ -18,6 +18,10 @@ libconnectivity_la_SOURCES = \
./connectivity/peer_connection.h \
./connectivity/sip_utils.cpp \
./connectivity/sip_utils.h \
./connectivity/turn_transport.cpp \
./connectivity/turn_transport.h \
./connectivity/turn_cache.cpp \
./connectivity/turn_cache.h \
./connectivity/utf8_utils.cpp \
./connectivity/utf8_utils.h \
./connectivity/transport/peer_channel.h

View File

@ -52,9 +52,6 @@ using OnStateChangeCb = std::function<bool(tls::TlsSessionState state)>;
using OnReadyCb = std::function<void(bool ok)>;
using onShutdownCb = std::function<void(void)>;
class TurnTransport;
class ConnectedTurnTransport;
//==============================================================================
class Stream

View File

@ -0,0 +1,196 @@
/*
* Copyright (C) 2022 Savoir-faire Linux Inc.
*
* Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "connectivity/turn_cache.h"
#include "logger.h"
#include "fileutils.h"
#include "manager.h"
#include "opendht/thread_pool.h" // TODO remove asio
namespace jami {
TurnCache::TurnCache(const std::string& accountId,
const std::string& cachePath,
const TurnTransportParams& params,
bool enabled)
: accountId_(accountId)
, cachePath_(cachePath)
, io_context(Manager::instance().ioContext())
{
refreshTimer_ = std::make_unique<asio::steady_timer>(*io_context,
std::chrono::steady_clock::now());
reconfigure(params, enabled);
}
TurnCache::~TurnCache() {}
std::optional<IpAddr>
TurnCache::getResolvedTurn(uint16_t family) const
{
if (family == AF_INET && cacheTurnV4_) {
return *cacheTurnV4_;
} else if (family == AF_INET6 && cacheTurnV6_) {
return *cacheTurnV6_;
}
return std::nullopt;
}
void
TurnCache::reconfigure(const TurnTransportParams& params, bool enabled)
{
params_ = params;
enabled_ = enabled;
{
std::lock_guard<std::mutex> lk(cachedTurnMutex_);
// Force re-resolution
isRefreshing_ = false;
cacheTurnV4_.reset();
cacheTurnV6_.reset();
testTurnV4_.reset();
testTurnV6_.reset();
}
refreshTimer_->expires_at(std::chrono::steady_clock::now());
refreshTimer_->async_wait(std::bind(&TurnCache::refresh, this, std::placeholders::_1));
}
void
TurnCache::refresh(const asio::error_code& ec)
{
if (ec == asio::error::operation_aborted)
return;
// The resolution of the TURN server can take quite some time (if timeout).
// So, run this in its own io thread to avoid to block the main thread.
// Avoid multiple refresh
if (isRefreshing_.exchange(true))
return;
if (!enabled_) {
// In this case, we do not use any TURN server
std::lock_guard<std::mutex> lk(cachedTurnMutex_);
cacheTurnV4_.reset();
cacheTurnV6_.reset();
isRefreshing_ = false;
return;
}
JAMI_INFO("[Account %s] Refresh cache for TURN server resolution", accountId_.c_str());
// Retrieve old cached value if available.
// This means that we directly get the correct value when launching the application on the
// same network
// No need to resolve, it's already a valid address
auto server = params_.domain;
if (IpAddr::isValid(server, AF_INET)) {
testTurn(IpAddr(server, AF_INET));
return;
} else if (IpAddr::isValid(server, AF_INET6)) {
testTurn(IpAddr(server, AF_INET6));
return;
}
// Else cache resolution result
fileutils::recursive_mkdir(cachePath_ + DIR_SEPARATOR_STR + "domains", 0700);
auto pathV4 = cachePath_ + DIR_SEPARATOR_STR + "domains" + DIR_SEPARATOR_STR + "v4." + server;
IpAddr testV4, testV6;
if (auto turnV4File = std::ifstream(pathV4)) {
std::string content((std::istreambuf_iterator<char>(turnV4File)),
std::istreambuf_iterator<char>());
testV4 = IpAddr(content, AF_INET);
}
auto pathV6 = cachePath_ + DIR_SEPARATOR_STR + "domains" + DIR_SEPARATOR_STR + "v6." + server;
if (auto turnV6File = std::ifstream(pathV6)) {
std::string content((std::istreambuf_iterator<char>(turnV6File)),
std::istreambuf_iterator<char>());
testV6 = IpAddr(content, AF_INET6);
}
// Resolve just in case. The user can have a different connectivity
auto turnV4 = IpAddr {server, AF_INET};
{
if (turnV4) {
// Cache value to avoid a delay when starting up Jami
std::ofstream turnV4File(pathV4);
turnV4File << turnV4.toString();
} else
fileutils::remove(pathV4, true);
// Update TURN
testV4 = IpAddr(std::move(turnV4));
}
auto turnV6 = IpAddr {server, AF_INET6};
{
if (turnV6) {
// Cache value to avoid a delay when starting up Jami
std::ofstream turnV6File(pathV6);
turnV6File << turnV6.toString();
} else
fileutils::remove(pathV6, true);
// Update TURN
testV6 = IpAddr(std::move(turnV6));
}
if (testV4)
testTurn(testV4);
if (testV6)
testTurn(testV6);
refreshTurnDelay(!testV4 && !testV6);
}
void
TurnCache::testTurn(IpAddr server)
{
TurnTransportParams params = params_;
params.server = server;
std::lock_guard<std::mutex> lk(cachedTurnMutex_);
auto& turn = server.isIpv4() ? testTurnV4_ : testTurnV6_;
turn.reset(); // Stop previous TURN
try {
turn = std::make_unique<TurnTransport>(
params, std::move([this, server](bool ok) {
std::lock_guard<std::mutex> lk(cachedTurnMutex_);
auto& cacheTurn = server.isIpv4() ? cacheTurnV4_ : cacheTurnV6_;
if (!ok) {
JAMI_ERROR("Connection to {:s} failed - reset", server.toString());
cacheTurn.reset();
} else {
JAMI_DEBUG("Connection to {:s} ready", server.toString());
cacheTurn = std::make_unique<IpAddr>(server);
}
refreshTurnDelay(!cacheTurnV6_ && !cacheTurnV4_);
auto& turn = server.isIpv4() ? testTurnV4_ : testTurnV6_;
turn->shutdown();
}));
} catch (const std::exception& e) {
JAMI_ERROR("TurnTransport creation error: {}", e.what());
}
}
void
TurnCache::refreshTurnDelay(bool scheduleNext)
{
isRefreshing_ = false;
if (scheduleNext) {
JAMI_WARNING("[Account {:s}] Cache for TURN resolution failed.", accountId_);
refreshTimer_->expires_at(std::chrono::steady_clock::now() + turnRefreshDelay_);
refreshTimer_->async_wait(std::bind(&TurnCache::refresh, this, std::placeholders::_1));
if (turnRefreshDelay_ < std::chrono::minutes(30))
turnRefreshDelay_ *= 2;
} else {
JAMI_DEBUG("[Account {:s}] Cache refreshed for TURN resolution", accountId_);
turnRefreshDelay_ = std::chrono::seconds(10);
}
}
} // namespace jami

View File

@ -0,0 +1,88 @@
/*
* Copyright (C) 2022 Savoir-faire Linux Inc.
*
* Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#pragma once
#include "connectivity/ip_utils.h"
#include "connectivity/turn_transport.h"
#include <atomic>
#include <asio.hpp>
#include <chrono>
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
namespace jami {
class TurnCache
{
public:
TurnCache(const std::string& accountId,
const std::string& cachePath,
const TurnTransportParams& params,
bool enabled);
~TurnCache();
std::optional<IpAddr> getResolvedTurn(uint16_t family = AF_INET) const;
/**
* Pass a new configuration for the cache
* @param param The new configuration
*/
void reconfigure(const TurnTransportParams& params, bool enabled);
/**
* Refresh cache from current configuration
*/
void refresh(const asio::error_code& ec = {});
private:
std::string accountId_;
std::string cachePath_;
TurnTransportParams params_;
std::atomic_bool enabled_ {false};
/**
* Avoid to refresh the cache multiple times
*/
std::atomic_bool isRefreshing_ {false};
/**
* This will cache the turn server resolution each time we launch
* Jami, or for each connectivityChange()
*/
void testTurn(IpAddr server);
std::unique_ptr<TurnTransport> testTurnV4_;
std::unique_ptr<TurnTransport> testTurnV6_;
// Used to detect if a turn server is down.
void refreshTurnDelay(bool scheduleNext);
std::chrono::seconds turnRefreshDelay_ {std::chrono::seconds(10)};
// Store resoved turn addresses
mutable std::mutex cachedTurnMutex_ {};
std::unique_ptr<IpAddr> cacheTurnV4_ {};
std::unique_ptr<IpAddr> cacheTurnV6_ {};
// io
std::shared_ptr<asio::io_context> io_context;
std::unique_ptr<asio::steady_timer> refreshTimer_;
};
} // namespace jami

View File

@ -0,0 +1,207 @@
/*
* Copyright (C) 2004-2022 Savoir-faire Linux Inc.
*
* Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com>
* Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "connectivity/turn_transport.h"
#include "connectivity/sip_utils.h"
#include "manager.h"
#include <asio.hpp>
#include <pjnath.h>
#include <pjlib-util.h>
#include <pjlib.h>
#define TRY(ret) \
do { \
if ((ret) != PJ_SUCCESS) \
throw std::runtime_error(#ret " failed"); \
} while (0)
namespace jami {
class TurnTransport::Impl
{
public:
Impl(std::function<void(bool)>&& cb)
{
io_context = Manager::instance().ioContext();
cb_ = std::move(cb);
}
~Impl();
/**
* Detect new TURN state
*/
void onTurnState(pj_turn_state_t old_state, pj_turn_state_t new_state);
/**
* Pool events from pjsip
*/
void ioJob(const asio::error_code& ec);
void start()
{
// Start ioTask
std::lock_guard<std::mutex> lock(ioMutex_);
ioTask_ = std::make_unique<asio::steady_timer>(*io_context,
std::chrono::steady_clock::now());
ioTask_->async_wait(std::bind(&TurnTransport::Impl::ioJob, this, std::placeholders::_1));
}
void stop()
{
std::lock_guard<std::mutex> lock(ioMutex_);
stopped_ = true;
if (ioTask_)
ioTask_->cancel();
}
TurnTransportParams settings;
pj_caching_pool poolCache {};
pj_pool_t* pool {nullptr};
pj_stun_config stunConfig {};
pj_turn_sock* relay {nullptr};
pj_str_t relayAddr {};
IpAddr peerRelayAddr; // address where peers should connect to
IpAddr mappedAddr;
std::function<void(bool)> cb_;
// io
std::mutex ioMutex_;
std::shared_ptr<asio::io_context> io_context;
std::atomic_bool stopped_;
std::unique_ptr<asio::steady_timer> ioTask_;
};
TurnTransport::Impl::~Impl()
{
stop();
pj_caching_pool_destroy(&poolCache);
}
void
TurnTransport::Impl::onTurnState(pj_turn_state_t old_state, pj_turn_state_t new_state)
{
if (new_state == PJ_TURN_STATE_READY) {
pj_turn_session_info info;
pj_turn_sock_get_info(relay, &info);
peerRelayAddr = IpAddr {info.relay_addr};
mappedAddr = IpAddr {info.mapped_addr};
JAMI_DEBUG("TURN server ready, peer relay address: {:s}",
peerRelayAddr.toString(true, true).c_str());
cb_(true);
} else if (old_state <= PJ_TURN_STATE_READY and new_state > PJ_TURN_STATE_READY) {
JAMI_WARNING("TURN server disconnected ({:s})", pj_turn_state_name(new_state));
cb_(false);
}
}
void
TurnTransport::Impl::ioJob(const asio::error_code& ec)
{
if (ec == asio::error::operation_aborted)
return;
if (ec) {
JAMI_ERROR("Error: {:s}", ec.message());
return;
}
const pj_time_val delay = {0, 10};
pj_ioqueue_poll(stunConfig.ioqueue, &delay);
pj_timer_heap_poll(stunConfig.timer_heap, nullptr);
// If cancelled while running, ioTask_ expires at will crash
std::lock_guard<std::mutex> lock(ioMutex_);
if (!stopped_) {
ioTask_->expires_at(std::chrono::steady_clock::now());
ioTask_->async_wait(std::bind(&TurnTransport::Impl::ioJob, this, std::placeholders::_1));
}
}
TurnTransport::TurnTransport(const TurnTransportParams& params, std::function<void(bool)>&& cb)
: pimpl_ {new Impl(std::move(cb))}
{
auto server = params.server;
if (!server.getPort())
server.setPort(PJ_STUN_PORT);
if (server.isUnspecified())
throw std::invalid_argument("invalid turn server address");
pimpl_->settings = params;
// PJSIP memory pool
pj_caching_pool_init(&pimpl_->poolCache, &pj_pool_factory_default_policy, 0);
pimpl_->pool = pj_pool_create(&pimpl_->poolCache.factory, "TurnTransport", 512, 512, nullptr);
if (not pimpl_->pool)
throw std::runtime_error("pj_pool_create() failed");
// STUN config
pj_stun_config_init(&pimpl_->stunConfig, &pimpl_->poolCache.factory, 0, nullptr, nullptr);
// create global timer heap
TRY(pj_timer_heap_create(pimpl_->pool, 1000, &pimpl_->stunConfig.timer_heap));
// create global ioqueue
TRY(pj_ioqueue_create(pimpl_->pool, 16, &pimpl_->stunConfig.ioqueue));
// TURN callbacks
pj_turn_sock_cb relay_cb;
pj_bzero(&relay_cb, sizeof(relay_cb));
relay_cb.on_state =
[](pj_turn_sock* relay, pj_turn_state_t old_state, pj_turn_state_t new_state) {
auto pimpl = static_cast<Impl*>(pj_turn_sock_get_user_data(relay));
pimpl->onTurnState(old_state, new_state);
};
// TURN socket config
pj_turn_sock_cfg turn_sock_cfg;
pj_turn_sock_cfg_default(&turn_sock_cfg);
turn_sock_cfg.max_pkt_size = 4096;
// TURN socket creation
TRY(pj_turn_sock_create(&pimpl_->stunConfig,
server.getFamily(),
PJ_TURN_TP_TCP,
&relay_cb,
&turn_sock_cfg,
&*this->pimpl_,
&pimpl_->relay));
// TURN allocation setup
pj_turn_alloc_param turn_alloc_param;
pj_turn_alloc_param_default(&turn_alloc_param);
turn_alloc_param.peer_conn_type = PJ_TURN_TP_TCP;
pj_stun_auth_cred cred;
pj_bzero(&cred, sizeof(cred));
cred.type = PJ_STUN_AUTH_CRED_STATIC;
pj_cstr(&cred.data.static_cred.realm, pimpl_->settings.realm.c_str());
pj_cstr(&cred.data.static_cred.username, pimpl_->settings.username.c_str());
cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN;
pj_cstr(&cred.data.static_cred.data, pimpl_->settings.password.c_str());
pimpl_->relayAddr = pj_strdup3(pimpl_->pool, server.toString().c_str());
// TURN connection/allocation
JAMI_DEBUG("Connecting to TURN {:s}", server.toString(true, true));
TRY(pj_turn_sock_alloc(pimpl_->relay,
&pimpl_->relayAddr,
server.getPort(),
nullptr,
&cred,
&turn_alloc_param));
pimpl_->start();
}
TurnTransport::~TurnTransport() {}
void
TurnTransport::shutdown()
{
pimpl_->stop();
}
} // namespace jami

View File

@ -0,0 +1,59 @@
/*
* Copyright (C) 2022 Savoir-faire Linux Inc.
*
* Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com>
* Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#pragma once
#include "connectivity/ip_utils.h"
#include <functional>
#include <memory>
#include <string>
namespace jami {
struct TurnTransportParams
{
IpAddr server;
std::string domain; // Used by cache_turn
// Plain Credentials
std::string realm;
std::string username;
std::string password;
};
/**
* This class is used to test connection to TURN servers
* No other logic is implemented.
*/
class TurnTransport
{
public:
TurnTransport(const TurnTransportParams& param, std::function<void(bool)>&& cb);
~TurnTransport();
void shutdown();
private:
TurnTransport() = delete;
class Impl;
std::unique_ptr<Impl> pimpl_;
};
} // namespace jami

View File

@ -290,8 +290,7 @@ JamiAccount::JamiAccount(const std::string& accountId)
, dataPath_(cachePath_ + DIR_SEPARATOR_STR "values")
, connectionManager_ {}
, nonSwarmTransferManager_(std::make_shared<TransferManager>(accountId, ""))
{
}
{}
JamiAccount::~JamiAccount() noexcept
{
@ -902,7 +901,7 @@ JamiAccount::loadConfig()
registeredName_ = config().registeredName;
try {
auto str = fileutils::loadCacheTextFile(cachePath_ + DIR_SEPARATOR_STR "dhtproxy",
std::chrono::hours(24 * 7));
std::chrono::hours(24 * 7));
std::string err;
Json::Value root;
Json::CharReaderBuilder rbuilder;
@ -1965,11 +1964,14 @@ JamiAccount::doRegister_()
getAccountID().c_str(),
name.c_str());
if (this->config().turnEnabled && !cacheTurnV4_) {
// If TURN is enabled, but no TURN cached, there can be a temporary resolution
// error to solve. Sometimes, a connectivity change is not enough, so even if
// this case is really rare, it should be easy to avoid.
cacheTurnServers();
if (this->config().turnEnabled && turnCache_) {
auto addr = turnCache_->getResolvedTurn();
if (addr == std::nullopt) {
// If TURN is enabled, but no TURN cached, there can be a temporary
// resolution error to solve. Sometimes, a connectivity change is not
// enough, so even if this case is really rare, it should be easy to avoid.
turnCache_->refresh();
}
}
auto uri = Uri(name);
@ -2278,7 +2280,7 @@ JamiAccount::setRegistrationState(RegistrationState state,
if (registrationState_ != state) {
if (state == RegistrationState::REGISTERED) {
JAMI_WARN("[Account %s] connected", getAccountID().c_str());
cacheTurnServers();
turnCache_->refresh();
storeActiveIpAddress();
} else if (state == RegistrationState::TRYING) {
JAMI_WARN("[Account %s] connecting…", getAccountID().c_str());
@ -2342,11 +2344,10 @@ JamiAccount::setCertificateStatus(const std::string& cert_id,
{
bool done = accountManager_ ? accountManager_->setCertificateStatus(cert_id, status) : false;
if (done) {
findCertificate(cert_id);
emitSignal<libjami::ConfigurationSignal::CertificateStateChanged>(getAccountID(),
cert_id,
tls::TrustStore::statusToStr(
status));
dht_->findCertificate(dht::InfoHash(cert_id),
[](const std::shared_ptr<dht::crypto::Certificate>& crt) {});
emitSignal<libjami::ConfigurationSignal::CertificateStateChanged>(
getAccountID(), cert_id, tls::TrustStore::statusToStr(status));
}
return done;
}
@ -3532,106 +3533,6 @@ JamiAccount::handleMessage(const std::string& from, const std::pair<std::string,
return false;
}
void
JamiAccount::cacheTurnServers()
{
// The resolution of the TURN server can take quite some time (if timeout).
// So, run this in its own io thread to avoid to block the main thread.
dht::ThreadPool::io().run([w = weak()] {
auto this_ = w.lock();
if (not this_)
return;
// Avoid multiple refresh
if (this_->isRefreshing_.exchange(true))
return;
const auto& conf = this_->config();
if (!conf.turnEnabled) {
// In this case, we do not use any TURN server
std::lock_guard<std::mutex> lk(this_->cachedTurnMutex_);
this_->cacheTurnV4_.reset();
this_->cacheTurnV6_.reset();
this_->isRefreshing_ = false;
return;
}
JAMI_INFO("[Account %s] Refresh cache for TURN server resolution",
this_->getAccountID().c_str());
// Retrieve old cached value if available.
// This means that we directly get the correct value when launching the application on the
// same network
std::string server = conf.turnServer.empty() ? DEFAULT_TURN_SERVER : conf.turnServer;
// No need to resolve, it's already a valid address
if (IpAddr::isValid(server, AF_INET)) {
this_->cacheTurnV4_ = std::make_unique<IpAddr>(server, AF_INET);
this_->isRefreshing_ = false;
return;
} else if (IpAddr::isValid(server, AF_INET6)) {
this_->cacheTurnV6_ = std::make_unique<IpAddr>(server, AF_INET6);
this_->isRefreshing_ = false;
return;
}
// Else cache resolution result
fileutils::recursive_mkdir(this_->cachePath_ + DIR_SEPARATOR_STR + "domains", 0700);
auto pathV4 = this_->cachePath_ + DIR_SEPARATOR_STR + "domains" + DIR_SEPARATOR_STR + "v4."
+ server;
if (auto turnV4File = std::ifstream(pathV4)) {
std::string content((std::istreambuf_iterator<char>(turnV4File)),
std::istreambuf_iterator<char>());
std::lock_guard<std::mutex> lk(this_->cachedTurnMutex_);
this_->cacheTurnV4_ = std::make_unique<IpAddr>(content, AF_INET);
}
auto pathV6 = this_->cachePath_ + DIR_SEPARATOR_STR + "domains" + DIR_SEPARATOR_STR + "v6."
+ server;
if (auto turnV6File = std::ifstream(pathV6)) {
std::string content((std::istreambuf_iterator<char>(turnV6File)),
std::istreambuf_iterator<char>());
std::lock_guard<std::mutex> lk(this_->cachedTurnMutex_);
this_->cacheTurnV6_ = std::make_unique<IpAddr>(content, AF_INET6);
}
// Resolve just in case. The user can have a different connectivity
auto turnV4 = IpAddr {server, AF_INET};
{
if (turnV4) {
// Cache value to avoid a delay when starting up Jami
std::ofstream turnV4File(pathV4);
turnV4File << turnV4.toString();
} else
fileutils::remove(pathV4, true);
std::lock_guard<std::mutex> lk(this_->cachedTurnMutex_);
// Update TURN
this_->cacheTurnV4_ = std::make_unique<IpAddr>(std::move(turnV4));
}
auto turnV6 = IpAddr {server, AF_INET6};
{
if (turnV6) {
// Cache value to avoid a delay when starting up Jami
std::ofstream turnV6File(pathV6);
turnV6File << turnV6.toString();
} else
fileutils::remove(pathV6, true);
std::lock_guard<std::mutex> lk(this_->cachedTurnMutex_);
// Update TURN
this_->cacheTurnV6_ = std::make_unique<IpAddr>(std::move(turnV6));
}
this_->isRefreshing_ = false;
if (!this_->cacheTurnV6_ && !this_->cacheTurnV4_) {
JAMI_WARN("[Account %s] Cache for TURN resolution failed.",
this_->getAccountID().c_str());
Manager::instance().scheduleTaskIn(
[w]() {
if (auto shared = w.lock())
shared->cacheTurnServers();
},
this_->turnRefreshDelay_);
if (this_->turnRefreshDelay_ < std::chrono::minutes(30))
this_->turnRefreshDelay_ *= 2;
} else {
JAMI_INFO("[Account %s] Cache refreshed for TURN resolution",
this_->getAccountID().c_str());
this_->turnRefreshDelay_ = std::chrono::seconds(10);
}
});
}
void
JamiAccount::callConnectionClosed(const DeviceId& deviceId, bool eraseDummy)
{

View File

@ -36,6 +36,7 @@
#include "data_transfer.h"
#include "uri.h"
#include "jamiaccount_config.h"
#include "connectivity/peer_connection.h"
#include "noncopyable.h"
#include "connectivity/ip_utils.h"
@ -611,7 +612,8 @@ private:
struct BuddyInfo;
struct DiscoveredPeer;
inline std::string getProxyConfigKey() const {
inline std::string getProxyConfigKey() const
{
const auto& conf = config();
return dht::InfoHash::get(conf.proxyServer + conf.proxyListUrl).toString();
}
@ -785,7 +787,12 @@ private:
* This will cache the turn server resolution each time we launch
* Jami, or for each connectivityChange()
*/
// TODO move in separate class
void testTurn(IpAddr server);
void cacheTurnServers();
std::unique_ptr<TurnTransport> testTurnV4_;
std::unique_ptr<TurnTransport> testTurnV6_;
void refreshTurnDelay(bool scheduleNext);
std::chrono::seconds turnRefreshDelay_ {std::chrono::seconds(10)};

View File

@ -29,6 +29,8 @@ libjami_sources = files(
'connectivity/multiplexed_socket.cpp',
'connectivity/peer_connection.cpp',
'connectivity/sip_utils.cpp',
'connectivity/turn_cache.cpp',
'connectivity/turn_transport.cpp',
'connectivity/utf8_utils.cpp',
'im/instant_messaging.cpp',
'im/message_engine.cpp',

View File

@ -143,6 +143,20 @@ SIPAccountBase::loadConfig()
IpAddr publishedIp {conf.publishedIp};
if (not conf.publishedSameasLocal and publishedIp)
setPublishedAddress(publishedIp);
TurnTransportParams turnParams;
turnParams.domain = conf.turnServer;
turnParams.username = conf.turnServerUserName;
turnParams.password = conf.turnServerPwd;
turnParams.realm = conf.turnServerRealm;
if (!turnCache_) {
auto cachePath = fileutils::get_cache_dir() + DIR_SEPARATOR_STR + getAccountID();
turnCache_ = std::make_unique<TurnCache>(getAccountID(),
cachePath,
turnParams,
conf.turnEnabled);
} else {
turnCache_->reconfigure(turnParams, conf.turnEnabled);
}
}
std::map<std::string, std::string>
@ -241,13 +255,11 @@ SIPAccountBase::getIceOptions() const noexcept
// if (config().stunEnabled)
// opts.stunServers.emplace_back(StunServerInfo().setUri(stunServer_));
if (config().turnEnabled) {
auto cached = false;
std::lock_guard<std::mutex> lk(cachedTurnMutex_);
cached = cacheTurnV4_ || cacheTurnV6_;
if (cacheTurnV4_ && *cacheTurnV4_) {
if (config().turnEnabled && turnCache_) {
auto turnAddr = turnCache_->getResolvedTurn();
if (turnAddr != std::nullopt) {
opts.turnServers.emplace_back(TurnServerInfo()
.setUri(cacheTurnV4_->toString(true))
.setUri(turnAddr->toString(true))
.setUsername(config().turnServerUserName)
.setPassword(config().turnServerPwd)
.setRealm(config().turnServerRealm));
@ -261,14 +273,6 @@ SIPAccountBase::getIceOptions() const noexcept
// .setPassword(turnServerPwd_)
// .setRealm(turnServerRealm_));
//}
// Nothing cached, so do the resolution
if (!cached) {
opts.turnServers.emplace_back(TurnServerInfo()
.setUri(config().turnServer)
.setUsername(config().turnServerUserName)
.setPassword(config().turnServerPwd)
.setRealm(config().turnServerRealm));
}
}
return opts;
}

View File

@ -28,6 +28,7 @@
#include "connectivity/sip_utils.h"
#include "connectivity/ip_utils.h"
#include "connectivity/turn_cache.h"
#include "noncopyable.h"
#include "im/message_engine.h"
#include "sipaccountbase_config.h"
@ -85,7 +86,8 @@ public:
virtual ~SIPAccountBase() noexcept;
const SipAccountBaseConfig& config() const {
const SipAccountBaseConfig& config() const
{
return *static_cast<const SipAccountBaseConfig*>(&Account::config());
}
@ -216,21 +218,6 @@ public:
public: // overloaded methods
virtual void flush() override;
/**
* Return current turn resolved addresses
* @return {unique_ptr(v4 resolved), unique_ptr(v6 resolved)}
*/
std::array<std::unique_ptr<IpAddr>, 2> turnCache()
{
std::lock_guard<std::mutex> lk {cachedTurnMutex_};
std::array<std::unique_ptr<IpAddr>, 2> result = {};
if (cacheTurnV4_ && *cacheTurnV4_)
result[0] = std::make_unique<IpAddr>(*cacheTurnV4_);
if (cacheTurnV6_ && *cacheTurnV6_)
result[1] = std::make_unique<IpAddr>(*cacheTurnV6_);
return result;
}
protected:
/**
* Retrieve volatile details such as recent registration errors
@ -279,9 +266,7 @@ protected:
std::chrono::steady_clock::time_point::min()};
std::shared_ptr<Task> composingTimeout_;
mutable std::mutex cachedTurnMutex_ {};
std::unique_ptr<IpAddr> cacheTurnV4_ {};
std::unique_ptr<IpAddr> cacheTurnV6_ {};
std::unique_ptr<TurnCache> turnCache_;
private:
NON_COPYABLE(SIPAccountBase);

View File

@ -38,7 +38,7 @@
using namespace libjami::Account;
using namespace libjami::Call::Details;
using namespace std::literals::chrono_literals;
namespace jami {
namespace test {
@ -48,7 +48,8 @@ public:
CallTest()
{
// Init daemon
libjami::init(libjami::InitFlag(libjami::LIBJAMI_FLAG_DEBUG | libjami::LIBJAMI_FLAG_CONSOLE_LOG));
libjami::init(
libjami::InitFlag(libjami::LIBJAMI_FLAG_DEBUG | libjami::LIBJAMI_FLAG_CONSOLE_LOG));
if (not Manager::instance().initialized)
CPPUNIT_ASSERT(libjami::start("jami-sample.yml"));
}
@ -68,6 +69,7 @@ private:
void testDeclineMultiDevice();
void testTlsInfosPeerCertificate();
void testSocketInfos();
void testInvalidTurn();
CPPUNIT_TEST_SUITE(CallTest);
CPPUNIT_TEST(testCall);
@ -76,6 +78,7 @@ private:
CPPUNIT_TEST(testDeclineMultiDevice);
CPPUNIT_TEST(testTlsInfosPeerCertificate);
CPPUNIT_TEST(testSocketInfos);
CPPUNIT_TEST(testInvalidTurn);
CPPUNIT_TEST_SUITE_END();
};
@ -138,12 +141,12 @@ CallTest::testCall()
JAMI_INFO("Start call between alice and Bob");
auto call = libjami::placeCallWithMedia(aliceId, bobUri, {});
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { return callReceived.load(); }));
CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return callReceived.load(); }));
JAMI_INFO("Stop call between alice and Bob");
callStopped = 0;
Manager::instance().hangupCall(aliceId, call);
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { return callStopped == 2; }));
CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return callStopped == 2; }));
}
void
@ -190,17 +193,16 @@ CallTest::testCachedCall()
successfullyConnected = true;
cv.notify_one();
});
CPPUNIT_ASSERT(
cv.wait_for(lk, std::chrono::seconds(30), [&] { return successfullyConnected.load(); }));
CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return successfullyConnected.load(); }));
JAMI_INFO("Start call between alice and Bob");
auto call = libjami::placeCallWithMedia(aliceId, bobUri, {});
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { return callReceived.load(); }));
CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return callReceived.load(); }));
callStopped = 0;
JAMI_INFO("Stop call between alice and Bob");
Manager::instance().hangupCall(aliceId, call);
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { return callStopped == 2; }));
CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return callStopped == 2; }));
}
void
@ -292,14 +294,12 @@ CallTest::testDeclineMultiDevice()
JAMI_INFO("Start call between alice and Bob");
auto call = libjami::placeCallWithMedia(aliceId, bobUri, {});
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] {
return callReceived == 2 && !callIdBob.empty();
}));
CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return callReceived == 2 && !callIdBob.empty(); }));
JAMI_INFO("Stop call between alice and Bob");
callStopped = 0;
Manager::instance().refuseCall(bobId, callIdBob);
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] {
CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] {
return callStopped.load() >= 3; /* >= because there is subcalls */
}));
}
@ -344,11 +344,10 @@ CallTest::testTlsInfosPeerCertificate()
JAMI_INFO("Start call between alice and Bob");
auto callId = libjami::placeCallWithMedia(aliceId, bobUri, {});
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { return !bobCallId.empty(); }));
CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return !bobCallId.empty(); }));
Manager::instance().answerCall(bobId, bobCallId);
CPPUNIT_ASSERT(
cv.wait_for(lk, std::chrono::seconds(30), [&] { return aliceCallState == "CURRENT"; }));
CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return aliceCallState == "CURRENT"; }));
auto call = std::dynamic_pointer_cast<SIPCall>(aliceAccount->getCall(callId));
auto* transport = call->getTransport();
@ -360,7 +359,7 @@ CallTest::testTlsInfosPeerCertificate()
JAMI_INFO("Stop call between alice and Bob");
callStopped = 0;
Manager::instance().hangupCall(aliceId, callId);
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { return callStopped == 2; }));
CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return callStopped == 2; }));
}
void
@ -413,12 +412,10 @@ CallTest::testSocketInfos()
JAMI_INFO("Start call between alice and Bob");
auto callId = libjami::placeCallWithMedia(aliceId, bobUri, {});
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { return !bobCallId.empty(); }));
CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return !bobCallId.empty(); }));
Manager::instance().answerCall(bobId, bobCallId);
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] {
return aliceCallState == "CURRENT" && mediaReady;
}));
CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return aliceCallState == "CURRENT" && mediaReady; }));
JAMI_INFO("Detail debug");
auto details = libjami::getCallDetails(aliceId, callId);
@ -434,7 +431,55 @@ CallTest::testSocketInfos()
JAMI_INFO("Stop call between alice and Bob");
callStopped = 0;
Manager::instance().hangupCall(aliceId, callId);
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { return callStopped == 2; }));
CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return callStopped == 2; }));
}
void
CallTest::testInvalidTurn()
{
auto aliceAccount = Manager::instance().getAccount<JamiAccount>(aliceId);
auto bobAccount = Manager::instance().getAccount<JamiAccount>(bobId);
auto bobUri = bobAccount->getUsername();
auto aliceUri = aliceAccount->getUsername();
std::mutex mtx;
std::unique_lock<std::mutex> lk {mtx};
std::condition_variable cv;
std::map<std::string, std::shared_ptr<libjami::CallbackWrapperBase>> confHandlers;
std::atomic_bool callReceived {false};
std::atomic<int> callStopped {0};
// Watch signals
confHandlers.insert(libjami::exportable_callback<libjami::CallSignal::IncomingCallWithMedia>(
[&](const std::string&,
const std::string&,
const std::string&,
const std::vector<std::map<std::string, std::string>>&) {
callReceived = true;
cv.notify_one();
}));
confHandlers.insert(libjami::exportable_callback<libjami::CallSignal::StateChange>(
[&](const std::string&, const std::string&, const std::string& state, signed) {
if (state == "OVER") {
callStopped += 1;
if (callStopped == 2)
cv.notify_one();
}
}));
libjami::registerSignalHandlers(confHandlers);
std::map<std::string, std::string> details;
details[ConfProperties::TURN::SERVER] = "1.1.1.1";
libjami::setAccountDetails(aliceId, details);
JAMI_INFO("Start call between alice and Bob");
auto call = libjami::placeCallWithMedia(aliceId, bobUri, {});
CPPUNIT_ASSERT(cv.wait_for(lk, 10s, [&] { return callReceived.load(); }));
JAMI_INFO("Stop call between alice and Bob");
callStopped = 0;
Manager::instance().hangupCall(aliceId, call);
CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return callStopped == 2; }));
}
} // namespace test