ConversationModule: use accountManager directly

ConversationModule is guaranteed to be provided a loaded accountManager
at all times with always the same deviceId.
This allows to bypass the account and the configurationmMutex during loading,
and avoids consistency issues.

GitLab: #952
Change-Id: I1068e7eb559f8be27bf34817abb3f2898de3318e
This commit is contained in:
Adrien Béraud
2024-03-26 14:37:48 -04:00
committed by François-Simon Fauteux-Chapleau
parent 5de9d9eee3
commit 4e84cc5de7
4 changed files with 108 additions and 99 deletions

View File

@ -146,7 +146,7 @@ public:
if (!repository_) {
throw std::logic_error("Couldn't create repository");
}
init(*account);
init(account);
}
Impl(const std::shared_ptr<JamiAccount>& account, const std::string& conversationId)
@ -159,7 +159,7 @@ public:
if (!repository_) {
throw std::logic_error("Couldn't create repository");
}
init(*account);
init(account);
}
Impl(const std::shared_ptr<JamiAccount>& account,
@ -188,10 +188,10 @@ public:
activeCallsPath_ = conversationDataPath_ / ConversationMapKeys::ACTIVE_CALLS;
for (const auto& c : repository_->convCommitsToMap(commits))
updateActiveCalls(c);
init(*account);
init(account);
}
void init(JamiAccount& account) {
void init(const std::shared_ptr<JamiAccount>& account) {
ioContext_ = Manager::instance().ioContext();
fallbackTimer_ = std::make_unique<asio::steady_timer>(*ioContext_);
swarmManager_
@ -203,7 +203,7 @@ public:
}
return false;
});
swarmManager_->setMobility(account.isMobile());
swarmManager_->setMobility(account->isMobile());
transferManager_
= std::make_shared<TransferManager>(accountId_,
"",
@ -219,7 +219,7 @@ public:
hostedCallsPath_ = conversationDataPath_ / ConversationMapKeys::HOSTED_CALLS;
loadActiveCalls();
loadStatus();
typers_ = std::make_shared<Typers>(shared, repository_->id());
typers_ = std::make_shared<Typers>(account, repository_->id());
}
const std::string& toString() const

View File

