swarm: only request a connection when peer is detected if a sync is needed

fetched check the devices who has fetched since the last commit. With this file
we can detect if a device needs to fetch and if it's the case we start a connection.
cacheSipConnection will sync the conversations when called. This avoids the
daemon to connect too all contacts

Change-Id: Id85db09c3c0a6aa44dd48b3bbee5ed7e0a5d6b84
This commit is contained in:
Sébastien Blin
2021-06-11 12:23:28 -04:00
parent a21f1d8ce9
commit 8a753f0463
4 changed files with 109 additions and 128 deletions

View File

@ -119,9 +119,7 @@ public:
if (!repository_) {
throw std::logic_error("Couldn't create repository");
}
if (auto shared = account_.lock())
transferManager_ = std::make_shared<TransferManager>(shared->getAccountID(),
repository_->id());
init();
}
Impl(const std::weak_ptr<JamiAccount>& account, const std::string& conversationId)
@ -131,9 +129,7 @@ public:
if (!repository_) {
throw std::logic_error("Couldn't create repository");
}
if (auto shared = account_.lock())
transferManager_ = std::make_shared<TransferManager>(shared->getAccountID(),
repository_->id());
init();
}
Impl(const std::weak_ptr<JamiAccount>& account,
@ -151,10 +147,22 @@ public:
}
throw std::logic_error("Couldn't clone repository");
}
if (auto shared = account_.lock())
init();
}
void init()
{
if (auto shared = account_.lock()) {
transferManager_ = std::make_shared<TransferManager>(shared->getAccountID(),
repository_->id());
conversationDataPath_ = fileutils::get_data_dir() + DIR_SEPARATOR_STR
+ shared->getAccountID() + DIR_SEPARATOR_STR
+ "conversation_data" + DIR_SEPARATOR_STR + repository_->id();
fetchedPath_ = fileutils::get_data_dir() + DIR_SEPARATOR_STR + "fetched";
loadFetched();
}
}
~Impl() = default;
bool isAdmin() const;
@ -233,6 +241,25 @@ public:
}
}
void loadFetched()
{
try {
// read file
auto file = fileutils::loadFile(fetchedPath_);
// load values
msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
std::lock_guard<std::mutex> lk {fetchedDevicesMtx_};
oh.get().convert(fetchedDevices_);
} catch (const std::exception& e) {
return;
}
}
void saveFetched()
{
std::ofstream file(fetchedPath_, std::ios::trunc | std::ios::binary);
msgpack::pack(file, fetchedDevices_);
}
std::unique_ptr<ConversationRepository> repository_;
std::weak_ptr<JamiAccount> account_;
std::atomic_bool isRemoving_ {false};
@ -248,6 +275,10 @@ public:
std::set<std::string> fetchingRemotes_ {}; // store current remote in fetch
std::deque<std::tuple<std::string, std::string, OnPullCb>> pullcbs_ {};
std::shared_ptr<TransferManager> transferManager_ {};
std::string conversationDataPath_ {};
std::string fetchedPath_ {};
std::mutex fetchedDevicesMtx_ {};
std::set<std::string> fetchedDevices_ {};
};
bool
@ -610,6 +641,7 @@ Conversation::sendMessage(const Json::Value& value,
std::unique_lock<std::mutex> lk(sthis->pimpl_->writeMtx_);
auto commit = sthis->pimpl_->repository_->commitMessage(
Json::writeString(wbuilder, value));
sthis->clearFetched();
sthis->pimpl_->announce(commit);
lk.unlock();
if (cb)
@ -1005,4 +1037,27 @@ Conversation::downloadFile(const std::string& interactionId,
return true;
}
void
Conversation::clearFetched()
{
std::lock_guard<std::mutex> lk(pimpl_->fetchedDevicesMtx_);
pimpl_->fetchedDevices_.clear();
pimpl_->saveFetched();
}
bool
Conversation::needsFetch(const std::string& deviceId) const
{
std::lock_guard<std::mutex> lk(pimpl_->fetchedDevicesMtx_);
return pimpl_->fetchedDevices_.find(deviceId) != pimpl_->fetchedDevices_.end();
}
void
Conversation::hasFetched(const std::string& deviceId)
{
std::lock_guard<std::mutex> lk(pimpl_->fetchedDevicesMtx_);
pimpl_->fetchedDevices_.emplace(deviceId);
pimpl_->saveFetched();
}
} // namespace jami

