Add load filter to perform savegame decompression in a separate thread
This commit is contained in:
@@ -59,6 +59,14 @@
|
|||||||
#include <deque>
|
#include <deque>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
|
#include "../thread.h"
|
||||||
|
#include <mutex>
|
||||||
|
#include <condition_variable>
|
||||||
|
#if defined(__MINGW32__)
|
||||||
|
#include "../3rdparty/mingw-std-threads/mingw.mutex.h"
|
||||||
|
#include "../3rdparty/mingw-std-threads/mingw.condition_variable.h"
|
||||||
|
#endif
|
||||||
|
|
||||||
#include "../safeguards.h"
|
#include "../safeguards.h"
|
||||||
|
|
||||||
extern const SaveLoadVersion SAVEGAME_VERSION = (SaveLoadVersion)(SL_MAX_VERSION - 1); ///< Current savegame version of OpenTTD.
|
extern const SaveLoadVersion SAVEGAME_VERSION = (SaveLoadVersion)(SL_MAX_VERSION - 1); ///< Current savegame version of OpenTTD.
|
||||||
@@ -2818,6 +2826,98 @@ SaveOrLoadResult SaveWithFilter(SaveFilter *writer, bool threaded)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct ThreadedLoadFilter : LoadFilter {
|
||||||
|
static const size_t BUFFER_COUNT = 4;
|
||||||
|
|
||||||
|
std::mutex mutex;
|
||||||
|
std::condition_variable full_cv;
|
||||||
|
std::condition_variable empty_cv;
|
||||||
|
uint first_ready = 0;
|
||||||
|
uint count_ready = 0;
|
||||||
|
size_t read_offsets[BUFFER_COUNT];
|
||||||
|
size_t read_counts[BUFFER_COUNT];
|
||||||
|
byte read_buf[MEMORY_CHUNK_SIZE * BUFFER_COUNT]; ///< Buffers for reading from source.
|
||||||
|
bool no_thread = false;
|
||||||
|
|
||||||
|
std::thread read_thread;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialise this filter.
|
||||||
|
* @param chain The next filter in this chain.
|
||||||
|
*/
|
||||||
|
ThreadedLoadFilter(LoadFilter *chain) : LoadFilter(chain)
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lk(this->mutex);
|
||||||
|
if (!StartNewThread(&this->read_thread, "ottd:loadgame", &ThreadedLoadFilter::RunThread, this)) {
|
||||||
|
DEBUG(sl, 1, "Failed to start load read thread, reading non-threaded");
|
||||||
|
this->no_thread = true;
|
||||||
|
} else {
|
||||||
|
DEBUG(sl, 2, "Started load read thread");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Clean everything up. */
|
||||||
|
~ThreadedLoadFilter()
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lk(this->mutex);
|
||||||
|
this->no_thread = true;
|
||||||
|
lk.unlock();
|
||||||
|
this->empty_cv.notify_all();
|
||||||
|
this->full_cv.notify_all();
|
||||||
|
if (this->read_thread.joinable()) {
|
||||||
|
this->read_thread.join();
|
||||||
|
DEBUG(sl, 2, "Joined load read thread");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void RunThread(ThreadedLoadFilter *self)
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lk(self->mutex);
|
||||||
|
while (!self->no_thread) {
|
||||||
|
if (self->count_ready == BUFFER_COUNT) {
|
||||||
|
self->full_cv.wait(lk);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint buf = (self->first_ready + self->count_ready) % BUFFER_COUNT;
|
||||||
|
lk.unlock();
|
||||||
|
size_t read = self->chain->Read(self->read_buf + (buf * MEMORY_CHUNK_SIZE), MEMORY_CHUNK_SIZE);
|
||||||
|
lk.lock();
|
||||||
|
self->read_offsets[buf] = 0;
|
||||||
|
self->read_counts[buf] = read;
|
||||||
|
self->count_ready++;
|
||||||
|
if (self->count_ready == 1) self->empty_cv.notify_one();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t Read(byte *buf, size_t size) override
|
||||||
|
{
|
||||||
|
if (this->no_thread) return this->chain->Read(buf, size);
|
||||||
|
|
||||||
|
size_t read = 0;
|
||||||
|
std::unique_lock<std::mutex> lk(this->mutex);
|
||||||
|
while (read < size) {
|
||||||
|
if (this->count_ready == 0) {
|
||||||
|
this->empty_cv.wait(lk);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t to_read = std::min<size_t>(size - read, read_counts[this->first_ready]);
|
||||||
|
if (to_read == 0) break;
|
||||||
|
memcpy(buf + read, this->read_buf + (this->first_ready * MEMORY_CHUNK_SIZE) + read_offsets[this->first_ready], to_read);
|
||||||
|
read += to_read;
|
||||||
|
read_offsets[this->first_ready] += to_read;
|
||||||
|
read_counts[this->first_ready] -= to_read;
|
||||||
|
if (read_counts[this->first_ready] == 0) {
|
||||||
|
this->first_ready = (this->first_ready + 1) % BUFFER_COUNT;
|
||||||
|
this->count_ready--;
|
||||||
|
if (this->count_ready == BUFFER_COUNT - 1) this->full_cv.notify_one();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return read;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Actually perform the loading of a "non-old" savegame.
|
* Actually perform the loading of a "non-old" savegame.
|
||||||
* @param reader The filter to read the savegame from.
|
* @param reader The filter to read the savegame from.
|
||||||
@@ -2897,6 +2997,7 @@ static SaveOrLoadResult DoLoad(LoadFilter *reader, bool load_check)
|
|||||||
}
|
}
|
||||||
|
|
||||||
_sl.lf = fmt->init_load(_sl.lf);
|
_sl.lf = fmt->init_load(_sl.lf);
|
||||||
|
_sl.lf = new ThreadedLoadFilter(_sl.lf);
|
||||||
_sl.reader = new ReadBuffer(_sl.lf);
|
_sl.reader = new ReadBuffer(_sl.lf);
|
||||||
_next_offs = 0;
|
_next_offs = 0;
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user