@ -122,7 +122,8 @@ struct SyncedConversation
class ConversationModule::Impl : public std::enable_shared_from_this<Impl>
{
public:
Impl(std::weak_ptr<JamiAccount>&& account,
Impl(std::shared_ptr<JamiAccount>&& account,
std::shared_ptr<AccountManager>&& accountManager,
NeedsSyncingCb&& needsSyncingCb,
SengMsgCb&& sendMsgCb,
NeedSocketCb&& onNeedSocket,
@ -367,6 +368,7 @@ public:
}
std::weak_ptr<JamiAccount> account_;
std::shared_ptr<AccountManager> accountManager_;
NeedsSyncingCb needsSyncingCb_;
SengMsgCb sendMsgCb_;
NeedSocketCb onNeedSocket_;
@ -449,7 +451,8 @@ public:
}
};
ConversationModule::Impl::Impl(std::weak_ptr<JamiAccount>&& account,
ConversationModule::Impl::Impl(std::shared_ptr<JamiAccount>&& account,
std::shared_ptr<AccountManager>&& accountManager,
NeedsSyncingCb&& needsSyncingCb,
SengMsgCb&& sendMsgCb,
NeedSocketCb&& onNeedSocket,
@ -457,20 +460,20 @@ ConversationModule::Impl::Impl(std::weak_ptr<JamiAccount>&& account,
UpdateConvReq&& updateConvReqCb,
OneToOneRecvCb&& oneToOneRecvCb)
: account_(account)
, accountManager_(accountManager)
, needsSyncingCb_(needsSyncingCb)
, sendMsgCb_(sendMsgCb)
, onNeedSocket_(onNeedSocket)
, onNeedSwarmSocket_(onNeedSwarmSocket)
, updateConvReqCb_(updateConvReqCb)
, oneToOneRecvCb_(oneToOneRecvCb)
, accountId_(account->getAccountID())
{
if (auto shared = account.lock()) {
accountId_ = shared->getAccountID();
deviceId_ = shared->currentDeviceId();
if (auto accm = shared->accountManager())
if (const auto* info = accm->getInfo())
username_ = info->accountId;
}
if (auto accm = account->accountManager())
if (const auto* info = accm->getInfo()) {
deviceId_ = info->deviceId;
username_ = info->accountId;
}
conversationsRequests_ = convRequests(accountId_);
loadMetadatas();
}
@ -507,7 +510,7 @@ ConversationModule::Impl::cloneConversation(const std::string& deviceId,
onNeedSocket_(
conv->info.id,
deviceId,
[=](const auto& channel) {
[w = weak(), conv, deviceId](const auto& channel) {
std::lock_guard lk(conv->mtx);
if (conv->pending && !conv->pending->ready) {
if (channel) {
@ -516,7 +519,7 @@ ConversationModule::Impl::cloneConversation(const std::string& deviceId,
conv->pending->socket = channel;
if (!conv->pending->cloning) {
conv->pending->cloning = true;
dht::ThreadPool::io().run([w = weak(),
dht::ThreadPool::io().run([w,
convId = conv->info.id,
deviceId = conv->pending->deviceId]() {
if (auto sthis = w.lock())
@ -686,9 +689,7 @@ ConversationModule::Impl::fetchNewCommits(const std::string& peer,
}
}
if (shared->syncCnt.fetch_sub(1) == 1) {
if (auto account = shared->account_.lock())
emitSignal<libjami::ConversationSignal::ConversationSyncFinished>(
account->getAccountID().c_str());
emitSignal<libjami::ConversationSignal::ConversationSyncFinished>(shared->accountId_);
}
},
commitId);
@ -725,9 +726,13 @@ ConversationModule::Impl::handlePendingConversation(const std::string& conversat
if (!acc)
return;
std::vector<DeviceId> kd;
for (const auto& [id, _] : acc->getKnownDevices())
kd.emplace_back(id);
{
std::unique_lock lk(conversationsMtx_);
const auto& devices = accountManager_->getKnownDevices();
kd.reserve(devices.size());
for (const auto& [id, _] : devices)
kd.emplace_back(id);
}
auto conv = getConversation(conversationId);
if (!conv)
return;
@ -844,17 +849,13 @@ ConversationModule::Impl::handlePendingConversation(const std::string& conversat
auto askForProfile = isOneOne;
if (!isOneOne) {
// If not 1:1 only download profiles from self (to avoid non checked files)
if (auto acc = account_.lock()) {
auto cert = acc->certStore().getCertificate(deviceId);
askForProfile = cert && cert->issuer
&& cert->issuer->getId().toString() == username_;
}
auto cert = acc->certStore().getCertificate(deviceId);
askForProfile = cert && cert->issuer
&& cert->issuer->getId().toString() == username_;
}
if (askForProfile) {
if (auto acc = account_.lock()) {
for (const auto& member : conversation->memberUris(username_)) {
acc->askForProfile(conversationId, deviceId, member);
}
for (const auto& member : conversation->memberUris(username_)) {
acc->askForProfile(conversationId, deviceId, member);
}
}
} catch (const std::exception& e) {
@ -887,10 +888,7 @@ ConversationModule::Impl::getRequest(const std::string& id) const
std::string
ConversationModule::Impl::getOneToOneConversation(const std::string& uri) const noexcept
{
auto acc = account_.lock();
if (!acc)
return {};
auto details = acc->getContactDetails(uri);
auto details = accountManager_->getContactDetails(uri);
auto itRemoved = details.find("removed");
// If contact is removed there is no conversation
if (itRemoved != details.end() && itRemoved->second != "0") {
@ -955,13 +953,11 @@ ConversationModule::Impl::removeRepositoryImpl(SyncedConversation& conv, bool sy
try {
if (conv.conversation->mode() == ConversationMode::ONE_TO_ONE) {
for (const auto& member : conv.conversation->getInitialMembers()) {
auto account = account_.lock();
if (member != account->getUsername()) {
if (member != username_) {
// Note: this can happen while re-adding a contact.
// In this case, check that we are removing the linked conversation.
if (conv.info.id == getOneToOneConversation(member)) {
if (auto am = account->accountManager())
am->removeContactConversation(member);
accountManager_->removeContactConversation(member);
}
}
}
@ -1092,12 +1088,10 @@ ConversationModule::Impl::sendMessageNotification(Conversation& conversation,
auto members = conversation.memberUris(username_, {MemberRole::BANNED});
std::vector<std::string> connectedMembers;
// print all members
if (auto acc = account_.lock()) {
for (const auto& device : devices) {
auto cert = acc->certStore().getCertificate(device.toString());
if (cert && cert->issuer)
connectedMembers.emplace_back(cert->issuer->getId().toString());
}
for (const auto& device : devices) {
auto cert = acc->certStore().getCertificate(device.toString());
if (cert && cert->issuer)
connectedMembers.emplace_back(cert->issuer->getId().toString());
}
std::sort(std::begin(connectedMembers), std::end(connectedMembers));
std::set_difference(members.begin(),
@ -1320,9 +1314,7 @@ ConversationModule::Impl::cloneConversationFrom(const std::shared_ptr<SyncedConv
onNeedSocket_(
conversationId,
deviceId,
[sthis = shared_from_this(), conv, conversationId, oldConvId, deviceId](
const auto& channel) {
auto acc = sthis->account_.lock();
[wthis=weak_from_this(), conv, conversationId, oldConvId, deviceId](const auto& channel) {
std::lock_guard lk(conv->mtx);
if (conv->pending && !conv->pending->ready) {
conv->pending->removeId = oldConvId;
@ -1332,15 +1324,15 @@ ConversationModule::Impl::cloneConversationFrom(const std::shared_ptr<SyncedConv
conv->pending->socket = channel;
if (!conv->pending->cloning) {
conv->pending->cloning = true;
dht::ThreadPool::io().run([w = sthis->weak(),
conversationId,
deviceId = conv->pending->deviceId]() {
if (auto sthis = w.lock())
dht::ThreadPool::io().run([wthis,
conversationId,
deviceId = conv->pending->deviceId]() {
if (auto sthis = wthis.lock())
sthis->handlePendingConversation(conversationId, deviceId);
});
}
return true;
} else {
} else if (auto sthis = wthis.lock()) {
conv->stopFetch(deviceId);
JAMI_WARNING("Clone failed. Re-clone in {}s", conv->fallbackTimer.count());
conv->fallbackClone->expires_at(std::chrono::steady_clock::now() + conv->fallbackTimer);
@ -1379,10 +1371,13 @@ void
ConversationModule::Impl::bootstrap(const std::string& convId)
{
std::vector<DeviceId> kd;
if (auto acc = account_.lock())
for (const auto& [id, _] : acc->getKnownDevices())
{
std::unique_lock lk(conversationsMtx_);
const auto& devices = accountManager_->getKnownDevices();
kd.reserve(devices.size());
for (const auto& [id, _] : devices)
kd.emplace_back(id);
}
auto bootstrap = [&](auto& conv) {
if (conv) {
#ifdef LIBJAMI_TESTABLE
@ -1427,9 +1422,8 @@ ConversationModule::Impl::cloneConversationFrom(const std::string& conversationI
const std::string& uri,
const std::string& oldConvId)
{
auto acc = account_.lock();
auto memberHash = dht::InfoHash(uri);
if (!acc || !memberHash) {
if (!memberHash) {
JAMI_WARNING("Invalid member detected: {}", uri);
return;
}
@ -1439,15 +1433,16 @@ ConversationModule::Impl::cloneConversationFrom(const std::string& conversationI
conv->info.created = std::time(nullptr);
conv->info.members.emplace(username_);
conv->info.members.emplace(uri);
acc->forEachDevice(memberHash,
[w = weak(), conv, conversationId, oldConvId](
const std::shared_ptr<dht::crypto::PublicKey>& pk) {
auto sthis = w.lock();
auto deviceId = pk->getLongId().toString();
if (!sthis or deviceId == sthis->deviceId_)
return;
sthis->cloneConversationFrom(conv, deviceId, oldConvId);
});
accountManager_->forEachDevice(
memberHash,
[w = weak(), conv, conversationId, oldConvId](
const std::shared_ptr<dht::crypto::PublicKey>& pk) {
auto sthis = w.lock();
auto deviceId = pk->getLongId().toString();
if (!sthis or deviceId == sthis->deviceId_)
return;
sthis->cloneConversationFrom(conv, deviceId, oldConvId);
});
addConvInfo(conv->info);
}
@ -1490,7 +1485,8 @@ ConversationModule::saveConvInfosToPath(const std::filesystem::path& path,
////////////////////////////////////////////////////////////////
ConversationModule::ConversationModule(std::weak_ptr<JamiAccount>&& account,
ConversationModule::ConversationModule(std::shared_ptr<JamiAccount> account,
std::shared_ptr<AccountManager> accountManager,
NeedsSyncingCb&& needsSyncingCb,
SengMsgCb&& sendMsgCb,
NeedSocketCb&& onNeedSocket,
@ -1499,6 +1495,7 @@ ConversationModule::ConversationModule(std::weak_ptr<JamiAccount>&& account,
OneToOneRecvCb&& oneToOneRecvCb,
bool autoLoadConversations)
: pimpl_ {std::make_unique<Impl>(std::move(account),
std::move(accountManager),
std::move(needsSyncingCb),
std::move(sendMsgCb),
std::move(onNeedSocket),
@ -1511,6 +1508,13 @@ ConversationModule::ConversationModule(std::weak_ptr<JamiAccount>&& account,
}
}
void
ConversationModule::setAccountManager(std::shared_ptr<AccountManager> accountManager)
{
std::unique_lock lk(pimpl_->conversationsMtx_);
pimpl_->accountManager_ = accountManager;
}
#ifdef LIBJAMI_TESTABLE
void
ConversationModule::onBootstrapStatus(
@ -1532,9 +1536,8 @@ ConversationModule::loadConversations()
auto conversationsRepositories = dhtnet::fileutils::readDirectory(
fileutils::get_data_dir() / pimpl_->accountId_ / "conversations");
auto contacts = acc->getContacts(
true); // Avoid to lock configurationMtx while conv Mtx is locked
std::unique_lock lk(pimpl_->conversationsMtx_);
auto contacts = pimpl_->accountManager_->getContacts(true); // Avoid to lock configurationMtx while conv Mtx is locked
std::unique_lock ilk(pimpl_->convInfosMtx_);
pimpl_->convInfos_ = convInfos(pimpl_->accountId_);
pimpl_->conversations_.clear();
@ -1546,19 +1549,18 @@ ConversationModule::loadConversations()
std::mutex toRmMtx;
std::set<std::string> toRm;
std::mutex convMtx;
std::atomic_int convNb;
size_t convNb;
std::vector<std::map<std::string, std::string>> contacts;
std::vector<std::tuple<std::string, std::string, std::string>> updateContactConv;
};
auto ctx = std::make_shared<Ctx>();
ctx->convNb = conversationsRepositories.empty() ? 0 : conversationsRepositories.size();
ctx->convNb = conversationsRepositories.size();
ctx->contacts = std::move(contacts);
for (auto repository : conversationsRepositories) {
dht::ThreadPool::io().run([this, ctx, repository, acc] {
for (auto&& r : conversationsRepositories) {
dht::ThreadPool::io().run([this, ctx, repository=std::move(r), acc] {
try {
auto sconv = std::make_shared<SyncedConversation>(repository);
auto conv = std::make_shared<Conversation>(acc, repository);
conv->onMessageStatusChanged([this, repository](const auto& status) {
auto msg = std::make_shared<SyncMsg>();
@ -1593,7 +1595,7 @@ ConversationModule::loadConversations()
ctx->cv.notify_all();
return;
}
std::string convFromDetails = itContact->at("conversationId");
const std::string& convFromDetails = itContact->at("conversationId");
auto removed = std::stoul(itContact->at("removed"));
auto added = std::stoul(itContact->at("added"));
auto isRemoved = removed > added;
@ -1668,8 +1670,8 @@ ConversationModule::loadConversations()
});
}
std::unique_lock lkCv {ctx->cvMtx};
ctx->cv.wait(lkCv, [&] { return ctx->convNb.load() == 0; });
std::unique_lock lkCv(ctx->cvMtx);
ctx->cv.wait(lkCv, [&] { return ctx->convNb == 0; });
// Prune any invalid conversations without members and
// set the removed flag if needed
@ -1903,11 +1905,10 @@ void
ConversationModule::onConversationRequest(const std::string& from, const Json::Value& value)
{
ConversationRequest req(value);
auto acc = pimpl_->account_.lock();
auto isOneToOne = req.isOneToOne();
std::string oldConv;
if (acc && isOneToOne) {
oldConv = getOneToOneConversation(from);
if (isOneToOne) {
oldConv = pimpl_->getOneToOneConversation(from);
}
std::unique_lock lk(pimpl_->conversationsRequestsMtx_);
JAMI_DEBUG("[Account {}] Receive a new conversation request for conversation {} from {}",
@ -2058,9 +2059,8 @@ ConversationModule::startConversation(ConversationMode mode, const dht::InfoHash
conversationId),
kd);
} catch (const std::exception& e) {
JAMI_ERR("[Account %s] Error while generating a conversation %s",
pimpl_->accountId_.c_str(),
e.what());
JAMI_ERROR("[Account {}] Error while generating a conversation {}",
pimpl_->accountId_, e.what());
return {};
}
auto convId = conversation->id();
@ -2370,11 +2370,8 @@ ConversationModule::syncConversations(const std::string& peer, const std::string
pimpl_->fetchNewCommits(peer, deviceId, cid);
for (const auto& cid : toClone)
pimpl_->cloneConversation(deviceId, peer, cid);
if (pimpl_->syncCnt.load() == 0) {
if (auto acc = pimpl_->account_.lock())
emitSignal<libjami::ConversationSignal::ConversationSyncFinished>(
acc->getAccountID().c_str());
}
if (pimpl_->syncCnt.load() == 0)
emitSignal<libjami::ConversationSignal::ConversationSyncFinished>(pimpl_->accountId_);
}
void
@ -2781,12 +2778,8 @@ ConversationModule::isBanned(const std::string& convId, const std::string& uri)
return conv->conversation->isBanned(uri);
}
// If 1:1 we check the certificate status
if (auto acc = pimpl_->account_.lock()) {
if (auto am = acc->accountManager())
return am->getCertificateStatus(uri)
== dhtnet::tls::TrustStore::PermissionStatus::BANNED;
}
return true;
std::lock_guard lk(pimpl_->conversationsMtx_);
return pimpl_->accountManager_->getCertificateStatus(uri) == dhtnet::tls::TrustStore::PermissionStatus::BANNED;
}
void

View File

@ -72,7 +72,8 @@ using OneToOneRecvCb = std::function<void(const std::string&, const std::string&
class ConversationModule
{
public:
ConversationModule(std::weak_ptr<JamiAccount>&& account,
ConversationModule(std::shared_ptr<JamiAccount> account,
std::shared_ptr<AccountManager> accountManager,
NeedsSyncingCb&& needsSyncingCb,
SengMsgCb&& sendMsgCb,
NeedSocketCb&& onNeedSocket,
@ -82,6 +83,8 @@ public:
bool autoLoadConversations = true);
~ConversationModule() = default;
void setAccountManager(std::shared_ptr<AccountManager> accountManager);
/**
* Refresh informations about conversations
*/

View File

@ -1219,6 +1219,7 @@ JamiAccount::loadAccount(const std::string& archive_password_scheme,
const auto& conf = config();
try {
auto oldIdentity = id_.first ? id_.first->getPublicKey().getLongId() : DeviceId();
if (conf.managerUri.empty()) {
accountManager_ = std::make_shared<ArchiveAccountManager>(
getPath(),
@ -1246,8 +1247,15 @@ JamiAccount::loadAccount(const std::string& archive_password_scheme,
id_ = std::move(id);
config_->username = info->accountId;
JAMI_WARNING("[Account {:s}] loaded account identity", getAccountID());
if (info->identity.first->getPublicKey().getLongId() != oldIdentity) {
JAMI_WARNING("[Account {:s}] identity changed", getAccountID());
std::lock_guard lk(moduleMtx_);
convModule_.reset();
} else {
convModule_->setAccountManager(accountManager_);
}
convModule(); // Init conv module
if (not isEnabled()) {
convModule(); // Init conv module
setRegistrationState(RegistrationState::UNREGISTERED);
}
} else if (isEnabled()) {
@ -1349,6 +1357,10 @@ JamiAccount::loadAccount(const std::string& archive_password_scheme,
conf.fromMap(config);
});
id_ = std::move(id);
{
std::lock_guard lk(moduleMtx_);
convModule_.reset();
}
if (migrating) {
Migration::setState(getAccountID(), Migration::State::SUCCESS);
}
@ -2150,7 +2162,8 @@ JamiAccount::convModule(bool noCreation)
std::lock_guard lk(moduleMtx_);
if (!convModule_) {
convModule_ = std::make_unique<ConversationModule>(
weak(),
shared(),
accountManager_,
[this](auto&& syncMsg) {
dht::ThreadPool::io().run([w = weak(), syncMsg] {
if (auto shared = w.lock()) {