View File

@ -291,6 +291,22 @@ public:
std::size_t start = 0,
std::size_t end = 0);
/**
* Reset fetched informations
*/
void clearFetched();
/**
* Check if a device has fetched last commit
* @param deviceId
*/
bool needsFetch(const std::string& deviceId) const;
/**
* Store informations about who fetch or not. This simplify sync (sync when a device without the
* last fetch is detected)
* @param deviceId
*/
void hasFetched(const std::string& deviceId);
private:
std::shared_ptr<Conversation> shared()
{

View File

@ -1951,7 +1951,20 @@ JamiAccount::trackPresence(const dht::InfoHash& h, BuddyInfo& buddy)
if (not expired) {
// Retry messages every time a new device announce its presence
sthis->messageEngine_.onPeerOnline(h.toString());
sthis->requestSIPConnection(h.toString(), dev.dev);
auto deviceId = dev.dev.toString();
auto needsSync = false;
{
std::unique_lock<std::mutex> lk(sthis->conversationsMtx_);
for (const auto& [_, conv] : sthis->conversations_) {
if (conv->isMember(deviceId, false) && conv->needsFetch(deviceId)) {
needsSync = true;
break;
}
}
}
if (needsSync)
sthis->requestSIPConnection(h.toString(),
dev.dev); // Both sides will sync conversations
}
if (isConnected and not wasConnected) {
sthis->onTrackedBuddyOnline(h);
@ -2416,11 +2429,21 @@ JamiAccount::doRegister_()
deviceId.to_c_str(),
channel->channel());
auto gs = std::make_unique<GitServer>(accountId, conversationId, channel);
gs->setOnFetched([w = weak(), conversationId](const std::string&) {
gs->setOnFetched([w = weak(), conversationId, deviceId](const std::string&) {
auto shared = w.lock();
if (!shared)
return;
shared->removeRepository(conversationId, true);
auto remove = false;
{
std::unique_lock<std::mutex> lk(shared->conversationsMtx_);
auto it = shared->conversations_.find(conversationId);
if (it != shared->conversations_.end() && it->second) {
remove = it->second->isRemoving();
it->second->hasFetched(deviceId.toString());
}
}
if (remove)
shared->removeRepository(conversationId, true);
});
const dht::Value::Id serverId = ValueIdDist()(rand);
{
@ -2523,56 +2546,10 @@ JamiAccount::doRegister_()
if (!dhtPeerConnector_)
dhtPeerConnector_ = std::make_unique<DhtPeerConnector>(*this);
{
std::lock_guard<std::mutex> lock(buddyInfoMtx);
for (auto& buddy : trackedBuddies_) {
buddy.second.devices_cnt = 0;
trackPresence(buddy.first, buddy.second);
}
}
if (needsConvSync_.exchange(false)) {
// Clone malformed conversations if needed
// (no-sync with others as onPeerOnline will do the job)
auto info = accountManager_->getInfo();
if (!info)
return;
std::lock_guard<std::mutex> lk(conversationsMtx_);
for (const auto& c : info->conversations) {
if (!c.removed) {
auto it = conversations_.find(c.id);
if (it == conversations_.end()) {
std::shared_ptr<std::atomic_bool> willClone
= std::make_shared<std::atomic_bool>(false);
for (const auto& member : c.members) {
if (member != getUsername()) {
// Try to clone from first other members device found
// NOTE: rescehdule this in a few seconds, to let the time
// to the peers to discover the device if it's the first time
// we create the device
Manager::instance().scheduleTaskIn(
[w = weak(), member, convId = c.id, willClone]() {
if (auto shared = w.lock())
shared->accountManager_->forEachDevice(
dht::InfoHash(member),
[w, convId, member, willClone](
const dht::InfoHash& dev) {
if (willClone->exchange(true))
return;
auto shared = w.lock();
if (!shared)
return;
shared->cloneConversation(dev.toString(),
member,
convId);
});
},
std::chrono::seconds(5));
}
}
}
}
}
std::lock_guard<std::mutex> lock(buddyInfoMtx);
for (auto& buddy : trackedBuddies_) {
buddy.second.devices_cnt = 0;
trackPresence(buddy.first, buddy.second);
}
} catch (const std::exception& e) {
JAMI_ERR("Error registering DHT account: %s", e.what());
@ -2626,10 +2603,8 @@ JamiAccount::doUnregister(std::function<void(bool)> released_cb)
// Stop all current p2p connections if account is disabled
// Else, we let the system managing if the co is down or not
if (not isEnabled()) {
needsConvSync_ = true;
if (not isEnabled())
shutdownConnections();
}
// Release current upnp mapping if any.
if (upnpCtrl_ and dhtUpnpMapping_.isValid()) {
@ -3463,67 +3438,6 @@ JamiAccount::sendTextMessage(const std::string& to,
std::chrono::minutes(1));
}
void
JamiAccount::sendSIPMessageToDevice(const std::string& to,
const DeviceId& deviceId,
const std::map<std::string, std::string>& payloads)
{
std::lock_guard<std::mutex> lk(sipConnsMtx_);
sip_utils::register_thread();
for (auto it = sipConns_.begin(); it != sipConns_.end();) {
auto& [key, value] = *it;
if (key.first == to && key.second != deviceId) {
++it;
continue;
}
auto& conn = value.back();
auto& channel = conn.channel;
// Set input token into callback
std::unique_ptr<TextMessageCtx> ctx {std::make_unique<TextMessageCtx>()};
ctx->acc = weak();
ctx->to = to;
ctx->deviceId = key.second;
ctx->channel = channel;
try {
auto res = sendSIPMessage(
conn, to, ctx.release(), {}, payloads, [](void* token, pjsip_event* event) {
std::unique_ptr<TextMessageCtx> c {(TextMessageCtx*) token};
auto code = event->body.tsx_state.tsx->status_code;
auto acc = c->acc.lock();
if (not acc)
return;
if (code == PJSIP_SC_OK) {
std::unique_lock<std::mutex> l(c->confirmation->lock);
c->confirmation->replied = true;
l.unlock();
if (!c->onlyConnected)
acc->messageEngine_.onMessageSent(c->to, c->id, true);
} else {
JAMI_WARN("Timeout when send a message, close current connection");
acc->shutdownSIPConnection(c->channel, c->to, c->deviceId);
}
});
if (!res) {
++it;
continue;
}
break;
} catch (const std::runtime_error& ex) {
JAMI_WARN("%s", ex.what());
++it;
// Remove connection in incorrect state
shutdownSIPConnection(channel, to, key.second);
continue;
}
++it;
}
}
void
JamiAccount::onIsComposing(const std::string& conversationId,
const std::string& peer,

View File

@ -340,9 +340,6 @@ public:
const std::map<std::string, std::string>& payloads) override;
void sendInstantMessage(const std::string& convId,
const std::map<std::string, std::string>& msg) override;
void sendSIPMessageToDevice(const std::string& to,
const DeviceId& deviceId,
const std::map<std::string, std::string>& payloads);
void onIsComposing(const std::string& conversationId,
const std::string& peer,
bool isWriting) override;
@ -1032,7 +1029,6 @@ private:
void syncWith(const std::string& deviceId, const std::shared_ptr<ChannelSocket>& socket);
void syncInfos(const std::shared_ptr<ChannelSocket>& socket);
void syncWithConnected();
std::atomic_bool needsConvSync_ {true};
/**
* Remove a repository and all files