/* * Copyright (C) 2004-2021 Savoir-faire Linux Inc. * * Author: Guillaume Roguez * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include "ice_transport.h" #include "ice_socket.h" #include "logger.h" #include "sip/sip_utils.h" #include "manager.h" #include "upnp/upnp_control.h" #include "transport/peer_channel.h" #include "jami/callmanager_interface.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "pj/limits.h" #define TRY(ret) \ do { \ if ((ret) != PJ_SUCCESS) \ throw std::runtime_error(#ret " failed"); \ } while (0) // Validate that the component ID is within the expected range #define ASSERT_COMP_ID(compId, compCount) \ do { \ if ((compId) == 0 or (compId) > (compCount)) \ throw std::runtime_error("Invalid component ID " + (std::to_string(compId))); \ } while (0) namespace jami { static constexpr unsigned STUN_MAX_PACKET_SIZE {8192}; static constexpr uint16_t IPV6_HEADER_SIZE = 40; ///< Size in bytes of IPV6 packet header static constexpr uint16_t IPV4_HEADER_SIZE = 20; ///< Size in bytes of IPV4 packet header static constexpr int MAX_CANDIDATES {32}; static constexpr int MAX_DESTRUCTION_TIMEOUT {3000}; static constexpr int HANDLE_EVENT_DURATION {500}; //============================================================================== using MutexGuard = std::lock_guard; using MutexLock = std::unique_lock; using namespace upnp; //============================================================================== class IceTransport::Impl { public: Impl(const char* name); ~Impl(); void initIceInstance(const IceTransportOptions& options); void onComplete(pj_ice_strans* ice_st, pj_ice_strans_op op, pj_status_t status); void onReceiveData(unsigned comp_id, void* pkt, pj_size_t size); /** * Set/change transport role as initiator. * Should be called before start method. */ bool setInitiatorSession(); /** * Set/change transport role as slave. * Should be called before start method. */ bool setSlaveSession(); bool createIceSession(pj_ice_sess_role role); /** * Must be called while holding iceMutex_ */ void getUFragPwd(); std::string link() const; // Non-mutex protected of public versions bool _isInitialized() const; bool _isStarted() const; bool _isRunning() const; bool _isFailed() const; const pj_ice_sess_cand* getSelectedCandidate(unsigned comp_id, bool remote) const; IpAddr getLocalAddress(unsigned comp_id) const; IpAddr getRemoteAddress(unsigned comp_id) const; static const char* getCandidateType(const pj_ice_sess_cand* cand); bool isTcpEnabled() const { return config_.protocol == PJ_ICE_TP_TCP; } bool addStunConfig(int af); void requestUpnpMappings(); bool hasUpnp() const; // Take a list of address pairs (local/public) and add them as // reflexive candidates using STUN config. void addServerReflexiveCandidates(const std::vector>& addrList); // Generate server reflexive candidates using the published (DHT/Account) address std::vector> setupGenericReflexiveCandidates(); // Generate server reflexive candidates using UPNP mappings. std::vector> setupUpnpReflexiveCandidates(); void setDefaultRemoteAddress(unsigned comp_id, const IpAddr& addr); const IpAddr getDefaultRemoteAddress(unsigned comp_id) const; bool handleEvents(unsigned max_msec); int flushTimerHeapAndIoQueue(); int checkEventQueue(int maxEventToPoll); std::string sessionName_ {}; std::unique_ptr> pool_ {}; bool isTcp_ {false}; bool upnpEnabled_ {false}; IceTransportCompleteCb on_initdone_cb_ {}; IceTransportCompleteCb on_negodone_cb_ {}; IceRecvInfo on_recv_cb_ {}; mutable std::mutex iceMutex_ {}; pj_ice_strans* icest_ {nullptr}; unsigned streamsCount_ {0}; unsigned compCountPerStream_ {0}; unsigned compCount_ {0}; std::string local_ufrag_ {}; std::string local_pwd_ {}; pj_sockaddr remoteAddr_ {}; std::condition_variable iceCV_ {}; pj_ice_strans_cfg config_ {}; std::string last_errmsg_ {}; std::atomic_bool is_stopped_ {false}; struct Packet { Packet(void* pkt, pj_size_t size) : data {reinterpret_cast(pkt), reinterpret_cast(pkt) + size} {} std::vector data {}; }; struct ComponentIO { std::mutex mutex; std::condition_variable cv; std::deque queue; IceRecvCb cb; }; // NOTE: Component IDs start from 1, while these three vectors // are indexed from 0. Conversion from ID to vector index must // be done properly. std::vector compIO_ {}; std::vector peerChannels_ {}; std::vector iceDefaultRemoteAddr_; // ICE controlling role. True for controller agents and false for // controlled agents std::atomic_bool initiatorSession_ {true}; // Local/Public addresses used by the account owning the ICE instance. IpAddr accountLocalAddr_ {}; IpAddr accountPublicAddr_ {}; // STUN and TURN servers std::vector stunServers_; std::vector turnServers_; /** * Returns the IP of each candidate for a given component in the ICE session */ struct LocalCandidate { IpAddr addr; pj_ice_cand_transport transport; }; std::shared_ptr upnp_ {}; std::mutex upnpMutex_ {}; std::map upnpMappings_; std::mutex upnpMappingsMutex_ {}; bool onlyIPv4Private_ {true}; // IO/Timer events are handled by following thread std::thread thread_ {}; std::atomic_bool threadTerminateFlags_ {false}; // Wait data on components pj_size_t lastSentLen_ {}; std::condition_variable waitDataCv_ = {}; std::atomic_bool destroying_ {false}; onShutdownCb scb {}; }; //============================================================================== /** * Add stun/turn configuration or default host as candidates */ static void add_stun_server(pj_pool_t& pool, pj_ice_strans_cfg& cfg, const StunServerInfo& info) { if (cfg.stun_tp_cnt >= PJ_ICE_MAX_STUN) throw std::runtime_error("Too many STUN configurations"); IpAddr ip {info.uri}; // Given URI cannot be DNS resolved or not IPv4 or IPv6? // This prevents a crash into PJSIP when ip.toString() is called. if (ip.getFamily() == AF_UNSPEC) { JAMI_DBG("[ice (%s)] STUN server '%s' not used, unresolvable address", (cfg.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"), info.uri.c_str()); return; } auto& stun = cfg.stun_tp[cfg.stun_tp_cnt++]; pj_ice_strans_stun_cfg_default(&stun); pj_strdup2_with_null(&pool, &stun.server, ip.toString().c_str()); stun.af = ip.getFamily(); if (!(stun.port = ip.getPort())) stun.port = PJ_STUN_PORT; stun.cfg.max_pkt_size = STUN_MAX_PACKET_SIZE; stun.conn_type = cfg.stun.conn_type; JAMI_DBG("[ice (%s)] added stun server '%s', port %u", (cfg.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"), pj_strbuf(&stun.server), stun.port); } static void add_turn_server(pj_pool_t& pool, pj_ice_strans_cfg& cfg, const TurnServerInfo& info) { if (cfg.turn_tp_cnt >= PJ_ICE_MAX_TURN) throw std::runtime_error("Too many TURN servers"); IpAddr ip {info.uri}; // Same comment as add_stun_server() if (ip.getFamily() == AF_UNSPEC) { JAMI_DBG("[ice (%s)] TURN server '%s' not used, unresolvable address", (cfg.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"), info.uri.c_str()); return; } auto& turn = cfg.turn_tp[cfg.turn_tp_cnt++]; pj_ice_strans_turn_cfg_default(&turn); pj_strdup2_with_null(&pool, &turn.server, ip.toString().c_str()); turn.af = ip.getFamily(); if (!(turn.port = ip.getPort())) turn.port = PJ_STUN_PORT; turn.cfg.max_pkt_size = STUN_MAX_PACKET_SIZE; turn.conn_type = cfg.turn.conn_type; // Authorization (only static plain password supported yet) if (not info.password.empty()) { turn.auth_cred.type = PJ_STUN_AUTH_CRED_STATIC; turn.auth_cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN; pj_strset(&turn.auth_cred.data.static_cred.realm, (char*) info.realm.c_str(), info.realm.size()); pj_strset(&turn.auth_cred.data.static_cred.username, (char*) info.username.c_str(), info.username.size()); pj_strset(&turn.auth_cred.data.static_cred.data, (char*) info.password.c_str(), info.password.size()); } JAMI_DBG("[ice (%s)] added turn server '%s', port %u", (cfg.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"), pj_strbuf(&turn.server), turn.port); } //============================================================================== IceTransport::Impl::Impl(const char* name) : sessionName_(name) , pool_(nullptr, [](pj_pool_t* pool) { pj_pool_release(pool); }) , thread_() { JAMI_DBG("[ice:%p] Creating IceTransport session for \"%s\"", this, name); } IceTransport::Impl::~Impl() { JAMI_DBG("[ice:%p] destroying %p", this, icest_); threadTerminateFlags_ = true; iceCV_.notify_all(); if (thread_.joinable()) { thread_.join(); } pj_ice_strans* strans = nullptr; std::swap(strans, icest_); assert(strans); // must be done before ioqueue/timer destruction JAMI_INFO("[ice:%p] Destroying ice_strans %p", pj_ice_strans_get_user_data(strans), strans); pj_ice_strans_stop_ice(strans); pj_ice_strans_destroy(strans); // NOTE: This last timer heap and IO queue polling is necessary to close // TURN socket. // Because when destroying the TURN session pjproject creates a pj_timer // to postpone the TURN destruction. This timer is only called if we poll // the event queue. int ret = flushTimerHeapAndIoQueue(); if (ret < 0) { JAMI_ERR("[ice:%p] IO queue polling failed", this); } else if (ret > 0) { JAMI_ERR("[ice:%p] Unexpected left timer in timer heap. Please report the bug", this); } if (checkEventQueue(1) > 0) { JAMI_WARN("[ice:%p] Unexpected left events in IO queue", this); } if (config_.stun_cfg.ioqueue) pj_ioqueue_destroy(config_.stun_cfg.ioqueue); if (config_.stun_cfg.timer_heap) pj_timer_heap_destroy(config_.stun_cfg.timer_heap); JAMI_DBG("[ice:%p] done destroying", this); } void IceTransport::Impl::initIceInstance(const IceTransportOptions& options) { isTcp_ = options.tcpEnable; upnpEnabled_ = options.upnpEnable; on_initdone_cb_ = options.onInitDone; on_negodone_cb_ = options.onNegoDone; streamsCount_ = options.streamsCount; compCountPerStream_ = options.compCountPerStream; compCount_ = streamsCount_ * compCountPerStream_; compIO_ = std::vector(compCount_); peerChannels_ = std::vector(compCount_); iceDefaultRemoteAddr_.reserve(compCount_); initiatorSession_ = options.master; accountLocalAddr_ = std::move(options.accountLocalAddr); accountPublicAddr_ = std::move(options.accountPublicAddr); stunServers_ = std::move(options.stunServers); turnServers_ = std::move(options.turnServers); JAMI_DBG("[ice:%p] Initializing the session - comp count %u - as a %s", this, compCount_, initiatorSession_ ? "master" : "slave"); if (upnpEnabled_) upnp_.reset(new upnp::Controller()); auto& iceTransportFactory = Manager::instance().getIceTransportFactory(); config_ = iceTransportFactory.getIceCfg(); // config copy if (isTcp_) { config_.protocol = PJ_ICE_TP_TCP; config_.stun.conn_type = PJ_STUN_TP_TCP; config_.turn.conn_type = PJ_TURN_TP_TCP; } else { config_.protocol = PJ_ICE_TP_UDP; config_.stun.conn_type = PJ_STUN_TP_UDP; config_.turn.conn_type = PJ_TURN_TP_UDP; } pool_.reset( pj_pool_create(iceTransportFactory.getPoolFactory(), "IceTransport.pool", 512, 512, NULL)); if (not pool_) throw std::runtime_error("pj_pool_create() failed"); // Note: For server reflexive candidates, UPNP mappings will // be used if available. Then, the public address learnt during // the account registration process will be added only if it // differs from the UPNP public address. // Also note that UPNP candidates should be added first in order // to have a higher priority when performing the connectivity // checks. // STUN configs layout: // - index 0 : host IPv4 // - index 1 : host IPv6 // - index 2 : upnp/generic srflx IPv4. // - index 3 : generic srflx (if upnp exists and different) config_.stun_tp_cnt = 0; JAMI_DBG("[ice:%p] Add host candidates", this); addStunConfig(pj_AF_INET()); addStunConfig(pj_AF_INET6()); std::vector> upnpSrflxCand; if (upnp_) { requestUpnpMappings(); upnpSrflxCand = setupUpnpReflexiveCandidates(); if (not upnpSrflxCand.empty()) { addServerReflexiveCandidates(upnpSrflxCand); JAMI_DBG("[ice:%p] Added UPNP srflx candidates:", this); } } auto genericSrflxCand = setupGenericReflexiveCandidates(); if (not genericSrflxCand.empty()) { // Generic srflx candidates will be added only if different // from upnp candidates. if (upnpSrflxCand.empty() or (upnpSrflxCand[0].second.toString() != genericSrflxCand[0].second.toString())) { addServerReflexiveCandidates(genericSrflxCand); JAMI_DBG("[ice:%p] Added generic srflx candidates:", this); } } if (upnpSrflxCand.empty() and genericSrflxCand.empty()) { JAMI_WARN("[ice:%p] No server reflexive candidates added", this); } pj_ice_strans_cb icecb; pj_bzero(&icecb, sizeof(icecb)); icecb.on_rx_data = [](pj_ice_strans* ice_st, unsigned comp_id, void* pkt, pj_size_t size, const pj_sockaddr_t* /*src_addr*/, unsigned /*src_addr_len*/) { if (auto* tr = static_cast(pj_ice_strans_get_user_data(ice_st))) tr->onReceiveData(comp_id, pkt, size); else JAMI_WARN("null IceTransport"); }; icecb.on_ice_complete = [](pj_ice_strans* ice_st, pj_ice_strans_op op, pj_status_t status) { if (auto* tr = static_cast(pj_ice_strans_get_user_data(ice_st))) tr->onComplete(ice_st, op, status); else JAMI_WARN("null IceTransport"); }; icecb.on_data_sent = [](pj_ice_strans* ice_st, pj_ssize_t size) { if (auto* tr = static_cast(pj_ice_strans_get_user_data(ice_st))) { std::lock_guard lk(tr->iceMutex_); tr->lastSentLen_ += size; tr->waitDataCv_.notify_all(); } else JAMI_WARN("null IceTransport"); }; icecb.on_destroy = [](pj_ice_strans* ice_st) { if (auto* tr = static_cast(pj_ice_strans_get_user_data(ice_st))) { tr->destroying_ = true; tr->waitDataCv_.notify_all(); if (tr->scb) tr->scb(); } else { JAMI_WARN("null IceTransport"); } }; // Add STUN servers for (auto& server : stunServers_) add_stun_server(*pool_, config_, server); // Add TURN servers for (auto& server : turnServers_) add_turn_server(*pool_, config_, server); static constexpr auto IOQUEUE_MAX_HANDLES = std::min(PJ_IOQUEUE_MAX_HANDLES, 64); TRY(pj_timer_heap_create(pool_.get(), 100, &config_.stun_cfg.timer_heap)); TRY(pj_ioqueue_create(pool_.get(), IOQUEUE_MAX_HANDLES, &config_.stun_cfg.ioqueue)); std::ostringstream sessionName {}; // We use the instance pointer as the PJNATH session name in order // to easily identify the logs reported by PJNATH. sessionName << this; pj_status_t status = pj_ice_strans_create(sessionName.str().c_str(), &config_, compCount_, this, &icecb, &icest_); if (status != PJ_SUCCESS || icest_ == nullptr) { throw std::runtime_error("pj_ice_strans_create() failed"); } // Must be created after any potential failure thread_ = std::thread([this] { while (not threadTerminateFlags_) { // NOTE: handleEvents can return false in this case // but here we don't care if there is event or not. handleEvents(HANDLE_EVENT_DURATION); } }); // Init to invalid addresses iceDefaultRemoteAddr_.reserve(compCount_); } bool IceTransport::Impl::_isInitialized() const { if (auto icest = icest_) { auto state = pj_ice_strans_get_state(icest); return state >= PJ_ICE_STRANS_STATE_SESS_READY and state != PJ_ICE_STRANS_STATE_FAILED; } return false; } bool IceTransport::Impl::_isStarted() const { if (auto icest = icest_) { auto state = pj_ice_strans_get_state(icest); return state >= PJ_ICE_STRANS_STATE_NEGO and state != PJ_ICE_STRANS_STATE_FAILED; } return false; } bool IceTransport::Impl::_isRunning() const { if (auto icest = icest_) { auto state = pj_ice_strans_get_state(icest); return state >= PJ_ICE_STRANS_STATE_RUNNING and state != PJ_ICE_STRANS_STATE_FAILED; } return false; } bool IceTransport::Impl::_isFailed() const { if (auto icest = icest_) return pj_ice_strans_get_state(icest) == PJ_ICE_STRANS_STATE_FAILED; return false; } bool IceTransport::Impl::handleEvents(unsigned max_msec) { // By tests, never seen more than two events per 500ms static constexpr auto MAX_NET_EVENTS = 2; pj_time_val max_timeout = {0, static_cast(max_msec)}; pj_time_val timeout = {0, 0}; unsigned net_event_count = 0; pj_timer_heap_poll(config_.stun_cfg.timer_heap, &timeout); auto hasActiveTimer = timeout.sec != PJ_MAXINT32 || timeout.msec != PJ_MAXINT32; // timeout limitation if (hasActiveTimer) pj_time_val_normalize(&timeout); if (PJ_TIME_VAL_GT(timeout, max_timeout)) { timeout = max_timeout; } do { auto n_events = pj_ioqueue_poll(config_.stun_cfg.ioqueue, &timeout); // timeout if (not n_events) return hasActiveTimer; // error if (n_events < 0) { const auto err = pj_get_os_error(); // Kept as debug as some errors are "normal" in regular context last_errmsg_ = sip_utils::sip_strerror(err); JAMI_DBG("[ice:%p] ioqueue error %d: %s", this, err, last_errmsg_.c_str()); std::this_thread::sleep_for(std::chrono::milliseconds(PJ_TIME_VAL_MSEC(timeout))); return hasActiveTimer; } net_event_count += n_events; timeout.sec = timeout.msec = 0; } while (net_event_count < MAX_NET_EVENTS); return hasActiveTimer; } int IceTransport::Impl::flushTimerHeapAndIoQueue() { pj_time_val timerTimeout = {0, 0}; pj_time_val defaultWaitTime = {0, HANDLE_EVENT_DURATION}; bool hasActiveTimer = false; std::chrono::milliseconds totalWaitTime {0}; auto const start = std::chrono::steady_clock::now(); // We try to process pending events as fast as possible to // speed-up the release. int maxEventToProcess = 10; do { if (checkEventQueue(maxEventToProcess) < 0) return -1; pj_timer_heap_poll(config_.stun_cfg.timer_heap, &timerTimeout); hasActiveTimer = !(timerTimeout.sec == PJ_MAXINT32 && timerTimeout.msec == PJ_MAXINT32); if (hasActiveTimer) { pj_time_val_normalize(&timerTimeout); auto waitTime = std::chrono::milliseconds( std::min(PJ_TIME_VAL_MSEC(timerTimeout), PJ_TIME_VAL_MSEC(defaultWaitTime))); std::this_thread::sleep_for(waitTime); totalWaitTime += waitTime; } } while (hasActiveTimer && totalWaitTime < std::chrono::milliseconds(MAX_DESTRUCTION_TIMEOUT)); auto duration = std::chrono::duration_cast( std::chrono::steady_clock::now() - start); JAMI_DBG("[ice:%p] Timer heap flushed after %ld ms", this, duration.count()); return static_cast(pj_timer_heap_count(config_.stun_cfg.timer_heap)); } int IceTransport::Impl::checkEventQueue(int maxEventToPoll) { pj_time_val timeout = {0, 0}; int eventCount = 0; int events = 0; do { events = pj_ioqueue_poll(config_.stun_cfg.ioqueue, &timeout); if (events < 0) { const auto err = pj_get_os_error(); last_errmsg_ = sip_utils::sip_strerror(err); JAMI_ERR("[ice:%p] ioqueue error %d: %s", this, err, last_errmsg_.c_str()); return events; } eventCount += events; } while (events > 0 && eventCount < maxEventToPoll); return eventCount; } void IceTransport::Impl::onComplete(pj_ice_strans*, pj_ice_strans_op op, pj_status_t status) { const char* opname = op == PJ_ICE_STRANS_OP_INIT ? "initialization" : op == PJ_ICE_STRANS_OP_NEGOTIATION ? "negotiation" : "unknown_op"; const bool done = status == PJ_SUCCESS; if (done) { JAMI_DBG("[ice:%p] %s %s success", this, (config_.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"), opname); } else { last_errmsg_ = sip_utils::sip_strerror(status); JAMI_ERR("[ice:%p] %s %s failed: %s", this, (config_.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"), opname, last_errmsg_.c_str()); } if (done and op == PJ_ICE_STRANS_OP_INIT) { if (initiatorSession_) setInitiatorSession(); else setSlaveSession(); } if (op == PJ_ICE_STRANS_OP_INIT and on_initdone_cb_) on_initdone_cb_(done); else if (op == PJ_ICE_STRANS_OP_NEGOTIATION) { if (done) { // Dump of connection pairs auto out = link(); JAMI_DBG("[ice:%p] %s connection pairs ([comp id] local [type] <-> remote [type]):\n%s", this, (config_.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"), out.c_str()); } if (on_negodone_cb_) on_negodone_cb_(done); } // Unlock waitForXXX APIs iceCV_.notify_all(); } std::string IceTransport::Impl::link() const { std::ostringstream out; for (unsigned strm = 0; strm < streamsCount_; strm++) { for (unsigned i = 1; i <= compCountPerStream_; i++) { auto absIdx = strm * streamsCount_ + i; auto laddr = getLocalAddress(absIdx); auto raddr = getRemoteAddress(absIdx); if (laddr and raddr) { out << " [" << i << "] " << laddr.toString(true, true) << " [" << getCandidateType(getSelectedCandidate(absIdx, false)) << "] " << " <-> " << raddr.toString(true, true) << " [" << getCandidateType(getSelectedCandidate(absIdx, true)) << "] " << '\n'; } else { out << " [" << i << "] disabled\n"; } } } return out.str(); } bool IceTransport::Impl::setInitiatorSession() { JAMI_DBG("[ice:%p] as master", this); initiatorSession_ = true; if (_isInitialized()) { std::lock_guard lk(iceMutex_); if (not icest_) { return false; } auto status = pj_ice_strans_change_role(icest_, PJ_ICE_SESS_ROLE_CONTROLLING); if (status != PJ_SUCCESS) { last_errmsg_ = sip_utils::sip_strerror(status); JAMI_ERR("[ice:%p] role change failed: %s", this, last_errmsg_.c_str()); return false; } return true; } return createIceSession(PJ_ICE_SESS_ROLE_CONTROLLING); } bool IceTransport::Impl::setSlaveSession() { JAMI_DBG("[ice:%p] as slave", this); initiatorSession_ = false; if (_isInitialized()) { std::lock_guard lk(iceMutex_); if (not icest_) { return false; } auto status = pj_ice_strans_change_role(icest_, PJ_ICE_SESS_ROLE_CONTROLLED); if (status != PJ_SUCCESS) { last_errmsg_ = sip_utils::sip_strerror(status); JAMI_ERR("[ice:%p] role change failed: %s", this, last_errmsg_.c_str()); return false; } return true; } return createIceSession(PJ_ICE_SESS_ROLE_CONTROLLED); } const pj_ice_sess_cand* IceTransport::Impl::getSelectedCandidate(unsigned comp_id, bool remote) const { ASSERT_COMP_ID(comp_id, compCount_); // Return the selected candidate pair. Might not be the nominated pair if // ICE has not concluded yet, but should be the nominated pair afterwards. if (not _isRunning()) { JAMI_ERR("[ice:%p] ICE transport is not running", this); return nullptr; } std::lock_guard lk(iceMutex_); if (not icest_) { return nullptr; } const auto* sess = pj_ice_strans_get_valid_pair(icest_, comp_id); if (sess == nullptr) { JAMI_ERR("[ice:%p] Component %i has no valid pair", this, comp_id); return nullptr; } if (remote) return sess->rcand; else return sess->lcand; } IpAddr IceTransport::Impl::getLocalAddress(unsigned comp_id) const { ASSERT_COMP_ID(comp_id, compCount_); if (auto cand = getSelectedCandidate(comp_id, false)) return cand->addr; JAMI_ERR("[ice:%p] No local address for component %i", this, comp_id); return {}; } IpAddr IceTransport::Impl::getRemoteAddress(unsigned comp_id) const { ASSERT_COMP_ID(comp_id, compCount_); if (auto cand = getSelectedCandidate(comp_id, true)) return cand->addr; JAMI_ERR("[ice:%p] No remote address for component %i", this, comp_id); return {}; } const char* IceTransport::Impl::getCandidateType(const pj_ice_sess_cand* cand) { auto name = cand ? pj_ice_get_cand_type_name(cand->type) : nullptr; return name ? name : "?"; } void IceTransport::Impl::getUFragPwd() { if (icest_) { pj_str_t local_ufrag, local_pwd; pj_ice_strans_get_ufrag_pwd(icest_, &local_ufrag, &local_pwd, nullptr, nullptr); local_ufrag_.assign(local_ufrag.ptr, local_ufrag.slen); local_pwd_.assign(local_pwd.ptr, local_pwd.slen); } } bool IceTransport::Impl::createIceSession(pj_ice_sess_role role) { std::lock_guard lk(iceMutex_); if (not icest_) { return false; } if (pj_ice_strans_init_ice(icest_, role, nullptr, nullptr) != PJ_SUCCESS) { JAMI_ERR("[ice:%p] pj_ice_strans_init_ice() failed", this); return false; } // Fetch some information on local configuration getUFragPwd(); JAMI_DBG("[ice:%p] (local) ufrag=%s, pwd=%s", this, local_ufrag_.c_str(), local_pwd_.c_str()); return true; } bool IceTransport::Impl::addStunConfig(int af) { if (config_.stun_tp_cnt >= PJ_ICE_MAX_STUN) { JAMI_ERR("Max number of STUN configurations reached (%i)", PJ_ICE_MAX_STUN); return false; } if (af != pj_AF_INET() and af != pj_AF_INET6()) { JAMI_ERR("Invalid address familly (%i)", af); return false; } auto& stun = config_.stun_tp[config_.stun_tp_cnt++]; pj_ice_strans_stun_cfg_default(&stun); stun.cfg.max_pkt_size = STUN_MAX_PACKET_SIZE; stun.af = af; stun.conn_type = config_.stun.conn_type; JAMI_DBG("[ice:%p] added host stun config for %s transport", this, config_.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"); return true; } void IceTransport::Impl::requestUpnpMappings() { // Must be called once ! std::lock_guard lock(upnpMutex_); if (not upnp_) return; auto transport = isTcpEnabled() ? PJ_CAND_TCP_PASSIVE : PJ_CAND_UDP; auto portType = transport == PJ_CAND_UDP ? PortType::UDP : PortType::TCP; // Request upnp mapping for each component. for (unsigned id = 1; id <= compCount_; id++) { // Set port number to 0 to get any available port. Mapping requestedMap(portType); // Request the mapping Mapping::sharedPtr_t mapPtr = upnp_->reserveMapping(requestedMap); // To use a mapping, it must be valid, open and has valid host address. if (mapPtr and mapPtr->getMapKey() and (mapPtr->getState() == MappingState::OPEN) and mapPtr->hasValidHostAddress()) { std::lock_guard lock(upnpMappingsMutex_); auto ret = upnpMappings_.emplace(mapPtr->getMapKey(), *mapPtr); if (ret.second) { JAMI_DBG("[ice:%p] UPNP mapping %s successfully allocated", this, mapPtr->toString(true).c_str()); } else { JAMI_WARN("[ice:%p] UPNP mapping %s already in the list!", this, mapPtr->toString().c_str()); } } else { JAMI_WARN("[ice:%p] UPNP mapping request failed!", this); upnp_->releaseMapping(requestedMap); } } } bool IceTransport::Impl::hasUpnp() const { return upnp_ and upnpMappings_.size() == compCount_; } void IceTransport::Impl::addServerReflexiveCandidates( const std::vector>& addrList) { if (addrList.size() != compCount_) { JAMI_WARN("[ice:%p] Provided addr list size %lu does not match component count %u", this, addrList.size(), compCount_); return; } // Add config for server reflexive candidates (UPNP or from DHT). if (not addStunConfig(pj_AF_INET())) return; assert(config_.stun_tp_cnt > 0 && config_.stun_tp_cnt < PJ_ICE_MAX_STUN); auto& stun = config_.stun_tp[config_.stun_tp_cnt - 1]; for (unsigned id = 1; id <= compCount_; id++) { auto idx = id - 1; auto& localAddr = addrList[idx].first; auto& publicAddr = addrList[idx].second; JAMI_DBG("[ice:%p] Add srflx reflexive candidates [%s : %s] for comp %u", this, localAddr.toString(true).c_str(), publicAddr.toString(true).c_str(), id); pj_sockaddr_cp(&stun.cfg.user_mapping[idx].local_addr, localAddr.pjPtr()); pj_sockaddr_cp(&stun.cfg.user_mapping[idx].mapped_addr, publicAddr.pjPtr()); if (isTcpEnabled()) { if (publicAddr.getPort() == 9) { stun.cfg.user_mapping[idx].tp_type = PJ_CAND_TCP_ACTIVE; } else { stun.cfg.user_mapping[idx].tp_type = PJ_CAND_TCP_PASSIVE; } } else { stun.cfg.user_mapping[idx].tp_type = PJ_CAND_UDP; } } stun.cfg.user_mapping_cnt = compCount_; assert(stun.cfg.user_mapping_cnt <= PJ_ICE_MAX_COMP); } std::vector> IceTransport::Impl::setupGenericReflexiveCandidates() { if (not accountLocalAddr_) { JAMI_WARN("[ice:%p] Missing local address, generic srflx candidates wont be generated!", this); return {}; } if (not accountPublicAddr_) { JAMI_WARN("[ice:%p] Missing public address, generic srflx candidates wont be generated!", this); return {}; } std::vector> addrList; auto isTcp = isTcpEnabled(); addrList.reserve(compCount_); for (unsigned id = 1; id <= compCount_; id++) { // For TCP, the type is set to active, because most likely the incoming // connection will be blocked by the NAT. // For UDP use random port number. uint16_t port = isTcp ? 9 : upnp::Controller::generateRandomPort(isTcp ? PortType::TCP : PortType::UDP); accountLocalAddr_.setPort(port); accountPublicAddr_.setPort(port); addrList.emplace_back(accountLocalAddr_, accountPublicAddr_); } return addrList; } std::vector> IceTransport::Impl::setupUpnpReflexiveCandidates() { // Add UPNP server reflexive candidates if available. if (not hasUpnp()) return {}; std::lock_guard lock(upnpMappingsMutex_); if (static_cast(upnpMappings_.size()) < compCount_) { JAMI_WARN("[ice:%p] Not enough mappings %lu. Expected %u", this, upnpMappings_.size(), compCount_); return {}; } std::vector> addrList; addrList.reserve(upnpMappings_.size()); for (auto const& [_, map] : upnpMappings_) { assert(map.getMapKey()); IpAddr localAddr {map.getInternalAddress()}; localAddr.setPort(map.getInternalPort()); IpAddr publicAddr {map.getExternalAddress()}; publicAddr.setPort(map.getExternalPort()); addrList.emplace_back(localAddr, publicAddr); } return addrList; } void IceTransport::Impl::setDefaultRemoteAddress(unsigned compId, const IpAddr& addr) { ASSERT_COMP_ID(compId, compCount_); iceDefaultRemoteAddr_[compId - 1] = addr; // The port does not matter. Set it 0 to avoid confusion. iceDefaultRemoteAddr_[compId - 1].setPort(0); } const IpAddr IceTransport::Impl::getDefaultRemoteAddress(unsigned compId) const { ASSERT_COMP_ID(compId, compCount_); return iceDefaultRemoteAddr_[compId - 1]; } void IceTransport::Impl::onReceiveData(unsigned comp_id, void* pkt, pj_size_t size) { ASSERT_COMP_ID(comp_id, compCount_); if (size == 0) return; { auto& io = compIO_[comp_id - 1]; std::lock_guard lk(io.mutex); if (io.cb) { io.cb((uint8_t*) pkt, size); return; } } std::error_code ec; auto err = peerChannels_.at(comp_id - 1).write((const char*) pkt, size, ec); if (err < 0) { JAMI_ERR("[ice:%p] rx: channel is closed", this); } } //============================================================================== IceTransport::IceTransport(const char* name) : pimpl_ {std::make_unique(name)} {} IceTransport::~IceTransport() { isStopped_ = true; cancelOperations(); } void IceTransport::initIceInstance(const IceTransportOptions& options) { pimpl_->initIceInstance(options); } bool IceTransport::isInitialized() const { std::lock_guard lk(pimpl_->iceMutex_); return pimpl_->_isInitialized(); } bool IceTransport::isStarted() const { std::lock_guard lk {pimpl_->iceMutex_}; return pimpl_->_isStarted(); } bool IceTransport::isRunning() const { std::lock_guard lk {pimpl_->iceMutex_}; return pimpl_->_isRunning(); } bool IceTransport::isStopped() const { std::lock_guard lk {pimpl_->iceMutex_}; return pimpl_->is_stopped_; } bool IceTransport::isFailed() const { std::lock_guard lk {pimpl_->iceMutex_}; return pimpl_->_isFailed(); } unsigned IceTransport::getComponentCount() const { return pimpl_->compCount_; } bool IceTransport::setSlaveSession() { return pimpl_->setSlaveSession(); } bool IceTransport::setInitiatorSession() { return pimpl_->setInitiatorSession(); } std::string IceTransport::getLastErrMsg() const { return pimpl_->last_errmsg_; } bool IceTransport::isInitiator() const { if (isInitialized()) { std::lock_guard lk(pimpl_->iceMutex_); if (pimpl_->icest_) { return pj_ice_strans_get_role(pimpl_->icest_) == PJ_ICE_SESS_ROLE_CONTROLLING; } return false; } return pimpl_->initiatorSession_; } bool IceTransport::startIce(const Attribute& rem_attrs, std::vector&& rem_candidates) { if (not isInitialized()) { JAMI_ERR("[ice:%p] not initialized transport", pimpl_.get()); pimpl_->is_stopped_ = true; return false; } // pj_ice_strans_start_ice crashes if remote candidates array is empty if (rem_candidates.empty()) { JAMI_ERR("[ice:%p] start failed: no remote candidates", pimpl_.get()); pimpl_->is_stopped_ = true; return false; } auto comp_cnt = std::max(1u, getComponentCount()); if (rem_candidates.size() / comp_cnt > PJ_ICE_ST_MAX_CAND - 1) { std::vector rcands; rcands.reserve(PJ_ICE_ST_MAX_CAND - 1); JAMI_WARN("[ice:%p] too much candidates detected, trim list.", pimpl_.get()); // Just trim some candidates. To avoid to only take host candidates, iterate // through the whole list and select some host, some turn and peer reflexives // It should give at least enough infos to negotiate. auto maxHosts = 8; auto maxRelays = PJ_ICE_MAX_TURN; for (auto& c : rem_candidates) { if (c.type == PJ_ICE_CAND_TYPE_HOST) { if (maxHosts == 0) continue; maxHosts -= 1; } else if (c.type == PJ_ICE_CAND_TYPE_RELAYED) { if (maxRelays == 0) continue; maxRelays -= 1; } if (rcands.size() == PJ_ICE_ST_MAX_CAND - 1) break; rcands.emplace_back(std::move(c)); } rem_candidates = std::move(rcands); } pj_str_t ufrag, pwd; JAMI_DBG("[ice:%p] negotiation starting (%zu remote candidates)", pimpl_.get(), rem_candidates.size()); std::unique_lock lk(pimpl_->iceMutex_); if (not pimpl_->icest_) { return false; } auto status = pj_ice_strans_start_ice(pimpl_->icest_, pj_strset(&ufrag, (char*) rem_attrs.ufrag.c_str(), rem_attrs.ufrag.size()), pj_strset(&pwd, (char*) rem_attrs.pwd.c_str(), rem_attrs.pwd.size()), rem_candidates.size(), rem_candidates.data()); if (status != PJ_SUCCESS) { pimpl_->last_errmsg_ = sip_utils::sip_strerror(status); JAMI_ERR("[ice:%p] start failed: %s", pimpl_.get(), pimpl_->last_errmsg_.c_str()); pimpl_->is_stopped_ = true; return false; } return true; } bool IceTransport::startIce(const SDP& sdp) { if (pimpl_->streamsCount_ != 1) { JAMI_ERR("Expected exactly one stream per SDP (found %u streams)", pimpl_->streamsCount_); return false; } if (not isInitialized()) { JAMI_ERR("[ice:%p] not initialized transport", pimpl_.get()); pimpl_->is_stopped_ = true; return false; } for (unsigned id = 1; id <= getComponentCount(); id++) { auto candVec = getLocalCandidates(id); for (auto const& cand : candVec) { JAMI_DBG("[ice:%p] Using local candidate %s for comp %u", pimpl_.get(), cand.c_str(), id); } } JAMI_DBG("[ice:%p] negotiation starting (%zu remote candidates)", pimpl_.get(), sdp.candidates.size()); pj_str_t ufrag, pwd; std::vector rem_candidates; rem_candidates.reserve(sdp.candidates.size()); IceCandidate cand; for (const auto& line : sdp.candidates) { if (parseIceAttributeLine(0, line, cand)) rem_candidates.emplace_back(cand); } std::unique_lock lk(pimpl_->iceMutex_); if (not pimpl_->icest_) { return false; } auto status = pj_ice_strans_start_ice(pimpl_->icest_, pj_strset(&ufrag, (char*) sdp.ufrag.c_str(), sdp.ufrag.size()), pj_strset(&pwd, (char*) sdp.pwd.c_str(), sdp.pwd.size()), rem_candidates.size(), rem_candidates.data()); if (status != PJ_SUCCESS) { pimpl_->last_errmsg_ = sip_utils::sip_strerror(status); JAMI_ERR("[ice:%p] start failed: %s", pimpl_.get(), pimpl_->last_errmsg_.c_str()); pimpl_->is_stopped_ = true; return false; } return true; } bool IceTransport::stop() { pimpl_->is_stopped_ = true; if (isStarted()) { std::lock_guard lk {pimpl_->iceMutex_}; if (not pimpl_->icest_) return false; auto status = pj_ice_strans_stop_ice(pimpl_->icest_); if (status != PJ_SUCCESS) { pimpl_->last_errmsg_ = sip_utils::sip_strerror(status); JAMI_ERR("[ice:%p] ICE stop failed: %s", pimpl_.get(), pimpl_->last_errmsg_.c_str()); return false; } } return true; } void IceTransport::cancelOperations() { isCancelled_ = true; for (auto& c : pimpl_->peerChannels_) { c.stop(); } } IpAddr IceTransport::getLocalAddress(unsigned comp_id) const { return pimpl_->getLocalAddress(comp_id); } IpAddr IceTransport::getRemoteAddress(unsigned comp_id) const { // Return the default remote address if set. // Note that the default remote addresses are the addresses // set in the 'c=' and 'a=rtcp' lines of the received SDP. // See pj_ice_strans_sendto2() for more details. if (pimpl_->getDefaultRemoteAddress(comp_id)) { return pimpl_->getDefaultRemoteAddress(comp_id); } return pimpl_->getRemoteAddress(comp_id); } const IceTransport::Attribute IceTransport::getLocalAttributes() const { return {pimpl_->local_ufrag_, pimpl_->local_pwd_}; } std::vector IceTransport::getLocalCandidates(unsigned comp_id) const { ASSERT_COMP_ID(comp_id, getComponentCount()); std::vector res; pj_ice_sess_cand cand[MAX_CANDIDATES]; unsigned cand_cnt = PJ_ARRAY_SIZE(cand); { std::lock_guard lk {pimpl_->iceMutex_}; if (not pimpl_->icest_) return res; if (pj_ice_strans_enum_cands(pimpl_->icest_, comp_id, &cand_cnt, cand) != PJ_SUCCESS) { JAMI_ERR("[ice:%p] pj_ice_strans_enum_cands() failed", pimpl_.get()); return res; } } res.reserve(cand_cnt); for (unsigned i = 0; i < cand_cnt; ++i) { /** Section 4.5, RFC 6544 (https://tools.ietf.org/html/rfc6544) * candidate-attribute = "candidate" ":" foundation SP component-id * SP "TCP" SP priority SP connection-address SP port SP cand-type [SP * rel-addr] [SP rel-port] SP tcp-type-ext * *(SP extension-att-name SP * extension-att-value) * * tcp-type-ext = "tcptype" SP tcp-type * tcp-type = "active" / "passive" / "so" */ char ipaddr[PJ_INET6_ADDRSTRLEN]; std::string tcp_type; if (cand[i].transport != PJ_CAND_UDP) { tcp_type += " tcptype"; switch (cand[i].transport) { case PJ_CAND_TCP_ACTIVE: tcp_type += " active"; break; case PJ_CAND_TCP_PASSIVE: tcp_type += " passive"; break; case PJ_CAND_TCP_SO: default: tcp_type += " so"; break; } } res.emplace_back( fmt::format("{} {} {} {} {} {} typ {}{}", std::string_view(cand[i].foundation.ptr, cand[i].foundation.slen), cand[i].comp_id, (cand[i].transport == PJ_CAND_UDP ? "UDP" : "TCP"), cand[i].prio, pj_sockaddr_print(&cand[i].addr, ipaddr, sizeof(ipaddr), 0), pj_sockaddr_get_port(&cand[i].addr), pj_ice_get_cand_type_name(cand[i].type), tcp_type)); } return res; } std::vector IceTransport::getLocalCandidates(unsigned streamIdx, unsigned compId) const { ASSERT_COMP_ID(compId, getComponentCount()); std::vector res; pj_ice_sess_cand cand[MAX_CANDIDATES]; unsigned cand_cnt = MAX_CANDIDATES; { std::lock_guard lk {pimpl_->iceMutex_}; if (not pimpl_->icest_) return res; // In the implementation, the component IDs are enumerated globally // (per SDP: 1, 2, 3, 4, ...). This is simpler because we create // only one pj_ice_strans instance. However, the component IDs are // enumerated per stream in the generated SDP (1, 2, 1, 2, ...) in // order to be compliant with the spec. auto globalCompId = streamIdx * 2 + compId; if (pj_ice_strans_enum_cands(pimpl_->icest_, globalCompId, &cand_cnt, cand) != PJ_SUCCESS) { JAMI_ERR("[ice:%p] pj_ice_strans_enum_cands() failed", pimpl_.get()); return res; } } res.reserve(cand_cnt); // Build ICE attributes according to RFC 6544, section 4.5. for (unsigned i = 0; i < cand_cnt; ++i) { char ipaddr[PJ_INET6_ADDRSTRLEN]; std::string tcp_type; if (cand[i].transport != PJ_CAND_UDP) { tcp_type += " tcptype"; switch (cand[i].transport) { case PJ_CAND_TCP_ACTIVE: tcp_type += " active"; break; case PJ_CAND_TCP_PASSIVE: tcp_type += " passive"; break; case PJ_CAND_TCP_SO: default: tcp_type += " so"; break; } } res.emplace_back( fmt::format("{} {} {} {} {} {} typ {}{}", std::string_view(cand[i].foundation.ptr, cand[i].foundation.slen), compId, (cand[i].transport == PJ_CAND_UDP ? "UDP" : "TCP"), cand[i].prio, pj_sockaddr_print(&cand[i].addr, ipaddr, sizeof(ipaddr), 0), pj_sockaddr_get_port(&cand[i].addr), pj_ice_get_cand_type_name(cand[i].type), tcp_type)); } return res; } bool IceTransport::parseIceAttributeLine(unsigned streamIdx, const std::string& line, IceCandidate& cand) const { // Silently ignore empty lines if (line.empty()) return false; if (streamIdx >= pimpl_->streamsCount_) { throw std::runtime_error("Stream index " + std::to_string(streamIdx) + " is invalid!"); } int af, cnt; char foundation[32], transport[12], ipaddr[80], type[32], tcp_type[32]; pj_str_t tmpaddr; unsigned comp_id, prio, port; pj_status_t status; pj_bool_t is_tcp = PJ_FALSE; // Parse ICE attribute line according to RFC-6544 section 4.5. // TODO/WARNING: There is no fail-safe in case of malformed attributes. cnt = sscanf(line.c_str(), "%31s %u %11s %u %79s %u typ %31s tcptype %31s\n", foundation, &comp_id, transport, &prio, ipaddr, &port, type, tcp_type); if (cnt != 7 && cnt != 8) { JAMI_ERR("[ice:%p] Invalid ICE candidate line: %s", pimpl_.get(), line.c_str()); return false; } if (strcmp(transport, "TCP") == 0) { is_tcp = PJ_TRUE; } pj_bzero(&cand, sizeof(IceCandidate)); if (strcmp(type, "host") == 0) cand.type = PJ_ICE_CAND_TYPE_HOST; else if (strcmp(type, "srflx") == 0) cand.type = PJ_ICE_CAND_TYPE_SRFLX; else if (strcmp(type, "prflx") == 0) cand.type = PJ_ICE_CAND_TYPE_PRFLX; else if (strcmp(type, "relay") == 0) cand.type = PJ_ICE_CAND_TYPE_RELAYED; else { JAMI_WARN("[ice:%p] invalid remote candidate type '%s'", pimpl_.get(), type); return false; } if (is_tcp) { if (strcmp(tcp_type, "active") == 0) cand.transport = PJ_CAND_TCP_ACTIVE; else if (strcmp(tcp_type, "passive") == 0) cand.transport = PJ_CAND_TCP_PASSIVE; else if (strcmp(tcp_type, "so") == 0) cand.transport = PJ_CAND_TCP_SO; else { JAMI_WARN("[ice:%p] invalid transport type type '%s'", pimpl_.get(), tcp_type); return false; } } else { cand.transport = PJ_CAND_UDP; } // If the component Id is enumerated relative to media, convert // it to absolute enumeration. if (comp_id <= pimpl_->compCountPerStream_) { comp_id += pimpl_->compCountPerStream_ * streamIdx; } cand.comp_id = (pj_uint8_t) comp_id; cand.prio = prio; if (strchr(ipaddr, ':')) af = pj_AF_INET6(); else { af = pj_AF_INET(); pimpl_->onlyIPv4Private_ &= IpAddr(ipaddr).isPrivate(); } tmpaddr = pj_str(ipaddr); pj_sockaddr_init(af, &cand.addr, NULL, 0); status = pj_sockaddr_set_str_addr(af, &cand.addr, &tmpaddr); if (status != PJ_SUCCESS) { JAMI_WARN("[ice:%p] invalid IP address '%s'", pimpl_.get(), ipaddr); return false; } pj_sockaddr_set_port(&cand.addr, (pj_uint16_t) port); pj_strdup2(pimpl_->pool_.get(), &cand.foundation, foundation); return true; } ssize_t IceTransport::recv(unsigned compId, unsigned char* buf, size_t len, std::error_code& ec) { ASSERT_COMP_ID(compId, getComponentCount()); auto& io = pimpl_->compIO_[compId - 1]; std::lock_guard lk(io.mutex); if (io.queue.empty()) { ec = std::make_error_code(std::errc::resource_unavailable_try_again); return -1; } auto& packet = io.queue.front(); const auto count = std::min(len, packet.data.size()); std::copy_n(packet.data.begin(), count, buf); if (count == packet.data.size()) { io.queue.pop_front(); } else { packet.data.erase(packet.data.begin(), packet.data.begin() + count); } ec.clear(); return count; } ssize_t IceTransport::recvfrom(unsigned compId, char* buf, size_t len, std::error_code& ec) { ASSERT_COMP_ID(compId, getComponentCount()); return pimpl_->peerChannels_.at(compId - 1).read(buf, len, ec); } void IceTransport::setOnRecv(unsigned compId, IceRecvCb cb) { ASSERT_COMP_ID(compId, getComponentCount()); auto& io = pimpl_->compIO_[compId - 1]; std::lock_guard lk(io.mutex); io.cb = std::move(cb); if (io.cb) { // Flush existing queue using the callback for (const auto& packet : io.queue) io.cb((uint8_t*) packet.data.data(), packet.data.size()); io.queue.clear(); } } void IceTransport::setOnShutdown(onShutdownCb&& cb) { pimpl_->scb = cb; } ssize_t IceTransport::send(unsigned compId, const unsigned char* buf, size_t len) { ASSERT_COMP_ID(compId, getComponentCount()); auto remote = getRemoteAddress(compId); if (!remote) { JAMI_ERR("[ice:%p] can't find remote address for component %d", pimpl_.get(), compId); errno = EINVAL; return -1; } std::unique_lock lk(pimpl_->iceMutex_); if (not pimpl_->icest_) { return -1; } auto status = pj_ice_strans_sendto2(pimpl_->icest_, compId, buf, len, remote.pjPtr(), remote.getLength()); if (status == PJ_EPENDING && isTCPEnabled()) { // NOTE; because we are in TCP, the sent size will count the header (2 // bytes length). pimpl_->waitDataCv_.wait(lk, [&] { return pimpl_->lastSentLen_ >= static_cast(len) or pimpl_->destroying_.load(); }); pimpl_->lastSentLen_ = 0; } else if (status != PJ_SUCCESS && status != PJ_EPENDING) { if (status == PJ_EBUSY) { errno = EAGAIN; } else { pimpl_->last_errmsg_ = sip_utils::sip_strerror(status); JAMI_ERR("[ice:%p] ice send failed: %s", pimpl_.get(), pimpl_->last_errmsg_.c_str()); errno = EIO; } return -1; } return len; } int IceTransport::waitForInitialization(std::chrono::milliseconds timeout) { std::unique_lock lk(pimpl_->iceMutex_); if (!pimpl_->iceCV_.wait_for(lk, timeout, [this] { return pimpl_->threadTerminateFlags_ or pimpl_->_isInitialized() or pimpl_->_isFailed(); })) { JAMI_WARN("[ice:%p] waitForInitialization: timeout", pimpl_.get()); return -1; } return not(pimpl_->threadTerminateFlags_ or pimpl_->_isFailed()); } ssize_t IceTransport::waitForData(unsigned compId, std::chrono::milliseconds timeout, std::error_code& ec) { ASSERT_COMP_ID(compId, getComponentCount()); return pimpl_->peerChannels_.at(compId - 1).wait(timeout, ec); } std::vector IceTransport::parseSDPList(const std::vector& msg) { std::vector sdp_list; try { size_t off = 0; while (off != msg.size()) { msgpack::unpacked result; msgpack::unpack(result, (const char*) msg.data(), msg.size(), off); SDP sdp; if (result.get().type == msgpack::type::POSITIVE_INTEGER) { // Version 1 msgpack::unpack(result, (const char*) msg.data(), msg.size(), off); std::tie(sdp.ufrag, sdp.pwd) = result.get().as>(); msgpack::unpack(result, (const char*) msg.data(), msg.size(), off); auto comp_cnt = result.get().as(); while (comp_cnt-- > 0) { msgpack::unpack(result, (const char*) msg.data(), msg.size(), off); auto candidates = result.get().as>(); sdp.candidates.reserve(sdp.candidates.size() + candidates.size()); sdp.candidates.insert(sdp.candidates.end(), candidates.begin(), candidates.end()); } } else { result.get().convert(sdp); } sdp_list.emplace_back(std::move(sdp)); } } catch (const msgpack::unpack_error& e) { JAMI_WARN("Error parsing sdp: %s", e.what()); } return sdp_list; } bool IceTransport::isTCPEnabled() { return pimpl_->isTcpEnabled(); } ICESDP IceTransport::parseIceCandidates(std::string_view sdp_msg) { if (pimpl_->streamsCount_ != 1) { JAMI_ERR("Expected exactly one stream per SDP (found %u streams)", pimpl_->streamsCount_); return {}; } ICESDP res; int nr = 0; for (std::string_view line; jami::getline(sdp_msg, line); nr++) { if (nr == 0) { res.rem_ufrag = line; } else if (nr == 1) { res.rem_pwd = line; } else { IceCandidate cand; if (parseIceAttributeLine(0, std::string(line), cand)) { JAMI_DBG("[ice:%p] Add remote candidate: %.*s", pimpl_.get(), (int) line.size(), line.data()); res.rem_candidates.emplace_back(cand); } } } return res; } void IceTransport::setDefaultRemoteAddress(unsigned comp_id, const IpAddr& addr) { pimpl_->setDefaultRemoteAddress(comp_id, addr); } std::string IceTransport::link() const { return pimpl_->link(); } //============================================================================== IceTransportFactory::IceTransportFactory() : cp_(new pj_caching_pool(), [](pj_caching_pool* p) { pj_caching_pool_destroy(p); delete p; }) , ice_cfg_() { pj_caching_pool_init(cp_.get(), NULL, 0); pj_ice_strans_cfg_default(&ice_cfg_); ice_cfg_.stun_cfg.pf = &cp_->factory; // v2.4.5 of PJNATH has a default of 100ms but RFC 5389 since version 14 requires // a minimum of 500ms on fixed-line links. Our usual case is wireless links. // This solves too long ICE exchange by DHT. // Using 500ms with default PJ_STUN_MAX_TRANSMIT_COUNT (7) gives around 33s before timeout. ice_cfg_.stun_cfg.rto_msec = 500; // See https://tools.ietf.org/html/rfc5245#section-8.1.1.2 // If enabled, it may help speed-up the connectivity, but may cause // the nomination of sub-optimal pairs. ice_cfg_.opt.aggressive = PJ_FALSE; } IceTransportFactory::~IceTransportFactory() {} std::shared_ptr IceTransportFactory::createTransport(const char* name) { try { return std::make_shared(name); } catch (const std::exception& e) { JAMI_ERR("%s", e.what()); return nullptr; } } std::unique_ptr IceTransportFactory::createUTransport(const char* name) { try { return std::make_unique(name); } catch (const std::exception& e) { JAMI_ERR("%s", e.what()); return nullptr; } } //============================================================================== void IceSocketTransport::setOnRecv(RecvCb&& cb) { return ice_->setOnRecv(compId_, cb); } bool IceSocketTransport::isInitiator() const { return ice_->isInitiator(); } void IceSocketTransport::shutdown() { ice_->cancelOperations(); } int IceSocketTransport::maxPayload() const { auto ip_header_size = (ice_->getRemoteAddress(compId_).getFamily() == AF_INET) ? IPV4_HEADER_SIZE : IPV6_HEADER_SIZE; return STANDARD_MTU_SIZE - ip_header_size - UDP_HEADER_SIZE; } int IceSocketTransport::waitForData(std::chrono::milliseconds timeout, std::error_code& ec) const { if (!ice_->isRunning()) return -1; return ice_->waitForData(compId_, timeout, ec); } std::size_t IceSocketTransport::write(const ValueType* buf, std::size_t len, std::error_code& ec) { auto res = ice_->send(compId_, buf, len); if (res < 0) { ec.assign(errno, std::generic_category()); return 0; } ec.clear(); return res; } std::size_t IceSocketTransport::read(ValueType* buf, std::size_t len, std::error_code& ec) { if (!ice_->isRunning()) return 0; try { auto res = reliable_ ? ice_->recvfrom(compId_, reinterpret_cast(buf), len, ec) : ice_->recv(compId_, buf, len, ec); return (res < 0) ? 0 : res; } catch (const std::exception& e) { JAMI_ERR("IceSocketTransport::read exception: %s", e.what()); } return 0; } IpAddr IceSocketTransport::localAddr() const { return ice_->getLocalAddress(compId_); } IpAddr IceSocketTransport::remoteAddr() const { return ice_->getRemoteAddress(compId_); } //============================================================================== void IceSocket::close() { if (ice_transport_) ice_transport_->setOnRecv(compId_, {}); ice_transport_.reset(); } ssize_t IceSocket::send(const unsigned char* buf, size_t len) { if (not ice_transport_) return -1; return ice_transport_->send(compId_, buf, len); } ssize_t IceSocket::waitForData(std::chrono::milliseconds timeout) { if (not ice_transport_) return -1; std::error_code ec; return ice_transport_->waitForData(compId_, timeout, ec); } void IceSocket::setOnRecv(IceRecvCb cb) { if (ice_transport_) ice_transport_->setOnRecv(compId_, cb); } uint16_t IceSocket::getTransportOverhead() { if (not ice_transport_) return 0; return (ice_transport_->getRemoteAddress(compId_).getFamily() == AF_INET) ? IPV4_HEADER_SIZE : IPV6_HEADER_SIZE; } void IceSocket::setDefaultRemoteAddress(const IpAddr& addr) { if (ice_transport_) ice_transport_->setDefaultRemoteAddress(compId_, addr); } } // namespace jami