Create os specific thread implementation

Change-Id: I267d6cb021a75713c28a7bbf29384da13d2a7217
This commit is contained in:
Mateusz Jablonski
2018-05-22 17:23:39 +02:00
committed by sys_ocldev
parent c104db1d5e
commit 09e4dab4f6
19 changed files with 256 additions and 59 deletions

View File

@ -22,6 +22,7 @@
#include "runtime/event/async_events_handler.h"
#include "runtime/event/event.h"
#include "runtime/os_interface/os_thread.h"
#include <iterator>
namespace OCLRT {
@ -68,29 +69,31 @@ Event *AsyncEventsHandler::processList() {
return sleepCandidate;
}
void AsyncEventsHandler::asyncProcess() {
std::unique_lock<std::mutex> lock(asyncMtx, std::defer_lock);
void *AsyncEventsHandler::asyncProcess(void *arg) {
auto self = reinterpret_cast<AsyncEventsHandler *>(arg);
std::unique_lock<std::mutex> lock(self->asyncMtx, std::defer_lock);
Event *sleepCandidate = nullptr;
while (true) {
lock.lock();
transferRegisterList();
if (!allowAsyncProcess) {
processList();
releaseEvents();
self->transferRegisterList();
if (!self->allowAsyncProcess) {
self->processList();
self->releaseEvents();
break;
}
if (list.empty()) {
asyncCond.wait(lock);
if (self->list.empty()) {
self->asyncCond.wait(lock);
}
lock.unlock();
sleepCandidate = processList();
sleepCandidate = self->processList();
if (sleepCandidate) {
sleepCandidate->wait(true, true);
}
std::this_thread::yield();
}
return nullptr;
}
void AsyncEventsHandler::closeThread() {
@ -108,7 +111,7 @@ void AsyncEventsHandler::openThread() {
if (!thread.get()) {
DEBUG_BREAK_IF(allowAsyncProcess);
allowAsyncProcess = true;
thread.reset(new std::thread([this] { asyncProcess(); }));
thread = Thread::create(asyncProcess, reinterpret_cast<void *>(this));
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, Intel Corporation
* Copyright (c) 2017 - 2018, Intel Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
@ -23,13 +23,13 @@
#pragma once
#include <vector>
#include <memory>
#include <thread>
#include <atomic>
#include <mutex>
#include <condition_variable>
namespace OCLRT {
class Event;
class Thread;
class AsyncEventsHandler {
public:
@ -40,7 +40,7 @@ class AsyncEventsHandler {
protected:
Event *processList();
void asyncProcess();
static void *asyncProcess(void *arg);
void releaseEvents();
MOCKABLE_VIRTUAL void openThread();
MOCKABLE_VIRTUAL void transferRegisterList();
@ -48,7 +48,7 @@ class AsyncEventsHandler {
std::vector<Event *> list;
std::vector<Event *> pendingList;
std::unique_ptr<std::thread> thread;
std::unique_ptr<Thread> thread;
std::mutex asyncMtx;
std::condition_variable asyncCond;
std::atomic<bool> allowAsyncProcess;

View File

@ -22,6 +22,7 @@
#include "runtime/memory_manager/deferred_deleter.h"
#include "runtime/memory_manager/deferrable_deletion.h"
#include "runtime/os_interface/os_thread.h"
namespace OCLRT {
DeferredDeleter::DeferredDeleter() {
@ -46,8 +47,7 @@ void DeferredDeleter::stop() {
// Wait for the working job to exit
worker->join();
// Delete working thread
delete worker;
worker = nullptr;
worker.reset();
}
drain(false);
}
@ -87,7 +87,7 @@ void DeferredDeleter::ensureThread() {
if (worker != nullptr) {
return;
}
worker = new std::thread(run, this);
worker = Thread::create(run, reinterpret_cast<void *>(this));
}
bool DeferredDeleter::areElementsReleased() {
@ -98,7 +98,8 @@ bool DeferredDeleter::shouldStop() {
return !doWorkInBackground;
}
void DeferredDeleter::run(DeferredDeleter *self) {
void *DeferredDeleter::run(void *arg) {
auto self = reinterpret_cast<DeferredDeleter *>(arg);
std::unique_lock<std::mutex> lock(self->queueMutex);
// Mark that working thread really started
self->doWorkInBackground = true;
@ -114,6 +115,7 @@ void DeferredDeleter::run(DeferredDeleter *self) {
// Check whether working thread should be stopped
} while (!self->shouldStop());
lock.unlock();
return nullptr;
}
void DeferredDeleter::drain(bool blocking) {

View File

@ -25,11 +25,11 @@
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <utility>
namespace OCLRT {
class DeferrableDeletion;
class Thread;
class DeferredDeleter {
public:
DeferredDeleter();
@ -54,11 +54,11 @@ class DeferredDeleter {
MOCKABLE_VIRTUAL bool areElementsReleased();
MOCKABLE_VIRTUAL bool shouldStop();
static void run(DeferredDeleter *self);
static void *run(void *);
std::atomic<bool> doWorkInBackground;
std::atomic<int> elementsToRelease;
std::thread *worker = nullptr;
std::unique_ptr<Thread> worker;
int32_t numClients = 0;
IDList<DeferrableDeletion, true> queue;
std::mutex queueMutex;

View File

@ -28,6 +28,7 @@ set(RUNTIME_SRCS_OS_INTERFACE_BASE
${CMAKE_CURRENT_SOURCE_DIR}/os_inc_base.h
${CMAKE_CURRENT_SOURCE_DIR}/os_interface.h
${CMAKE_CURRENT_SOURCE_DIR}/os_library.h
${CMAKE_CURRENT_SOURCE_DIR}/os_thread.h
${CMAKE_CURRENT_SOURCE_DIR}/os_time.cpp
${CMAKE_CURRENT_SOURCE_DIR}/os_time.h
${CMAKE_CURRENT_SOURCE_DIR}/performance_counters.cpp

View File

@ -45,6 +45,8 @@ set(RUNTIME_SRCS_OS_INTERFACE_LINUX
${CMAKE_CURRENT_SOURCE_DIR}/drm_null_device.h
${CMAKE_CURRENT_SOURCE_DIR}/hw_info_config.cpp
${CMAKE_CURRENT_SOURCE_DIR}/linux_inc.cpp
${CMAKE_CURRENT_SOURCE_DIR}/os_thread_linux.cpp
${CMAKE_CURRENT_SOURCE_DIR}/os_thread_linux.h
${CMAKE_CURRENT_SOURCE_DIR}/os_inc.h
${CMAKE_CURRENT_SOURCE_DIR}/os_interface.cpp
${CMAKE_CURRENT_SOURCE_DIR}/os_interface.h

View File

@ -25,16 +25,16 @@
#include <queue>
#include <stdio.h>
#include "runtime/helpers/aligned_memory.h"
#include "drm_buffer_object.h"
#include "drm_command_stream.h"
#include "drm_gem_close_worker.h"
#include "drm_memory_manager.h"
#include "runtime/os_interface/linux/drm_buffer_object.h"
#include "runtime/os_interface/linux/drm_command_stream.h"
#include "runtime/os_interface/linux/drm_gem_close_worker.h"
#include "runtime/os_interface/linux/drm_memory_manager.h"
#include "runtime/os_interface/os_thread.h"
namespace OCLRT {
DrmGemCloseWorker::DrmGemCloseWorker(DrmMemoryManager &memoryManager) : active(true), thread(nullptr), workCount(0), memoryManager(memoryManager),
workerDone(false) {
thread = new std::thread(&DrmGemCloseWorker::worker, this);
DrmGemCloseWorker::DrmGemCloseWorker(DrmMemoryManager &memoryManager) : memoryManager(memoryManager) {
thread = Thread::create(worker, reinterpret_cast<void *>(this));
}
void DrmGemCloseWorker::closeThread() {
@ -44,8 +44,7 @@ void DrmGemCloseWorker::closeThread() {
}
thread->join();
delete thread;
thread = nullptr;
thread.reset();
}
}
@ -80,40 +79,42 @@ inline void DrmGemCloseWorker::close(BufferObject *bo) {
workCount--;
}
void DrmGemCloseWorker::worker() {
void *DrmGemCloseWorker::worker(void *arg) {
DrmGemCloseWorker *self = reinterpret_cast<DrmGemCloseWorker *>(arg);
BufferObject *workItem = nullptr;
std::queue<BufferObject *> localQueue;
std::unique_lock<std::mutex> lock(closeWorkerMutex);
std::unique_lock<std::mutex> lock(self->closeWorkerMutex);
lock.unlock();
while (active) {
while (self->active) {
lock.lock();
workItem = nullptr;
while (queue.empty() && active) {
condition.wait(lock);
while (self->queue.empty() && self->active) {
self->condition.wait(lock);
}
if (!queue.empty()) {
localQueue.swap(queue);
if (!self->queue.empty()) {
localQueue.swap(self->queue);
}
lock.unlock();
while (!localQueue.empty()) {
workItem = localQueue.front();
localQueue.pop();
close(workItem);
self->close(workItem);
}
}
lock.lock();
while (!queue.empty()) {
workItem = queue.front();
queue.pop();
close(workItem);
while (!self->queue.empty()) {
workItem = self->queue.front();
self->queue.pop();
self->close(workItem);
}
lock.unlock();
workerDone.store(true);
self->workerDone.store(true);
return nullptr;
}
}

View File

@ -24,7 +24,6 @@
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <map>
#include <set>
#include <queue>
@ -33,6 +32,7 @@
namespace OCLRT {
class DrmMemoryManager;
class BufferObject;
class Thread;
enum gemCloseWorkerMode {
gemCloseWorkerInactive,
@ -55,18 +55,18 @@ class DrmGemCloseWorker {
protected:
void close(BufferObject *workItem);
void closeThread();
void worker();
bool active;
static void *worker(void *arg);
bool active = true;
std::thread *thread;
std::unique_ptr<Thread> thread;
std::queue<BufferObject *> queue;
std::atomic<uint32_t> workCount;
std::atomic<uint32_t> workCount{0};
DrmMemoryManager &memoryManager;
std::mutex closeWorkerMutex;
std::condition_variable condition;
std::atomic<bool> workerDone;
std::atomic<bool> workerDone{false};
};
}

View File

@ -0,0 +1,37 @@
/*
* Copyright (c) 2018, Intel Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included
* in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
* OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
* OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
* ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
#include "runtime/os_interface/linux/os_thread_linux.h"
namespace OCLRT {
ThreadLinux::ThreadLinux(pthread_t threadId) : threadId(threadId){};
std::unique_ptr<Thread> Thread::create(void *(*func)(void *), void *arg) {
pthread_t threadId;
pthread_create(&threadId, nullptr, func, arg);
return std::unique_ptr<Thread>(new ThreadLinux(threadId));
}
void ThreadLinux::join() {
pthread_join(threadId, nullptr);
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright (c) 2018, Intel Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included
* in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
* OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
* OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
* ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
#include "runtime/os_interface/os_thread.h"
#include <thread>
namespace OCLRT {
class ThreadLinux : public Thread {
public:
ThreadLinux(pthread_t threadId);
void join() override;
protected:
pthread_t threadId;
};
}

View File

@ -0,0 +1,33 @@
/*
* Copyright (c) 2018, Intel Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included
* in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
* OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
* OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
* ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
#pragma once
#include <memory>
namespace OCLRT {
class Thread {
public:
static std::unique_ptr<Thread> create(void *(*func)(void *), void *arg);
virtual void join() = 0;
virtual ~Thread() = default;
};
}

View File

@ -47,6 +47,8 @@ set(RUNTIME_SRCS_OS_INTERFACE_WINDOWS
${CMAKE_CURRENT_SOURCE_DIR}/os_library.cpp
${CMAKE_CURRENT_SOURCE_DIR}/os_library.h
${CMAKE_CURRENT_SOURCE_DIR}/os_socket.h
${CMAKE_CURRENT_SOURCE_DIR}/os_thread_win.cpp
${CMAKE_CURRENT_SOURCE_DIR}/os_thread_win.h
${CMAKE_CURRENT_SOURCE_DIR}/os_time.cpp
${CMAKE_CURRENT_SOURCE_DIR}/os_time.h
${CMAKE_CURRENT_SOURCE_DIR}/performance_counters_win.cpp

View File

@ -0,0 +1,36 @@
/*
* Copyright (c) 2018, Intel Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included
* in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
* OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
* OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
* ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
#include "runtime/os_interface/windows/os_thread_win.h"
namespace OCLRT {
ThreadWin::ThreadWin(std::thread *thread) {
this->thread.reset(thread);
};
std::unique_ptr<Thread> Thread::create(void *(*func)(void *), void *arg) {
return std::unique_ptr<Thread>(new ThreadWin(new std::thread(func, arg)));
}
void ThreadWin::join() {
thread->join();
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright (c) 2018, Intel Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included
* in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
* OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
* OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
* ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
#include "runtime/os_interface/os_thread.h"
#include <thread>
namespace OCLRT {
class ThreadWin : public Thread {
public:
ThreadWin(std::thread *thread);
void join() override;
protected:
std::unique_ptr<std::thread> thread;
};
}

View File

@ -221,7 +221,7 @@ TEST_F(AsyncEventsHandlerTests, givenEventsNotHandledByHandlderWhenAsyncExecutio
EXPECT_EQ(3, event1->getRefInternalCount());
EXPECT_EQ(3, event2->getRefInternalCount());
handler->allowAsyncProcess.store(false);
handler->asyncProcess(); // enter and exit because of allowAsyncProcess == false
MockHandler::asyncProcess(reinterpret_cast<void *>(handler.get())); // enter and exit because of allowAsyncProcess == false
EXPECT_EQ(2, event1->getRefInternalCount());
EXPECT_EQ(2, event2->getRefInternalCount());
EXPECT_TRUE(handler->peekIsListEmpty());
@ -363,7 +363,7 @@ TEST_F(AsyncEventsHandlerTests, givenSleepCandidateWhenProcessedThenCallWaitWith
EXPECT_CALL(*event1, wait(true, true)).Times(1).WillOnce(Invoke(unsetAsyncFlag));
handler->asyncProcess();
MockHandler::asyncProcess(reinterpret_cast<void *>(handler.get()));
event1->setStatus(CL_COMPLETE);
}
@ -374,7 +374,7 @@ TEST_F(AsyncEventsHandlerTests, asyncProcessCallsProcessListBeforeReturning) {
handler->registerEvent(event);
handler->allowAsyncProcess.store(false);
handler->asyncProcess();
MockHandler::asyncProcess(reinterpret_cast<void *>(handler.get()));
EXPECT_TRUE(handler->peekIsListEmpty());
EXPECT_EQ(1, event->getRefInternalCount());

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, Intel Corporation
* Copyright (c) 2017 - 2018, Intel Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
@ -25,7 +25,6 @@
#include "runtime/event/async_events_handler.h"
#include <vector>
#include <memory>
#include <thread>
#include <atomic>
#include <mutex>
#include <iterator>
@ -42,7 +41,7 @@ class MockHandler : public AsyncEventsHandler {
~MockHandler() override {
if (!allowThreadCreating) {
asyncProcess(); // process once for cleanup
asyncProcess(this); // process once for cleanup
}
}

View File

@ -22,6 +22,7 @@
#include "unit_tests/mocks/mock_deferred_deleter.h"
#include "runtime/memory_manager/deferrable_deletion.h"
#include "runtime/os_interface/os_thread.h"
#include "gtest/gtest.h"
namespace OCLRT {
@ -109,8 +110,8 @@ bool MockDeferredDeleter::baseShouldStop() {
return DeferredDeleter::shouldStop();
}
std::thread *MockDeferredDeleter::getThreadHandle() {
return worker;
Thread *MockDeferredDeleter::getThreadHandle() {
return worker.get();
}
std::unique_ptr<DeferredDeleter> createDeferredDeleter() {
@ -118,7 +119,7 @@ std::unique_ptr<DeferredDeleter> createDeferredDeleter() {
}
void MockDeferredDeleter::runThread() {
worker = new std::thread(run, this);
worker = Thread::create(run, reinterpret_cast<void *>(this));
}
void MockDeferredDeleter::forceStop() {

View File

@ -62,7 +62,7 @@ class MockDeferredDeleter : public DeferredDeleter {
bool baseShouldStop();
std::thread *getThreadHandle();
Thread *getThreadHandle();
void runThread();

View File

@ -44,6 +44,16 @@ set(IGDRCL_SRCS_offline_compiler_tests
${CLOC_SRCS_LIB}
)
if(WIN32)
list(APPEND IGDRCL_SRCS_offline_compiler_tests
${IGDRCL_SOURCE_DIR}/runtime/os_interface/windows/os_thread_win.cpp
)
else()
list(APPEND IGDRCL_SRCS_offline_compiler_tests
${IGDRCL_SOURCE_DIR}/runtime/os_interface/linux/os_thread_linux.cpp
)
endif()
link_directories(${CMAKE_RUNTIME_OUTPUT_DIRECTORY})
add_executable(cloc_tests ${IGDRCL_SRCS_offline_compiler_tests})