Add a generic worker thread pool job executor mechanism
This commit is contained in:
@@ -539,6 +539,8 @@ add_files(
|
|||||||
window_func.h
|
window_func.h
|
||||||
window_gui.h
|
window_gui.h
|
||||||
window_type.h
|
window_type.h
|
||||||
|
worker_thread.cpp
|
||||||
|
worker_thread.h
|
||||||
zoom_func.h
|
zoom_func.h
|
||||||
zoom_type.h
|
zoom_type.h
|
||||||
zoning.h
|
zoning.h
|
||||||
|
@@ -84,6 +84,7 @@
|
|||||||
#include "debug_desync.h"
|
#include "debug_desync.h"
|
||||||
#include "event_logs.h"
|
#include "event_logs.h"
|
||||||
#include "tunnelbridge.h"
|
#include "tunnelbridge.h"
|
||||||
|
#include "worker_thread.h"
|
||||||
|
|
||||||
#include "linkgraph/linkgraphschedule.h"
|
#include "linkgraph/linkgraphschedule.h"
|
||||||
#include "tracerestrict.h"
|
#include "tracerestrict.h"
|
||||||
@@ -991,8 +992,12 @@ int openttd_main(int argc, char *argv[])
|
|||||||
/* ScanNewGRFFiles now has control over the scanner. */
|
/* ScanNewGRFFiles now has control over the scanner. */
|
||||||
RequestNewGRFScan(scanner.release());
|
RequestNewGRFScan(scanner.release());
|
||||||
|
|
||||||
|
_general_worker_pool.Start("ottd:worker", 8);
|
||||||
|
|
||||||
VideoDriver::GetInstance()->MainLoop();
|
VideoDriver::GetInstance()->MainLoop();
|
||||||
|
|
||||||
|
_general_worker_pool.Stop();
|
||||||
|
|
||||||
WaitTillSaved();
|
WaitTillSaved();
|
||||||
|
|
||||||
/* only save config if we have to */
|
/* only save config if we have to */
|
||||||
|
82
src/worker_thread.cpp
Normal file
82
src/worker_thread.cpp
Normal file
@@ -0,0 +1,82 @@
|
|||||||
|
/*
|
||||||
|
* This file is part of OpenTTD.
|
||||||
|
* OpenTTD is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, version 2.
|
||||||
|
* OpenTTD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
* See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with OpenTTD. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/** @file worker_thread.cpp Worker thread pool utility. */
|
||||||
|
|
||||||
|
#include "stdafx.h"
|
||||||
|
#include "worker_thread.h"
|
||||||
|
#include "thread.h"
|
||||||
|
|
||||||
|
#include "safeguards.h"
|
||||||
|
|
||||||
|
WorkerThreadPool _general_worker_pool;
|
||||||
|
|
||||||
|
void WorkerThreadPool::Start(const char *thread_name, uint max_workers)
|
||||||
|
{
|
||||||
|
uint cpus = std::thread::hardware_concurrency();
|
||||||
|
if (cpus <= 1) return;
|
||||||
|
|
||||||
|
std::lock_guard<std::mutex> lk(this->lock);
|
||||||
|
|
||||||
|
this->exit = false;
|
||||||
|
|
||||||
|
uint worker_target = std::min<uint>(max_workers, cpus);
|
||||||
|
if (this->workers >= worker_target) return;
|
||||||
|
|
||||||
|
uint new_workers = worker_target - this->workers;
|
||||||
|
|
||||||
|
for (uint i = 0; i < new_workers; i++) {
|
||||||
|
this->workers++;
|
||||||
|
if (!StartNewThread(nullptr, thread_name, &WorkerThreadPool::Run, this)) {
|
||||||
|
this->workers--;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void WorkerThreadPool::Stop()
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lk(this->lock);
|
||||||
|
this->exit = true;
|
||||||
|
this->empty_cv.notify_all();
|
||||||
|
this->done_cv.wait(lk, [this]() { return this->workers == 0; });
|
||||||
|
}
|
||||||
|
|
||||||
|
void WorkerThreadPool::EnqueueJob(WorkerJobFunc *func, void *data1, void *data2, void *data3)
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lk(this->lock);
|
||||||
|
if (this->workers == 0) {
|
||||||
|
/* Just execute it here and now */
|
||||||
|
lk.unlock();
|
||||||
|
func(data1, data2, data3);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
bool notify = this->jobs.empty();
|
||||||
|
this->jobs.push({ func, data1, data2, data3 });
|
||||||
|
lk.unlock();
|
||||||
|
if (notify) this->empty_cv.notify_one();
|
||||||
|
}
|
||||||
|
|
||||||
|
void WorkerThreadPool::Run(WorkerThreadPool *pool)
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lk(pool->lock);
|
||||||
|
while (!pool->exit || !pool->jobs.empty()) {
|
||||||
|
if (pool->jobs.empty()) {
|
||||||
|
pool->empty_cv.wait(lk);
|
||||||
|
} else {
|
||||||
|
WorkerJob job = pool->jobs.front();
|
||||||
|
pool->jobs.pop();
|
||||||
|
lk.unlock();
|
||||||
|
job.func(job.data1, job.data2, job.data3);
|
||||||
|
lk.lock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pool->workers--;
|
||||||
|
if (pool->workers == 0) {
|
||||||
|
pool->done_cv.notify_all();
|
||||||
|
}
|
||||||
|
}
|
55
src/worker_thread.h
Normal file
55
src/worker_thread.h
Normal file
@@ -0,0 +1,55 @@
|
|||||||
|
/*
|
||||||
|
* This file is part of OpenTTD.
|
||||||
|
* OpenTTD is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, version 2.
|
||||||
|
* OpenTTD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
* See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with OpenTTD. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/** @file worker_thread.h Worker thread pool utility. */
|
||||||
|
|
||||||
|
#ifndef WORKER_THREAD_H
|
||||||
|
#define WORKER_THREAD_H
|
||||||
|
|
||||||
|
#include <queue>
|
||||||
|
#include <mutex>
|
||||||
|
#include <condition_variable>
|
||||||
|
#if defined(__MINGW32__)
|
||||||
|
#include "3rdparty/mingw-std-threads/mingw.mutex.h"
|
||||||
|
#include "3rdparty/mingw-std-threads/mingw.condition_variable.h"
|
||||||
|
#endif
|
||||||
|
|
||||||
|
typedef void WorkerJobFunc(void *, void *, void *);
|
||||||
|
|
||||||
|
struct WorkerThreadPool {
|
||||||
|
private:
|
||||||
|
struct WorkerJob {
|
||||||
|
WorkerJobFunc *func;
|
||||||
|
void *data1;
|
||||||
|
void *data2;
|
||||||
|
void *data3;
|
||||||
|
};
|
||||||
|
|
||||||
|
uint workers = 0;
|
||||||
|
bool exit = false;
|
||||||
|
std::mutex lock;
|
||||||
|
std::queue<WorkerJob> jobs;
|
||||||
|
std::condition_variable empty_cv;
|
||||||
|
std::condition_variable done_cv;
|
||||||
|
|
||||||
|
static void Run(WorkerThreadPool *pool);
|
||||||
|
|
||||||
|
public:
|
||||||
|
|
||||||
|
void Start(const char *thread_name, uint max_workers);
|
||||||
|
void Stop();
|
||||||
|
void EnqueueJob(WorkerJobFunc *func, void *data1 = nullptr, void *data2 = nullptr, void *data3 = nullptr);
|
||||||
|
|
||||||
|
~WorkerThreadPool()
|
||||||
|
{
|
||||||
|
this->Stop();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
extern WorkerThreadPool _general_worker_pool;
|
||||||
|
|
||||||
|
#endif /* WORKER_THREAD_H */
|
Reference in New Issue
Block a user