diff --git a/src/linkgraph/linkgraphjob.cpp b/src/linkgraph/linkgraphjob.cpp index 0b60edfbb6..ce3c182e7f 100644 --- a/src/linkgraph/linkgraphjob.cpp +++ b/src/linkgraph/linkgraphjob.cpp @@ -48,7 +48,6 @@ LinkGraphJob::LinkGraphJob(const LinkGraph &orig, uint duration_multiplier) : * This is on purpose. */ link_graph(orig), settings(_settings_game.linkgraph), - thread(NULL), join_date_ticks(GetLinkGraphJobJoinDateTicks(duration_multiplier)), start_date_ticks((_date * DAY_TICKS) + _date_fract), job_completed(false) @@ -66,23 +65,9 @@ void LinkGraphJob::EraseFlows(NodeID from) } } -/** - * Spawn a thread if possible and run the link graph job in the thread. If - * that's not possible run the job right now in the current thread. - */ -void LinkGraphJob::SpawnThread() +void LinkGraphJob::SetJobGroup(std::shared_ptr group) { - if (!ThreadObject::New(&(LinkGraphSchedule::Run), this, &this->thread, "ottd:linkgraph")) { - this->thread = NULL; - /* Of course this will hang a bit. - * On the other hand, if you want to play games which make this hang noticably - * on a platform without threads then you'll probably get other problems first. - * OK: - * If someone comes and tells me that this hangs for him/her, I'll implement a - * smaller grained "Step" method for all handlers and add some more ticks where - * "Step" is called. No problem in principle. */ - LinkGraphSchedule::Run(this); - } + this->group = std::move(group); } /** @@ -90,10 +75,9 @@ void LinkGraphJob::SpawnThread() */ void LinkGraphJob::JoinThread() { - if (this->thread != NULL) { - this->thread->Join(); - delete this->thread; - this->thread = NULL; + if (this->group != nullptr) { + this->group->JoinThread(); + this->group.reset(); } } diff --git a/src/linkgraph/linkgraphjob.h b/src/linkgraph/linkgraphjob.h index 8b36d21c01..c5935d405a 100644 --- a/src/linkgraph/linkgraphjob.h +++ b/src/linkgraph/linkgraphjob.h @@ -15,9 +15,11 @@ #include "../thread/thread.h" #include "linkgraph.h" #include +#include class LinkGraphJob; class Path; +class LinkGraphJobGroup; typedef std::list PathList; /** Type of the pool for link graph jobs. */ @@ -56,11 +58,12 @@ private: friend const SaveLoad *GetLinkGraphJobDesc(); friend void GetLinkGraphJobDayLengthScaleAfterLoad(LinkGraphJob *lgj); friend class LinkGraphSchedule; + friend class LinkGraphJobGroup; protected: const LinkGraph link_graph; ///< Link graph to by analyzed. Is copied when job is started and mustn't be modified later. + std::shared_ptr group; ///< JOb group thread the job is running in or NULL if it's running in the main thread. const LinkGraphSettings settings; ///< Copy of _settings_game.linkgraph at spawn time. - ThreadObject *thread; ///< Thread the job is running in or NULL if it's running in the main thread. DateTicks join_date_ticks; ///< Date when the job is to be joined. DateTicks start_date_ticks; ///< Date when the job was started. NodeAnnotationVector nodes; ///< Extra node data necessary for link graph calculation. @@ -69,7 +72,7 @@ protected: void EraseFlows(NodeID from); void JoinThread(); - void SpawnThread(); + void SetJobGroup(std::shared_ptr group); public: @@ -269,7 +272,7 @@ public: * Bare constructor, only for save/load. link_graph, join_date and actually * settings have to be brutally const-casted in order to populate them. */ - LinkGraphJob() : settings(_settings_game.linkgraph), thread(NULL), + LinkGraphJob() : settings(_settings_game.linkgraph), join_date_ticks(INVALID_DATE), start_date_ticks(INVALID_DATE), job_completed(false) {} LinkGraphJob(const LinkGraph &orig, uint duration_multiplier); diff --git a/src/linkgraph/linkgraphschedule.cpp b/src/linkgraph/linkgraphschedule.cpp index 939c9f3422..ea729f4f59 100644 --- a/src/linkgraph/linkgraphschedule.cpp +++ b/src/linkgraph/linkgraphschedule.cpp @@ -64,6 +64,7 @@ void LinkGraphSchedule::SpawnNext() uint scaling = FindLastBit(total_cost); uint cost_budget = total_cost / scaling; uint used_budget = 0; + std::vector jobs_to_execute; while (used_budget < cost_budget && !this->schedule.empty()) { LinkGraph *lg = this->schedule.front(); assert(lg == LinkGraph::Get(lg->index)); @@ -73,7 +74,7 @@ void LinkGraphSchedule::SpawnNext() if (LinkGraphJob::CanAllocateItem()) { uint duration_multiplier = CeilDiv(scaling * cost, total_cost); std::unique_ptr job(new LinkGraphJob(*lg, duration_multiplier)); - job->SpawnThread(); // todo + jobs_to_execute.emplace_back(job.get(), cost); if (this->running.empty() || job->JoinDateTicks() >= this->running.back()->JoinDateTicks()) { this->running.push_back(std::move(job)); DEBUG(linkgraph, 3, "LinkGraphSchedule::SpawnNext(): Running job: id: %u, nodes: %u, cost: %u, duration_multiplier: %u", @@ -94,6 +95,8 @@ void LinkGraphSchedule::SpawnNext() this->schedule.splice(this->schedule.end(), schedule_to_back); + LinkGraphJobGroup::ExecuteJobSet(std::move(jobs_to_execute)); + DEBUG(linkgraph, 2, "LinkGraphSchedule::SpawnNext(): Linkgraph job totals: cost: %u, budget: %u, scaling: %u, scheduled: %zu, running: %zu", total_cost, cost_budget, scaling, this->schedule.size(), this->running.size()); } @@ -169,9 +172,11 @@ void LinkGraphSchedule::JoinNext() */ void LinkGraphSchedule::SpawnAll() { + std::vector jobs_to_execute; for (JobList::iterator i = this->running.begin(); i != this->running.end(); ++i) { - (*i)->SpawnThread(); + jobs_to_execute.emplace_back(i->get()); } + LinkGraphJobGroup::ExecuteJobSet(std::move(jobs_to_execute)); } /** @@ -223,6 +228,82 @@ LinkGraphSchedule::~LinkGraphSchedule() } } +LinkGraphJobGroup::LinkGraphJobGroup(constructor_token token, std::vector jobs) : + jobs(std::move(jobs)) { } + +void LinkGraphJobGroup::SpawnThread() { + ThreadObject *t = nullptr; + + /** + * Spawn a thread if possible and run the link graph job in the thread. If + * that's not possible run the job right now in the current thread. + */ + if (ThreadObject::New(&(LinkGraphJobGroup::Run), this, &t, "ottd:linkgraph")) { + this->thread.reset(t); + for (auto &it : this->jobs) { + it->SetJobGroup(this->shared_from_this()); + } + } else { + this->thread.reset(); + /* Of course this will hang a bit. + * On the other hand, if you want to play games which make this hang noticably + * on a platform without threads then you'll probably get other problems first. + * OK: + * If someone comes and tells me that this hangs for him/her, I'll implement a + * smaller grained "Step" method for all handlers and add some more ticks where + * "Step" is called. No problem in principle. */ + LinkGraphJobGroup::Run(this); + } +} + +void LinkGraphJobGroup::JoinThread() { + if (!this->thread || this->joined_thread) return; + this->thread->Join(); + this->joined_thread = true; +} + +/** + * Run all jobs for the given LinkGraphJobGroup. This method is tailored to + * ThreadObject::New. + * @param j Pointer to a LinkGraphJobGroup. + */ +/* static */ void LinkGraphJobGroup::Run(void *group) +{ + LinkGraphJobGroup *job_group = (LinkGraphJobGroup *)group; + for (LinkGraphJob *job : job_group->jobs) { + LinkGraphSchedule::Run(job); + } +} + +/* static */ void LinkGraphJobGroup::ExecuteJobSet(std::vector jobs) { + const uint thread_budget = 200000; + + std::sort(jobs.begin(), jobs.end(), [](const JobInfo &a, const JobInfo &b) { + return a.cost_estimate < b.cost_estimate; + }); + + std::vector bucket; + uint bucket_cost = 0; + auto flush_bucket = [&]() { + if (!bucket_cost) return; + DEBUG(linkgraph, 2, "LinkGraphJobGroup::ExecuteJobSet: Creating Job Group: jobs: %zu, cost: %u", bucket.size(), bucket_cost); + auto group = std::make_shared(constructor_token(), std::move(bucket)); + group->SpawnThread(); + bucket_cost = 0; + bucket.clear(); + }; + + for (JobInfo &it : jobs) { + if (bucket_cost && (bucket_cost + it.cost_estimate > thread_budget)) flush_bucket(); + bucket.push_back(it.job); + bucket_cost += it.cost_estimate; + } + flush_bucket(); +} + +LinkGraphJobGroup::JobInfo::JobInfo(LinkGraphJob *job) : + job(job), cost_estimate(job->Graph().CalculateCostEstimate()) { } + /** * Pause the game if on the next _date_fract tick, we would do a join with the next * link graph job, but it is still running. diff --git a/src/linkgraph/linkgraphschedule.h b/src/linkgraph/linkgraphschedule.h index 51369a8de6..9a9d7f17af 100644 --- a/src/linkgraph/linkgraphschedule.h +++ b/src/linkgraph/linkgraphschedule.h @@ -12,6 +12,7 @@ #ifndef LINKGRAPHSCHEDULE_H #define LINKGRAPHSCHEDULE_H +#include "../thread/thread.h" #include "linkgraph.h" #include @@ -80,4 +81,33 @@ public: void Unqueue(LinkGraph *lg) { this->schedule.remove(lg); } }; +class LinkGraphJobGroup : public std::enable_shared_from_this { + friend LinkGraphJob; + +private: + bool joined_thread = false; ///< True if thread has already been joined + std::unique_ptr thread; ///< Thread the job group is running in or NULL if it's running in the main thread. + const std::vector jobs; ///< The set of jobs in this job set + +private: + struct constructor_token { }; + static void Run(void *group); + void SpawnThread(); + void JoinThread(); + +public: + LinkGraphJobGroup(constructor_token token, std::vector jobs); + + struct JobInfo { + LinkGraphJob * job; + uint cost_estimate; + + JobInfo(LinkGraphJob *job); + JobInfo(LinkGraphJob *job, uint cost_estimate) : + job(job), cost_estimate(cost_estimate) { } + }; + + static void ExecuteJobSet(std::vector jobs); +}; + #endif /* LINKGRAPHSCHEDULE_H */