Revert "fix: use condition variables instead of busy waits in worker threads"

This reverts commit db0b4a616c.

Signed-off-by: Compute-Runtime-Validation <compute-runtime-validation@intel.com>
This commit is contained in:
Compute-Runtime-Validation
2025-10-11 03:04:39 +02:00
committed by Compute-Runtime-Automation
parent c45f3ecc8a
commit 244dd9b0b4
18 changed files with 137 additions and 201 deletions

View File

@@ -653,6 +653,13 @@ void CommandStreamReceiver::downloadAllocation(GraphicsAllocation &gfxAllocation
}
}
void CommandStreamReceiver::startControllingDirectSubmissions() {
auto controller = this->executionEnvironment.directSubmissionController.get();
if (controller) {
controller->startControlling();
}
}
bool CommandStreamReceiver::enqueueWaitForPagingFence(uint64_t pagingFenceValue) {
auto controller = this->executionEnvironment.directSubmissionController.get();
if (this->isAnyDirectSubmissionEnabled() && controller) {

View File

@@ -348,6 +348,8 @@ class CommandStreamReceiver : NEO::NonCopyableAndNonMovableClass {
uint32_t getRootDeviceIndex() const { return rootDeviceIndex; }
MOCKABLE_VIRTUAL void startControllingDirectSubmissions();
MOCKABLE_VIRTUAL bool isAnyDirectSubmissionEnabled() const {
return this->isDirectSubmissionEnabled() || isBlitterDirectSubmissionEnabled();
}

View File

@@ -1302,7 +1302,7 @@ SubmissionStatus CommandStreamReceiverHw<GfxFamily>::flushSmallTask(LinearStream
this->latestSentTaskCount = taskCount + 1;
auto submissionStatus = flushHandler(batchBuffer, getResidencyAllocations());
if (submissionStatus == SubmissionStatus::success) {
++taskCount;
taskCount++;
}
return submissionStatus;
}
@@ -1465,6 +1465,7 @@ inline bool CommandStreamReceiverHw<GfxFamily>::initDirectSubmission() {
if (directSubmissionController) {
directSubmissionController->registerDirectSubmission(this);
}
this->startControllingDirectSubmissions();
if (this->isUpdateTagFromWaitEnabled()) {
this->overrideDispatchPolicy(DispatchMode::immediateDispatch);
}
@@ -1477,7 +1478,6 @@ inline bool CommandStreamReceiverHw<GfxFamily>::initDirectSubmission() {
}
}
}
return ret;
}

View File

@@ -62,49 +62,49 @@ void DirectSubmissionController::startThread() {
}
void DirectSubmissionController::stopThread() {
runControlling.store(false);
keepControlling.store(false);
{
std::lock_guard<std::mutex> lock(condVarMutex);
condVar.notify_one();
}
if (directSubmissionControllingThread) {
directSubmissionControllingThread->join();
directSubmissionControllingThread.reset();
}
}
void DirectSubmissionController::startControlling() {
this->runControlling.store(true);
}
void *DirectSubmissionController::controlDirectSubmissionsState(void *self) {
auto controller = reinterpret_cast<DirectSubmissionController *>(self);
controller->timeSinceLastCheck = controller->getCpuTimestamp();
controller->lastHangCheckTime = std::chrono::high_resolution_clock::now();
while (controller->keepControlling.load()) {
while (!controller->runControlling.load()) {
if (!controller->keepControlling.load()) {
return nullptr;
}
std::unique_lock<std::mutex> lock(controller->condVarMutex);
controller->wait(lock);
controller->handlePagingFenceRequests(lock, true);
if (controller->sleep(lock)) { // Paging Fence Request
controller->handlePagingFenceRequests(lock, true);
} else { // Timeout
lock.unlock();
controller->checkNewSubmissions();
controller->handlePagingFenceRequests(lock, false);
auto isControllerNotified = controller->sleep(lock);
if (isControllerNotified) {
controller->handlePagingFenceRequests(lock, false);
}
}
return nullptr;
}
controller->timeSinceLastCheck = controller->getCpuTimestamp();
controller->lastHangCheckTime = std::chrono::high_resolution_clock::now();
while (true) {
if (!controller->keepControlling.load()) {
return nullptr;
}
std::unique_lock<std::mutex> lock(controller->condVarMutex);
controller->handlePagingFenceRequests(lock, true);
void DirectSubmissionController::wait(std::unique_lock<std::mutex> &lock) {
inDeepSleep = true;
condVar.wait(lock, [&]() { return !keepControlling.load() || !pagingFenceRequests.empty() || activeSubmissionsCount > 0; });
inDeepSleep = false;
}
void DirectSubmissionController::notifyNewSubmission() {
++activeSubmissionsCount;
if (inDeepSleep) {
std::lock_guard<std::mutex> lock(condVarMutex);
condVar.notify_one();
auto isControllerNotified = controller->sleep(lock);
if (isControllerNotified) {
controller->handlePagingFenceRequests(lock, true);
}
lock.unlock();
controller->checkNewSubmissions();
}
}
@@ -141,7 +141,6 @@ void DirectSubmissionController::checkNewSubmissions() {
csr->stopDirectSubmission(false, false);
state.isStopped = true;
shouldRecalculateTimeout = true;
--activeSubmissionsCount;
}
state.taskCount = csr->peekTaskCount();
} else {
@@ -279,13 +278,13 @@ void DirectSubmissionController::recalculateTimeout() {
}
void DirectSubmissionController::enqueueWaitForPagingFence(CommandStreamReceiver *csr, uint64_t pagingFenceValue) {
std::lock_guard lock(condVarMutex);
std::lock_guard lock(this->condVarMutex);
pagingFenceRequests.push({csr, pagingFenceValue});
condVar.notify_one();
}
void DirectSubmissionController::drainPagingFenceQueue() {
std::lock_guard lock(condVarMutex);
std::lock_guard lock(this->condVarMutex);
while (!pagingFenceRequests.empty()) {
auto request = pagingFenceRequests.front();

View File

@@ -61,13 +61,13 @@ class DirectSubmissionController {
void unregisterDirectSubmission(CommandStreamReceiver *csr);
void startThread();
void startControlling();
void stopThread();
static bool isSupported();
void enqueueWaitForPagingFence(CommandStreamReceiver *csr, uint64_t pagingFenceValue);
void drainPagingFenceQueue();
void notifyNewSubmission();
protected:
struct DirectSubmissionState {
@@ -95,11 +95,10 @@ class DirectSubmissionController {
};
static void *controlDirectSubmissionsState(void *self);
MOCKABLE_VIRTUAL void checkNewSubmissions();
void checkNewSubmissions();
bool isDirectSubmissionIdle(CommandStreamReceiver *csr, std::unique_lock<std::recursive_mutex> &csrLock);
bool isCopyEngineOnDeviceIdle(uint32_t rootDeviceIndex, std::optional<TaskCountType> &bcsTaskCount);
MOCKABLE_VIRTUAL bool sleep(std::unique_lock<std::mutex> &lock);
MOCKABLE_VIRTUAL void wait(std::unique_lock<std::mutex> &lock);
MOCKABLE_VIRTUAL SteadyClock::time_point getCpuTimestamp();
MOCKABLE_VIRTUAL void overrideDirectSubmissionTimeouts(const ProductHelper &productHelper);
@@ -108,7 +107,7 @@ class DirectSubmissionController {
void updateLastSubmittedThrottle(QueueThrottle throttle);
size_t getTimeoutParamsMapKey(QueueThrottle throttle, bool acLineStatus);
MOCKABLE_VIRTUAL void handlePagingFenceRequests(std::unique_lock<std::mutex> &lock, bool checkForNewSubmissions);
void handlePagingFenceRequests(std::unique_lock<std::mutex> &lock, bool checkForNewSubmissions);
MOCKABLE_VIRTUAL TimeoutElapsedMode timeoutElapsed();
std::chrono::microseconds getSleepValue() const { return std::chrono::microseconds(this->timeout / this->bcsTimeoutDivisor); }
@@ -119,8 +118,7 @@ class DirectSubmissionController {
std::unique_ptr<Thread> directSubmissionControllingThread;
std::atomic_bool keepControlling = true;
std::atomic_bool inDeepSleep = false;
std::atomic_uint activeSubmissionsCount = 0;
std::atomic_bool runControlling = false;
SteadyClock::time_point timeSinceLastCheck{};
SteadyClock::time_point lastTerminateCpuTimestamp{};

View File

@@ -10,7 +10,6 @@
#include "shared/source/command_stream/submissions_aggregator.h"
#include "shared/source/debug_settings/debug_settings_manager.h"
#include "shared/source/device/device.h"
#include "shared/source/direct_submission/direct_submission_controller.h"
#include "shared/source/direct_submission/direct_submission_hw.h"
#include "shared/source/direct_submission/relaxed_ordering_helper.h"
#include "shared/source/execution_environment/execution_environment.h"
@@ -589,9 +588,6 @@ template <typename GfxFamily, typename Dispatcher>
bool DirectSubmissionHw<GfxFamily, Dispatcher>::submitCommandBufferToGpu(bool needStart, uint64_t gpuAddress, size_t size, bool needWait, const ResidencyContainer *allocationsForResidency) {
if (needStart) {
this->ringStart = this->submit(gpuAddress, size, allocationsForResidency);
if (auto controller = rootDeviceEnvironment.executionEnvironment.directSubmissionController.get()) {
controller->notifyNewSubmission();
}
return this->ringStart;
} else {
if (needWait) {

View File

@@ -17,4 +17,4 @@ bool DirectSubmissionController::sleep(std::unique_lock<std::mutex> &lock) {
void DirectSubmissionController::overrideDirectSubmissionTimeouts(const ProductHelper &productHelper) {
}
} // namespace NEO
} // namespace NEO

View File

@@ -28,4 +28,4 @@ void DirectSubmissionController::overrideDirectSubmissionTimeouts(const ProductH
this->maxTimeout = std::chrono::microseconds(maxTimeoutUs);
}
} // namespace NEO
} // namespace NEO

View File

@@ -67,7 +67,7 @@ bool SVMAllocsManager::SvmAllocationCache::insert(size_t size, void *ptr, SvmAll
return false;
}
std::unique_lock<std::mutex> lock(this->mtx);
std::lock_guard<std::mutex> lock(this->mtx);
if (svmData->device ? svmData->device->shouldLimitAllocationsReuse() : memoryManager->shouldLimitAllocationsReuse()) {
return false;
}
@@ -99,11 +99,8 @@ bool SVMAllocsManager::SvmAllocationCache::insert(size_t size, void *ptr, SvmAll
}
svmData->isSavedForReuse = true;
allocations.emplace(std::lower_bound(allocations.begin(), allocations.end(), size), size, ptr, svmData, waitForCompletion);
empty = false;
if (auto usmReuseCleaner = this->memoryManager->peekExecutionEnvironment().unifiedMemoryReuseCleaner.get()) {
lock.unlock();
usmReuseCleaner->startThread();
usmReuseCleaner->notifySvmAllocationsCacheUpdate();
if (memoryManager->peekExecutionEnvironment().unifiedMemoryReuseCleaner) {
memoryManager->peekExecutionEnvironment().unifiedMemoryReuseCleaner->startThread();
}
}
if (enablePerformanceLogging) {
@@ -113,7 +110,6 @@ bool SVMAllocsManager::SvmAllocationCache::insert(size_t size, void *ptr, SvmAll
.operationType = CacheOperationType::insert,
.isSuccess = isSuccess});
}
return isSuccess;
}
@@ -185,7 +181,6 @@ void *SVMAllocsManager::SvmAllocationCache::get(size_t size, const UnifiedMemory
svmAllocsManager->reinsertToAllocsForIndirectAccess(*allocationIter->svmData);
}
allocations.erase(allocationIter);
empty = allocations.empty();
return allocationPtr;
}
}
@@ -220,7 +215,6 @@ void SVMAllocsManager::SvmAllocationCache::trim() {
svmAllocsManager->freeSVMAllocImpl(cachedAllocationInfo.allocation, FreePolicyType::blocking, cachedAllocationInfo.svmData);
}
this->allocations.clear();
empty = true;
}
void SVMAllocsManager::SvmAllocationCache::cleanup() {
@@ -305,7 +299,6 @@ void SVMAllocsManager::SvmAllocationCache::trimOldAllocs(std::chrono::high_resol
if (trimAll) {
std::erase_if(allocations, SvmCacheAllocationInfo::isMarkedForDelete);
}
empty = allocations.empty();
}
SvmAllocationData *SVMAllocsManager::MapBasedAllocationTracker::get(const void *ptr) {

View File

@@ -221,7 +221,6 @@ class SVMAllocsManager {
static bool allocUtilizationAllows(size_t requestedSize, size_t reuseCandidateSize);
static bool alignmentAllows(void *ptr, size_t alignment);
bool isInUse(SvmCacheAllocationInfo &cacheAllocInfo);
bool isEmpty() { return empty; };
void *get(size_t size, const UnifiedMemoryProperties &unifiedMemoryProperties);
void trim();
void trimOldAllocs(std::chrono::high_resolution_clock::time_point trimTimePoint, bool trimAll);
@@ -235,7 +234,6 @@ class SVMAllocsManager {
MemoryManager *memoryManager = nullptr;
bool enablePerformanceLogging = false;
bool requireUpdatingAllocsForIndirectAccess = false;
std::atomic_bool empty = true;
};
enum class FreePolicyType : uint32_t {

View File

@@ -24,10 +24,7 @@ UnifiedMemoryReuseCleaner::~UnifiedMemoryReuseCleaner() {
void UnifiedMemoryReuseCleaner::stopThread() {
keepCleaning.store(false);
{
std::lock_guard<std::mutex> lock(condVarMutex);
condVar.notify_one();
}
runCleaning.store(false);
if (unifiedMemoryReuseCleanerThread) {
unifiedMemoryReuseCleanerThread->join();
unifiedMemoryReuseCleanerThread.reset();
@@ -36,24 +33,26 @@ void UnifiedMemoryReuseCleaner::stopThread() {
void *UnifiedMemoryReuseCleaner::cleanUnifiedMemoryReuse(void *self) {
auto cleaner = reinterpret_cast<UnifiedMemoryReuseCleaner *>(self);
while (cleaner->keepCleaning.load()) {
std::unique_lock lock(cleaner->condVarMutex);
cleaner->wait(lock);
lock.unlock();
cleaner->trimOldInCaches();
while (!cleaner->runCleaning.load()) {
if (!cleaner->keepCleaning.load()) {
return nullptr;
}
NEO::sleep(sleepTime);
}
return nullptr;
}
void UnifiedMemoryReuseCleaner::notifySvmAllocationsCacheUpdate() {
std::lock_guard<std::mutex> lock(condVarMutex);
condVar.notify_one();
while (true) {
if (!cleaner->keepCleaning.load()) {
return nullptr;
}
NEO::sleep(sleepTime);
cleaner->trimOldInCaches();
}
}
void UnifiedMemoryReuseCleaner::registerSvmAllocationCache(SvmAllocationCache *cache) {
std::lock_guard<std::mutex> lockSvmAllocationCaches(this->svmAllocationCachesMutex);
this->svmAllocationCaches.push_back(cache);
this->startCleaning();
}
void UnifiedMemoryReuseCleaner::unregisterSvmAllocationCache(SvmAllocationCache *cache) {
@@ -86,4 +85,4 @@ void UnifiedMemoryReuseCleaner::startThread() {
});
}
} // namespace NEO
} // namespace NEO

View File

@@ -11,7 +11,6 @@
#include "shared/source/memory_manager/unified_memory_manager.h"
#include <chrono>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <vector>
@@ -34,16 +33,9 @@ class UnifiedMemoryReuseCleaner : NEO::NonCopyableAndNonMovableClass {
void registerSvmAllocationCache(SvmAllocationCache *cache);
void unregisterSvmAllocationCache(SvmAllocationCache *cache);
MOCKABLE_VIRTUAL void wait(std::unique_lock<std::mutex> &lock) {
condVar.wait(lock, [&]() { return !keepCleaning.load() || !isEmpty(); });
}
MOCKABLE_VIRTUAL bool isEmpty() {
std::unique_lock<std::mutex> lock(svmAllocationCachesMutex);
return std::all_of(svmAllocationCaches.begin(), svmAllocationCaches.end(), [](const auto &it) { return it->isEmpty(); });
}
void notifySvmAllocationsCacheUpdate();
protected:
void startCleaning() { runCleaning.store(true); };
static void *cleanUnifiedMemoryReuse(void *self);
MOCKABLE_VIRTUAL void trimOldInCaches();
std::unique_ptr<Thread> unifiedMemoryReuseCleanerThread;
@@ -52,8 +44,7 @@ class UnifiedMemoryReuseCleaner : NEO::NonCopyableAndNonMovableClass {
std::mutex svmAllocationCachesMutex;
std::once_flag startThreadOnce;
std::mutex condVarMutex;
std::condition_variable condVar;
std::atomic_bool runCleaning = false;
std::atomic_bool keepCleaning = true;
bool trimAllAllocations = false;

View File

@@ -11,6 +11,7 @@ namespace NEO {
struct MockUnifiedMemoryReuseCleaner : public UnifiedMemoryReuseCleaner {
public:
using UnifiedMemoryReuseCleaner::keepCleaning;
using UnifiedMemoryReuseCleaner::runCleaning;
using UnifiedMemoryReuseCleaner::svmAllocationCaches;
using UnifiedMemoryReuseCleaner::UnifiedMemoryReuseCleaner;
using UnifiedMemoryReuseCleaner::unifiedMemoryReuseCleanerThread;
@@ -19,8 +20,6 @@ struct MockUnifiedMemoryReuseCleaner : public UnifiedMemoryReuseCleaner {
trimOldInCachesCalled = true;
if (callBaseTrimOldInCaches) {
UnifiedMemoryReuseCleaner::trimOldInCaches();
} else {
clearCaches();
}
}
void startThread() override {
@@ -29,18 +28,9 @@ struct MockUnifiedMemoryReuseCleaner : public UnifiedMemoryReuseCleaner {
UnifiedMemoryReuseCleaner::startThread();
}
};
void wait(std::unique_lock<std::mutex> &lock) override {
waitOnConditionVar.store(true);
UnifiedMemoryReuseCleaner::wait(lock);
};
void clearCaches() {
std::lock_guard<std::mutex> lock(svmAllocationCachesMutex);
svmAllocationCaches.clear();
}
std::atomic_bool trimOldInCachesCalled = false;
std::atomic_bool waitOnConditionVar = false;
bool trimOldInCachesCalled = false;
bool startThreadCalled = false;
bool callBaseStartThread = false;
bool callBaseTrimOldInCaches = true;
};
} // namespace NEO
} // namespace NEO

View File

@@ -1168,7 +1168,12 @@ class CommandStreamReceiverHwDirectSubmissionMock : public CommandStreamReceiver
return CommandStreamReceiverHw<Type>::obtainUniqueOwnership();
}
void startControllingDirectSubmissions() override {
startControllingDirectSubmissionsCalled = true;
}
uint32_t recursiveLockCounter = 0;
bool startControllingDirectSubmissionsCalled = false;
};
HWTEST_F(InitDirectSubmissionTest, whenCallInitDirectSubmissionAgainThenItIsNotReinitialized) {
@@ -1199,7 +1204,7 @@ HWTEST_F(InitDirectSubmissionTest, whenCallInitDirectSubmissionAgainThenItIsNotR
csr.reset();
}
HWTEST_F(InitDirectSubmissionTest, whenCallInitDirectSubmissionThenObtainLock) {
HWTEST_F(InitDirectSubmissionTest, whenCallInitDirectSubmissionThenObtainLockAndInitController) {
auto csr = std::make_unique<CommandStreamReceiverHwDirectSubmissionMock<FamilyType>>(*device->executionEnvironment, device->getRootDeviceIndex(), device->getDeviceBitfield());
std::unique_ptr<OsContext> osContext(OsContext::create(device->getExecutionEnvironment()->rootDeviceEnvironments[0]->osInterface.get(), device->getRootDeviceIndex(), 0,
EngineDescriptorHelper::getDefaultDescriptor({aub_stream::ENGINE_RCS, EngineUsage::regular},
@@ -1213,6 +1218,7 @@ HWTEST_F(InitDirectSubmissionTest, whenCallInitDirectSubmissionThenObtainLock) {
csr->initializeTagAllocation();
csr->initDirectSubmission();
EXPECT_EQ(1u, csr->recursiveLockCounter);
EXPECT_TRUE(csr->startControllingDirectSubmissionsCalled);
csr.reset();
}

View File

@@ -19,7 +19,6 @@ struct DirectSubmissionControllerMock : public DirectSubmissionController {
using DirectSubmissionController::directSubmissionsMutex;
using DirectSubmissionController::getSleepValue;
using DirectSubmissionController::handlePagingFenceRequests;
using DirectSubmissionController::inDeepSleep;
using DirectSubmissionController::isCopyEngineOnDeviceIdle;
using DirectSubmissionController::isCsrsContextGroupIdleDetectionEnabled;
using DirectSubmissionController::isDirectSubmissionIdle;
@@ -43,16 +42,6 @@ struct DirectSubmissionControllerMock : public DirectSubmissionController {
}
}
void handlePagingFenceRequests(std::unique_lock<std::mutex> &lock, bool checkForNewSubmissions) override {
handlePagingFenceRequestsCalled.store(true);
DirectSubmissionController::handlePagingFenceRequests(lock, checkForNewSubmissions);
}
void checkNewSubmissions() override {
checkNewSubmissionCalled.store(true);
DirectSubmissionController::checkNewSubmissions();
}
SteadyClock::time_point getCpuTimestamp() override {
return cpuTimestamp;
}
@@ -70,8 +59,6 @@ struct DirectSubmissionControllerMock : public DirectSubmissionController {
std::atomic<bool> sleepReturnValue{false};
std::atomic<TimeoutElapsedMode> timeoutElapsedReturnValue{TimeoutElapsedMode::notElapsed};
std::atomic<bool> timeoutElapsedCallBase{false};
std::atomic<bool> checkNewSubmissionCalled{false};
std::atomic<bool> handlePagingFenceRequestsCalled{false};
bool callBaseSleepMethod = false;
};
} // namespace NEO
} // namespace NEO

View File

@@ -127,12 +127,16 @@ struct MockWddmCsr : public WddmCommandStreamReceiver<GfxFamily> {
}
return ret;
}
void startControllingDirectSubmissions() override {
directSubmissionControllerStarted = true;
}
uint32_t flushCalledCount = 0;
std::unique_ptr<CommandBuffer> recordedCommandBuffer;
bool callParentInitDirectSubmission = true;
bool initBlitterDirectSubmission = false;
uint32_t fillReusableAllocationsListCalled = 0;
bool directSubmissionControllerStarted = false;
};
class WddmCommandStreamMockGdiTest : public ::testing::Test {
@@ -1178,6 +1182,7 @@ HWTEST_TEMPLATED_F(WddmCommandStreamMockGdiTest, givenDirectSubmissionEnabledOnR
EXPECT_TRUE(ret);
EXPECT_TRUE(csr->isDirectSubmissionEnabled());
EXPECT_FALSE(csr->isBlitterDirectSubmissionEnabled());
EXPECT_FALSE(mockCsr->directSubmissionControllerStarted);
GraphicsAllocation *commandBuffer = memoryManager->allocateGraphicsMemoryWithProperties(MockAllocationProperties{csr->getRootDeviceIndex(), MemoryConstants::pageSize});
ASSERT_NE(nullptr, commandBuffer);
@@ -1224,6 +1229,7 @@ HWTEST_TEMPLATED_F(WddmCommandStreamMockGdiTest, givenDirectSubmissionEnabledOnB
EXPECT_TRUE(ret);
EXPECT_FALSE(csr->isDirectSubmissionEnabled());
EXPECT_TRUE(csr->isBlitterDirectSubmissionEnabled());
EXPECT_FALSE(mockCsr->directSubmissionControllerStarted);
GraphicsAllocation *commandBuffer = memoryManager->allocateGraphicsMemoryWithProperties(MockAllocationProperties{csr->getRootDeviceIndex(), MemoryConstants::pageSize});
ASSERT_NE(nullptr, commandBuffer);