Linkgraph: Support running multiple jobs per spawned thread.

This commit is contained in:
Jonathan G Rennison
2016-10-31 00:21:09 +00:00
parent 84e61b690a
commit 1cede8431f
4 changed files with 124 additions and 26 deletions

View File

@@ -48,7 +48,6 @@ LinkGraphJob::LinkGraphJob(const LinkGraph &orig, uint duration_multiplier) :
* This is on purpose. */ * This is on purpose. */
link_graph(orig), link_graph(orig),
settings(_settings_game.linkgraph), settings(_settings_game.linkgraph),
thread(NULL),
join_date_ticks(GetLinkGraphJobJoinDateTicks(duration_multiplier)), join_date_ticks(GetLinkGraphJobJoinDateTicks(duration_multiplier)),
start_date_ticks((_date * DAY_TICKS) + _date_fract), start_date_ticks((_date * DAY_TICKS) + _date_fract),
job_completed(false) job_completed(false)
@@ -66,23 +65,9 @@ void LinkGraphJob::EraseFlows(NodeID from)
} }
} }
/** void LinkGraphJob::SetJobGroup(std::shared_ptr<LinkGraphJobGroup> group)
* Spawn a thread if possible and run the link graph job in the thread. If
* that's not possible run the job right now in the current thread.
*/
void LinkGraphJob::SpawnThread()
{ {
if (!ThreadObject::New(&(LinkGraphSchedule::Run), this, &this->thread, "ottd:linkgraph")) { this->group = std::move(group);
this->thread = NULL;
/* Of course this will hang a bit.
* On the other hand, if you want to play games which make this hang noticably
* on a platform without threads then you'll probably get other problems first.
* OK:
* If someone comes and tells me that this hangs for him/her, I'll implement a
* smaller grained "Step" method for all handlers and add some more ticks where
* "Step" is called. No problem in principle. */
LinkGraphSchedule::Run(this);
}
} }
/** /**
@@ -90,10 +75,9 @@ void LinkGraphJob::SpawnThread()
*/ */
void LinkGraphJob::JoinThread() void LinkGraphJob::JoinThread()
{ {
if (this->thread != NULL) { if (this->group != nullptr) {
this->thread->Join(); this->group->JoinThread();
delete this->thread; this->group.reset();
this->thread = NULL;
} }
} }

View File

@@ -15,9 +15,11 @@
#include "../thread/thread.h" #include "../thread/thread.h"
#include "linkgraph.h" #include "linkgraph.h"
#include <list> #include <list>
#include <memory>
class LinkGraphJob; class LinkGraphJob;
class Path; class Path;
class LinkGraphJobGroup;
typedef std::list<Path *> PathList; typedef std::list<Path *> PathList;
/** Type of the pool for link graph jobs. */ /** Type of the pool for link graph jobs. */
@@ -56,11 +58,12 @@ private:
friend const SaveLoad *GetLinkGraphJobDesc(); friend const SaveLoad *GetLinkGraphJobDesc();
friend void GetLinkGraphJobDayLengthScaleAfterLoad(LinkGraphJob *lgj); friend void GetLinkGraphJobDayLengthScaleAfterLoad(LinkGraphJob *lgj);
friend class LinkGraphSchedule; friend class LinkGraphSchedule;
friend class LinkGraphJobGroup;
protected: protected:
const LinkGraph link_graph; ///< Link graph to by analyzed. Is copied when job is started and mustn't be modified later. const LinkGraph link_graph; ///< Link graph to by analyzed. Is copied when job is started and mustn't be modified later.
std::shared_ptr<LinkGraphJobGroup> group; ///< JOb group thread the job is running in or NULL if it's running in the main thread.
const LinkGraphSettings settings; ///< Copy of _settings_game.linkgraph at spawn time. const LinkGraphSettings settings; ///< Copy of _settings_game.linkgraph at spawn time.
ThreadObject *thread; ///< Thread the job is running in or NULL if it's running in the main thread.
DateTicks join_date_ticks; ///< Date when the job is to be joined. DateTicks join_date_ticks; ///< Date when the job is to be joined.
DateTicks start_date_ticks; ///< Date when the job was started. DateTicks start_date_ticks; ///< Date when the job was started.
NodeAnnotationVector nodes; ///< Extra node data necessary for link graph calculation. NodeAnnotationVector nodes; ///< Extra node data necessary for link graph calculation.
@@ -69,7 +72,7 @@ protected:
void EraseFlows(NodeID from); void EraseFlows(NodeID from);
void JoinThread(); void JoinThread();
void SpawnThread(); void SetJobGroup(std::shared_ptr<LinkGraphJobGroup> group);
public: public:
@@ -269,7 +272,7 @@ public:
* Bare constructor, only for save/load. link_graph, join_date and actually * Bare constructor, only for save/load. link_graph, join_date and actually
* settings have to be brutally const-casted in order to populate them. * settings have to be brutally const-casted in order to populate them.
*/ */
LinkGraphJob() : settings(_settings_game.linkgraph), thread(NULL), LinkGraphJob() : settings(_settings_game.linkgraph),
join_date_ticks(INVALID_DATE), start_date_ticks(INVALID_DATE), job_completed(false) {} join_date_ticks(INVALID_DATE), start_date_ticks(INVALID_DATE), job_completed(false) {}
LinkGraphJob(const LinkGraph &orig, uint duration_multiplier); LinkGraphJob(const LinkGraph &orig, uint duration_multiplier);

