diff --git a/src/saveload/saveload.cpp b/src/saveload/saveload.cpp index 790a671499..acbfed22e8 100644 --- a/src/saveload/saveload.cpp +++ b/src/saveload/saveload.cpp @@ -59,6 +59,14 @@ #include #include +#include "../thread.h" +#include +#include +#if defined(__MINGW32__) +#include "../3rdparty/mingw-std-threads/mingw.mutex.h" +#include "../3rdparty/mingw-std-threads/mingw.condition_variable.h" +#endif + #include "../safeguards.h" extern const SaveLoadVersion SAVEGAME_VERSION = (SaveLoadVersion)(SL_MAX_VERSION - 1); ///< Current savegame version of OpenTTD. @@ -2818,6 +2826,98 @@ SaveOrLoadResult SaveWithFilter(SaveFilter *writer, bool threaded) } } +struct ThreadedLoadFilter : LoadFilter { + static const size_t BUFFER_COUNT = 4; + + std::mutex mutex; + std::condition_variable full_cv; + std::condition_variable empty_cv; + uint first_ready = 0; + uint count_ready = 0; + size_t read_offsets[BUFFER_COUNT]; + size_t read_counts[BUFFER_COUNT]; + byte read_buf[MEMORY_CHUNK_SIZE * BUFFER_COUNT]; ///< Buffers for reading from source. + bool no_thread = false; + + std::thread read_thread; + + /** + * Initialise this filter. + * @param chain The next filter in this chain. + */ + ThreadedLoadFilter(LoadFilter *chain) : LoadFilter(chain) + { + std::unique_lock lk(this->mutex); + if (!StartNewThread(&this->read_thread, "ottd:loadgame", &ThreadedLoadFilter::RunThread, this)) { + DEBUG(sl, 1, "Failed to start load read thread, reading non-threaded"); + this->no_thread = true; + } else { + DEBUG(sl, 2, "Started load read thread"); + } + } + + /** Clean everything up. */ + ~ThreadedLoadFilter() + { + std::unique_lock lk(this->mutex); + this->no_thread = true; + lk.unlock(); + this->empty_cv.notify_all(); + this->full_cv.notify_all(); + if (this->read_thread.joinable()) { + this->read_thread.join(); + DEBUG(sl, 2, "Joined load read thread"); + } + } + + static void RunThread(ThreadedLoadFilter *self) + { + std::unique_lock lk(self->mutex); + while (!self->no_thread) { + if (self->count_ready == BUFFER_COUNT) { + self->full_cv.wait(lk); + continue; + } + + uint buf = (self->first_ready + self->count_ready) % BUFFER_COUNT; + lk.unlock(); + size_t read = self->chain->Read(self->read_buf + (buf * MEMORY_CHUNK_SIZE), MEMORY_CHUNK_SIZE); + lk.lock(); + self->read_offsets[buf] = 0; + self->read_counts[buf] = read; + self->count_ready++; + if (self->count_ready == 1) self->empty_cv.notify_one(); + } + } + + size_t Read(byte *buf, size_t size) override + { + if (this->no_thread) return this->chain->Read(buf, size); + + size_t read = 0; + std::unique_lock lk(this->mutex); + while (read < size) { + if (this->count_ready == 0) { + this->empty_cv.wait(lk); + continue; + } + + size_t to_read = std::min(size - read, read_counts[this->first_ready]); + if (to_read == 0) break; + memcpy(buf + read, this->read_buf + (this->first_ready * MEMORY_CHUNK_SIZE) + read_offsets[this->first_ready], to_read); + read += to_read; + read_offsets[this->first_ready] += to_read; + read_counts[this->first_ready] -= to_read; + if (read_counts[this->first_ready] == 0) { + this->first_ready = (this->first_ready + 1) % BUFFER_COUNT; + this->count_ready--; + if (this->count_ready == BUFFER_COUNT - 1) this->full_cv.notify_one(); + } + } + return read; + } +}; + /** * Actually perform the loading of a "non-old" savegame. * @param reader The filter to read the savegame from. @@ -2897,6 +2997,7 @@ static SaveOrLoadResult DoLoad(LoadFilter *reader, bool load_check) } _sl.lf = fmt->init_load(_sl.lf); + _sl.lf = new ThreadedLoadFilter(_sl.lf); _sl.reader = new ReadBuffer(_sl.lf); _next_offs = 0;