35 namespace experimental {
39 void MultiSimulationManager::Log(
string s) {
40 Log::Info(
"MultiSimulationManager",
"[M]: ", s);
43 MultiSimulationManager::MultiSimulationManager(
44 int ws, Param *default_params,
45 std::function<
void(Param *, TimeSeries *)> simulate)
46 : worldsize_(ws), default_params_(default_params), simulate_(simulate) {
47 Log(
"Started Master process");
48 availability_.resize(ws);
52 void MultiSimulationManager::WriteTimingsToFile() {
54 myfile.open(
"timing_results.csv");
56 myfile <<
"worker_id,simulation_runtime,mpi_runtime" << std::endl;
57 for (
auto &t : timings_) {
58 myfile << worker <<
",";
59 myfile << t[
"SIMULATE"] <<
",";
60 myfile << t[
"MPI_CALL"] << std::endl;
64 Log(
"Timing results of all workers have been written to "
65 "timing_results.csv.");
68 MultiSimulationManager::~MultiSimulationManager() {
69 Log(
"Completed all tasks");
72 void MultiSimulationManager::IngestData(
const std::string &data_file) {}
75 void MultiSimulationManager::RecordTiming(
int worker, TimingAggregator *agg) {
76 timings_[worker] = *agg;
80 void MultiSimulationManager::KillAllWorkers() {
81 ForAllWorkers([&](
int worker) {
83 Timing t_mpi(
"MPI_CALL", &ta_);
84 MPI_Send(
nullptr, 0, MPI_INT, worker, Tag::kKill, MPI_COMM_WORLD);
90 void MultiSimulationManager::GetTimingsFromWorkers() {
91 ForAllWorkers([&](
int worker) {
95 Timing t_mpi(
"MPI_CALL", &ta_);
96 MPI_Recv(&size, 1, MPI_INT, MPI_ANY_SOURCE, Tag::kKill, MPI_COMM_WORLD,
99 TimingAggregator *agg = MPI_Recv_Obj_ROOT<TimingAggregator>(
100 size, status.MPI_SOURCE, Tag::kKill);
101 RecordTiming(status.MPI_SOURCE, agg);
105 int MultiSimulationManager::Start() {
107 Timing t_tot(
"TOTAL", &ta_);
110 MPI_Barrier(MPI_COMM_WORLD);
115 [&](
int worker) { ChangeStatusWorker(worker, Status::kAvail); });
117 auto dispatch_experiment =
118 L2F([&](Param *final_params, TimeSeries *result) {
121 if (worldsize_ == 1) {
122 simulate_(final_params, result);
124 auto worker = GetFirstAvailableWorker();
127 while (worker == -1) {
128 std::this_thread::sleep_for(std::chrono::milliseconds(100));
129 worker = GetFirstAvailableWorker();
134 Timing t_mpi(
"MPI_CALL", &ta_);
135 MPI_Send_Obj_ROOT(final_params, worker, Tag::kTask);
142 MPI_Recv(&size, 1, MPI_INT, worker, Tag::kResult, MPI_COMM_WORLD,
144 string msg =
"Receiving results from worker " +
145 to_string(status.MPI_SOURCE);
147 Timing t_mpi(
"MPI_CALL", &ta_);
148 TimeSeries *tmp_result =
149 MPI_Recv_Obj_ROOT<TimeSeries>(size, worker, Tag::kResult);
150 msg =
"Successfully received results from worker " +
151 to_string(status.MPI_SOURCE);
153 *result = *tmp_result;
157 ChangeStatusWorker(status.MPI_SOURCE, Status::kAvail);
164 OptimizationParam *opt_params = default_params_->Get<OptimizationParam>();
168 (*algorithm)(dispatch_experiment, default_params_);
170 dispatch_experiment(default_params_,
new TimeSeries());
174 GetTimingsFromWorkers();
178 RecordTiming(kMaster, &ta_);
181 WriteTimingsToFile();
188 int MultiSimulationManager::GetFirstAvailableWorker() {
192 auto it = std::find(begin(availability_), end(availability_),
true);
193 if (it != end(availability_)) {
194 ret = std::distance(begin(availability_), it);
195 ChangeStatusWorker(ret, Status::kBusy);
202 void MultiSimulationManager::ChangeStatusWorker(
int worker, Status s) {
203 std::stringstream msg;
204 msg <<
"Changing status of [W" << worker <<
"] to " << s;
206 availability_[worker] = s;
211 void MultiSimulationManager::ForAllWorkers(
212 const std::function<
void(
int w)> &lambda) {
213 for (
int i = 1; i < worldsize_; i++) {
219 Worker::Worker(
int myrank, std::function<
void(Param *, TimeSeries *)> simulate)
220 : myrank_(myrank), simulate_(simulate) {
225 string msg =
"Stopped (Completed " + to_string(task_count_) +
" tasks)";
229 int Worker::Start() {
230 Timing tot(
"TOTAL", &ta_);
233 MPI_Barrier(MPI_COMM_WORLD);
241 Timing t(
"MPI_CALL", &ta_);
242 MPI_Recv(&size, 1, MPI_INT, kMaster, MPI_ANY_TAG, MPI_COMM_WORLD,
247 switch (status.MPI_TAG) {
249 Param *params =
nullptr;
251 Timing t(
"MPI_CALL", &ta_);
252 params = MPI_Recv_Obj_ROOT<Param>(size, kMaster, Tag::kTask);
256 Timing sim(
"SIMULATE", &ta_);
257 simulate_(params, &result);
259 IncrementTaskCount();
261 Timing t(
"MPI_CALL", &ta_);
262 Log(
"Sending back results");
263 MPI_Send_Obj_ROOT(&result, kMaster, Tag::kResult);
269 MPI_Send_Obj_ROOT<TimingAggregator>(&ta_, kMaster, Tag::kKill);
272 Log(
"Received unknown message tag. Stopping...");
278 void Worker::IncrementTaskCount() { task_count_++; }