View File

@@ -64,6 +64,7 @@ void LinkGraphSchedule::SpawnNext()
uint scaling = FindLastBit(total_cost); uint scaling = FindLastBit(total_cost);
uint cost_budget = total_cost / scaling; uint cost_budget = total_cost / scaling;
uint used_budget = 0; uint used_budget = 0;
std::vector<LinkGraphJobGroup::JobInfo> jobs_to_execute;
while (used_budget < cost_budget && !this->schedule.empty()) { while (used_budget < cost_budget && !this->schedule.empty()) {
LinkGraph *lg = this->schedule.front(); LinkGraph *lg = this->schedule.front();
assert(lg == LinkGraph::Get(lg->index)); assert(lg == LinkGraph::Get(lg->index));
@@ -73,7 +74,7 @@ void LinkGraphSchedule::SpawnNext()
if (LinkGraphJob::CanAllocateItem()) { if (LinkGraphJob::CanAllocateItem()) {
uint duration_multiplier = CeilDiv(scaling * cost, total_cost); uint duration_multiplier = CeilDiv(scaling * cost, total_cost);
std::unique_ptr<LinkGraphJob> job(new LinkGraphJob(*lg, duration_multiplier)); std::unique_ptr<LinkGraphJob> job(new LinkGraphJob(*lg, duration_multiplier));
job->SpawnThread(); // todo jobs_to_execute.emplace_back(job.get(), cost);
if (this->running.empty() || job->JoinDateTicks() >= this->running.back()->JoinDateTicks()) { if (this->running.empty() || job->JoinDateTicks() >= this->running.back()->JoinDateTicks()) {
this->running.push_back(std::move(job)); this->running.push_back(std::move(job));
DEBUG(linkgraph, 3, "LinkGraphSchedule::SpawnNext(): Running job: id: %u, nodes: %u, cost: %u, duration_multiplier: %u", DEBUG(linkgraph, 3, "LinkGraphSchedule::SpawnNext(): Running job: id: %u, nodes: %u, cost: %u, duration_multiplier: %u",
@@ -94,6 +95,8 @@ void LinkGraphSchedule::SpawnNext()
this->schedule.splice(this->schedule.end(), schedule_to_back); this->schedule.splice(this->schedule.end(), schedule_to_back);
LinkGraphJobGroup::ExecuteJobSet(std::move(jobs_to_execute));
DEBUG(linkgraph, 2, "LinkGraphSchedule::SpawnNext(): Linkgraph job totals: cost: %u, budget: %u, scaling: %u, scheduled: %zu, running: %zu", DEBUG(linkgraph, 2, "LinkGraphSchedule::SpawnNext(): Linkgraph job totals: cost: %u, budget: %u, scaling: %u, scheduled: %zu, running: %zu",
total_cost, cost_budget, scaling, this->schedule.size(), this->running.size()); total_cost, cost_budget, scaling, this->schedule.size(), this->running.size());
} }
@@ -169,9 +172,11 @@ void LinkGraphSchedule::JoinNext()
*/ */
void LinkGraphSchedule::SpawnAll() void LinkGraphSchedule::SpawnAll()
{ {
std::vector<LinkGraphJobGroup::JobInfo> jobs_to_execute;
for (JobList::iterator i = this->running.begin(); i != this->running.end(); ++i) { for (JobList::iterator i = this->running.begin(); i != this->running.end(); ++i) {
(*i)->SpawnThread(); jobs_to_execute.emplace_back(i->get());
} }
LinkGraphJobGroup::ExecuteJobSet(std::move(jobs_to_execute));
} }
/** /**
@@ -223,6 +228,82 @@ LinkGraphSchedule::~LinkGraphSchedule()
} }
} }
LinkGraphJobGroup::LinkGraphJobGroup(constructor_token token, std::vector<LinkGraphJob *> jobs) :
jobs(std::move(jobs)) { }
void LinkGraphJobGroup::SpawnThread() {
ThreadObject *t = nullptr;
/**
* Spawn a thread if possible and run the link graph job in the thread. If
* that's not possible run the job right now in the current thread.
*/
if (ThreadObject::New(&(LinkGraphJobGroup::Run), this, &t, "ottd:linkgraph")) {
this->thread.reset(t);
for (auto &it : this->jobs) {
it->SetJobGroup(this->shared_from_this());
}
} else {
this->thread.reset();
/* Of course this will hang a bit.
* On the other hand, if you want to play games which make this hang noticably
* on a platform without threads then you'll probably get other problems first.
* OK:
* If someone comes and tells me that this hangs for him/her, I'll implement a
* smaller grained "Step" method for all handlers and add some more ticks where
* "Step" is called. No problem in principle. */
LinkGraphJobGroup::Run(this);
}
}
void LinkGraphJobGroup::JoinThread() {
if (!this->thread || this->joined_thread) return;
this->thread->Join();
this->joined_thread = true;
}
/**
* Run all jobs for the given LinkGraphJobGroup. This method is tailored to
* ThreadObject::New.
* @param j Pointer to a LinkGraphJobGroup.
*/
/* static */ void LinkGraphJobGroup::Run(void *group)
{
LinkGraphJobGroup *job_group = (LinkGraphJobGroup *)group;
for (LinkGraphJob *job : job_group->jobs) {
LinkGraphSchedule::Run(job);
}
}
/* static */ void LinkGraphJobGroup::ExecuteJobSet(std::vector<JobInfo> jobs) {
const uint thread_budget = 200000;
std::sort(jobs.begin(), jobs.end(), [](const JobInfo &a, const JobInfo &b) {
return a.cost_estimate < b.cost_estimate;
});
std::vector<LinkGraphJob *> bucket;
uint bucket_cost = 0;
auto flush_bucket = [&]() {
if (!bucket_cost) return;
DEBUG(linkgraph, 2, "LinkGraphJobGroup::ExecuteJobSet: Creating Job Group: jobs: %zu, cost: %u", bucket.size(), bucket_cost);
auto group = std::make_shared<LinkGraphJobGroup>(constructor_token(), std::move(bucket));
group->SpawnThread();
bucket_cost = 0;
bucket.clear();
};
for (JobInfo &it : jobs) {
if (bucket_cost && (bucket_cost + it.cost_estimate > thread_budget)) flush_bucket();
bucket.push_back(it.job);
bucket_cost += it.cost_estimate;
}
flush_bucket();
}
LinkGraphJobGroup::JobInfo::JobInfo(LinkGraphJob *job) :
job(job), cost_estimate(job->Graph().CalculateCostEstimate()) { }
/** /**
* Pause the game if on the next _date_fract tick, we would do a join with the next * Pause the game if on the next _date_fract tick, we would do a join with the next
* link graph job, but it is still running. * link graph job, but it is still running.

View File

@@ -12,6 +12,7 @@
#ifndef LINKGRAPHSCHEDULE_H #ifndef LINKGRAPHSCHEDULE_H
#define LINKGRAPHSCHEDULE_H #define LINKGRAPHSCHEDULE_H
#include "../thread/thread.h"
#include "linkgraph.h" #include "linkgraph.h"
#include <memory> #include <memory>
@@ -80,4 +81,33 @@ public:
void Unqueue(LinkGraph *lg) { this->schedule.remove(lg); } void Unqueue(LinkGraph *lg) { this->schedule.remove(lg); }
}; };
class LinkGraphJobGroup : public std::enable_shared_from_this<LinkGraphJobGroup> {
friend LinkGraphJob;
private:
bool joined_thread = false; ///< True if thread has already been joined
std::unique_ptr<ThreadObject> thread; ///< Thread the job group is running in or NULL if it's running in the main thread.
const std::vector<LinkGraphJob *> jobs; ///< The set of jobs in this job set
private:
struct constructor_token { };
static void Run(void *group);
void SpawnThread();
void JoinThread();
public:
LinkGraphJobGroup(constructor_token token, std::vector<LinkGraphJob *> jobs);
struct JobInfo {
LinkGraphJob * job;
uint cost_estimate;
JobInfo(LinkGraphJob *job);
JobInfo(LinkGraphJob *job, uint cost_estimate) :
job(job), cost_estimate(cost_estimate) { }
};
static void ExecuteJobSet(std::vector<JobInfo> jobs);
};
#endif /* LINKGRAPHSCHEDULE_H */ #endif /* LINKGRAPHSCHEDULE_H */