filetransfer: support channeled file transfers

With this patch, Jami will be able to use a current opened socket with a peer
to transmit files. This means that no ICE negotiation will be necessary
and if multiple files are transferred via the same socket

Change-Id: I8eaf7c38595bbf8e86098d6c8ad21afc9210fe6b
Gitlab: #228
This commit is contained in:
Sébastien Blin
2020-06-08 16:18:20 -04:00
committed by Adrien Béraud
parent e781b9c73a
commit b5c090ec06
13 changed files with 543 additions and 79 deletions

View File

@ -27,6 +27,7 @@
#include "string_utils.h"
#include "map_utils.h"
#include "client/ring_signal.h"
#include "jamidht/p2p.h"
#include <thread>
#include <stdexcept>
@ -41,6 +42,7 @@
#include <cstdlib> // mkstemp
#include <opendht/rng.h>
#include <opendht/thread_pool.h>
namespace jami {
@ -52,6 +54,8 @@ generateUID()
return dist(rd);
}
constexpr const uint32_t MAX_BUFFER_SIZE {65534}; /* Channeled max packet size */
//==============================================================================
class DataTransfer : public Stream
@ -105,6 +109,8 @@ public:
const DRing::DataTransferId id;
virtual void cancel() {}
protected:
mutable std::mutex infoMutex_;
mutable DRing::DataTransferInfo info_;
@ -256,9 +262,64 @@ public:
bool write(const std::vector<uint8_t>& buffer) override;
void emit(DRing::DataTransferEventCode code) const override;
void cancel() override {
if (auto account = Manager::instance().getAccount<JamiAccount>(info_.accountId))
account->closePeerConnection(peerUri_, id);
}
void setOnRecv(std::function<void(std::vector<uint8_t>&&)>&& cb) override {
bool send = false;
{
std::lock_guard<std::mutex> lock(onRecvCbMtx_);
if (cb) send = true;
onRecvCb_ = std::move(cb);
}
if (send) {
std::vector<uint8_t> buf;
sendHeader(buf); // Pass headers to the new callback
}
}
private:
SubOutgoingFileTransfer() = delete;
void sendHeader(std::vector<uint8_t>& buf) const {
std::stringstream ss;
ss << "Content-Length: " << info_.totalSize << '\n'
<< "Display-Name: " << info_.displayName << '\n'
<< "Offset: 0\n"
<< '\n';
auto header = ss.str();
buf.resize(header.size());
std::copy(std::begin(header), std::end(header), std::begin(buf));
headerSent_ = true;
emit(DRing::DataTransferEventCode::wait_peer_acceptance);
if (onRecvCb_)
onRecvCb_(std::move(buf));
}
void sendFile() const {
dht::ThreadPool::computation().run([this]() {
while (!input_.eof() && onRecvCb_) {
std::vector<uint8_t> buf;
buf.resize(MAX_BUFFER_SIZE);
input_.read(reinterpret_cast<char*>(&buf[0]), buf.size());
buf.resize(input_.gcount());
if (buf.size()) {
std::lock_guard<std::mutex> lk {infoMutex_};
info_.bytesProgress += buf.size();
metaInfo_->updateInfo(info_);
}
if (onRecvCb_)
onRecvCb_(std::move(buf));
}
JAMI_DBG() << "FTP#" << getId() << ": sent " << info_.bytesProgress << " bytes";
emit(DRing::DataTransferEventCode::finished);
});
}
mutable std::shared_ptr<OptimisticMetaOutgoingInfo> metaInfo_;
mutable std::ifstream input_;
std::size_t tx_ {0};
@ -267,6 +328,8 @@ private:
const std::string peerUri_;
mutable std::unique_ptr<std::thread> timeoutThread_;
mutable std::atomic_bool stopTimeout_ {false};
std::mutex onRecvCbMtx_;
std::function<void(std::vector<uint8_t>&&)> onRecvCb_ {};
};
SubOutgoingFileTransfer::SubOutgoingFileTransfer(DRing::DataTransferId tid,
@ -301,10 +364,6 @@ SubOutgoingFileTransfer::closeAndEmit(DRing::DataTransferEventCode code) const n
started_ = false; // NOTE: replace DataTransfer::close(); which is non const
input_.close();
// We don't need the connection anymore. Can close it.
auto account = Manager::instance().getAccount<JamiAccount>(info_.accountId);
account->closePeerConnection(peerUri_, id);
if (info_.lastEvent < DRing::DataTransferEventCode::finished)
emit(code);
}
@ -314,18 +373,7 @@ SubOutgoingFileTransfer::read(std::vector<uint8_t>& buf) const
{
// Need to send headers?
if (!headerSent_) {
std::stringstream ss;
ss << "Content-Length: " << info_.totalSize << '\n'
<< "Display-Name: " << info_.displayName << '\n'
<< "Offset: 0\n"
<< '\n';
auto header = ss.str();
buf.resize(header.size());
std::copy(std::begin(header), std::end(header), std::begin(buf));
headerSent_ = true;
emit(DRing::DataTransferEventCode::wait_peer_acceptance);
sendHeader(buf);
return true;
}
@ -365,6 +413,8 @@ SubOutgoingFileTransfer::write(const std::vector<uint8_t>& buffer)
if (buffer.size() == 3 and buffer[0] == 'G' and buffer[1] == 'O' and buffer[2] == '\n') {
peerReady_ = true;
emit(DRing::DataTransferEventCode::ongoing);
if (onRecvCb_)
sendFile();
} else {
// consider any other response as a cancel msg
JAMI_WARN() << "FTP#" << getId() << ": refused by peer";
@ -408,6 +458,7 @@ class OutgoingFileTransfer final : public DataTransfer
{
public:
OutgoingFileTransfer(DRing::DataTransferId tid, const DRing::DataTransferInfo& info);
~OutgoingFileTransfer() {}
std::shared_ptr<DataTransfer> startNewOutgoing(const std::string& peer_uri) {
auto newTransfer = std::make_shared<SubOutgoingFileTransfer>(id, peer_uri, this->metaInfo_);
@ -416,6 +467,14 @@ public:
return newTransfer;
}
bool cancel(bool channeled) {
if (channeled)
cancelChanneled_ = true;
else
cancelIce_ = true;
return cancelChanneled_ && cancelIce_;
}
bool hasBeenStarted() const override
{
// Started if one subtransfer is started
@ -427,9 +486,17 @@ public:
void close() noexcept override;
void cancel() {
for (const auto& subtransfer: subtransfer_)
subtransfer->cancel();
}
private:
OutgoingFileTransfer() = delete;
std::atomic_bool cancelChanneled_ {false};
std::atomic_bool cancelIce_ {false};
mutable std::shared_ptr<OptimisticMetaOutgoingInfo> metaInfo_;
mutable std::ifstream input_;
mutable std::vector<std::shared_ptr<SubOutgoingFileTransfer>> subtransfer_;
@ -456,8 +523,9 @@ OutgoingFileTransfer::OutgoingFileTransfer(DRing::DataTransferId tid, const DRin
void
OutgoingFileTransfer::close() noexcept
{
for (const auto& subtransfer : subtransfer_)
subtransfer->close();
if (cancelChanneled_ && cancelIce_)
for (const auto& subtransfer : subtransfer_)
subtransfer->close();
}
//==============================================================================
@ -465,7 +533,7 @@ OutgoingFileTransfer::close() noexcept
class IncomingFileTransfer final : public DataTransfer
{
public:
IncomingFileTransfer(DRing::DataTransferId, const DRing::DataTransferInfo&);
IncomingFileTransfer(DRing::DataTransferId, const DRing::DataTransferInfo&, DRing::DataTransferId);
bool start() override;
@ -477,16 +545,24 @@ public:
bool write(const uint8_t* buffer, std::size_t length) override;
void cancel() override {
auto account = Manager::instance().getAccount<JamiAccount>(info_.accountId);
if (account) account->closePeerConnection(info_.peer, internalId_);
}
private:
IncomingFileTransfer() = delete;
DRing::DataTransferId internalId_;
std::ofstream fout_;
std::promise<void> filenamePromise_;
};
IncomingFileTransfer::IncomingFileTransfer(DRing::DataTransferId tid,
const DRing::DataTransferInfo& info)
: DataTransfer(tid)
const DRing::DataTransferInfo& info,
DRing::DataTransferId internalId)
: DataTransfer(tid), internalId_(internalId)
{
JAMI_WARN() << "[FTP] incoming transfert of " << info.totalSize << " byte(s): " << info.displayName;
@ -531,6 +607,11 @@ IncomingFileTransfer::start()
void
IncomingFileTransfer::close() noexcept
{
{
std::lock_guard<std::mutex> lk {infoMutex_};
if (info_.lastEvent >= DRing::DataTransferEventCode::finished)
return;
}
DataTransfer::close();
try {
@ -539,10 +620,6 @@ IncomingFileTransfer::close() noexcept
fout_.close();
// We don't need the connection anymore. Can close it.
auto account = Manager::instance().getAccount<JamiAccount>(info_.accountId);
account->closePeerConnection(info_.peer, id);
JAMI_DBG() << "[FTP] file closed, rx " << info_.bytesProgress
<< " on " << info_.totalSize;
if (info_.bytesProgress >= info_.totalSize)
@ -588,7 +665,7 @@ public:
std::shared_ptr<DataTransfer> createOutgoingFileTransfer(const DRing::DataTransferInfo& info,
DRing::DataTransferId& tid);
std::shared_ptr<IncomingFileTransfer> createIncomingFileTransfer(const DRing::DataTransferInfo&);
std::shared_ptr<IncomingFileTransfer> createIncomingFileTransfer(const DRing::DataTransferInfo&, const DRing::DataTransferId&);
std::shared_ptr<DataTransfer> getTransfer(const DRing::DataTransferId&);
void cancel(DataTransfer&);
void onConnectionRequestReply(const DRing::DataTransferId&, PeerConnection*);
@ -625,10 +702,10 @@ DataTransferFacade::Impl::createOutgoingFileTransfer(const DRing::DataTransferIn
}
std::shared_ptr<IncomingFileTransfer>
DataTransferFacade::Impl::createIncomingFileTransfer(const DRing::DataTransferInfo& info)
DataTransferFacade::Impl::createIncomingFileTransfer(const DRing::DataTransferInfo& info, const DRing::DataTransferId& internal_id)
{
auto tid = generateUID();
auto transfer = std::make_shared<IncomingFileTransfer>(tid, info);
auto transfer = std::make_shared<IncomingFileTransfer>(tid, info, internal_id);
{
std::lock_guard<std::mutex> lk {mapMutex_};
map_.emplace(tid, transfer);
@ -648,7 +725,8 @@ DataTransferFacade::Impl::onConnectionRequestReply(const DRing::DataTransferId&
connection->getPeerUri()
)
);
} else if (not transfer->hasBeenStarted()) {
} else if (std::dynamic_pointer_cast<OutgoingFileTransfer>(transfer)->cancel(false)
and not transfer->hasBeenStarted()) {
transfer->emit(DRing::DataTransferEventCode::unjoinable_peer);
cancel(*transfer);
}
@ -705,6 +783,21 @@ DataTransferFacade::sendFile(const DRing::DataTransferInfo& info,
info.peer, tid,
[this, tid] (PeerConnection* connection) {
pimpl_->onConnectionRequestReply(tid, connection);
},
[this, tid](const std::shared_ptr<ChanneledOutgoingTransfer>& out) {
if (auto transfer = pimpl_->getTransfer(tid))
if (out)
out->linkTransfer(
std::dynamic_pointer_cast<OutgoingFileTransfer>(transfer)->startNewOutgoing(out->peer())
);
},
[this, tid]() {
if (auto transfer = pimpl_->getTransfer(tid))
if (std::dynamic_pointer_cast<OutgoingFileTransfer>(transfer)->cancel(true)
and not transfer->hasBeenStarted()) {
transfer->emit(DRing::DataTransferEventCode::unjoinable_peer);
pimpl_->cancel(*transfer);
}
});
return DRing::DataTransferError::success;
} catch (const std::exception& ex) {
@ -734,6 +827,7 @@ DRing::DataTransferError
DataTransferFacade::cancel(const DRing::DataTransferId& id) noexcept
{
if (auto transfer = pimpl_->getTransfer(id)) {
transfer->cancel();
pimpl_->cancel(*transfer);
return DRing::DataTransferError::success;
}
@ -744,7 +838,12 @@ void
DataTransferFacade::close(const DRing::DataTransferId &id) noexcept
{
std::lock_guard<std::mutex> lk {pimpl_->mapMutex_};
pimpl_->map_.erase(id);
const auto& iter = pimpl_->map_.find(static_cast<uint64_t>(id));
if (iter != std::end(pimpl_->map_)) {
// NOTE: don't erase from map. The client can retrieve
// related info() to know if the file is finished.
iter->second->close();
}
}
DRing::DataTransferError
@ -779,16 +878,24 @@ DataTransferFacade::info(const DRing::DataTransferId& id,
return DRing::DataTransferError::unknown;
}
DRing::DataTransferId
DataTransferFacade::createIncomingTransfer(const DRing::DataTransferInfo &info, const DRing::DataTransferId& internal_id)
{
auto transfer = pimpl_->createIncomingFileTransfer(info, internal_id);
if (!transfer)
return {};
return transfer->getId();
}
IncomingFileInfo
DataTransferFacade::onIncomingFileRequest(const DRing::DataTransferInfo &info) {
auto transfer = pimpl_->createIncomingFileTransfer(info);
if (!transfer)
return {};
auto filename = transfer->requestFilename();
if (!filename.empty())
if (transfer->start())
return {transfer->getId(), std::static_pointer_cast<Stream>(transfer)};
return {transfer->getId(), nullptr};
DataTransferFacade::onIncomingFileRequest(const DRing::DataTransferId& id)
{
if (auto transfer = std::static_pointer_cast<IncomingFileTransfer>(pimpl_->getTransfer(id))) {
auto filename = transfer->requestFilename();
if (!filename.empty() && transfer->start())
return {id, std::static_pointer_cast<Stream>(transfer)};
}
return {id, nullptr};
}
} // namespace jami

View File

@ -67,9 +67,12 @@ public:
DRing::DataTransferError bytesProgress(const DRing::DataTransferId& id, int64_t& total,
int64_t& progress) const noexcept;
/// Used by p2p.cpp
DRing::DataTransferId createIncomingTransfer(const DRing::DataTransferInfo &info, const DRing::DataTransferId& internal_id);
/// Create an IncomingFileTransfer object.
/// \return a shared pointer on created Stream object, or nullptr in case of error
IncomingFileInfo onIncomingFileRequest(const DRing::DataTransferInfo &info);
IncomingFileInfo onIncomingFileRequest(const DRing::DataTransferId& id);
private:
class Impl;

View File

@ -35,10 +35,12 @@ namespace jami {
//==============================================================================
FtpServer::FtpServer(const std::string& account_id,
const std::string& peer_uri)
const std::string& peer_uri,
const DRing::DataTransferId& outId)
: Stream()
, accountId_ {account_id}
, peerUri_ {peer_uri}
, outId_ {outId}
{}
DRing::DataTransferId
@ -46,6 +48,8 @@ FtpServer::getId() const
{
// Because FtpServer is just the protocol on the top of a stream so the id
// of the stream is the id of out_.
if (isTreatingRequest_)
return transferId_;
return out_.id;
}
@ -67,13 +71,28 @@ FtpServer::startNewFile()
info.totalSize = fileSize_;
info.bytesProgress = 0;
rx_ = 0;
out_ = Manager::instance().dataTransfers->onIncomingFileRequest(info); // we block here until answer from client
transferId_ = Manager::instance().dataTransfers->createIncomingTransfer(info, outId_); // return immediately
isTreatingRequest_ = true;
out_ = Manager::instance().dataTransfers->onIncomingFileRequest(transferId_); // we block here until answer from client
isTreatingRequest_ = false;
if (!out_.stream) {
JAMI_DBG() << "[FTP] transfer aborted by client";
closed_ = true; // send NOK msg at next read()
} else {
go_ = true;
}
if (onRecvCb_) {
std::vector<uint8_t> buffer;
if (go_) {
buffer.resize(3);
buffer[0] = 'G'; buffer[1] = 'O'; buffer[2] = '\n';
} else {
buffer.resize(4);
buffer[0] = 'N'; buffer[1] = 'G'; buffer[2] = 'O'; buffer[3] = '\n';
}
onRecvCb_(std::move(buffer));
}
return bool(out_.stream);
}

View File

@ -30,16 +30,22 @@
namespace jami {
using RecvCb = std::function<void(std::vector<uint8_t>&& buf)>;
class FtpServer final : public Stream
{
public:
FtpServer(const std::string& account_id, const std::string& peer_uri);
FtpServer(const std::string& account_id, const std::string& peer_uri, const DRing::DataTransferId& outId = 0);
bool read(std::vector<uint8_t>& buffer) const override;
bool write(const std::vector<uint8_t>& buffer) override;
DRing::DataTransferId getId() const override;
void close() noexcept override;
void setOnRecv(RecvCb&& cb) {
onRecvCb_ = cb;
}
private:
bool parseStream(const std::vector<uint8_t>&);
bool parseLine(const std::string&);
@ -54,7 +60,10 @@ private:
const std::string accountId_;
const std::string peerUri_;
std::atomic_bool isTreatingRequest_ {false};
DRing::DataTransferId transferId_ {0};
IncomingFileInfo out_ {0, nullptr};
DRing::DataTransferId outId_ {0};
std::size_t fileSize_ {0};
std::size_t rx_ {0};
std::stringstream headerStream_;
@ -63,6 +72,8 @@ private:
mutable bool closed_ {false};
mutable bool go_ {false};
FtpState state_ {FtpState::PARSE_HEADERS};
RecvCb onRecvCb_ {};
};
} // namespace jami

View File

@ -19,6 +19,8 @@ libringacc_la_SOURCES = \
connectionmanager.cpp \
channeled_transport.h \
channeled_transport.cpp \
channeled_transfers.h \
channeled_transfers.cpp \
multiplexed_socket.h \
multiplexed_socket.cpp \
sips_transport_ice.cpp \

View File

@ -0,0 +1,106 @@
/*
* Copyright (C) 2020 Savoir-faire Linux Inc.
*
* Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "channeled_transfers.h"
#include "ftp_server.h"
#include "multiplexed_socket.h"
#include <opendht/thread_pool.h>
#include "jamiaccount.h"
namespace jami {
ChanneledOutgoingTransfer::ChanneledOutgoingTransfer(const std::shared_ptr<ChannelSocket>& channel)
: channel_(channel)
{}
ChanneledOutgoingTransfer::~ChanneledOutgoingTransfer()
{
channel_->setOnRecv({});
file_->setOnRecv({});
channel_->shutdown();
}
std::string
ChanneledOutgoingTransfer::peer() const
{
return channel_ ? "" : channel_->deviceId();
}
void
ChanneledOutgoingTransfer::linkTransfer(const std::shared_ptr<Stream>& file)
{
if (!file) return;
file_ = file;
channel_->setOnRecv([this](const uint8_t* buf, size_t len) {
dht::ThreadPool::io().run([
rx=std::vector<uint8_t>(buf, buf+len),
file=std::weak_ptr<Stream>(file_)] {
if (auto f = file.lock())
f->write(rx);
});
return len;
});
file_->setOnRecv([channel = std::weak_ptr<ChannelSocket>(channel_)](std::vector<uint8_t>&& data) {
if (auto c = channel.lock()) {
std::error_code ec;
c->write(data.data(), data.size(), ec);
}
});
}
ChanneledIncomingTransfer::ChanneledIncomingTransfer(const std::shared_ptr<ChannelSocket>& channel, const std::shared_ptr<FtpServer>& ftp)
: ftp_ (ftp)
, channel_(channel)
{
channel_->setOnRecv([this](const uint8_t* buf, size_t len) {
dht::ThreadPool::io().run([
rx=std::vector<uint8_t>(buf, buf+len),
ftp=std::weak_ptr<FtpServer>(ftp_)] {
if (auto f = ftp.lock())
f->write(rx);
});
return len;
});
ftp_->setOnRecv([channel = std::weak_ptr<ChannelSocket>(channel_)](std::vector<uint8_t>&& data) {
if (auto c = channel.lock()) {
std::error_code ec;
c->write(data.data(), data.size(), ec);
}
});
}
ChanneledIncomingTransfer::~ChanneledIncomingTransfer()
{
channel_->setOnRecv({});
channel_->shutdown();
}
DRing::DataTransferId
ChanneledIncomingTransfer::id() const
{
if (ftp_)
return ftp_->getId();
return 0;
}
}

View File

@ -0,0 +1,55 @@
/*
* Copyright (C) 2020 Savoir-faire Linux Inc.
*
* Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#pragma once
#include <string>
#include <memory>
#include "dring/datatransfer_interface.h"
namespace jami {
class ChannelSocket;
class Stream;
class FtpServer;
class ChanneledOutgoingTransfer {
public:
ChanneledOutgoingTransfer(const std::shared_ptr<ChannelSocket>& channel);
~ChanneledOutgoingTransfer();
void linkTransfer(const std::shared_ptr<Stream>& file);
std::string peer() const;
private:
std::shared_ptr<ChannelSocket> channel_ {};
std::shared_ptr<Stream> file_;
};
class ChanneledIncomingTransfer {
public:
ChanneledIncomingTransfer(const std::shared_ptr<ChannelSocket>& channel, const std::shared_ptr<FtpServer>& ftp);
~ChanneledIncomingTransfer();
DRing::DataTransferId id() const;
private:
std::shared_ptr<FtpServer> ftp_;
std::shared_ptr<ChannelSocket> channel_;
};
}

View File

@ -2001,18 +2001,39 @@ JamiAccount::doRegister_()
auto result = fut.get();
return result;
});
connectionManager_->onChannelRequest([](const std::string& /* deviceId */, const std::string& name) {
connectionManager_->onChannelRequest([this](const std::string& /* deviceId */, const std::string& name) {
if (name == "sip") {
return true;
} else if (name.substr(0, 7) == "file://") {
auto tid_str = name.substr(7);
uint64_t tid;
std::istringstream iss(tid_str);
iss >> tid;
if (dhtPeerConnector_->onIncomingChannelRequest(tid)) {
incomingFileTransfers_.emplace(tid_str);
return true;
}
}
return false;
});
connectionManager_->onConnectionReady([this](const std::string& deviceId, const std::string& name, std::shared_ptr<ChannelSocket> channel) {
if (channel && name == "sip") {
if (channel) {
auto cert = tls::CertificateStore::instance().getCertificate(deviceId);
if (!cert || !cert->issuer) return;
auto peerId = cert->issuer->getId().toString();
if (channel) cacheSIPConnection(std::move(channel), peerId, deviceId);
if (name == "sip") {
cacheSIPConnection(std::move(channel), peerId, deviceId);
} else if (name.substr(0, 7) == "file://") {
auto tid_str = name.substr(7);
auto it = incomingFileTransfers_.find(tid_str);
// Note, outgoing file transfers are ignored.
if (it == incomingFileTransfers_.end()) return;
incomingFileTransfers_.erase(it);
uint64_t tid;
std::istringstream iss(tid_str);
iss >> tid;
dhtPeerConnector_->onIncomingConnection(peerId, tid, std::move(channel));
}
}
});
@ -3038,9 +3059,11 @@ JamiAccount::publicAddresses()
void
JamiAccount::requestPeerConnection(const std::string& peer_id, const DRing::DataTransferId& tid,
const std::function<void(PeerConnection*)>& connect_cb)
const std::function<void(PeerConnection*)>& connect_cb,
const std::function<void(const std::shared_ptr<ChanneledOutgoingTransfer>&)>& channeledConnectedCb,
const std::function<void()>& onChanneledCancelled)
{
dhtPeerConnector_->requestConnection(peer_id, tid, connect_cb);
dhtPeerConnector_->requestConnection(peer_id, tid, connect_cb, channeledConnectedCb, onChanneledCancelled);
}
void

View File

@ -78,6 +78,7 @@ class AccountManager;
struct AccountInfo;
class ChannelSocket;
class SipTransport;
class ChanneledOutgoingTransfer;
/**
* @brief Ring Account is build on top of SIPAccountBase and uses DHT to handle call connectivity.
@ -353,7 +354,9 @@ public:
/// /// \param[in] tid linked outgoing data transfer
///
void requestPeerConnection(const std::string& peer, const DRing::DataTransferId& tid,
const std::function<void(PeerConnection*)>& connect_cb);
const std::function<void(PeerConnection*)>& connect_cb,
const std::function<void(const std::shared_ptr<ChanneledOutgoingTransfer>&)>& channeledConnectedCb,
const std::function<void()>& onChanneledCancelled);
///
/// Close a E2E connection between a given peer and a given transfer id.
@ -716,6 +719,9 @@ private:
* @param deviceId Device linked to that transport
*/
void cacheSIPConnection(std::shared_ptr<ChannelSocket>&& socket, const std::string& peerId, const std::string& deviceId);
// File transfers
std::set<std::string> incomingFileTransfers_ {};
};
static inline std::ostream& operator<< (std::ostream& os, const JamiAccount& acc)

View File

@ -30,6 +30,8 @@
#include "peer_connection.h"
#include "turn_transport.h"
#include "account_manager.h"
#include "multiplexed_socket.h"
#include "connectionmanager.h"
#include <opendht/default_types.h>
#include <opendht/rng.h>
@ -101,22 +103,20 @@ public:
dht::Value::Id id = dht::Value::INVALID_ID;
uint32_t protocol {protocol_version}; ///< Protocol identification. First bit reserved to indicate a request (0) or a response (1)
std::vector<std::string> addresses; ///< Request: public addresses for TURN permission. Response: TURN relay addresses (only 1 in current implementation)
MSGPACK_DEFINE_MAP(id, protocol, addresses)
uint64_t tid {0};
MSGPACK_DEFINE_MAP(id, protocol, addresses, tid)
PeerConnectionMsg() = default;
PeerConnectionMsg(dht::Value::Id id, uint32_t aprotocol, const std::string& arelay)
: id {id}, protocol {aprotocol}, addresses {{arelay}} {}
PeerConnectionMsg(dht::Value::Id id, uint32_t aprotocol, const std::vector<std::string>& asrelay)
: id {id}, protocol {aprotocol}, addresses {asrelay} {}
PeerConnectionMsg(dht::Value::Id id, uint32_t aprotocol, const std::string& arelay, uint64_t transfer_id)
: id {id}, protocol {aprotocol}, addresses {{arelay}}, tid {transfer_id} {}
PeerConnectionMsg(dht::Value::Id id, uint32_t aprotocol, const std::vector<std::string>& asrelay, uint64_t transfer_id)
: id {id}, protocol {aprotocol}, addresses {asrelay}, tid {transfer_id} {}
bool isRequest() const noexcept { return (protocol & 1) == 0; }
PeerConnectionMsg respond(const IpAddr& relay) const {
return {id, protocol|1, relay.toString(true, true)};
return {id, protocol|1, relay.toString(true, true), tid};
}
PeerConnectionMsg respond(const std::vector<std::string>& addresses) const {
return {id, protocol|1, addresses};
return {id, protocol|1, addresses, tid};
}
};
@ -172,6 +172,7 @@ public:
std::mutex clientsMutex_;
void cancel(const std::string& peer_id, const DRing::DataTransferId& tid);
void cancelChanneled(const DRing::DataTransferId& tid);
void onRequestMsg(PeerConnectionMsg&&);
void onTrustedRequestMsg(PeerConnectionMsg&&, const std::shared_ptr<dht::crypto::Certificate>&,
@ -203,6 +204,15 @@ public:
std::weak_ptr<DhtPeerConnector::Impl const> weak() const {
return std::static_pointer_cast<DhtPeerConnector::Impl const>(shared_from_this());
}
// For Channeled transports
std::mutex channeledIncomingMtx_;
std::map<DRing::DataTransferId, std::unique_ptr<ChanneledIncomingTransfer>> channeledIncoming_;
std::mutex channeledOutgoingMtx_;
std::map<DRing::DataTransferId, std::shared_ptr<ChanneledOutgoingTransfer>> channeledOutgoing_;
std::mutex incomingTransfersMtx_;
std::set<DRing::DataTransferId> incomingTransfers_;
};
//==============================================================================
@ -222,11 +232,14 @@ public:
const std::shared_ptr<dht::crypto::Certificate>& peer_cert,
const std::vector<std::string>& public_addresses,
const ListenerFunction& connect_cb)
: parent_ {parent}
, tid_ {tid}
: tid_ {tid}
, parent_ {parent}
, peer_ {peer_h}
, publicAddresses_ {public_addresses}
, peerCertificate_ {peer_cert} {
auto shared = parent_.account.lock();
if (!shared) return;
waitId_ = ValueIdDist()(shared->rand);
addListener(connect_cb);
processTask_ = std::async(
std::launch::async,
@ -243,7 +256,6 @@ public:
for (auto& cb: listeners_)
cb(nullptr);
connection_.reset();
}
bool hasAlreadyAResponse() {
@ -274,6 +286,7 @@ public:
responseCV_.notify_all();
}
const DRing::DataTransferId tid_;
private:
void process() {
// Add ice msg into the addresses
@ -304,10 +317,10 @@ private:
// Prepare connection request as a DHT message
PeerConnectionMsg request;
request.id = ValueIdDist()(acc->rand); /* Random id for the message unicity */
waitId_ = request.id;
request.id = waitId_; /* Random id for the message unicity */
request.addresses = {icemsg.str()};
request.addresses.insert(request.addresses.end(), publicAddresses_.begin(), publicAddresses_.end());
request.tid = tid_;
// Send connection request through DHT
JAMI_DBG() << acc << "[CNX] request connection to " << peer_;
@ -419,7 +432,6 @@ private:
}
Impl& parent_;
const DRing::DataTransferId tid_;
const dht::InfoHash peer_;
std::vector<std::string> publicAddresses_;
@ -493,10 +505,19 @@ DhtPeerConnector::Impl::answerToRequest(PeerConnectionMsg&& request,
auto acc = account.lock();
if (!acc) return;
if (request.tid != 0) {
std::lock_guard<std::mutex> lk(incomingTransfersMtx_);
if (incomingTransfers_.find(request.tid) != incomingTransfers_.end()) {
JAMI_INFO("Incoming request for id(%lu) is already treated via channeled socket", request.tid);
return;
}
incomingTransfers_.emplace(request.tid);
}
// Save peer certificate for later TLS session (MUST BE DONE BEFORE TURN PEER AUTHORIZATION)
certMap_.emplace(cert->getId(), std::make_pair(cert, peer_h));
auto sendRelayV4 = false, sendRelayV6 = false, sendIce = false, hasPubIp = false;
auto sendIce = false, hasPubIp = false;
struct IceReady {
std::mutex mtx {};
@ -722,6 +743,29 @@ DhtPeerConnector::Impl::cancel(const std::string& peer_id, const DRing::DataTran
});
}
void
DhtPeerConnector::Impl::cancelChanneled(const DRing::DataTransferId& tid) {
dht::ThreadPool::io().run([w=weak(), tid] {
auto shared = w.lock();
if (!shared) return;
// Cancel outgoing files
DRing::DataTransferId finalId = tid;
{
std::lock_guard<std::mutex> lk(shared->channeledIncomingMtx_);
auto it = shared->channeledIncoming_.find(tid);
if (it != shared->channeledIncoming_.end()) {
finalId = it->second->id();
}
shared->channeledIncoming_.erase(tid);
}
{
std::lock_guard<std::mutex> lk(shared->channeledOutgoingMtx_);
shared->channeledOutgoing_.erase(tid);
}
Manager::instance().dataTransfers->close(finalId);
});
}
//==============================================================================
DhtPeerConnector::DhtPeerConnector(JamiAccount& account)
@ -761,7 +805,9 @@ DhtPeerConnector::onDhtConnected(const std::string& device_id)
void
DhtPeerConnector::requestConnection(const std::string& peer_id,
const DRing::DataTransferId& tid,
const std::function<void(PeerConnection*)>& connect_cb)
const std::function<void(PeerConnection*)>& connect_cb,
const std::function<void(const std::shared_ptr<ChanneledOutgoingTransfer>&)>& channeledConnectedCb,
const std::function<void()>& onChanneledCancelled)
{
const auto peer_h = dht::InfoHash(peer_id);
@ -776,16 +822,9 @@ DhtPeerConnector::requestConnection(const std::string& peer_id,
if (!acc) return;
auto addresses = acc->publicAddresses();
// Add local addresses
// XXX: is it really needed? use-case? a local TURN server?
//addresses.emplace_back(ip_utils::getLocalAddr(AF_INET));
//addresses.emplace_back(ip_utils::getLocalAddr(AF_INET6));
// TODO: bypass DHT devices lookup if connection already exist
acc->forEachDevice(
peer_h,
[this, addresses, connect_cb, tid](const dht::InfoHash& dev_h) {
[this, addresses, connect_cb, tid, channeledConnectedCb, onChanneledCancelled](const dht::InfoHash& dev_h) {
auto acc = pimpl_->account.lock();
if (!acc) return;
if (dev_h == acc->dht()->getId()) {
@ -793,6 +832,41 @@ DhtPeerConnector::requestConnection(const std::string& peer_id,
return;
}
acc->connectionManager().connectDevice(dev_h.toString(), "file://" + std::to_string(tid),
[this, tid, channeledConnectedCb, onChanneledCancelled, connect_cb](const std::shared_ptr<ChannelSocket>& channel) {
auto shared = pimpl_->account.lock();
if (!channel) {
onChanneledCancelled();
return;
}
if (!shared) return;
JAMI_INFO("New file channel for outgoing transfer with id(%lu)", tid);
auto outgoingFile = std::make_shared<ChanneledOutgoingTransfer>(channel);
{
std::lock_guard<std::mutex> lk(pimpl_->channeledOutgoingMtx_);
pimpl_->channeledOutgoing_.emplace(tid, outgoingFile);
}
channel->onShutdown([this, tid, onChanneledCancelled]() {
JAMI_INFO("Channel down for outgoing transfer with id(%lu)", tid);
onChanneledCancelled();
dht::ThreadPool::io().run([w=pimpl_->weak(), tid] {
auto shared = w.lock();
if (!shared) return;
// Cancel outgoing files
{
std::lock_guard<std::mutex> lk(shared->channeledOutgoingMtx_);
shared->channeledOutgoing_.erase(tid);
}
Manager::instance().dataTransfers->close(tid);
});
});
// Cancel via DHT because we will use the channeled path
connect_cb(nullptr);
channeledConnectedCb(outgoingFile);
});
acc->findCertificate(
dev_h,
[this, dev_h, addresses, connect_cb, tid] (const std::shared_ptr<dht::crypto::Certificate>& cert) {
@ -800,17 +874,66 @@ DhtPeerConnector::requestConnection(const std::string& peer_id,
});
},
[this, peer_h, connect_cb, accId = acc->getAccountID()](bool found) {
[this, peer_h, connect_cb, onChanneledCancelled, accId = acc->getAccountID()](bool found) {
if (!found) {
JAMI_WARN() << accId << "[CNX] aborted, no devices for " << peer_h;
connect_cb(nullptr);
onChanneledCancelled();
}
});
}
void
DhtPeerConnector::closeConnection(const std::string& peer_id, const DRing::DataTransferId& tid) {
DhtPeerConnector::closeConnection(const std::string& peer_id, const DRing::DataTransferId& tid)
{
pimpl_->cancel(peer_id, tid);
pimpl_->cancelChanneled(tid);
}
bool
DhtPeerConnector::onIncomingChannelRequest(const DRing::DataTransferId& tid)
{
std::lock_guard<std::mutex> lk(pimpl_->incomingTransfersMtx_);
if (pimpl_->incomingTransfers_.find(tid) != pimpl_->incomingTransfers_.end()) {
JAMI_INFO("Incoming transfer request with id(%lu) is already treated via DHT", tid);
return false;
}
pimpl_->incomingTransfers_.emplace(tid);
JAMI_INFO("Incoming transfer request with id(%lu)", tid);
return true;
}
void
DhtPeerConnector::onIncomingConnection(const std::string& peer_id, const DRing::DataTransferId& tid, const std::shared_ptr<ChannelSocket>& channel)
{
if (!channel) return;
auto acc = pimpl_->account.lock();
if (!acc) return;
auto incomingFile = std::make_unique<ChanneledIncomingTransfer>(channel, std::make_shared<FtpServer>(acc->getAccountID(), peer_id, tid));
{
std::lock_guard<std::mutex> lk(pimpl_->channeledIncomingMtx_);
pimpl_->channeledIncoming_.emplace(tid, std::move(incomingFile));
}
channel->onShutdown([this, tid]() {
JAMI_INFO("Channel down for incoming transfer with id(%lu)", tid);
dht::ThreadPool::io().run([w=pimpl_->weak(), tid] {
auto shared = w.lock();
if (!shared) return;
// Cancel incoming files
DRing::DataTransferId internalId = 0;
{
std::lock_guard<std::mutex> lk(shared->channeledIncomingMtx_);
auto it = shared->channeledIncoming_.find(tid);
if (it != shared->channeledIncoming_.end())
internalId = it->second->id();
shared->channeledIncoming_.erase(tid);
}
if (internalId != 0) {
Manager::instance().dataTransfers->close(internalId);
}
});
});
}
} // namespace jami

View File

@ -22,6 +22,7 @@
#pragma once
#include "dring/datatransfer_interface.h"
#include "channeled_transfers.h"
#include <string>
#include <memory>
@ -38,9 +39,13 @@ public:
~DhtPeerConnector();
void onDhtConnected(const std::string& device_id);
void requestConnection(const std::string& peer_id, const DRing::DataTransferId& tid, const std::function<void(PeerConnection*)>& connect_cb);
void requestConnection(const std::string& peer_id, const DRing::DataTransferId& tid,
const std::function<void(PeerConnection*)>& connect_cb,
const std::function<void(const std::shared_ptr<ChanneledOutgoingTransfer>&)>& channeledConnectedCb,
const std::function<void()>& onChanneledCancelled);
void closeConnection(const std::string& peer_id, const DRing::DataTransferId& tid);
bool onIncomingChannelRequest(const DRing::DataTransferId& tid);
void onIncomingConnection(const std::string& peer_id, const DRing::DataTransferId& tid, const std::shared_ptr<ChannelSocket>& channel);
private:
DhtPeerConnector() = delete;

View File

@ -16,6 +16,7 @@ libjami_sources = files(
'hooks/urlhook.cpp',
'im/instant_messaging.cpp',
'im/message_engine.cpp',
'jamidht/channeled_transfers.cpp',
'jamidht/eth/libdevcore/Common.cpp',
'jamidht/eth/libdevcore/CommonData.cpp',
'jamidht/eth/libdevcore/FixedHash.cpp',

View File

@ -73,6 +73,9 @@ public:
(void)length;
return false;
};
virtual void setOnRecv(std::function<void(std::vector<uint8_t>&&)>&&) {
// Not implemented
}
};
//==============================================================================