scheduled_executor: Add tracepoint

For complete execution context, the executor has now a name associated with it
and filename/linum must be passed along with the jobs.

Change-Id: I91e872d6c0710dc3c90352974daba5d559d930c9
This commit is contained in:
Olivier Dion
2021-12-10 13:06:25 -05:00
committed by Sébastien Blin
parent 8b761ef35a
commit 1f1e6014b4
5 changed files with 142 additions and 41 deletions

View File

@ -366,7 +366,7 @@ struct Manager::ManagerPimpl
std::thread ioContextRunner_;
/** Main scheduler */
ScheduledExecutor scheduler_;
ScheduledExecutor scheduler_ {"manager"};
std::atomic_bool autoAnswer_ {false};

View File

@ -22,8 +22,11 @@
namespace jami {
ScheduledExecutor::ScheduledExecutor()
: running_(std::make_shared<std::atomic<bool>>(true))
std::atomic<uint64_t> task_cookie = {0};
ScheduledExecutor::ScheduledExecutor(const std::string& name)
: name_(name)
, running_(std::make_shared<std::atomic<bool>>(true))
, thread_([this, is_running = running_] {
// The thread needs its own reference of `running_` in case the
// scheduler is destroyed within the thread because of a job
@ -61,34 +64,40 @@ ScheduledExecutor::stop()
}
void
ScheduledExecutor::run(Job&& job)
ScheduledExecutor::run(std::function<void()>&& job,
const char* filename, uint32_t linum)
{
{
std::lock_guard<std::mutex> lock(jobLock_);
auto now = clock::now();
jobs_[now].emplace_back(std::move(job));
jobs_[now].emplace_back(std::move(job), filename, linum);
}
cv_.notify_all();
}
std::shared_ptr<Task>
ScheduledExecutor::schedule(Job&& job, time_point t)
ScheduledExecutor::schedule(std::function<void()>&& job, time_point t,
const char* filename, uint32_t linum)
{
auto ret = std::make_shared<Task>(std::move(job));
auto ret = std::make_shared<Task>(std::move(job), filename, linum);
schedule(ret, t);
return ret;
}
std::shared_ptr<Task>
ScheduledExecutor::scheduleIn(Job&& job, duration dt)
ScheduledExecutor::scheduleIn(std::function<void()>&& job, duration dt,
const char* filename, uint32_t linum)
{
return schedule(std::move(job), clock::now() + dt);
return schedule(std::move(job), clock::now() + dt,
filename, linum);
}
std::shared_ptr<RepeatedTask>
ScheduledExecutor::scheduleAtFixedRate(RepeatedJob&& job, duration dt)
ScheduledExecutor::scheduleAtFixedRate(std::function<bool()>&& job,
duration dt,
const char* filename, uint32_t linum)
{
auto ret = std::make_shared<RepeatedTask>(std::move(job));
auto ret = std::make_shared<RepeatedTask>(std::move(job), filename, linum);
reschedule(ret, clock::now(), dt);
return ret;
}
@ -97,9 +106,9 @@ void
ScheduledExecutor::reschedule(std::shared_ptr<RepeatedTask> task, time_point t, duration dt)
{
schedule(std::make_shared<Task>([this, task = std::move(task), t, dt]() mutable {
if (task->run())
if (task->run(name_.c_str()))
reschedule(std::move(task), t + dt, dt);
}),
}, task->job().filename, task->job().linum),
t);
}
@ -108,7 +117,8 @@ ScheduledExecutor::schedule(std::shared_ptr<Task> task, time_point t)
{
{
std::lock_guard<std::mutex> lock(jobLock_);
jobs_[t].emplace_back([task = std::move(task)] { task->run(); });
jobs_[t].emplace_back([task = std::move(task), this] { task->run(name_.c_str()); },
task->job().filename, task->job().linum);
}
cv_.notify_all();
}
@ -134,7 +144,7 @@ ScheduledExecutor::loop()
}
for (auto& job : jobs) {
try {
job();
job.fn();
} catch (const std::exception& e) {
JAMI_ERR("Exception running job: %s", e.what());
}

View File

@ -32,13 +32,53 @@
#include "noncopyable.h"
#include "tracepoint.h"
#include "trace-tools.h"
namespace jami {
extern std::atomic<uint64_t> task_cookie;
/**
* A runnable function
*/
using Job = std::function<void()>;
using RepeatedJob = std::function<bool()>;
struct Job {
Job(std::function<void()>&& f, const char* file, uint32_t l)
: fn(std::move(f))
, filename(file)
, linum(l) { }
std::function<void()> fn;
const char* filename;
uint32_t linum;
inline operator bool() const {
return static_cast<bool>(fn);
}
void reset() {
fn = {};
}
};
struct RepeatedJob {
RepeatedJob(std::function<bool()>&& f, const char* file, uint32_t l)
: fn(std::move(f))
, filename(file)
, linum(l) { }
std::function<bool()> fn;
const char* filename;
uint32_t linum;
inline operator bool() {
return static_cast<bool>(fn);
}
void reset() {
fn = {};
}
};
/**
* A Job that can be disposed
@ -46,19 +86,34 @@ using RepeatedJob = std::function<bool()>;
class Task
{
public:
Task(Job&& j)
: job_(std::move(j))
{}
void run()
Task(std::function<void()>&& fn, const char* filename, uint32_t linum)
: job_(std::move(fn), filename, linum)
, cookie_(task_cookie++) { }
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
void run(const char* executor_name)
{
if (job_)
job_();
if (job_.fn) {
jami_tracepoint(scheduled_executor_task_begin,
executor_name,
job_.filename, job_.linum,
cookie_);
job_.fn();
jami_tracepoint(scheduled_executor_task_end,
cookie_);
}
}
void cancel() { job_ = {}; }
#pragma GCC pop
void cancel() { job_.reset(); }
bool isCancelled() const { return !job_; }
Job& job() { return job_; }
private:
Job job_;
uint64_t cookie_;
};
/**
@ -67,32 +122,58 @@ private:
class RepeatedTask
{
public:
RepeatedTask(RepeatedJob&& j)
: job_(std::move(j))
{}
bool run()
RepeatedTask(std::function<bool()>&& fn, const char* filename,
uint32_t linum)
: job_(std::move(fn), filename, linum)
, cookie_(task_cookie++) { }
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
bool run(const char* executor_name)
{
bool cont;
std::lock_guard<std::mutex> l(lock_);
if (cancel_.load() or (job_ and not job_())) {
cancel_.store(true);
job_ = {};
if (not cancel_.load() and job_.fn) {
jami_tracepoint(scheduled_executor_task_begin,
executor_name,
job_.filename, job_.linum,
cookie_);
cont = job_.fn();
jami_tracepoint(scheduled_executor_task_end,
cookie_);
} else {
cont = false;
}
return (bool) job_;
if (not cont) {
job_.reset();
}
return static_cast<bool>(job_);
}
#pragma GCC pop
void cancel() { cancel_.store(true); }
void destroy()
{
cancel();
std::lock_guard<std::mutex> l(lock_);
job_ = {};
job_.reset();
}
bool isCancelled() const { return cancel_.load(); }
RepeatedJob& job() { return job_; }
private:
NON_COPYABLE(RepeatedTask);
mutable std::mutex lock_;
RepeatedJob job_;
mutable std::mutex lock_;
std::atomic_bool cancel_ {false};
uint64_t cookie_;
};
class ScheduledExecutor
@ -102,28 +183,37 @@ public:
using time_point = clock::time_point;
using duration = clock::duration;
ScheduledExecutor();
ScheduledExecutor(const std::string& name_);
~ScheduledExecutor();
/**
* Schedule job to be run ASAP
*/
void run(Job&& job);
void run(std::function<void()>&& job,
const char* filename=CURRENT_FILENAME(),
uint32_t linum=CURRENT_LINE());
/**
* Schedule job to be run at time t
*/
std::shared_ptr<Task> schedule(Job&& job, time_point t);
std::shared_ptr<Task> schedule(std::function<void()>&& job, time_point t,
const char* filename=CURRENT_FILENAME(),
uint32_t linum=CURRENT_LINE());
/**
* Schedule job to be run after delay dt
*/
std::shared_ptr<Task> scheduleIn(Job&& job, duration dt);
std::shared_ptr<Task> scheduleIn(std::function<void()>&& job, duration dt,
const char* filename=CURRENT_FILENAME(),
uint32_t linum=CURRENT_LINE());
/**
* Schedule job to be run every dt, starting now.
*/
std::shared_ptr<RepeatedTask> scheduleAtFixedRate(RepeatedJob&& job, duration dt);
std::shared_ptr<RepeatedTask> scheduleAtFixedRate(std::function<bool()>&& job,
duration dt,
const char* filename=CURRENT_FILENAME(),
uint32_t linum=CURRENT_LINE());
/**
* Stop the scheduler, can't be reversed
@ -137,6 +227,7 @@ private:
void schedule(std::shared_ptr<Task>, time_point t);
void reschedule(std::shared_ptr<RepeatedTask>, time_point t, duration dt);
std::string name_;
std::shared_ptr<std::atomic<bool>> running_;
std::map<time_point, std::vector<Job>> jobs_ {};
std::mutex jobLock_ {};

View File

@ -155,7 +155,7 @@ private:
// Data members
std::shared_ptr<PMPIGD> igd_;
natpmp_t natpmpHdl_;
ScheduledExecutor natpmpScheduler_ {};
ScheduledExecutor natpmpScheduler_ {"natpmp"};
std::shared_ptr<Task> searchForIgdTimer_ {};
unsigned int igdSearchCounter_ {0};
UpnpMappingObserver* observer_ {nullptr};

View File

@ -236,7 +236,7 @@ private:
std::weak_ptr<PUPnP> weak() { return std::static_pointer_cast<PUPnP>(shared_from_this()); }
// Execution queue to run lib upnp actions
ScheduledExecutor pupnpScheduler_ {};
ScheduledExecutor pupnpScheduler_ {"pupnp"};
// Initialization status.
std::atomic_bool initialized_ {false};