From ffa12c740b1bcc3fa3e6e73b2a16d1bf93333d5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Blin?= Date: Fri, 7 May 2021 15:45:10 -0400 Subject: [PATCH] multiplexed_socket: add ping/pong logic Detecting that a socket is down via a keep alive can take time, moreover if a relay is used, because the keep alive can continue to work even if the connection is half closed. In this patch, we add a mechanism where a beacon is sent over the multiplexed socket to know if the peer can answer. This message can be used to detect if a socket is usable and is sent whenever the user starts a call or that a connectivityChanged() occurs. Moreover, the multiplexedsocket can now sends a version to enable features and if a bad packet on the control socket comes, the socket is not stopped. GitLab: #542 Change-Id: Ia66d8d7f9b66bba02927f4ea41c21ef27089bceb --- configure.ac | 2 +- meson.build | 2 +- src/jamidht/connectionmanager.cpp | 99 +++-- src/jamidht/connectionmanager.h | 13 +- src/jamidht/jamiaccount.cpp | 26 +- src/jamidht/multiplexed_socket.cpp | 293 ++++++++++++- src/jamidht/multiplexed_socket.h | 61 ++- .../connectionManager/connectionManager.cpp | 415 +++++++++++++----- 8 files changed, 712 insertions(+), 199 deletions(-) diff --git a/configure.ac b/configure.ac index 6e5ede7b1..db6e14710 100644 --- a/configure.ac +++ b/configure.ac @@ -2,7 +2,7 @@ dnl Jami - configure.ac dnl Process this file with autoconf to produce a configure script. AC_PREREQ([2.69]) -AC_INIT([Jami Daemon],[10.0.0],[jami@gnu.org],[jami]) +AC_INIT([Jami Daemon],[10.0.1],[jami@gnu.org],[jami]) dnl Clear the implicit flags that default to '-g -O2', otherwise they dnl take precedence over the values we set via the diff --git a/meson.build b/meson.build index 76f778059..530a14feb 100644 --- a/meson.build +++ b/meson.build @@ -1,5 +1,5 @@ project('jami-daemon', ['c', 'cpp'], - version: '9.9.0', + version: '10.0.1', license: 'GPL3+', default_options: ['cpp_std=gnu++17', 'buildtype=debugoptimized'], meson_version:'>= 0.54' diff --git a/src/jamidht/connectionmanager.cpp b/src/jamidht/connectionmanager.cpp index fd73ea9bc..b5f5451be 100644 --- a/src/jamidht/connectionmanager.cpp +++ b/src/jamidht/connectionmanager.cpp @@ -117,7 +117,9 @@ public: const dht::Value::Id& vid, const std::shared_ptr& cert); void connectDevice(const DeviceId& deviceId, const std::string& uri, ConnectCallback cb); - void connectDevice(const std::shared_ptr& cert, const std::string& name, ConnectCallback cb); + void connectDevice(const std::shared_ptr& cert, + const std::string& name, + ConnectCallback cb); /** * Send a ChannelRequest on the TLS socket. Triggers cb when ready * @param sock socket used to send the request @@ -386,29 +388,28 @@ ConnectionManager::Impl::connectDevice(const DeviceId& deviceId, cb(nullptr, deviceId); return; } - account.findCertificate( - deviceId, - [w = weak(), deviceId, name, cb = std::move(cb)]( - const std::shared_ptr& cert) { - if (!cert) { - JAMI_ERR("Invalid certificate found for device %s", deviceId.to_c_str()); - cb(nullptr, deviceId); - return; - } - if (auto shared = w.lock()) { - shared->connectDevice(cert, name, std::move(cb)); - } - }); + account.findCertificate(deviceId, + [w = weak(), deviceId, name, cb = std::move(cb)]( + const std::shared_ptr& cert) { + if (!cert) { + JAMI_ERR("Invalid certificate found for device %s", + deviceId.to_c_str()); + cb(nullptr, deviceId); + return; + } + if (auto shared = w.lock()) { + shared->connectDevice(cert, name, std::move(cb)); + } + }); } void -ConnectionManager::Impl::connectDevice(const std::shared_ptr& cert, const std::string& name, ConnectCallback cb) +ConnectionManager::Impl::connectDevice(const std::shared_ptr& cert, + const std::string& name, + ConnectCallback cb) { // Avoid dht operation in a DHT callback to avoid deadlocks - runOnMainThread([w=weak(), - name = std::move(name), - cert = std::move(cert), - cb = std::move(cb)] { + runOnMainThread([w = weak(), name = std::move(name), cert = std::move(cert), cb = std::move(cb)] { auto deviceId = cert->getId(); auto sthis = w.lock(); if (!sthis || sthis->isDestroying_) { @@ -421,7 +422,7 @@ ConnectionManager::Impl::connectDevice(const std::shared_ptraccount.rand); --tentatives; } while (sthis->getPendingCallbacks(deviceId, vid).size() != 0 - && tentatives != MAX_TENTATIVES); + && tentatives != MAX_TENTATIVES); if (tentatives == MAX_TENTATIVES) { JAMI_ERR("Couldn't get a current random channel number"); cb(nullptr, deviceId); @@ -448,8 +449,7 @@ ConnectionManager::Impl::connectDevice(const std::shared_ptrgetInfo(deviceId)) { std::lock_guard lk(info->mutex_); if (info->socket_) { - JAMI_DBG("Peer already connected to %s. Add a new channel", - deviceId.to_c_str()); + JAMI_DBG("Peer already connected to %s. Add a new channel", deviceId.to_c_str()); info->cbIds_.emplace(cbId); sthis->sendChannelRequest(info->socket_, name, deviceId, vid); return; @@ -457,8 +457,7 @@ ConnectionManager::Impl::connectDevice(const std::shared_ptraccount.getIceOptions([w, - cbId, - deviceId = std::move(deviceId), - name = std::move(name), - cert = std::move(cert), - vid, - eraseInfo](auto&& ice_config) { + cbId, + deviceId = std::move(deviceId), + name = std::move(name), + cert = std::move(cert), + vid, + eraseInfo](auto&& ice_config) { auto sthis = w.lock(); if (!sthis) return; @@ -526,10 +525,10 @@ ConnectionManager::Impl::connectDevice(const std::shared_ptrinfos_[{req.from, req.id}] = info; } JAMI_INFO("[Account:%s] accepting connection from %s", - shared->account.getAccountID().c_str(), - deviceId.c_str()); + shared->account.getAccountID().c_str(), + deviceId.c_str()); std::unique_lock lk {info->mutex_}; - info->ice_ = Manager::instance() - .getIceTransportFactory() - .createUTransport(shared->account.getAccountID().c_str(), 1, true, ice_config); + info->ice_ = Manager::instance().getIceTransportFactory().createUTransport( + shared->account.getAccountID().c_str(), 1, true, ice_config); if (not info->ice_) { JAMI_ERR("Cannot initialize ICE session."); if (shared->connReadyCb_) @@ -978,7 +976,8 @@ bool ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const { auto pending = pimpl_->getPendingCallbacks(deviceId); - return std::find_if(pending.begin(), pending.end(), [&](auto p) { return p.name == name; }) != pending.end(); + return std::find_if(pending.begin(), pending.end(), [&](auto p) { return p.name == name; }) + != pending.end(); } void @@ -1053,12 +1052,26 @@ void ConnectionManager::monitor() const { std::lock_guard lk(pimpl_->infosMtx_); - JAMI_DBG("ConnectionManager for account %s (%s), current status:", pimpl_->account.getAccountID().c_str(), pimpl_->account.getUserUri().c_str()); + JAMI_DBG("ConnectionManager for account %s (%s), current status:", + pimpl_->account.getAccountID().c_str(), + pimpl_->account.getUserUri().c_str()); for (const auto& [_, ci] : pimpl_->infos_) { if (ci->socket_) ci->socket_->monitor(); } - JAMI_DBG("ConnectionManager for account %s (%s), end status.", pimpl_->account.getAccountID().c_str(), pimpl_->account.getUserUri().c_str()); + JAMI_DBG("ConnectionManager for account %s (%s), end status.", + pimpl_->account.getAccountID().c_str(), + pimpl_->account.getUserUri().c_str()); +} + +void +ConnectionManager::connectivityChanged() +{ + std::lock_guard lk(pimpl_->infosMtx_); + for (const auto& [_, ci] : pimpl_->infos_) { + if (ci->socket_) + ci->socket_->sendBeacon(); + } } } // namespace jami diff --git a/src/jamidht/connectionmanager.h b/src/jamidht/connectionmanager.h index 43b80c58e..9239601a7 100644 --- a/src/jamidht/connectionmanager.h +++ b/src/jamidht/connectionmanager.h @@ -86,7 +86,9 @@ public: * @param cb Callback called when socket is ready ready */ void connectDevice(const DeviceId& deviceId, const std::string& name, ConnectCallback cb); - void connectDevice(const std::shared_ptr& cert, const std::string& name, ConnectCallback cb); + void connectDevice(const std::shared_ptr& cert, + const std::string& name, + ConnectCallback cb); /** * Check if we are already connecting to a device with a specific name @@ -133,8 +135,17 @@ public: * @return the number of active sockets */ std::size_t activeSockets() const; + + /** + * Log informations for all sockets + */ void monitor() const; + /** + * Send beacon on peers supporting it + */ + void connectivityChanged(); + private: ConnectionManager() = delete; class Impl; diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp index 93c1958bf..9d4689a87 100644 --- a/src/jamidht/jamiaccount.cpp +++ b/src/jamidht/jamiaccount.cpp @@ -688,6 +688,8 @@ JamiAccount::startOutgoingCall(const std::shared_ptr& call, const std:: if (!transport) continue; + sipConn.channel->sendBeacon(); + JAMI_WARN("[call %s] A channeled socket is detected with this peer.", call->getCallId().c_str()); @@ -774,11 +776,11 @@ JamiAccount::onConnectedOutgoingCall(const std::shared_ptr& call, return; const auto localAddress = ip_utils::getInterfaceAddr(shared->getLocalInterface(), - target.getFamily()); + target.getFamily()); IpAddr addrSdp = shared->getPublishedSameasLocal() - ? localAddress - : shared->getPublishedIpAddress(target.getFamily()); + ? localAddress + : shared->getPublishedIpAddress(target.getFamily()); // fallback on local address if (not addrSdp) @@ -967,8 +969,7 @@ JamiAccount::serialize(YAML::Emitter& out) const if (receiptSignature_.size() > 0) out << YAML::Key << Conf::RING_ACCOUNT_RECEIPT_SIG << YAML::Value << YAML::Binary(receiptSignature_.data(), receiptSignature_.size()); - out << YAML::Key << DRing::Account::ConfProperties::DEVICE_NAME << YAML::Value - << deviceName_; + out << YAML::Key << DRing::Account::ConfProperties::DEVICE_NAME << YAML::Value << deviceName_; out << YAML::Key << DRing::Account::ConfProperties::MANAGER_URI << YAML::Value << managerUri_; out << YAML::Key << DRing::Account::ConfProperties::MANAGER_USERNAME << YAML::Value << managerUsername_; @@ -2749,14 +2750,14 @@ JamiAccount::incomingCall(dht::IceCandidates&& msg, }); }; auto ice = createIceTransport(("sip:" + call->getCallId()).c_str(), - ICE_COMPONENTS, - false, - iceOptions); + ICE_COMPONENTS, + false, + iceOptions); iceOptions.tcpEnable = true; auto ice_tcp = createIceTransport(("sip:" + call->getCallId()).c_str(), - ICE_COMPONENTS, - true, - iceOptions); + ICE_COMPONENTS, + true, + iceOptions); std::weak_ptr wcall = call; Manager::instance().addTask([account = shared(), wcall, ice, ice_tcp, msg, from_cert, from] { @@ -2943,6 +2944,7 @@ JamiAccount::connectivityChanged() return; } dht_->connectivityChanged(); + connectionManager_->connectivityChanged(); // reset cache setPublishedAddress({}); } @@ -3716,7 +3718,7 @@ JamiAccount::onIsComposing(const std::string& peer, bool isWriting) void JamiAccount::getIceOptions(std::function cb) noexcept { - storeActiveIpAddress([this, cb=std::move(cb)] { + storeActiveIpAddress([this, cb = std::move(cb)] { auto opts = SIPAccountBase::getIceOptions(); auto publishedAddr = getPublishedIpAddress(); diff --git a/src/jamidht/multiplexed_socket.cpp b/src/jamidht/multiplexed_socket.cpp index ed21e4360..8981f6602 100644 --- a/src/jamidht/multiplexed_socket.cpp +++ b/src/jamidht/multiplexed_socket.cpp @@ -26,9 +26,29 @@ #include #include -namespace jami { - static constexpr std::size_t IO_BUFFER_SIZE {8192}; ///< Size of char buffer used by IO operations +static constexpr int MULTIPLEXED_SOCKET_VERSION {1}; + +struct ChanneledMessage +{ + uint16_t channel; + std::vector data; + MSGPACK_DEFINE(channel, data) +}; + +struct BeaconMsg +{ + bool p; + MSGPACK_DEFINE_MAP(p) +}; + +struct VersionMsg +{ + int v; + MSGPACK_DEFINE_MAP(v) +}; + +namespace jami { using clock = std::chrono::steady_clock; using time_point = clock::time_point; @@ -54,6 +74,7 @@ public: eventLoop(); } catch (const std::exception& e) { JAMI_ERR() << "[CNX] peer connection event loop failure: " << e.what(); + shutdown(); } }} {} @@ -97,6 +118,8 @@ public: return; stop.store(true); isShutdown_ = true; + if (beaconTask_) + beaconTask_->cancel(); if (onShutdown_) onShutdown_(); if (endpoint) { @@ -114,6 +137,8 @@ public: * Triggered when a new control packet is received */ void handleControlPacket(std::vector&& pkt); + void handleProtocolPacket(std::vector&& pkt); + bool handleProtocolMsg(const msgpack::object& o); /** * Triggered when a new packet on a channel is received */ @@ -124,6 +149,14 @@ public: void setOnReady(OnConnectionReadyCb&& cb) { onChannelReady_ = std::move(cb); } void setOnRequest(OnConnectionRequestCb&& cb) { onRequest_ = std::move(cb); } + // Beacon + void sendBeacon(const std::chrono::milliseconds& timeout); + void handleBeaconRequest(); + void handleBeaconResponse(); + std::atomic_int beaconCounter_ {0}; + + bool writeProtocolMessage(const msgpack::sbuffer& buffer); + msgpack::unpacker pac_ {}; MultiplexedSocket& parent_; @@ -155,6 +188,16 @@ public: std::mutex writeMtx {}; time_point start_ {clock::now()}; + std::shared_ptr beaconTask_ {}; + + // version related stuff + void sendVersion(); + void onVersion(int version); + std::atomic_bool canSendBeacon_ {false}; + std::atomic_bool answerBeacon_ {true}; + int version_ {MULTIPLEXED_SOCKET_VERSION}; + std::function onBeaconCb_ {}; + std::function onVersionCb_ {}; }; void @@ -168,6 +211,7 @@ MultiplexedSocket::Impl::eventLoop() } return true; }); + sendVersion(); std::error_code ec; while (!stop) { if (!endpoint) { @@ -192,12 +236,14 @@ MultiplexedSocket::Impl::eventLoop() while (pac_.next(oh) && !stop) { try { auto msg = oh.get().as(); - if (msg.channel == 0) + if (msg.channel == CONTROL_CHANNEL) handleControlPacket(std::move(msg.data)); + else if (msg.channel == PROTOCOL_CHANNEL) + handleProtocolPacket(std::move(msg.data)); else handleChannelPacket(msg.channel, std::move(msg.data)); - } catch (const msgpack::unpack_error& e) { - JAMI_WARN("Error when decoding msgpack message: %s", e.what()); + continue; + } catch (...) { } } } @@ -221,6 +267,95 @@ MultiplexedSocket::Impl::onAccept(const std::string& name, uint16_t channel) } } +void +MultiplexedSocket::Impl::sendBeacon(const std::chrono::milliseconds& timeout) +{ + if (!canSendBeacon_) + return; + beaconCounter_++; + JAMI_DBG("Send beacon to peer %s", deviceId.to_c_str()); + + msgpack::sbuffer buffer(8); + msgpack::packer pk(&buffer); + pk.pack(BeaconMsg {true}); + if (!writeProtocolMessage(buffer)) + return; + beaconTask_ = Manager::instance().scheduleTaskIn( + [w = parent_.weak()]() { + if (auto shared = w.lock()) { + if (shared->pimpl_->beaconCounter_ != 0) { + JAMI_ERR() << "Beacon doesn't get any response. Stopping socket"; + shared->shutdown(); + } + } + }, + timeout); +} + +void +MultiplexedSocket::Impl::handleBeaconRequest() +{ + if (!answerBeacon_) + return; + // Run this on dedicated thread because some callbacks can take time + dht::ThreadPool::io().run([w = parent_.weak()]() { + if (auto shared = w.lock()) { + msgpack::sbuffer buffer(8); + msgpack::packer pk(&buffer); + pk.pack(BeaconMsg {false}); + JAMI_DBG("Send beacon response to peer %s", shared->deviceId().to_c_str()); + shared->pimpl_->writeProtocolMessage(buffer); + } + }); +} + +void +MultiplexedSocket::Impl::handleBeaconResponse() +{ + JAMI_DBG("Get beacon response from peer %s", deviceId.to_c_str()); + beaconCounter_--; +} + +bool +MultiplexedSocket::Impl::writeProtocolMessage(const msgpack::sbuffer& buffer) +{ + std::error_code ec; + int wr = parent_.write(PROTOCOL_CHANNEL, + (const unsigned char*) buffer.data(), + buffer.size(), + ec); + return wr > 0; +} + +void +MultiplexedSocket::Impl::sendVersion() +{ + dht::ThreadPool::io().run([w = parent_.weak()]() { + if (auto shared = w.lock()) { + auto version = shared->pimpl_->version_; + msgpack::sbuffer buffer(8); + msgpack::packer pk(&buffer); + pk.pack(VersionMsg {version}); + shared->pimpl_->writeProtocolMessage(buffer); + } + }); +} + +void +MultiplexedSocket::Impl::onVersion(int version) +{ + // Check if version > 1 + if (version >= 1) { + JAMI_INFO() << "Enable beacon support for " << deviceId; + canSendBeacon_ = true; + } else { + JAMI_WARN("Peer %s uses an old version which doesn't support all the features (%d)", + deviceId.to_c_str(), + version); + canSendBeacon_ = false; + } +} + void MultiplexedSocket::Impl::onRequest(const std::string& name, uint16_t channel) { @@ -275,28 +410,30 @@ MultiplexedSocket::Impl::handleControlPacket(std::vector&& pkt) { // Run this on dedicated thread because some callbacks can take time dht::ThreadPool::io().run([w = parent_.weak(), pkt = std::move(pkt)]() { + auto shared = w.lock(); + if (!shared) + return; try { size_t off = 0; while (off != pkt.size()) { msgpack::unpacked result; msgpack::unpack(result, (const char*) pkt.data(), pkt.size(), off); - auto req = result.get().as(); - if (auto shared = w.lock()) { - if (req.state == ChannelRequestState::ACCEPT) { - shared->pimpl_->onAccept(req.name, req.channel); - } else if (req.state == ChannelRequestState::DECLINE) { - std::lock_guard lkSockets(shared->pimpl_->socketsMutex); - shared->pimpl_->channelDatas_.erase(req.channel); - shared->pimpl_->sockets.erase(req.channel); - } else if (shared->pimpl_->onRequest_) { - shared->pimpl_->onRequest(req.name, req.channel); - } + auto object = result.get(); + if (shared->pimpl_->handleProtocolMsg(object)) + continue; + auto req = object.as(); + if (req.state == ChannelRequestState::ACCEPT) { + shared->pimpl_->onAccept(req.name, req.channel); + } else if (req.state == ChannelRequestState::DECLINE) { + std::lock_guard lkSockets(shared->pimpl_->socketsMutex); + shared->pimpl_->channelDatas_.erase(req.channel); + shared->pimpl_->sockets.erase(req.channel); + } else if (shared->pimpl_->onRequest_) { + shared->pimpl_->onRequest(req.name, req.channel); } } } catch (const std::exception& e) { JAMI_ERR("Error on the control channel: %s", e.what()); - if (auto shared = w.lock()) - shared->pimpl_->stop.store(true); } }); } @@ -340,6 +477,60 @@ MultiplexedSocket::Impl::handleChannelPacket(uint16_t channel, std::vector 0) { + auto key = o.via.map.ptr[0].key.as(); + if (key == "p") { + auto msg = o.as(); + if (msg.p) + handleBeaconRequest(); + else + handleBeaconResponse(); + if (onBeaconCb_) + onBeaconCb_(msg.p); + return true; + } else if (key == "v") { + auto msg = o.as(); + onVersion(msg.v); + if (onVersionCb_) + onVersionCb_(msg.v); + return true; + } else { + JAMI_WARN("Unknown message type"); + } + } + } catch (const std::exception& e) { + JAMI_ERR() << e.what(); + } + return false; +} + +void +MultiplexedSocket::Impl::handleProtocolPacket(std::vector&& pkt) +{ + // Run this on dedicated thread because some callbacks can take time + dht::ThreadPool::io().run([w = parent_.weak(), pkt = std::move(pkt)]() { + auto shared = w.lock(); + if (!shared) + return; + try { + size_t off = 0; + while (off != pkt.size()) { + msgpack::unpacked result; + msgpack::unpack(result, (const char*) pkt.data(), pkt.size(), off); + auto object = result.get(); + if (shared->pimpl_->handleProtocolMsg(object)) + return; + } + } catch (const std::exception& e) { + JAMI_ERR("Error on the protocol channel: %s", e.what()); + } + }); +} + MultiplexedSocket::MultiplexedSocket(const DeviceId& deviceId, std::unique_ptr endpoint) : pimpl_(std::make_unique(*this, deviceId, std::move(endpoint))) @@ -358,6 +549,8 @@ MultiplexedSocket::addChannel(const std::string& name) std::lock_guard lk(pimpl_->socketsMutex); for (int i = 1; i < UINT16_MAX; ++i) { auto c = (offset + i) % UINT16_MAX; + if (c == CONTROL_CHANNEL || c == PROTOCOL_CHANNEL) + continue; auto& socket = pimpl_->sockets[c]; if (!socket) { auto& channel = pimpl_->channelDatas_[c]; @@ -575,6 +768,50 @@ MultiplexedSocket::monitor() const } } +void +MultiplexedSocket::sendBeacon(const std::chrono::milliseconds& timeout) +{ + pimpl_->sendBeacon(timeout); +} + +#ifdef DRING_TESTABLE +bool +MultiplexedSocket::canSendBeacon() const +{ + return pimpl_->canSendBeacon_; +} + +void +MultiplexedSocket::answerToBeacon(bool value) +{ + pimpl_->answerBeacon_ = value; +} + +void +MultiplexedSocket::setVersion(int version) +{ + pimpl_->version_ = version; +} + +void +MultiplexedSocket::setOnBeaconCb(const std::function& cb) +{ + pimpl_->onBeaconCb_ = cb; +} + +void +MultiplexedSocket::setOnVersionCb(const std::function& cb) +{ + pimpl_->onVersionCb_ = cb; +} + +void +MultiplexedSocket::sendVersion() +{ + pimpl_->sendVersion(); +} +#endif + //////////////////////////////////////////////////////////////// class ChannelSocket::Impl @@ -666,6 +903,16 @@ ChannelSocket::underlyingICE() const return {}; } +#ifdef DRING_TESTABLE +std::shared_ptr +ChannelSocket::underlyingSocket() const +{ + if (auto mtx = pimpl_->endpoint.lock()) + return mtx; + return {}; +} +#endif + void ChannelSocket::stop() { @@ -743,4 +990,14 @@ ChannelSocket::onShutdown(OnShutdownCb&& cb) } } +void +ChannelSocket::sendBeacon(const std::chrono::milliseconds& timeout) +{ + if (auto ep = pimpl_->endpoint.lock()) { + ep->sendBeacon(timeout); + } else { + shutdown(); + } +} + } // namespace jami diff --git a/src/jamidht/multiplexed_socket.h b/src/jamidht/multiplexed_socket.h index ab69b88dd..d75e420d0 100644 --- a/src/jamidht/multiplexed_socket.h +++ b/src/jamidht/multiplexed_socket.h @@ -35,7 +35,9 @@ using OnConnectionReadyCb using onChannelReadyCb = std::function; using OnShutdownCb = std::function; +static constexpr auto SEND_BEACON_TIMEOUT = std::chrono::milliseconds(3000); static constexpr uint16_t CONTROL_CHANNEL {0}; +static constexpr uint16_t PROTOCOL_CHANNEL {0xffff}; enum class ChannelRequestState { REQUEST, @@ -55,13 +57,6 @@ struct ChannelRequest MSGPACK_DEFINE(name, channel, state) }; -struct ChanneledMessage -{ - uint16_t channel; - std::vector data; - MSGPACK_DEFINE(channel, data) -}; - /** * A socket divided in channels over a TLS session */ @@ -129,8 +124,50 @@ public: std::shared_ptr underlyingICE() const; + /** + * Get informations from socket (channels opened) + */ void monitor() const; + /** + * Send a beacon on the socket and close if no response come + * @param timeout + */ + void sendBeacon(const std::chrono::milliseconds& timeout = SEND_BEACON_TIMEOUT); + +#ifdef DRING_TESTABLE + /** + * Check if we can send beacon on the socket + */ + bool canSendBeacon() const; + + /** + * Decide if yes or not we answer to beacon + * @param value New value + */ + void answerToBeacon(bool value); + + /** + * Change version sent to the peer + */ + void setVersion(int version); + + /** + * Set a callback to detect beacon messages + */ + void setOnBeaconCb(const std::function& cb); + + /** + * Set a callback to detect version messages + */ + void setOnVersionCb(const std::function& cb); + + /** + * Send the version + */ + void sendVersion(); +#endif + private: class Impl; std::unique_ptr pimpl_; @@ -186,6 +223,16 @@ public: std::shared_ptr underlyingICE() const; + /** + * Send a beacon on the socket and close if no response come + * @param timeout + */ + void sendBeacon(const std::chrono::milliseconds& timeout = SEND_BEACON_TIMEOUT); + +#ifdef DRING_TESTABLE + std::shared_ptr underlyingSocket() const; +#endif + private: class Impl; std::unique_ptr pimpl_; diff --git a/test/unitTest/connectionManager/connectionManager.cpp b/test/unitTest/connectionManager/connectionManager.cpp index abb226f84..ecf87d224 100644 --- a/test/unitTest/connectionManager/connectionManager.cpp +++ b/test/unitTest/connectionManager/connectionManager.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017-2019 Savoir-faire Linux Inc. + * Copyright (C) 2017-2021 Savoir-faire Linux Inc. * Author: Sébastien Blin * * This program is free software; you can redistribute it and/or modify @@ -69,6 +69,10 @@ private: void testFloodSocket(); void testDestroyWhileSending(); void testIsConnecting(); + void testCanSendBeacon(); + void testCannotSendBeacon(); + void testConnectivityChangeTriggerBeacon(); + void testOnNoBeaconTriggersShutdown(); CPPUNIT_TEST_SUITE(ConnectionManagerTest); CPPUNIT_TEST(testConnectDevice); @@ -86,6 +90,10 @@ private: CPPUNIT_TEST(testFloodSocket); CPPUNIT_TEST(testDestroyWhileSending); CPPUNIT_TEST(testIsConnecting); + CPPUNIT_TEST(testCanSendBeacon); + CPPUNIT_TEST(testCannotSendBeacon); + CPPUNIT_TEST(testConnectivityChangeTriggerBeacon); + CPPUNIT_TEST(testOnNoBeaconTriggersShutdown); CPPUNIT_TEST_SUITE_END(); }; @@ -121,7 +129,6 @@ ConnectionManagerTest::setUp() std::mutex mtx; std::unique_lock lk {mtx}; std::condition_variable cv; - auto accountsReady = 0; confHandlers.insert( DRing::exportable_callback( [&](const std::string&, const std::map&) { @@ -186,16 +193,15 @@ ConnectionManagerTest::testConnectDevice() return true; }); - aliceAccount->connectionManager() - .connectDevice(bobDeviceId, - "git://*", - [&cv, &successfullyConnected](std::shared_ptr socket, - const DeviceId&) { - if (socket) { - successfullyConnected = true; - } - cv.notify_one(); - }); + aliceAccount->connectionManager().connectDevice(bobDeviceId, + "git://*", + [&](std::shared_ptr socket, + const DeviceId&) { + if (socket) { + successfullyConnected = true; + } + cv.notify_one(); + }); CPPUNIT_ASSERT( cvReceive.wait_for(lk, std::chrono::seconds(60), [&] { return successfullyReceive; })); CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { return successfullyConnected; })); @@ -231,16 +237,15 @@ ConnectionManagerTest::testAcceptConnection() receiverConnected = socket && (name == "git://*"); }); - aliceAccount->connectionManager() - .connectDevice(bobDeviceId, - "git://*", - [&cv, &successfullyConnected](std::shared_ptr socket, - const DeviceId&) { - if (socket) { - successfullyConnected = true; - } - cv.notify_one(); - }); + aliceAccount->connectionManager().connectDevice(bobDeviceId, + "git://*", + [&](std::shared_ptr socket, + const DeviceId&) { + if (socket) { + successfullyConnected = true; + } + cv.notify_one(); + }); CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { return successfullyReceive && successfullyConnected && receiverConnected; @@ -275,27 +280,25 @@ ConnectionManagerTest::testMultipleChannels() receiverConnected += 1; }); - aliceAccount->connectionManager() - .connectDevice(bobDeviceId, - "git://*", - [&cv, &successfullyConnected](std::shared_ptr socket, - const DeviceId&) { - if (socket) { - successfullyConnected = true; - } - cv.notify_one(); - }); + aliceAccount->connectionManager().connectDevice(bobDeviceId, + "git://*", + [&](std::shared_ptr socket, + const DeviceId&) { + if (socket) { + successfullyConnected = true; + } + cv.notify_one(); + }); - aliceAccount->connectionManager() - .connectDevice(bobDeviceId, - "sip://*", - [&cv, &successfullyConnected2](std::shared_ptr socket, - const DeviceId&) { - if (socket) { - successfullyConnected2 = true; - } - cv.notify_one(); - }); + aliceAccount->connectionManager().connectDevice(bobDeviceId, + "sip://*", + [&](std::shared_ptr socket, + const DeviceId&) { + if (socket) { + successfullyConnected2 = true; + } + cv.notify_one(); + }); CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { return successfullyConnected && successfullyConnected2 && receiverConnected == 2; @@ -331,28 +334,26 @@ ConnectionManagerTest::testMultipleChannelsSameName() receiverConnected += 1; }); - aliceAccount->connectionManager() - .connectDevice(bobDeviceId, - "git://*", - [&cv, &successfullyConnected](std::shared_ptr socket, - const DeviceId&) { - if (socket) { - successfullyConnected = true; - } - cv.notify_one(); - }); + aliceAccount->connectionManager().connectDevice(bobDeviceId, + "git://*", + [&](std::shared_ptr socket, + const DeviceId&) { + if (socket) { + successfullyConnected = true; + } + cv.notify_one(); + }); // We can open two sockets with the same name, it will be two different channel - aliceAccount->connectionManager() - .connectDevice(bobDeviceId, - "git://*", - [&cv, &successfullyConnected2](std::shared_ptr socket, - const DeviceId&) { - if (socket) { - successfullyConnected2 = true; - } - cv.notify_one(); - }); + aliceAccount->connectionManager().connectDevice(bobDeviceId, + "git://*", + [&](std::shared_ptr socket, + const DeviceId&) { + if (socket) { + successfullyConnected2 = true; + } + cv.notify_one(); + }); CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { return successfullyConnected && successfullyConnected2 && receiverConnected == 2; @@ -467,16 +468,15 @@ ConnectionManagerTest::testDeclineConnection() receiverConnected = true; }); - aliceAccount->connectionManager() - .connectDevice(bobDeviceId, - "git://*", - [&cv, &successfullyConnected](std::shared_ptr socket, - const DeviceId&) { - if (socket) { - successfullyConnected = true; - } - cv.notify_one(); - }); + aliceAccount->connectionManager().connectDevice(bobDeviceId, + "git://*", + [&](std::shared_ptr socket, + const DeviceId&) { + if (socket) { + successfullyConnected = true; + } + cv.notify_one(); + }); cv.wait_for(lk, std::chrono::seconds(30)); CPPUNIT_ASSERT(successfullyReceive); CPPUNIT_ASSERT(!successfullyConnected); @@ -513,16 +513,15 @@ ConnectionManagerTest::testAcceptsICERequest() receiverConnected = socket && (name == "git://*"); }); - aliceAccount->connectionManager() - .connectDevice(bobDeviceId, - "git://*", - [&cv, &successfullyConnected](std::shared_ptr socket, - const DeviceId&) { - if (socket) { - successfullyConnected = true; - } - cv.notify_one(); - }); + aliceAccount->connectionManager().connectDevice(bobDeviceId, + "git://*", + [&](std::shared_ptr socket, + const DeviceId&) { + if (socket) { + successfullyConnected = true; + } + cv.notify_one(); + }); CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { return successfullyReceive && successfullyConnected && receiverConnected; @@ -559,16 +558,15 @@ ConnectionManagerTest::testDeclineICERequest() receiverConnected = socket && (name == "git://*"); }); - aliceAccount->connectionManager() - .connectDevice(bobDeviceId, - "git://*", - [&cv, &successfullyConnected](std::shared_ptr socket, - const DeviceId&) { - if (socket) { - successfullyConnected = true; - } - cv.notify_one(); - }); + aliceAccount->connectionManager().connectDevice(bobDeviceId, + "git://*", + [&](std::shared_ptr socket, + const DeviceId&) { + if (socket) { + successfullyConnected = true; + } + cv.notify_one(); + }); cv.wait_for(lk, std::chrono::seconds(30)); CPPUNIT_ASSERT(successfullyReceive); @@ -1017,37 +1015,222 @@ ConnectionManagerTest::testIsConnecting() std::condition_variable cv; bool successfullyConnected = false, successfullyReceive = false; - bobAccount->connectionManager().onChannelRequest( - [&](const DeviceId&, const std::string&) { - successfullyReceive = true; - cv.notify_one(); - std::this_thread::sleep_for(std::chrono::seconds(2)); - return true; - }); + bobAccount->connectionManager().onChannelRequest([&](const DeviceId&, const std::string&) { + successfullyReceive = true; + cv.notify_one(); + std::this_thread::sleep_for(std::chrono::seconds(2)); + return true; + }); CPPUNIT_ASSERT(!aliceAccount->connectionManager().isConnecting(bobDeviceId, "sip")); - aliceAccount->connectionManager() - .connectDevice(bobDeviceId, - "sip", - [&cv, &successfullyConnected](std::shared_ptr socket, - const DeviceId&) { - if (socket) { - successfullyConnected = true; - } - cv.notify_one(); - }); + aliceAccount->connectionManager().connectDevice(bobDeviceId, + "sip", + [&](std::shared_ptr socket, + const DeviceId&) { + if (socket) { + successfullyConnected = true; + } + cv.notify_one(); + }); // connectDevice is full async, so isConnecting will be true after a few ms. - CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { - return successfullyReceive; - })); + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { return successfullyReceive; })); CPPUNIT_ASSERT(aliceAccount->connectionManager().isConnecting(bobDeviceId, "sip")); - CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { - return successfullyConnected; - })); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Just to wait for the callback to finish + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { return successfullyConnected; })); + std::this_thread::sleep_for( + std::chrono::milliseconds(100)); // Just to wait for the callback to finish CPPUNIT_ASSERT(!aliceAccount->connectionManager().isConnecting(bobDeviceId, "sip")); } +void +ConnectionManagerTest::testCanSendBeacon() +{ + auto aliceAccount = Manager::instance().getAccount(aliceId); + auto bobAccount = Manager::instance().getAccount(bobId); + auto bobDeviceId = DeviceId(std::string(bobAccount->currentDeviceId())); + + bobAccount->connectionManager().onICERequest([](const DeviceId&) { return true; }); + aliceAccount->connectionManager().onICERequest([](const DeviceId&) { return true; }); + + std::mutex mtx; + std::unique_lock lk {mtx}; + std::condition_variable cv; + bool successfullyConnected = false; + + std::shared_ptr aliceSocket, bobSocket; + bobAccount->connectionManager().onChannelRequest( + [&](const DeviceId&, const std::string&) { return true; }); + bobAccount->connectionManager().onConnectionReady( + [&](const DeviceId&, const std::string&, std::shared_ptr socket) { + if (socket && socket->name() == "sip") + bobSocket = socket->underlyingSocket(); + cv.notify_one(); + }); + + aliceAccount->connectionManager().connectDevice(bobDeviceId, + "sip", + [&](std::shared_ptr socket, + const DeviceId&) { + if (socket) { + aliceSocket = socket->underlyingSocket(); + successfullyConnected = true; + } + cv.notify_one(); + }); + // connectDevice is full async, so isConnecting will be true after a few ms. + CPPUNIT_ASSERT( + cv.wait_for(lk, std::chrono::seconds(30), [&] { return aliceSocket && bobSocket; })); + CPPUNIT_ASSERT(aliceSocket->canSendBeacon()); + CPPUNIT_ASSERT(bobSocket->canSendBeacon()); +} + +void +ConnectionManagerTest::testCannotSendBeacon() +{ + auto aliceAccount = Manager::instance().getAccount(aliceId); + auto bobAccount = Manager::instance().getAccount(bobId); + auto bobDeviceId = DeviceId(std::string(bobAccount->currentDeviceId())); + + bobAccount->connectionManager().onICERequest([](const DeviceId&) { return true; }); + aliceAccount->connectionManager().onICERequest([](const DeviceId&) { return true; }); + + std::mutex mtx; + std::unique_lock lk {mtx}; + std::condition_variable cv; + bool successfullyConnected = false; + + std::shared_ptr aliceSocket, bobSocket; + bobAccount->connectionManager().onChannelRequest( + [&](const DeviceId&, const std::string&) { return true; }); + bobAccount->connectionManager().onConnectionReady( + [&](const DeviceId&, const std::string&, std::shared_ptr socket) { + if (socket && socket->name() == "sip") + bobSocket = socket->underlyingSocket(); + cv.notify_one(); + }); + + aliceAccount->connectionManager().connectDevice(bobDeviceId, + "sip", + [&](std::shared_ptr socket, + const DeviceId&) { + if (socket) { + aliceSocket = socket->underlyingSocket(); + successfullyConnected = true; + } + cv.notify_one(); + }); + // connectDevice is full async, so isConnecting will be true after a few ms. + CPPUNIT_ASSERT( + cv.wait_for(lk, std::chrono::seconds(30), [&] { return aliceSocket && bobSocket; })); + + int version = 1412; + bobSocket->setOnVersionCb([&](auto v) { + version = v; + cv.notify_one(); + }); + aliceSocket->setVersion(0); + aliceSocket->sendVersion(); + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { return version == 0; })); + CPPUNIT_ASSERT(!bobSocket->canSendBeacon()); +} + +void +ConnectionManagerTest::testConnectivityChangeTriggerBeacon() +{ + auto aliceAccount = Manager::instance().getAccount(aliceId); + auto bobAccount = Manager::instance().getAccount(bobId); + auto bobDeviceId = DeviceId(std::string(bobAccount->currentDeviceId())); + + bobAccount->connectionManager().onICERequest([](const DeviceId&) { return true; }); + aliceAccount->connectionManager().onICERequest([](const DeviceId&) { return true; }); + + std::mutex mtx; + std::unique_lock lk {mtx}; + std::condition_variable cv; + bool successfullyConnected = false; + + std::shared_ptr aliceSocket, bobSocket; + bobAccount->connectionManager().onChannelRequest( + [&](const DeviceId&, const std::string&) { return true; }); + bobAccount->connectionManager().onConnectionReady( + [&](const DeviceId&, const std::string&, std::shared_ptr socket) { + if (socket && socket->name() == "sip") + bobSocket = socket->underlyingSocket(); + cv.notify_one(); + }); + + aliceAccount->connectionManager().connectDevice(bobDeviceId, + "sip", + [&](std::shared_ptr socket, + const DeviceId&) { + if (socket) { + aliceSocket = socket->underlyingSocket(); + successfullyConnected = true; + } + cv.notify_one(); + }); + // connectDevice is full async, so isConnecting will be true after a few ms. + CPPUNIT_ASSERT( + cv.wait_for(lk, std::chrono::seconds(30), [&] { return aliceSocket && bobSocket; })); + + bool hasRequest = false; + bobSocket->setOnBeaconCb([&](auto p) { + if (p) + hasRequest = true; + cv.notify_one(); + }); + aliceAccount->connectionManager().connectivityChanged(); + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(10), [&] { return hasRequest; })); +} + +void +ConnectionManagerTest::testOnNoBeaconTriggersShutdown() +{ + auto aliceAccount = Manager::instance().getAccount(aliceId); + auto bobAccount = Manager::instance().getAccount(bobId); + auto bobDeviceId = DeviceId(std::string(bobAccount->currentDeviceId())); + + bobAccount->connectionManager().onICERequest([](const DeviceId&) { return true; }); + aliceAccount->connectionManager().onICERequest([](const DeviceId&) { return true; }); + + std::mutex mtx; + std::unique_lock lk {mtx}; + std::condition_variable cv; + bool successfullyConnected = false; + + std::shared_ptr aliceSocket, bobSocket; + bobAccount->connectionManager().onChannelRequest( + [&](const DeviceId&, const std::string&) { return true; }); + bobAccount->connectionManager().onConnectionReady( + [&](const DeviceId&, const std::string&, std::shared_ptr socket) { + if (socket && socket->name() == "sip") + bobSocket = socket->underlyingSocket(); + cv.notify_one(); + }); + + aliceAccount->connectionManager().connectDevice(bobDeviceId, + "sip", + [&](std::shared_ptr socket, + const DeviceId&) { + if (socket) { + aliceSocket = socket->underlyingSocket(); + successfullyConnected = true; + } + cv.notify_one(); + }); + // connectDevice is full async, so isConnecting will be true after a few ms. + CPPUNIT_ASSERT( + cv.wait_for(lk, std::chrono::seconds(30), [&] { return aliceSocket && bobSocket; })); + + bool isClosed = false; + aliceSocket->onShutdown([&] { + isClosed = true; + cv.notify_one(); + }); + bobSocket->answerToBeacon(false); + aliceAccount->connectionManager().connectivityChanged(); + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(10), [&] { return isClosed; })); +} + } // namespace test } // namespace jami