conference: fix crashes on leave

This patch fix several problems:
+ participants_ must be protected and not iterated while accessed
+ getVideoMixer() do not generate a video mixer, this is only
managed at one place.
+ Remove setConfId, enter/exitConf are here for this and this
is causing the videoMixer to not be destroyed in time, because
exitConference() was called after setConfId(""). This was causing
crashes because the videoMixer will update for the last participant
with the error: "The call is not bound to any conference"

GitLab: #660
Change-Id: Ic60bc7377b0315f7e2906ab03a7653381436180c
This commit is contained in:
Sébastien Blin
2021-11-09 11:22:21 -05:00
parent 390601fd32
commit 41a7ef1970
7 changed files with 170 additions and 154 deletions

View File

@ -135,8 +135,6 @@ public:
*/
const std::string& getConfId() const { return confID_; }
void setConfId(const std::string& id) { confID_ = id; }
std::weak_ptr<Account> getAccount() const { return account_; }
std::string getAccountId() const;

View File

@ -66,13 +66,13 @@ Conference::Conference(bool enableVideo)
if (not videoEnabled_)
return;
getVideoMixer()->setOnSourcesUpdated([this](std::vector<video::SourceInfo>&& infos) {
videoMixer_ = std::make_shared<video::VideoMixer>(id_);
videoMixer_->setOnSourcesUpdated([this](std::vector<video::SourceInfo>&& infos) {
runOnMainThread([w = weak(), infos = std::move(infos)] {
auto shared = w.lock();
if (!shared)
return;
ConfInfo newInfo;
auto subCalls = shared->participants_;
auto hostAdded = false;
// Handle participants showing their video
std::unique_lock<std::mutex> lk(shared->videoToCallMtx_);
@ -95,9 +95,8 @@ Conference::Conference(bool enableVideo)
}
}
auto active = false;
if (auto videoMixer = shared->getVideoMixer())
if (auto videoMixer = shared->videoMixer_)
active = info.source == videoMixer->getActiveParticipant();
subCalls.erase(it->second);
std::string_view peerId = string_remove_suffix(uri, '@');
auto isModerator = shared->isModerator(peerId);
if (uri.empty()) {
@ -122,7 +121,7 @@ Conference::Conference(bool enableVideo)
isModerator,
isHandRaised});
}
if (auto videoMixer = shared->getVideoMixer()) {
if (auto videoMixer = shared->videoMixer_) {
newInfo.h = videoMixer->getHeight();
newInfo.w = videoMixer->getWidth();
}
@ -143,34 +142,30 @@ Conference::Conference(bool enableVideo)
Conference::~Conference()
{
JAMI_INFO("Destroy conference %s", id_.c_str());
#ifdef ENABLE_VIDEO
for (const auto& participant_id : participants_) {
if (auto call = getCall(participant_id)) {
call->exitConference();
// Reset distant callInfo
call->resetConfInfo();
// Trigger the SIP negotiation to update the resolution for the remaining call
// ideally this sould be done without renegotiation
call->switchInput(
Manager::instance().getVideoManager().videoDeviceMonitor.getMRLForDefaultDevice());
foreachCall([&](auto call) {
call->exitConference();
// Reset distant callInfo
call->resetConfInfo();
// Trigger the SIP negotiation to update the resolution for the remaining call
// ideally this sould be done without renegotiation
call->switchInput(
Manager::instance().getVideoManager().videoDeviceMonitor.getMRLForDefaultDevice());
// Continue the recording for the call if the conference was recorded
if (this->isRecording()) {
JAMI_DBG("Stop recording for conf %s", getConfID().c_str());
this->toggleRecording();
if (not call->isRecording()) {
JAMI_DBG("Conference was recorded, start recording for conf %s",
call->getCallId().c_str());
call->toggleRecording();
}
// Continue the recording for the call if the conference was recorded
if (isRecording()) {
JAMI_DBG("Stop recording for conf %s", getConfID().c_str());
toggleRecording();
if (not call->isRecording()) {
JAMI_DBG("Conference was recorded, start recording for conf %s",
call->getCallId().c_str());
call->toggleRecording();
}
// Notify that the remaining peer is still recording after conference
if (call->isPeerRecording())
call->peerRecording(true);
}
}
// Notify that the remaining peer is still recording after conference
if (call->isPeerRecording())
call->peerRecording(true);
});
for (auto it = confSinksMap_.begin(); it != confSinksMap_.end();) {
if (videoMixer_)
videoMixer_->detach(it->second.get());
@ -414,71 +409,75 @@ Conference::addParticipant(const std::string& participant_id)
{
JAMI_DBG("Adding call %s to conference %s", participant_id.c_str(), id_.c_str());
if (participants_.insert(participant_id).second) {
// Check if participant was muted before conference
if (auto call = getCall(participant_id)) {
if (call->isPeerMuted()) {
participantsMuted_.emplace(string_remove_suffix(call->getPeerNumber(), '@'));
}
{
std::lock_guard<std::mutex> lk(participantsMtx_);
if (!participants_.insert(participant_id).second)
return;
}
// When a call joins a conference, the control if the media
// source sates (mainly mute/un-mute states) will be handled
// by the conference.
takeOverMediaSourceControl(participant_id);
// Check if participant was muted before conference
if (auto call = getCall(participant_id)) {
if (call->isPeerMuted()) {
participantsMuted_.emplace(string_remove_suffix(call->getPeerNumber(), '@'));
}
if (auto call = getCall(participant_id)) {
auto w = call->getAccount();
auto account = w.lock();
if (account) {
// Add defined moderators for the account link to the call
for (const auto& mod : account->getDefaultModerators()) {
moderators_.emplace(mod);
}
// When a call joins a conference, the control if the media
// source sates (mainly mute/un-mute states) will be handled
// by the conference.
takeOverMediaSourceControl(participant_id);
}
// Check for localModeratorsEnabled preference
if (account->isLocalModeratorsEnabled() && not localModAdded_) {
auto accounts = jami::Manager::instance().getAllAccounts<JamiAccount>();
for (const auto& account : accounts) {
moderators_.emplace(account->getUsername());
}
localModAdded_ = true;
}
if (auto call = getCall(participant_id)) {
auto w = call->getAccount();
auto account = w.lock();
if (account) {
// Add defined moderators for the account link to the call
for (const auto& mod : account->getDefaultModerators()) {
moderators_.emplace(mod);
}
// Check for allModeratorEnabled preference
if (account->isAllModerators()) {
moderators_.emplace(string_remove_suffix(call->getPeerNumber(), '@'));
// Check for localModeratorsEnabled preference
if (account->isLocalModeratorsEnabled() && not localModAdded_) {
auto accounts = jami::Manager::instance().getAllAccounts<JamiAccount>();
for (const auto& account : accounts) {
moderators_.emplace(account->getUsername());
}
localModAdded_ = true;
}
// Check for allModeratorEnabled preference
if (account->isAllModerators()) {
moderators_.emplace(string_remove_suffix(call->getPeerNumber(), '@'));
}
}
}
#ifdef ENABLE_VIDEO
if (auto call = getCall(participant_id)) {
// In conference, all participants need to have video session
// (with a sink) in order to display the participant info in
// the layout. So, if a participant joins with an audio only
// call, a dummy video stream is added to the call.
auto mediaList = call->getMediaAttributeList();
if (not MediaAttribute::hasMediaType(mediaList, MediaType::MEDIA_VIDEO)) {
call->addDummyVideoRtpSession();
if (auto call = getCall(participant_id)) {
// In conference, all participants need to have video session
// (with a sink) in order to display the participant info in
// the layout. So, if a participant joins with an audio only
// call, a dummy video stream is added to the call.
auto mediaList = call->getMediaAttributeList();
if (not MediaAttribute::hasMediaType(mediaList, MediaType::MEDIA_VIDEO)) {
call->addDummyVideoRtpSession();
}
call->enterConference(getConfID());
// Continue the recording for the conference if one participant was recording
if (call->isRecording()) {
JAMI_DBG("Stop recording for call %s", call->getCallId().c_str());
call->toggleRecording();
if (not this->isRecording()) {
JAMI_DBG("One participant was recording, start recording for conference %s",
getConfID().c_str());
this->toggleRecording();
}
call->enterConference(getConfID());
// Continue the recording for the conference if one participant was recording
if (call->isRecording()) {
JAMI_DBG("Stop recording for call %s", call->getCallId().c_str());
call->toggleRecording();
if (not this->isRecording()) {
JAMI_DBG("One participant was recording, start recording for conference %s",
getConfID().c_str());
this->toggleRecording();
}
}
} else
JAMI_ERR("no call associate to participant %s", participant_id.c_str());
}
} else
JAMI_ERR("no call associate to participant %s", participant_id.c_str());
#endif // ENABLE_VIDEO
#ifdef ENABLE_PLUGIN
createConfAVStreams();
createConfAVStreams();
#endif
}
}
void
@ -511,16 +510,16 @@ Conference::setLayout(int layout)
{
switch (layout) {
case 0:
getVideoMixer()->setVideoLayout(video::Layout::GRID);
videoMixer_->setVideoLayout(video::Layout::GRID);
// The layout shouldn't have an active participant
if (videoMixer_->getActiveParticipant())
videoMixer_->setActiveParticipant(nullptr);
break;
case 1:
getVideoMixer()->setVideoLayout(video::Layout::ONE_BIG_WITH_SMALL);
videoMixer_->setVideoLayout(video::Layout::ONE_BIG_WITH_SMALL);
break;
case 2:
getVideoMixer()->setVideoLayout(video::Layout::ONE_BIG);
videoMixer_->setVideoLayout(video::Layout::ONE_BIG);
break;
default:
break;
@ -553,23 +552,21 @@ void
Conference::sendConferenceInfos()
{
// Inform calls that the layout has changed
for (const auto& participant_id : participants_) {
foreachCall([&](auto call) {
// Produce specific JSON for each participant (2 separate accounts can host ...
// a conference on a same device, the conference is not link to one account).
if (auto call = getCall(participant_id)) {
auto w = call->getAccount();
auto account = w.lock();
if (!account)
continue;
auto w = call->getAccount();
auto account = w.lock();
if (!account)
return;
dht::ThreadPool::io().run(
[call,
confInfo = getConfInfoHostUri(account->getUsername() + "@ring.dht",
call->getPeerNumber())] {
call->sendConfInfo(confInfo.toString());
});
}
}
dht::ThreadPool::io().run(
[call,
confInfo = getConfInfoHostUri(account->getUsername() + "@ring.dht",
call->getPeerNumber())] {
call->sendConfInfo(confInfo.toString());
});
});
auto confInfo = getConfInfoHostUri("", "");
createSinks(confInfo);
@ -600,7 +597,7 @@ Conference::attachVideo(Observable<std::shared_ptr<MediaFrame>>* frame, const st
{
std::lock_guard<std::mutex> lk(videoToCallMtx_);
videoToCall_.emplace(frame, callId);
frame->attach(getVideoMixer().get());
frame->attach(videoMixer_.get());
}
void
@ -609,7 +606,7 @@ Conference::detachVideo(Observable<std::shared_ptr<MediaFrame>>* frame)
std::lock_guard<std::mutex> lk(videoToCallMtx_);
auto it = videoToCall_.find(frame);
if (it != videoToCall_.end()) {
it->first->detach(getVideoMixer().get());
it->first->detach(videoMixer_.get());
videoToCall_.erase(it);
}
}
@ -617,16 +614,19 @@ Conference::detachVideo(Observable<std::shared_ptr<MediaFrame>>* frame)
void
Conference::removeParticipant(const std::string& participant_id)
{
if (participants_.erase(participant_id)) {
if (auto call = getCall(participant_id)) {
participantsMuted_.erase(std::string(string_remove_suffix(call->getPeerNumber(), '@')));
handsRaised_.erase(std::string(string_remove_suffix(call->getPeerNumber(), '@')));
{
std::lock_guard<std::mutex> lk(participantsMtx_);
if (!participants_.erase(participant_id))
return;
}
if (auto call = getCall(participant_id)) {
participantsMuted_.erase(std::string(string_remove_suffix(call->getPeerNumber(), '@')));
handsRaised_.erase(std::string(string_remove_suffix(call->getPeerNumber(), '@')));
#ifdef ENABLE_VIDEO
call->exitConference();
if (call->isPeerRecording())
call->peerRecording(false);
call->exitConference();
if (call->isPeerRecording())
call->peerRecording(false);
#endif // ENABLE_VIDEO
}
}
}
@ -652,12 +652,10 @@ Conference::attachLocalParticipant()
rbPool.flush(RingBufferPool::DEFAULT_ID);
#ifdef ENABLE_VIDEO
if (isVideoEnabled()) {
if (auto mixer = getVideoMixer()) {
mixer->switchInput(mediaInput_);
if (not mediaSecondaryInput_.empty())
mixer->switchSecondaryInput(mediaSecondaryInput_);
}
if (videoMixer_) {
videoMixer_->switchInput(mediaInput_);
if (not mediaSecondaryInput_.empty())
videoMixer_->switchSecondaryInput(mediaSecondaryInput_);
}
#endif
setMediaSourceState(MediaType::MEDIA_AUDIO, false);
@ -677,16 +675,13 @@ Conference::detachLocalParticipant()
JAMI_INFO("Detach local participant from conference %s", id_.c_str());
if (getState() == State::ACTIVE_ATTACHED) {
for (const auto& p : participants_) {
Manager::instance().getRingBufferPool().unBindCallID(getCall(p)->getCallId(),
foreachCall([&](auto call) {
Manager::instance().getRingBufferPool().unBindCallID(call->getCallId(),
RingBufferPool::DEFAULT_ID);
}
});
#ifdef ENABLE_VIDEO
if (isVideoEnabled()) {
if (auto mixer = getVideoMixer()) {
mixer->stopInput();
}
}
if (videoMixer_)
videoMixer_->stopInput();
#endif
setMediaSourceState(MediaType::MEDIA_AUDIO, true);
setMediaSourceState(MediaType::MEDIA_VIDEO, true);
@ -706,7 +701,7 @@ Conference::bindParticipant(const std::string& participant_id)
auto& rbPool = Manager::instance().getRingBufferPool();
for (const auto& item : participants_) {
for (const auto& item : getParticipantList()) {
if (participant_id != item) {
// Do not attach muted participants
if (auto call = Manager::instance().getCallFromCallID(item)) {
@ -744,7 +739,7 @@ Conference::bindHost()
auto& rbPool = Manager::instance().getRingBufferPool();
for (const auto& item : participants_) {
for (const auto& item : getParticipantList()) {
if (auto call = Manager::instance().getCallFromCallID(item)) {
if (isMuted(string_remove_suffix(call->getPeerNumber(), '@')))
continue;
@ -761,9 +756,10 @@ Conference::unbindHost()
Manager::instance().getRingBufferPool().unBindAllHalfDuplexOut(RingBufferPool::DEFAULT_ID);
}
const ParticipantSet&
ParticipantSet
Conference::getParticipantList() const
{
std::lock_guard<std::mutex> lk(participantsMtx_);
return participants_;
}
@ -777,11 +773,7 @@ Conference::toggleRecording()
deinitRecorder(recorder_);
// Notify each participant
for (const auto& participant_id : participants_) {
if (auto call = getCall(participant_id)) {
call->updateRecState(newState);
}
}
foreachCall([&](auto call) { call->updateRecState(newState); });
return Recordable::toggleRecording();
}
@ -804,7 +796,7 @@ Conference::switchInput(const std::string& input)
if (not isVideoEnabled())
return;
if (auto mixer = getVideoMixer()) {
if (auto mixer = videoMixer_) {
mixer->switchInput(input);
#ifdef ENABLE_PLUGIN
// Preview
@ -823,8 +815,8 @@ Conference::switchSecondaryInput(const std::string& input)
{
#ifdef ENABLE_VIDEO
mediaSecondaryInput_ = input;
if (isVideoEnabled()) {
getVideoMixer()->switchSecondaryInput(input);
if (videoMixer_) {
videoMixer_->switchSecondaryInput(input);
}
#endif
}
@ -839,8 +831,6 @@ Conference::isVideoEnabled() const
std::shared_ptr<video::VideoMixer>
Conference::getVideoMixer()
{
if (!videoMixer_)
videoMixer_.reset(new video::VideoMixer(id_));
return videoMixer_;
}
#endif
@ -965,7 +955,7 @@ Conference::setHandRaised(const std::string& participant_id, const bool& state)
}
return;
} else {
for (const auto& p : participants_) {
for (const auto& p : getParticipantList()) {
if (auto call = getCall(p)) {
auto isPeerRequiringAttention = isHandRaised(participant_id);
if (participant_id == string_remove_suffix(call->getPeerNumber(), '@')) {
@ -989,7 +979,7 @@ Conference::setHandRaised(const std::string& participant_id, const bool& state)
void
Conference::setModerator(const std::string& participant_id, const bool& state)
{
for (const auto& p : participants_) {
for (const auto& p : getParticipantList()) {
if (auto call = getCall(p)) {
auto isPeerModerator = isModerator(participant_id);
if (participant_id == string_remove_suffix(call->getPeerNumber(), '@')) {
@ -1029,6 +1019,14 @@ Conference::updateHandsRaised()
sendConferenceInfos();
}
void
Conference::foreachCall(const std::function<void(const std::shared_ptr<Call>& call)>& cb)
{
for (const auto& p : getParticipantList())
if (auto call = getCall(p))
cb(call);
}
bool
Conference::isMuted(std::string_view uri) const
{
@ -1155,7 +1153,7 @@ Conference::isHost(std::string_view uri) const
// Check if the URI is a local URI (AccountID) for at least one of the subcall
// (a local URI can be in the call with another device)
for (const auto& p : participants_) {
for (const auto& p : getParticipantList()) {
if (auto call = getCall(p)) {
if (auto account = call->getAccount().lock()) {
if (account->getUsername() == uri)
@ -1240,12 +1238,12 @@ Conference::muteLocalHost(bool is_muted, const std::string& mediaType)
}
setMediaSourceState(MediaType::MEDIA_VIDEO, is_muted);
if (is_muted) {
if (auto mixer = getVideoMixer()) {
if (auto mixer = videoMixer_) {
JAMI_DBG("Muting local video source");
mixer->stopInput();
}
} else {
if (auto mixer = getVideoMixer()) {
if (auto mixer = videoMixer_) {
JAMI_DBG("Un-muting local video source");
switchInput(mediaInput_);
}
@ -1326,11 +1324,11 @@ Conference::mergeConfInfo(ConfInfo& newInfo, const std::string& peerURI)
updateNeeded = true;
}
// Send confInfo only if needed to avoid loops
if (updateNeeded and isVideoEnabled()) {
if (updateNeeded and videoMixer_) {
// Trigger the layout update in the mixer because the frame resolution may
// change from participant to conference and cause a mismatch between
// confInfo layout and rendering layout.
getVideoMixer()->updateLayout();
videoMixer_->updateLayout();
}
}
@ -1349,7 +1347,7 @@ Conference::findHostforRemoteParticipant(std::string_view uri)
std::shared_ptr<Call>
Conference::getCallFromPeerID(std::string_view peerID)
{
for (const auto& p : participants_) {
for (const auto& p : getParticipantList()) {
auto call = getCall(p);
if (call && string_remove_suffix(call->getPeerNumber(), '@') == peerID) {
return call;

View File

@ -299,7 +299,7 @@ public:
/**
* Get the participant list for this conference
*/
const ParticipantSet& getParticipantList() const;
ParticipantSet getParticipantList() const;
/**
* Start/stop recording toggle
@ -353,8 +353,11 @@ private:
void updateModerators();
void updateHandsRaised();
void foreachCall(const std::function<void(const std::shared_ptr<Call>& call)>& cb);
std::string id_;
State confState_ {State::ACTIVE_ATTACHED};
mutable std::mutex participantsMtx_ {};
ParticipantSet participants_;
bool videoEnabled_ {false};

View File

@ -563,7 +563,6 @@ Manager::ManagerPimpl::processRemainingParticipants(Conference& conf)
if (account->isRendezVous())
return;
call->setConfId("");
if (current_call_id != conf.getConfID())
base_.onHoldCall(call->getCallId());
else
@ -693,7 +692,6 @@ Manager::ManagerPimpl::bindCallToConference(Call& call, Conference& conf)
base_.getRingBufferPool().unBindAll(call_id);
conf.addParticipant(call_id);
call.setConfId(conf_id);
if (state == "HOLD") {
conf.bindParticipant(call_id);
@ -1042,7 +1040,6 @@ Manager::outgoingCall(const std::string& account_id,
stopTone();
pimpl_->switchCall(call);
call->setConfId(conf_id);
return call_id;
}
@ -1078,7 +1075,6 @@ Manager::outgoingCall(const std::string& account_id,
stopTone();
pimpl_->switchCall(call);
call->setConfId(conf_id);
return call_id;
}
@ -1798,7 +1794,6 @@ Manager::removeParticipant(const std::string& call_id)
}
conf->removeParticipant(call_id);
call->setConfId("");
removeAudio(*call);
@ -1839,7 +1834,6 @@ Manager::joinConference(const std::string& conf_id1, const std::string& conf_id2
}
conf->removeParticipant(p);
call->setConfId("");
removeAudio(*call);
}
// Remove conf1

View File

@ -158,7 +158,7 @@ VideoRtpSession::startSender()
sender_.reset();
socketPair_->stopSendOp(false);
MediaStream ms
= !conference_
= !videoMixer_
? MediaStream("video sender",
AV_PIX_FMT_YUV420P,
1 / static_cast<rational<int>>(localVideoParams_.framerate),
@ -166,7 +166,7 @@ VideoRtpSession::startSender()
localVideoParams_.height,
send_.bitrate,
static_cast<rational<int>>(localVideoParams_.framerate))
: conference_->getVideoMixer()->getStream("Video Sender");
: videoMixer_->getStream("Video Sender");
sender_.reset(
new VideoSender(getRemoteRtpUri(), ms, send_, *socketPair_, initSeqVal_ + 1, mtu_));
if (changeOrientationCallback_)

View File

@ -2802,6 +2802,7 @@ void
SIPCall::enterConference(const std::string& confId)
{
JAMI_DBG("[call:%s] Entering conference [%s]", getCallId().c_str(), confId.c_str());
confID_ = confId;
#ifdef ENABLE_VIDEO
auto conf = Manager::instance().getConferenceFromID(confId);
@ -2851,6 +2852,7 @@ SIPCall::exitConference()
#ifdef ENABLE_PLUGIN
createCallAVStreams();
#endif
confID_ = "";
}
std::shared_ptr<Observable<std::shared_ptr<MediaFrame>>>

View File

@ -76,6 +76,7 @@ private:
void testCreateParticipantsSinks();
void testMuteStatusAfterRemove();
void testHandsUp();
void testPeerLeaveConference();
CPPUNIT_TEST_SUITE(ConferenceTest);
CPPUNIT_TEST(testGetConference);
@ -84,6 +85,7 @@ private:
CPPUNIT_TEST(testCreateParticipantsSinks);
CPPUNIT_TEST(testMuteStatusAfterRemove);
CPPUNIT_TEST(testHandsUp);
CPPUNIT_TEST(testPeerLeaveConference);
CPPUNIT_TEST_SUITE_END();
// Common parts
@ -471,6 +473,25 @@ ConferenceTest::testHandsUp()
DRing::unregisterSignalHandlers();
}
void
ConferenceTest::testPeerLeaveConference()
{
registerSignalHandlers();
auto aliceAccount = Manager::instance().getAccount<JamiAccount>(aliceId);
auto bobAccount = Manager::instance().getAccount<JamiAccount>(bobId);
auto bobUri = bobAccount->getUsername();
startConference();
Manager::instance().hangupCall(bobCall.callId);
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(20), [&] {
return bobCall.state == "OVER" && confId.empty();
}));
DRing::unregisterSignalHandlers();
}
} // namespace test
} // namespace jami