diff --git a/MSVC/ring-daemon.vcxproj b/MSVC/ring-daemon.vcxproj
index e29eb765f..89c807d78 100644
--- a/MSVC/ring-daemon.vcxproj
+++ b/MSVC/ring-daemon.vcxproj
@@ -830,7 +830,6 @@
-
@@ -993,7 +992,6 @@
-
diff --git a/src/Makefile.am b/src/Makefile.am
index 73cca67a2..0076e0880 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -101,14 +101,12 @@ libring_la_SOURCES = \
fileutils.cpp \
archiver.cpp \
threadloop.cpp \
- thread_pool.cpp \
ip_utils.h \
ip_utils.cpp \
utf8_utils.cpp \
ice_transport.cpp \
ice_transport.h \
threadloop.h \
- thread_pool.h \
conference.h \
account_factory.h \
call_factory.h \
diff --git a/src/im/message_engine.cpp b/src/im/message_engine.cpp
index c0cfea9af..5301e34e9 100644
--- a/src/im/message_engine.cpp
+++ b/src/im/message_engine.cpp
@@ -20,12 +20,12 @@
#include "message_engine.h"
#include "sip/sipaccountbase.h"
#include "manager.h"
-#include "thread_pool.h"
#include "fileutils.h"
#include "client/ring_signal.h"
#include "dring/account_const.h"
+#include
#include
#include
@@ -252,7 +252,7 @@ MessageEngine::save_() const
root[c.first] = std::move(peerRoot);
}
// Save asynchronously
- ThreadPool::instance().run([path = savePath_,
+ dht::ThreadPool::computation().run([path = savePath_,
root = std::move(root),
accountID = account_.getAccountID(),
messageNum = messages_.size()]
diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp
index 1c695b24c..24b87a70f 100644
--- a/src/jamidht/jamiaccount.cpp
+++ b/src/jamidht/jamiaccount.cpp
@@ -32,7 +32,6 @@
#include "accountarchive.h"
#include "ringcontact.h"
#include "configkeys.h"
-#include "thread_pool.h"
#include "sip/sdp.h"
#include "sip/sipvoiplink.h"
@@ -69,6 +68,7 @@
#include "libdevcrypto/Common.h"
#include "base64.h"
+#include
#include
#include
@@ -1064,7 +1064,7 @@ generatePIN(size_t length = 8)
void
JamiAccount::addDevice(const std::string& password)
{
- ThreadPool::instance().run([this_=shared(), password]() {
+ dht::ThreadPool::computation().run([this_=shared(), password]() {
std::vector key;
dht::InfoHash loc;
std::string pin_str;
@@ -1137,7 +1137,7 @@ bool
JamiAccount::revokeDevice(const std::string& password, const std::string& device)
{
// shared_ptr of future
- auto fa = ThreadPool::instance().getShared(
+ auto fa = dht::ThreadPool::computation().getShared(
[this, password] { return readArchive(password); });
findCertificate(dht::InfoHash(device),
[this,fa=std::move(fa),password,device](const std::shared_ptr& crt) mutable
@@ -1199,7 +1199,7 @@ JamiAccount::loadAccountFromFile(const std::string& archive_path, const std::str
{
setRegistrationState(RegistrationState::INITIALIZING);
auto accountId = getAccountID();
- ThreadPool::instance().run([w=weak(), archive_password, archive_path, accountId]{
+ dht::ThreadPool::computation().run([w=weak(), archive_password, archive_path, accountId]{
AccountArchive archive;
try {
archive = AccountArchive(archive_path, archive_password);
@@ -1304,8 +1304,8 @@ JamiAccount::loadAccountFromDHT(const std::string& archive_password, const std::
}
};
- ThreadPool::instance().run(std::bind(search, true, state_old));
- ThreadPool::instance().run(std::bind(search, false, state_new));
+ dht::ThreadPool::computation().run(std::bind(search, true, state_old));
+ dht::ThreadPool::computation().run(std::bind(search, false, state_new));
}
void
@@ -1313,11 +1313,11 @@ JamiAccount::createAccount(const std::string& archive_password, dht::crypto::Ide
{
JAMI_WARN("[Account %s] creating new account", getAccountID().c_str());
setRegistrationState(RegistrationState::INITIALIZING);
- ThreadPool::instance().run([sthis=shared(), archive_password, migrate]() mutable {
+ dht::ThreadPool::computation().run([sthis=shared(), archive_password, migrate]() mutable {
AccountArchive a;
auto& this_ = *sthis;
- auto future_keypair = ThreadPool::instance().get(std::bind(&dev::KeyPair::create));
+ auto future_keypair = dht::ThreadPool::computation().get(std::bind(&dev::KeyPair::create));
try {
if (migrate.first and migrate.second) {
JAMI_WARN("[Account %s] converting certificate from old ring account %s",
@@ -2733,7 +2733,7 @@ JamiAccount::loadTreatedMessages()
void
JamiAccount::saveTreatedMessages() const
{
- ThreadPool::instance().run([w = weak()](){
+ dht::ThreadPool::io().run([w = weak()](){
if (auto sthis = w.lock()) {
auto& this_ = *sthis;
std::lock_guard lock(this_.messageMutex_);
@@ -2963,7 +2963,7 @@ JamiAccount::generateDhParams()
{
//make sure cachePath_ is writable
fileutils::check_dir(cachePath_.c_str(), 0700);
- dhParams_ = ThreadPool::instance().get(std::bind(loadDhParams, cachePath_ + DIR_SEPARATOR_STR "dhParams"));
+ dhParams_ = dht::ThreadPool::computation().get(std::bind(loadDhParams, cachePath_ + DIR_SEPARATOR_STR "dhParams"));
}
MatchRank
@@ -3353,17 +3353,19 @@ JamiAccount::igdChanged()
if (not dht_.isRunning())
return;
if (upnp_) {
- std::thread{[s = shared(), oldPort = static_cast(dhtPortUsed_)] {
- auto& this_ = *s;
- if (not this_.mapPortUPnP())
- JAMI_WARN("UPnP: Could not map DHT port");
- auto newPort = static_cast(this_.dhtPortUsed_);
- if (oldPort != newPort) {
- JAMI_WARN("DHT port changed: restarting network");
- this_.doRegister_();
- } else
- this_.dht_.connectivityChanged();
- }}.detach();
+ dht::ThreadPool::io().run([w = weak(), oldPort = static_cast(dhtPortUsed_)] {
+ if (auto s = w.lock()) {
+ auto& this_ = *s;
+ if (not this_.mapPortUPnP())
+ JAMI_WARN("UPnP: Could not map DHT port");
+ auto newPort = static_cast(this_.dhtPortUsed_);
+ if (oldPort != newPort) {
+ JAMI_WARN("DHT port changed: restarting network");
+ this_.doRegister_();
+ } else
+ this_.dht_.connectivityChanged();
+ }
+ });
} else
dht_.connectivityChanged();
}
diff --git a/src/jamidht/namedirectory.cpp b/src/jamidht/namedirectory.cpp
index 5344e0bd1..e392e4695 100644
--- a/src/jamidht/namedirectory.cpp
+++ b/src/jamidht/namedirectory.cpp
@@ -20,10 +20,10 @@
#include "logger.h"
#include "string_utils.h"
-#include "thread_pool.h"
#include "fileutils.h"
#include "base64.h"
+#include
#include
#include
#include
@@ -166,7 +166,7 @@ void NameDirectory::lookupAddress(const std::string& addr, LookupCallback cb)
}).share();
// avoid blocking on future destruction
- ThreadPool::instance().run([ret](){ ret.get(); });
+ dht::ThreadPool::io().run([ret](){ ret.get(); });
} catch (const std::exception& e) {
JAMI_ERR("Error when performing address lookup: %s", e.what());
cb("", Response::error);
@@ -265,7 +265,7 @@ void NameDirectory::lookupName(const std::string& n, LookupCallback cb)
}).share();
// avoid blocking on future destruction
- ThreadPool::instance().run([ret](){ ret.get(); });
+ dht::ThreadPool::io().run([ret](){ ret.get(); });
} catch (const std::exception& e) {
JAMI_ERR("Error when performing name lookup: %s", e.what());
cb("", Response::error);
@@ -367,8 +367,8 @@ void NameDirectory::registerName(const std::string& addr, const std::string& n,
}
}, params).share();
- // avoid blocking on future destruction
- ThreadPool::instance().run([ret](){ ret.get(); });
+ // avoid blocking on future destruction
+ dht::ThreadPool::io().run([ret](){ ret.get(); });
} catch (const std::exception& e) {
JAMI_ERR("Error when performing name registration: %s", e.what());
cb(RegistrationResponse::error);
diff --git a/src/manager.cpp b/src/manager.cpp
index 6f5a1c620..5d9cb51ea 100644
--- a/src/manager.cpp
+++ b/src/manager.cpp
@@ -34,7 +34,6 @@
#include "logger.h"
#include "account_schema.h"
-#include "thread_pool.h"
#include "fileutils.h"
#include "map_utils.h"
@@ -81,6 +80,8 @@ using random_device = dht::crypto::random_device;
#include "data_transfer.h"
+#include
+
#ifndef WIN32
#include
#include
@@ -823,9 +824,10 @@ Manager::finish() noexcept
// Flush remaining tasks (free lambda' with capture)
pimpl_->scheduler_.stop();
+ dht::ThreadPool::io().join();
+ dht::ThreadPool::computation().join();
pj_shutdown();
- ThreadPool::instance().join();
} catch (const VoipLinkException &err) {
JAMI_ERR("%s", err.what());
}
@@ -2835,7 +2837,7 @@ Manager::loadAccountMap(const YAML::Node& node)
continue;
}
remaining++;
- ThreadPool::instance().run([
+ dht::ThreadPool::computation().run([
this, dir,
&cv, &remaining, &lock,
configFile = accountBaseDir + DIR_SEPARATOR_STR + dir + DIR_SEPARATOR_STR + "config.yml"
diff --git a/src/media/media_recorder.cpp b/src/media/media_recorder.cpp
index 72627eac6..f275fb343 100644
--- a/src/media/media_recorder.cpp
+++ b/src/media/media_recorder.cpp
@@ -25,12 +25,13 @@
#include "media_io_handle.h"
#include "media_recorder.h"
#include "system_codec_container.h"
-#include "thread_pool.h"
#include "video/filter_transpose.h"
#ifdef RING_ACCEL
#include "video/accel.h"
#endif
+#include
+
#include
#include
#include
@@ -153,7 +154,7 @@ MediaRecorder::startRecording()
if (initRecord() >= 0) {
isRecording_ = true;
// start thread after isRecording_ is set to true
- ThreadPool::instance().run([rec = shared_from_this()] {
+ dht::ThreadPool::computation().run([rec = shared_from_this()] {
while (rec->isRecording()) {
rec->filterAndEncode(rec->videoFilter_.get(), rec->videoIdx_);
rec->filterAndEncode(rec->audioFilter_.get(), rec->audioIdx_);
diff --git a/src/security/certstore.cpp b/src/security/certstore.cpp
index 291e6afd2..05d5efbf6 100644
--- a/src/security/certstore.cpp
+++ b/src/security/certstore.cpp
@@ -22,10 +22,11 @@
#include "client/ring_signal.h"
-#include "thread_pool.h"
#include "fileutils.h"
#include "logger.h"
+#include
+
#include
#include
@@ -202,7 +203,7 @@ readCertificates(const std::string& path, const std::string& crl_path)
void
CertificateStore::pinCertificatePath(const std::string& path, std::function&)> cb)
{
- ThreadPool::instance().run([&, path, cb]() {
+ dht::ThreadPool::computation().run([&, path, cb]() {
auto certs = readCertificates(path, crlPath_);
std::vector ids;
std::vector> scerts;
diff --git a/src/sip/sipcall.cpp b/src/sip/sipcall.cpp
index b2bdb8b38..7a24820a8 100644
--- a/src/sip/sipcall.cpp
+++ b/src/sip/sipcall.cpp
@@ -40,7 +40,6 @@
#include "dring/media_const.h"
#include "client/ring_signal.h"
#include "ice_transport.h"
-#include "thread_pool.h"
#ifdef ENABLE_VIDEO
#include "client/videomanager.h"
@@ -52,6 +51,7 @@
#include "errno.h"
#include
+#include
namespace jami {
@@ -800,7 +800,7 @@ void
SIPCall::sendKeyframe()
{
#ifdef ENABLE_VIDEO
- ThreadPool::instance().run([w = weak()] {
+ dht::ThreadPool::computation().run([w = weak()] {
if (auto sthis = w.lock()) {
JAMI_DBG("handling picture fast update request");
sthis->getVideoRtp().forceKeyFrame();
diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp
deleted file mode 100644
index ead9435c6..000000000
--- a/src/thread_pool.cpp
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Copyright (C) 2016-2019 Savoir-faire Linux Inc.
- *
- * Author: Adrien Béraud
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-#include "thread_pool.h"
-#include "logger.h"
-
-#include
-#include
-
-#include // fix windows compiler bug
-
-namespace jami {
-
-struct ThreadPool::ThreadState
-{
- std::thread thread {};
- std::atomic_bool run {true};
-};
-
-ThreadPool::ThreadPool()
- : maxThreads_(std::max(std::thread::hardware_concurrency(), 4))
-{
- threads_.reserve(maxThreads_);
-}
-
-ThreadPool::~ThreadPool()
-{
- join();
-}
-
-void
-ThreadPool::run(std::function&& cb)
-{
- std::unique_lock l(lock_);
-
- // launch new thread if necessary
- if (not readyThreads_ && threads_.size() < maxThreads_) {
- threads_.emplace_back(new ThreadState());
- auto& t = *threads_.back();
- t.thread = std::thread([&]() {
- while (t.run) {
- std::function task;
-
- // pick task from queue
- {
- std::unique_lock l(lock_);
- readyThreads_++;
- cv_.wait(l, [&](){
- return not t.run or not tasks_.empty();
- });
- readyThreads_--;
- if (not t.run)
- break;
- task = std::move(tasks_.front());
- tasks_.pop();
- }
-
- // run task
- try {
- if (task)
- task();
- } catch (const std::exception& e) {
- JAMI_ERR("Exception running task: %s", e.what());
- }
- }
- });
- }
-
- // push task to queue
- tasks_.emplace(std::move(cb));
-
- // notify thread
- l.unlock();
- cv_.notify_one();
-}
-
-void
-ThreadPool::join()
-{
- for (auto& t : threads_)
- t->run = false;
- cv_.notify_all();
- for (auto& t : threads_)
- t->thread.join();
- threads_.clear();
-}
-
-}
diff --git a/src/thread_pool.h b/src/thread_pool.h
deleted file mode 100644
index 701231212..000000000
--- a/src/thread_pool.h
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright (C) 2016-2019 Savoir-faire Linux Inc.
- *
- * Author: Adrien Béraud
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-#pragma once
-
-#include
-#include
-#include
-#include
-#include
-
-namespace jami {
-
-class ThreadPool {
-public:
- static ThreadPool& instance() {
- static ThreadPool pool;
- return pool;
- }
-
- ThreadPool();
- ~ThreadPool();
-
- void run(std::function&& cb);
-
- template
- std::future get(std::function&& cb) {
- auto ret = std::make_shared>();
- run(std::bind([=](std::function& mcb) mutable {
- ret->set_value(mcb());
- }, std::move(cb)));
- return ret->get_future();
- }
- template
- std::shared_ptr> getShared(std::function&& cb) {
- return std::make_shared>(get(std::move(cb)));
- }
-
- void join();
-
-private:
- struct ThreadState;
- std::queue> tasks_ {};
- std::vector> threads_;
- unsigned readyThreads_ {0};
- std::mutex lock_ {};
- std::condition_variable cv_ {};
-
- const unsigned maxThreads_;
-};
-
-}