conference: identify mixed videos per streamId

Videos in the videoMixer_ were not easy to identify, moreover,
"sinkId" in the participants informations was the concatenation
of the confId and a URI, which is a problem if several devices
from the same account was present in the conference. Finally,
the active stream logic was dirty, with two different variables
used to identify the active stream in the mixer.

This patch introduces a streamId which is (callId_type_idx, e.g.
ca111412_video_0, ca111412_audio_2), so every video shown in the
conference are identified via a unique ID.
Active stream in the video mixer is identified by this ID, not
by the callId or pointer.

This should not change any behaviour, but prepare for multistream.

https://git.jami.net/savoirfairelinux/jami-project/-/issues/1429

Change-Id: I250dd31ad1ea92ed1fd1e94bec2f5abd311d2128
This commit is contained in:
Sébastien Blin
2022-05-02 08:49:47 -04:00
parent 13ceecdac4
commit ce645d4fd3
18 changed files with 225 additions and 224 deletions

View File

@ -578,14 +578,7 @@ setActiveStream(const std::string& accountId,
{
if (const auto account = jami::Manager::instance().getAccount<jami::JamiAccount>(accountId)) {
if (auto conf = account->getConference(confId)) {
// TODO, as for now the videoMixer doesn't have the streamId
if (deviceId == account->currentDeviceId() && accountUri == account->getUsername()) {
conf->setActiveStream("", state);
} else if (auto call = std::static_pointer_cast<jami::SIPCall>(
conf->getCallFromPeerID(accountUri))) {
conf->setActiveStream(call->getCallId(), state);
}
// conf->setActiveStream(streamId, state);
conf->setActiveStream(streamId, state);
} else if (auto call = account->getCall(confId)) {
if (call->conferenceProtocolVersion() == 1) {
Json::Value sinkVal;

View File

@ -109,14 +109,14 @@ Conference::Conference(const std::shared_ptr<Account>& account)
}
auto hostAdded = false;
// Handle participants showing their video
std::unique_lock<std::mutex> lk(shared->videoToCallMtx_);
for (const auto& info : infos) {
std::string uri {};
bool isLocalMuted = false;
std::string deviceId {};
auto active = false;
if (!info.id.empty()) {
if (auto call = std::dynamic_pointer_cast<SIPCall>(getCall(info.id))) {
if (!info.callId.empty()) {
std::string callId = info.callId;
if (auto call = std::dynamic_pointer_cast<SIPCall>(getCall(callId))) {
uri = call->getPeerNumber();
isLocalMuted = call->isPeerMuted();
if (auto* transport = call->getTransport())
@ -125,13 +125,12 @@ Conference::Conference(const std::shared_ptr<Account>& account)
std::string_view peerId = string_remove_suffix(uri, '@');
auto isModerator = shared->isModerator(peerId);
auto isHandRaised = shared->isHandRaised(deviceId);
auto isModeratorMuted = shared->isMuted(info.id);
auto sinkId = shared->getConfId() + peerId;
auto isModeratorMuted = shared->isMuted(callId);
if (auto videoMixer = shared->videoMixer_)
active = videoMixer->verifyActive(info.id); // TODO streamId
active = videoMixer->verifyActive(info.streamId);
newInfo.emplace_back(ParticipantInfo {std::move(uri),
deviceId,
std::move(sinkId),
std::move(info.streamId),
active,
info.x,
info.y,
@ -143,30 +142,31 @@ Conference::Conference(const std::shared_ptr<Account>& account)
isModerator,
isHandRaised});
} else {
auto it = shared->videoToCall_.find(info.source);
if (it == shared->videoToCall_.end())
it = shared->videoToCall_.emplace_hint(it, info.source, std::string());
// If not local
auto isModeratorMuted = false;
if (!it->second.empty()) {
// If not local
auto streamInfo = shared->videoMixer_->streamInfo(info.source);
std::string streamId = streamInfo.streamId;
if (!streamId.empty()) {
// Retrieve calls participants
// TODO: this is a first version, we assume that the peer is not
// a master of a conference and there is only one remote
// In the future, we should retrieve confInfo from the call
// To merge layouts informations
// TODO sinkId
auto isModeratorMuted = shared->isMuted(it->second);
isModeratorMuted = shared->isMuted(streamId);
if (auto videoMixer = shared->videoMixer_)
active = videoMixer->verifyActive(it->second);
if (auto call = std::dynamic_pointer_cast<SIPCall>(getCall(it->second))) {
active = videoMixer->verifyActive(streamId);
if (auto call = std::dynamic_pointer_cast<SIPCall>(getCall(streamInfo.callId))) {
uri = call->getPeerNumber();
isLocalMuted = call->isPeerMuted();
if (auto* transport = call->getTransport())
deviceId = transport->deviceId();
}
} else {
streamId = sip_utils::streamId("", 0, MediaType::MEDIA_VIDEO);
if (auto videoMixer = shared->videoMixer_)
active = videoMixer->verifyActive(streamId);
}
std::string_view peerId = string_remove_suffix(uri, '@');
// TODO (another patch): use deviceId instead of peerId as specified in protocol
auto isModerator = shared->isModerator(peerId);
if (uri.empty() && !hostAdded) {
hostAdded = true;
@ -175,12 +175,9 @@ Conference::Conference(const std::shared_ptr<Account>& account)
isLocalMuted = shared->isMediaSourceMuted(MediaType::MEDIA_AUDIO);
}
auto isHandRaised = shared->isHandRaised(deviceId);
auto sinkId = shared->getConfId() + peerId;
if (auto videoMixer = shared->videoMixer_)
active |= videoMixer->verifyActive(info.source);
newInfo.emplace_back(ParticipantInfo {std::move(uri),
deviceId,
std::move(sinkId),
std::move(streamId),
active,
info.x,
info.y,
@ -197,7 +194,6 @@ Conference::Conference(const std::shared_ptr<Account>& account)
newInfo.h = videoMixer->getHeight();
newInfo.w = videoMixer->getWidth();
}
lk.unlock();
if (!hostAdded) {
ParticipantInfo pi;
pi.videoMuted = true;
@ -218,10 +214,8 @@ Conference::Conference(const std::shared_ptr<Account>& account)
});
parser_.onRaiseHand(
[&](const auto& deviceId, bool state) { setHandRaised(deviceId, state); });
parser_.onSetActiveStream([&](const auto& accountUri, const auto& deviceId, bool state) {
// TODO replace per streamId
if (auto call = getCallWith(accountUri, deviceId))
setHandRaised(call->getCallId(), state);
parser_.onSetActiveStream([&](const auto& streamId, bool state) {
setActiveStream(streamId, state);
});
parser_.onMuteStreamAudio
(
@ -616,11 +610,12 @@ Conference::handleMediaChangeRequest(const std::shared_ptr<Call>& call,
#ifdef ENABLE_VIDEO
// If the new media list has video, remove the participant from audioonlylist.
if (MediaAttribute::hasMediaType(MediaAttribute::buildMediaAttributesList(remoteMediaList,
false),
MediaType::MEDIA_VIDEO)) {
if (videoMixer_)
videoMixer_->removeAudioOnlySource(call->getCallId());
if (videoMixer_ && MediaAttribute::hasMediaType(
MediaAttribute::buildMediaAttributesList(remoteMediaList, false),
MediaType::MEDIA_VIDEO)) {
auto callId = call->getCallId();
videoMixer_->removeAudioOnlySource(callId,
std::string(sip_utils::streamId(callId, 0, MediaType::MEDIA_VIDEO)));
}
#endif
@ -708,10 +703,8 @@ Conference::addParticipant(const std::string& participant_id)
// In conference, if a participant joins with an audio only
// call, it must be listed in the audioonlylist.
auto mediaList = call->getMediaAttributeList();
if (not MediaAttribute::hasMediaType(mediaList, MediaType::MEDIA_VIDEO)) {
if (videoMixer_) {
videoMixer_->addAudioOnlySource(call->getCallId());
}
if (videoMixer_ && not MediaAttribute::hasMediaType(mediaList, MediaType::MEDIA_VIDEO)) {
videoMixer_->addAudioOnlySource(call->getCallId(), sip_utils::streamId(call->getCallId(), 0, MediaType::MEDIA_AUDIO));
}
call->enterConference(shared_from_this());
// Continue the recording for the conference if one participant was recording
@ -739,14 +732,11 @@ Conference::setActiveParticipant(const std::string& participant_id)
if (!videoMixer_)
return;
if (isHost(participant_id)) {
videoMixer_->addActiveHost();
videoMixer_->setActiveStream(sip_utils::streamId("", 0, MediaType::MEDIA_VIDEO));
return;
}
if (auto call = getCallFromPeerID(participant_id)) {
if (auto videoRecv = call->getReceiveVideoFrameActiveWriter())
videoMixer_->setActiveStream(videoRecv.get());
else
videoMixer_->setActiveStream(call->getCallId());
videoMixer_->setActiveStream(sip_utils::streamId(call->getCallId(), 0, MediaType::MEDIA_VIDEO));
return;
}
@ -764,25 +754,13 @@ Conference::setActiveParticipant(const std::string& participant_id)
void
Conference::setActiveStream(const std::string& streamId, bool state)
{
// TODO BUG: for now activeStream is the callId, and should be the sink!
#ifdef ENABLE_VIDEO
if (!videoMixer_)
return;
if (state) {
// TODO remove
if (streamId.empty()) {
videoMixer_->addActiveHost();
} else if (auto call = getCall(streamId)) {
if (auto videoRecv = call->getReceiveVideoFrameActiveWriter())
videoMixer_->setActiveStream(videoRecv.get());
else
videoMixer_->setActiveStream(call->getCallId());
return;
}
// TODO videoMixer_->setActiveStream(sinkId);
} else {
if (state)
videoMixer_->setActiveStream(streamId);
else
videoMixer_->resetActiveStream();
}
#endif
}
@ -872,27 +850,6 @@ Conference::createSinks(const ConfInfo& infos)
sink),
confSinksMap_);
}
void
Conference::attachVideo(Observable<std::shared_ptr<MediaFrame>>* frame, const std::string& callId)
{
JAMI_DBG("[conf:%s] attaching video of call %s", id_.c_str(), callId.c_str());
std::lock_guard<std::mutex> lk(videoToCallMtx_);
videoToCall_.emplace(frame, callId);
frame->attach(videoMixer_.get());
}
void
Conference::detachVideo(Observable<std::shared_ptr<MediaFrame>>* frame)
{
std::lock_guard<std::mutex> lk(videoToCallMtx_);
auto it = videoToCall_.find(frame);
if (it != videoToCall_.end()) {
JAMI_DBG("[conf:%s] detaching video of call %s", id_.c_str(), it->second.c_str());
it->first->detach(videoMixer_.get());
videoToCall_.erase(it);
}
}
#endif
void
@ -911,8 +868,8 @@ Conference::removeParticipant(const std::string& participant_id)
#ifdef ENABLE_VIDEO
auto sinkId = getConfId() + peerId;
// Remove if active
// TODO if (videoMixer_->verifyActive(sinkId))
if (videoMixer_->verifyActive(participant_id))
// TODO all streams
if (videoMixer_->verifyActive(sip_utils::streamId(participant_id, 0, MediaType::MEDIA_VIDEO)))
videoMixer_->resetActiveStream();
call->exitConference();
if (call->isPeerRecording())
@ -1252,7 +1209,6 @@ Conference::setHandRaised(const std::string& deviceId, const bool& state)
if (auto* transport = call->getTransport())
callDeviceId = transport->deviceId();
if (deviceId == callDeviceId) {
JAMI_ERR() << "@@@X";
if (state and not isPeerRequiringAttention) {
JAMI_DBG("Raise %s hand", deviceId.c_str());
handsRaised_.emplace(deviceId);

View File

@ -335,8 +335,6 @@ public:
#ifdef ENABLE_VIDEO
void createSinks(const ConfInfo& infos);
void attachVideo(Observable<std::shared_ptr<MediaFrame>>* frame, const std::string& callId);
void detachVideo(Observable<std::shared_ptr<MediaFrame>>* frame);
std::shared_ptr<video::VideoMixer> getVideoMixer();
std::string getVideoInput() const { return hostVideoSource_.sourceUri_; }
#endif
@ -400,9 +398,6 @@ private:
ConfInfo confInfo_ {};
void sendConferenceInfos();
// We need to convert call to frame
std::mutex videoToCallMtx_;
std::map<Observable<std::shared_ptr<MediaFrame>>*, std::string> videoToCall_ {};
std::shared_ptr<RingBuffer> ghostRingBuffer_;
#ifdef ENABLE_VIDEO

View File

@ -158,9 +158,7 @@ ConfProtocolParser::parseV1()
mediaVal[ProtocolKeys::MUTEAUDIO].asBool());
}
if (mediaVal.isMember(ProtocolKeys::ACTIVE)) {
// TODO streamId
setActiveStream_(accountUri,
deviceId,
setActiveStream_(streamId,
mediaVal[ProtocolKeys::ACTIVE].asBool());
}
}

View File

@ -59,10 +59,7 @@ public:
{
raiseHand_ = std::move(cb);
}
/**
* @todo, replace the 2 strings per streamId
*/
void onSetActiveStream(std::function<void(const std::string&, const std::string&, bool)>&& cb)
void onSetActiveStream(std::function<void(const std::string&, bool)>&& cb)
{
setActiveStream_ = std::move(cb);
}
@ -125,7 +122,7 @@ private:
std::function<bool(std::string_view)> checkAuthorization_;
std::function<void(const std::string&, const std::string&)> hangupParticipant_;
std::function<void(const std::string&, bool)> raiseHand_;
std::function<void(const std::string&, const std::string&, bool)> setActiveStream_;
std::function<void(const std::string&, bool)> setActiveStream_;
std::function<void(const std::string&, const std::string&, const std::string&, bool)>
muteStreamAudio_;
std::function<void(const std::string&, const std::string&, const std::string&, bool)>

View File

@ -48,21 +48,21 @@
namespace jami {
AudioRtpSession::AudioRtpSession(const std::string& id)
: RtpSession(id, MediaType::MEDIA_AUDIO)
AudioRtpSession::AudioRtpSession(const std::string& callId, const std::string& streamId)
: RtpSession(callId, streamId, MediaType::MEDIA_AUDIO)
, rtcpCheckerThread_([] { return true; }, [this] { processRtcpChecker(); }, [] {})
{
JAMI_DBG("Created Audio RTP session: %p - call Id %s", this, callID_.c_str());
JAMI_DBG("Created Audio RTP session: %p - call Id %s", this, callId_.c_str());
// don't move this into the initializer list or Cthulus will emerge
ringbuffer_ = Manager::instance().getRingBufferPool().createRingBuffer(callID_);
ringbuffer_ = Manager::instance().getRingBufferPool().createRingBuffer(callId_);
}
AudioRtpSession::~AudioRtpSession()
{
stop();
JAMI_DBG("Destroyed Audio RTP session: %p - call Id %s", this, callID_.c_str());
JAMI_DBG("Destroyed Audio RTP session: %p - call Id %s", this, callId_.c_str());
}
void
@ -90,7 +90,7 @@ AudioRtpSession::startSender()
audioInput_->detach(sender_.get());
// sender sets up input correctly, we just keep a reference in case startSender is called
audioInput_ = jami::getAudioInput(callID_);
audioInput_ = jami::getAudioInput(callId_);
audioInput_->setMuted(muteState_);
audioInput_->setSuccessfulSetupCb(onSuccessfulSetup_);
auto newParams = audioInput_->switchInput(input_);
@ -117,7 +117,7 @@ AudioRtpSession::startSender()
sender_.reset();
socketPair_->stopSendOp(false);
sender_.reset(
new AudioSender(callID_, getRemoteRtpUri(), send_, *socketPair_, initSeqVal_, mtu_));
new AudioSender(getRemoteRtpUri(), send_, *socketPair_, initSeqVal_, mtu_));
} catch (const MediaEncoderException& e) {
JAMI_ERR("%s", e.what());
send_.enabled = false;
@ -160,7 +160,7 @@ AudioRtpSession::startReceiver()
JAMI_WARN("Restarting audio receiver");
auto accountAudioCodec = std::static_pointer_cast<AccountAudioCodecInfo>(receive_.codec);
receiveThread_.reset(new AudioReceiveThread(callID_,
receiveThread_.reset(new AudioReceiveThread(callId_,
accountAudioCodec->audioformat,
receive_.receiving_sdp,
mtu_));
@ -332,7 +332,7 @@ AudioRtpSession::initRecorder(std::shared_ptr<MediaRecorder>& rec)
{
if (receiveThread_)
receiveThread_->attach(rec->addStream(receiveThread_->getInfo()));
if (auto input = jami::getAudioInput(callID_))
if (auto input = jami::getAudioInput(callId_))
input->attach(rec->addStream(input->getInfo()));
}
@ -344,7 +344,7 @@ AudioRtpSession::deinitRecorder(std::shared_ptr<MediaRecorder>& rec)
receiveThread_->detach(ob);
}
}
if (auto input = jami::getAudioInput(callID_)) {
if (auto input = jami::getAudioInput(callId_)) {
if (auto ob = rec->getStream(input->getInfo().name)) {
input->detach(ob);
}

View File

@ -50,7 +50,7 @@ struct RTCPInfo
class AudioRtpSession : public RtpSession
{
public:
AudioRtpSession(const std::string& id);
AudioRtpSession(const std::string& callId, const std::string& streamId);
virtual ~AudioRtpSession();
void start(std::unique_ptr<IceSocket> rtp_sock, std::unique_ptr<IceSocket> rtcp_sock) override;

View File

@ -33,14 +33,12 @@
namespace jami {
AudioSender::AudioSender(const std::string& id,
const std::string& dest,
AudioSender::AudioSender(const std::string& dest,
const MediaDescription& args,
SocketPair& socketPair,
const uint16_t seqVal,
const uint16_t mtu)
: id_(id)
, dest_(dest)
: dest_(dest)
, args_(args)
, seqVal_(seqVal)
, mtu_(mtu)

View File

@ -37,8 +37,7 @@ class Resampler;
class AudioSender : public Observer<std::shared_ptr<MediaFrame>>
{
public:
AudioSender(const std::string& id,
const std::string& dest,
AudioSender(const std::string& dest,
const MediaDescription& args,
SocketPair& socketPair,
const uint16_t seqVal,
@ -56,7 +55,6 @@ private:
bool setup(SocketPair& socketPair);
std::string id_;
std::string dest_;
MediaDescription args_;
std::unique_ptr<MediaEncoder> audioEncoder_;

View File

@ -42,8 +42,10 @@ public:
// Media direction
enum class Direction { SEND, RECV };
RtpSession(const std::string& callID, MediaType type)
: callID_(callID)
// Note: callId is used for ring buffers and smarttools
RtpSession(const std::string& callId, const std::string& streamId, MediaType type)
: callId_(callId)
, streamId_(streamId)
, mediaType_(type)
{}
virtual ~RtpSession() {};
@ -82,9 +84,12 @@ public:
const IpAddr& getSendAddr() const { return send_.addr; };
const IpAddr& getRecvAddr() const { return receive_.addr; };
inline std::string streamId() const { return streamId_; }
protected:
std::recursive_mutex mutex_;
const std::string callID_;
const std::string callId_;
const std::string streamId_;
MediaType mediaType_;
std::unique_ptr<SocketPair> socketPair_;
std::string input_ {};

View File

@ -32,6 +32,7 @@
#ifdef RING_ACCEL
#include "accel.h"
#endif
#include "sip/sip_utils.h"
#include <cmath>
#include <unistd.h>
@ -88,8 +89,7 @@ VideoMixer::VideoMixer(const std::string& id, const std::string& localInput)
// Local video camera is the main participant
if (not localInput.empty())
videoLocal_ = getVideoInput(localInput);
if (videoLocal_)
videoLocal_->attach(this);
attachVideo(videoLocal_.get(), "", sip_utils::streamId("", 0, MediaType::MEDIA_VIDEO));
loop_.start();
nextProcess_ = std::chrono::steady_clock::now();
@ -100,16 +100,10 @@ VideoMixer::~VideoMixer()
{
stop_sink();
if (videoLocal_) {
videoLocal_->detach(this);
// prefer to release it now than after the next join
videoLocal_.reset();
}
if (videoLocalSecondary_) {
videoLocalSecondary_->detach(this);
// prefer to release it now than after the next join
videoLocalSecondary_.reset();
}
detachVideo(videoLocal_.get());
videoLocal_.reset();
detachVideo(videoLocalSecondary_.get());
videoLocalSecondary_.reset();
loop_.join();
@ -139,8 +133,7 @@ VideoMixer::switchInput(const std::string& input)
// Re-attach videoInput to mixer
videoLocal_ = getVideoInput(input);
if (videoLocal_)
videoLocal_->attach(this);
attachVideo(videoLocal_.get(), "", sip_utils::streamId("", 0, MediaType::MEDIA_VIDEO));
}
void
@ -164,9 +157,7 @@ VideoMixer::switchSecondaryInput(const std::string& input)
// Re-attach videoInput to mixer
videoLocalSecondary_ = getVideoInput(input);
if (videoLocalSecondary_) {
videoLocalSecondary_->attach(this);
}
attachVideo(videoLocalSecondary_.get(), "", sip_utils::streamId("", 1, MediaType::MEDIA_VIDEO));
}
void
@ -177,26 +168,9 @@ VideoMixer::stopInput()
}
}
void
VideoMixer::addActiveHost()
{
activeStream_ = "";
activeSource_ = videoLocalSecondary_ ? videoLocalSecondary_.get() : videoLocal_.get();
updateLayout();
}
void
VideoMixer::setActiveStream(Observable<std::shared_ptr<MediaFrame>>* ob)
{
activeStream_ = "";
activeSource_ = ob;
updateLayout();
}
void
VideoMixer::setActiveStream(const std::string& id)
{
activeSource_ = nullptr;
activeStream_ = id;
updateLayout();
}
@ -204,11 +178,43 @@ VideoMixer::setActiveStream(const std::string& id)
void
VideoMixer::updateLayout()
{
if (activeStream_ == "" && activeSource_ == nullptr)
if (activeStream_ == "")
currentLayout_ = Layout::GRID;
layoutUpdated_ += 1;
}
void
VideoMixer::attachVideo(Observable<std::shared_ptr<MediaFrame>>* frame, const std::string& callId, const std::string& streamId)
{
if (!frame) return;
JAMI_DBG("Attaching video with streamId %s", streamId.c_str());
std::lock_guard<std::mutex> lk(videoToStreamInfoMtx_);
videoToStreamInfo_.emplace(frame, StreamInfo {callId, streamId});
frame->attach(this);
}
void
VideoMixer::detachVideo(Observable<std::shared_ptr<MediaFrame>>* frame)
{
if (!frame)
return;
bool detach = false;
std::unique_lock<std::mutex> lk(videoToStreamInfoMtx_);
auto it = videoToStreamInfo_.find(frame);
if (it != videoToStreamInfo_.end()) {
JAMI_DBG("Detaching video of call %s", it->second.callId.c_str());
detach = true;
// Handle the case where the current shown source leave the conference
// Note, do not call resetActiveStream() to avoid multiple updates
if (verifyActive(it->second.streamId))
activeStream_ = {};
videoToStreamInfo_.erase(it);
}
lk.unlock();
if (detach)
frame->detach(this);
}
void
VideoMixer::attached(Observable<std::shared_ptr<MediaFrame>>* ob)
{
@ -230,9 +236,6 @@ VideoMixer::detached(Observable<std::shared_ptr<MediaFrame>>* ob)
for (const auto& x : sources_) {
if (x->source == ob) {
// Handle the case where the current shown source leave the conference
if (verifyActive(ob))
resetActiveStream();
JAMI_DBG("Remove source [%p]", x.get());
sources_.remove(x);
JAMI_DBG("Total sources: %lu", sources_.size());
@ -303,11 +306,10 @@ VideoMixer::process()
std::vector<SourceInfo> sourcesInfo;
sourcesInfo.reserve(sources_.size() + audioOnlySources_.size());
// add all audioonlysources
for (auto& id : audioOnlySources_) {
JAMI_ERR() << "@@@ " << id;
auto active = verifyActive(id);
for (auto& [callId, streamId] : audioOnlySources_) {
auto active = verifyActive(streamId);
if (currentLayout_ != Layout::ONE_BIG or active) {
sourcesInfo.emplace_back(SourceInfo {{}, 0, 0, 10, 10, false, id});
sourcesInfo.emplace_back(SourceInfo {{}, 0, 0, 10, 10, false, callId, streamId});
}
if (currentLayout_ == Layout::ONE_BIG and active)
successfullyRendered = true;
@ -318,7 +320,8 @@ VideoMixer::process()
if (!loop_.isRunning())
return;
auto activeSource = verifyActive(x->source);
auto sinfo = streamInfo(x->source);
auto activeSource = verifyActive(sinfo.streamId);
if (currentLayout_ != Layout::ONE_BIG or activeSource) {
// make rendered frame temporarily unavailable for update()
// to avoid concurrent access.
@ -385,8 +388,9 @@ VideoMixer::process()
layoutUpdated_ -= 1;
if (layoutUpdated_ == 0) {
for (auto& x : sources_) {
auto sinfo = streamInfo(x->source);
sourcesInfo.emplace_back(
SourceInfo {x->source, x->x, x->y, x->w, x->h, x->hasVideo, {}});
SourceInfo {x->source, x->x, x->y, x->w, x->h, x->hasVideo, sinfo.callId, sinfo.streamId});
}
if (onSourcesUpdated_)
onSourcesUpdated_(std::move(sourcesInfo));

View File

@ -37,6 +37,11 @@ namespace video {
class SinkClient;
struct StreamInfo {
std::string callId;
std::string streamId;
};
struct SourceInfo
{
Observable<std::shared_ptr<MediaFrame>>* source;
@ -45,7 +50,8 @@ struct SourceInfo
int w;
int h;
bool hasVideo;
std::string id;
std::string callId;
std::string streamId;
};
using OnSourcesUpdatedCb = std::function<void(std::vector<SourceInfo>&&)>;
@ -73,27 +79,18 @@ public:
void switchSecondaryInput(const std::string& input);
void stopInput();
void setActiveStream(Observable<std::shared_ptr<MediaFrame>>* ob);
void setActiveStream(const std::string& id);
void resetActiveStream()
{
activeStream_ = {};
activeSource_ = {};
updateLayout();
}
void addActiveHost();
// TODO group, we can only use a set of string to identify actives
bool verifyActive(const std::string& id)
{
return activeStream_ == id;
}
bool verifyActive(Observable<std::shared_ptr<MediaFrame>>* ob)
{
return activeSource_ == ob;
}
void setVideoLayout(Layout newLayout)
{
currentLayout_ = newLayout;
@ -114,18 +111,33 @@ public:
std::shared_ptr<SinkClient>& getSink() { return sink_; }
void addAudioOnlySource(const std::string& id)
void addAudioOnlySource(const std::string& callId, const std::string& streamId)
{
std::lock_guard<std::mutex> lk(audioOnlySourcesMtx_);
audioOnlySources_.insert(id);
std::unique_lock<std::mutex> lk(audioOnlySourcesMtx_);
audioOnlySources_.insert({callId, streamId});
lk.unlock();
updateLayout();
}
void removeAudioOnlySource(const std::string& id)
void removeAudioOnlySource(const std::string& callId, const std::string& streamId)
{
std::lock_guard<std::mutex> lk(audioOnlySourcesMtx_);
if (audioOnlySources_.erase(id))
std::unique_lock<std::mutex> lk(audioOnlySourcesMtx_);
if (audioOnlySources_.erase({callId, streamId})) {
lk.unlock();
updateLayout();
}
}
void attachVideo(Observable<std::shared_ptr<MediaFrame>>* frame, const std::string& callId, const std::string& streamId);
void detachVideo(Observable<std::shared_ptr<MediaFrame>>* frame);
StreamInfo streamInfo(Observable<std::shared_ptr<MediaFrame>>* frame) const
{
std::lock_guard<std::mutex> lk(videoToStreamInfoMtx_);
auto it = videoToStreamInfo_.find(frame);
if (it == videoToStreamInfo_.end())
return {};
return it->second;
}
private:
@ -162,11 +174,14 @@ private:
ThreadLoop loop_; // as to be last member
Layout currentLayout_ {Layout::GRID};
Observable<std::shared_ptr<MediaFrame>>* activeSource_ {nullptr};
std::list<std::unique_ptr<VideoMixerSource>> sources_;
// We need to convert call to frame
mutable std::mutex videoToStreamInfoMtx_ {};
std::map<Observable<std::shared_ptr<MediaFrame>>*, StreamInfo> videoToStreamInfo_ {};
std::mutex audioOnlySourcesMtx_;
std::set<std::string> audioOnlySources_;
std::set<std::pair<std::string, std::string>> audioOnlySources_;
std::string activeStream_ {};
std::atomic_int layoutUpdated_ {0};

View File

@ -59,15 +59,17 @@ constexpr auto EXPIRY_TIME_RTCP = std::chrono::seconds(2);
constexpr auto DELAY_AFTER_REMB_INC = std::chrono::seconds(1);
constexpr auto DELAY_AFTER_REMB_DEC = std::chrono::milliseconds(500);
VideoRtpSession::VideoRtpSession(const string& callID, const DeviceParams& localVideoParams)
: RtpSession(callID, MediaType::MEDIA_VIDEO)
VideoRtpSession::VideoRtpSession(const string& callId,
const string& streamId,
const DeviceParams& localVideoParams)
: RtpSession(callId, streamId, MediaType::MEDIA_VIDEO)
, localVideoParams_(localVideoParams)
, videoBitrateInfo_ {}
, rtcpCheckerThread_([] { return true; }, [this] { processRtcpChecker(); }, [] {})
{
setupVideoBitrateInfo(); // reset bitrate
cc = std::make_unique<CongestionControl>();
JAMI_DBG("[%p] Video RTP session created for call %s", this, callID_.c_str());
JAMI_DBG("[%p] Video RTP session created for call %s", this, callId_.c_str());
}
VideoRtpSession::~VideoRtpSession()
@ -247,7 +249,7 @@ VideoRtpSession::startReceiver()
if (receiveThread_)
JAMI_WARN("[%p] Already has a receiver, restarting", this);
receiveThread_.reset(
new VideoReceiveThread(callID_, !conference_, receive_.receiving_sdp, mtu_));
new VideoReceiveThread(callId_, !conference_, receive_.receiving_sdp, mtu_));
// XXX keyframe requests can timeout if unanswered
receiveThread_->addIOContext(*socketPair_);
@ -255,23 +257,27 @@ VideoRtpSession::startReceiver()
receiveThread_->startLoop();
receiveThread_->setRequestKeyFrameCallback([this]() { cbKeyFrameRequest_(); });
receiveThread_->setRotation(rotation_.load());
if (videoMixer_) {
auto activeParticipant = videoMixer_->verifyActive(receiveThread_.get())
|| videoMixer_->verifyActive(callID_);
videoMixer_->removeAudioOnlySource(callID_);
if (activeParticipant)
videoMixer_->setActiveStream(receiveThread_.get());
if (videoMixer_ and conference_) {
// Note, this should be managed differently, this is a bit hacky
auto audioId = streamId_;
string_replace(audioId, "video", "audio");
auto activeStream = videoMixer_->verifyActive(audioId);
videoMixer_->removeAudioOnlySource(callId_, audioId);
if (activeStream)
videoMixer_->setActiveStream(streamId_);
}
} else {
JAMI_DBG("[%p] Video receiver disabled", this);
if (receiveThread_ and videoMixer_) {
auto activeParticipant = videoMixer_->verifyActive(receiveThread_.get())
|| videoMixer_->verifyActive(callID_);
videoMixer_->addAudioOnlySource(callID_);
if (receiveThread_ and videoMixer_ and conference_) {
// Note, this should be managed differently, this is a bit hacky
auto audioId_ = streamId_;
string_replace(audioId_, "video", "audio");
auto activeStream = videoMixer_->verifyActive(streamId_);
videoMixer_->addAudioOnlySource(callId_, audioId_);
receiveThread_->detach(videoMixer_.get());
if (activeParticipant)
videoMixer_->setActiveStream(callID_);
if (activeStream)
videoMixer_->setActiveStream(audioId_);
}
}
if (socketPair_)
@ -289,11 +295,13 @@ VideoRtpSession::stopReceiver()
return;
if (videoMixer_) {
auto activeParticipant = videoMixer_->verifyActive(receiveThread_.get()) || videoMixer_->verifyActive(callID_);
videoMixer_->addAudioOnlySource(callID_);
auto activeStream = videoMixer_->verifyActive(streamId_);
auto audioId = streamId_;
string_replace(audioId, "video", "audio");
videoMixer_->addAudioOnlySource(callId_, audioId);
receiveThread_->detach(videoMixer_.get());
if (activeParticipant)
videoMixer_->setActiveStream(callID_);
if (activeStream)
videoMixer_->setActiveStream(audioId);
}
// We need to disable the read operation, otherwise the
@ -463,7 +471,7 @@ VideoRtpSession::setupConferenceVideoPipeline(Conference& conference, Direction
JAMI_DBG("[%p] Setup video sender pipeline on conference %s for call %s",
this,
conference.getConfId().c_str(),
callID_.c_str());
callId_.c_str());
videoMixer_ = conference.getVideoMixer();
if (sender_) {
// Swap sender from local video to conference video mixer
@ -478,10 +486,11 @@ VideoRtpSession::setupConferenceVideoPipeline(Conference& conference, Direction
JAMI_DBG("[%p] Setup video receiver pipeline on conference %s for call %s",
this,
conference.getConfId().c_str(),
callID_.c_str());
callId_.c_str());
if (receiveThread_) {
receiveThread_->stopSink();
conference.attachVideo(receiveThread_.get(), callID_);
if (videoMixer_)
videoMixer_->attachVideo(receiveThread_.get(), callId_, streamId_);
} else {
JAMI_WARN("[%p] no receiver", this);
}
@ -545,11 +554,11 @@ VideoRtpSession::exitConference()
videoMixer_->detach(sender_.get());
if (receiveThread_) {
auto activetParticipant = videoMixer_->verifyActive(receiveThread_.get());
conference_->detachVideo(receiveThread_.get());
auto activeStream = videoMixer_->verifyActive(streamId_);
videoMixer_->detachVideo(receiveThread_.get());
receiveThread_->startSink();
if (activetParticipant)
videoMixer_->setActiveStream(callID_);
if (activeStream)
videoMixer_->setActiveStream(streamId_);
}
videoMixer_.reset();

View File

@ -70,7 +70,7 @@ class VideoRtpSession : public RtpSession
public:
using BaseType = RtpSession;
VideoRtpSession(const std::string& callID, const DeviceParams& localVideoParams);
VideoRtpSession(const std::string& callId, const std::string& streamId, const DeviceParams& localVideoParams);
~VideoRtpSession();
void setRequestKeyFrameCallback(std::function<void(void)> cb);

View File

@ -293,6 +293,14 @@ sip_strerror(pj_status_t code)
return std::string {ret.ptr, ret.ptr + ret.slen};
}
std::string
streamId(const std::string& callId, uint32_t idx, MediaType mt)
{
if (callId.empty())
return fmt::format("host_{}_{}", (mt == MediaType::MEDIA_VIDEO ? "video" : "audio"), idx);
return fmt::format("{}_{}_{}", callId, (mt == MediaType::MEDIA_VIDEO ? "video" : "audio"), idx);
}
void
sockaddr_to_host_port(pj_pool_t* pool, pjsip_host_port* host_port, const pj_sockaddr* addr)
{

View File

@ -143,6 +143,9 @@ as_view(const pj_str_t& str) noexcept
return {str.ptr, (size_t) str.slen};
}
std::string
streamId(const std::string& callId, uint32_t idx, MediaType mt);
// PJSIP dialog locking in RAII way
// Usage: declare local variable like this: sip_utils::PJDialogLock lock {dialog};
// The lock is kept until the local variable is deleted

View File

@ -175,12 +175,28 @@ SIPCall::createRtpSession(RtpStream& stream)
if (not stream.mediaAttribute_)
throw std::runtime_error("Missing media attribute");
// Find idx of this stream as we can share several audio/videos
auto streamIdx = [&]() {
auto idx = 0;
for (const auto& st : rtpStreams_) {
if (st.mediaAttribute_->label_ == stream.mediaAttribute_->label_)
return idx;
if (st.mediaAttribute_->type_ == stream.mediaAttribute_->type_)
idx++;
}
return -1;
};
// To get audio_0 ; video_0
auto idx = streamIdx();
auto streamId = sip_utils::streamId(id_, idx, stream.mediaAttribute_->type_);
if (stream.mediaAttribute_->type_ == MediaType::MEDIA_AUDIO) {
stream.rtpSession_ = std::make_shared<AudioRtpSession>(id_);
stream.rtpSession_ = std::make_shared<AudioRtpSession>(id_,
streamId);
}
#ifdef ENABLE_VIDEO
else if (stream.mediaAttribute_->type_ == MediaType::MEDIA_VIDEO) {
stream.rtpSession_ = std::make_shared<video::VideoRtpSession>(id_, getVideoSettings());
stream.rtpSession_ = std::make_shared<video::VideoRtpSession>(id_, streamId, getVideoSettings());
std::static_pointer_cast<video::VideoRtpSession>(stream.rtpSession_)->setRotation(rotation_);
}
#endif
@ -1044,11 +1060,7 @@ SIPCall::hangup(int reason)
// Stop all RTP streams
stopAllMedia();
if (auto conf = getConference()) {
if (auto mixer = conf->getVideoMixer()) {
mixer->removeAudioOnlySource(getCallId());
}
}
detachAudioFromConference();
setState(Call::ConnectionState::DISCONNECTED, reason);
dht::ThreadPool::io().run([w = weak()] {
if (auto shared = w.lock())
@ -1056,6 +1068,20 @@ SIPCall::hangup(int reason)
});
}
void
SIPCall::detachAudioFromConference()
{
if (auto conf = getConference()) {
if (auto mixer = conf->getVideoMixer()) {
for (const auto& stream : rtpStreams_) {
if (stream.mediaAttribute_->type_ == MediaType::MEDIA_AUDIO) {
mixer->removeAudioOnlySource(getCallId(), stream.rtpSession_->streamId());
}
}
}
}
}
void
SIPCall::refuse()
{
@ -1404,12 +1430,7 @@ SIPCall::peerHungup()
if (inviteSession_)
terminateSipSession(PJSIP_SC_NOT_FOUND);
if (auto conf = getConference()) {
if (auto mixer = conf->getVideoMixer()) {
mixer->removeAudioOnlySource(getCallId());
}
}
detachAudioFromConference();
Call::peerHungup();
}

View File

@ -485,6 +485,7 @@ private:
{"v:remote", false}};
void resetMediaReady();
void detachAudioFromConference();
std::mutex setupSuccessMutex_;
#ifdef ENABLE_VIDEO