BioDynaMo  v1.05.119-a4ff3934
multi_simulation_manager.cc
Go to the documentation of this file.
1 // -----------------------------------------------------------------------------
2 //
3 // Copyright (C) 2021 CERN & University of Surrey for the benefit of the
4 // BioDynaMo collaboration. All Rights Reserved.
5 //
6 // Licensed under the Apache License, Version 2.0 (the "License");
7 // you may not use this file except in compliance with the License.
8 //
9 // See the LICENSE file distributed with this work for details.
10 // See the NOTICE file distributed with this work for additional information
11 // regarding copyright ownership.
12 //
13 // -----------------------------------------------------------------------------
14 
15 #ifdef USE_MPI
16 
17 #include <thread>
18 
19 #include "mpi.h"
20 
21 #include "core/functor.h"
25 #include "core/scheduler.h"
26 #include "core/util/timing.h"
27 
28 using std::cout;
29 using std::endl;
30 using std::string;
31 using std::to_string;
32 using std::vector;
33 
34 namespace bdm {
35 namespace experimental {
36 
39 void MultiSimulationManager::Log(string s) {
40  Log::Info("MultiSimulationManager", "[M]: ", s);
41 }
42 
43 MultiSimulationManager::MultiSimulationManager(
44  int ws, Param *default_params,
45  std::function<void(Param *, TimeSeries *)> simulate)
46  : worldsize_(ws), default_params_(default_params), simulate_(simulate) {
47  Log("Started Master process");
48  availability_.resize(ws);
49  timings_.resize(ws);
50 }
51 
52 void MultiSimulationManager::WriteTimingsToFile() {
53  std::ofstream myfile;
54  myfile.open("timing_results.csv");
55  int worker = 0;
56  myfile << "worker_id,simulation_runtime,mpi_runtime" << std::endl;
57  for (auto &t : timings_) {
58  myfile << worker << ",";
59  myfile << t["SIMULATE"] << ",";
60  myfile << t["MPI_CALL"] << std::endl;
61  worker++;
62  }
63  myfile.close();
64  Log("Timing results of all workers have been written to "
65  "timing_results.csv.");
66 }
67 
68 MultiSimulationManager::~MultiSimulationManager() {
69  Log("Completed all tasks");
70 }
71 
72 void MultiSimulationManager::IngestData(const std::string &data_file) {}
73 
74 // Copy the timing results of the specified worker
75 void MultiSimulationManager::RecordTiming(int worker, TimingAggregator *agg) {
76  timings_[worker] = *agg;
77 }
78 
79 // Send kill message to all workers
80 void MultiSimulationManager::KillAllWorkers() {
81  ForAllWorkers([&](int worker) {
82  {
83  Timing t_mpi("MPI_CALL", &ta_);
84  MPI_Send(nullptr, 0, MPI_INT, worker, Tag::kKill, MPI_COMM_WORLD);
85  }
86  });
87 }
88 
89 // Receive timing objects of all workers
90 void MultiSimulationManager::GetTimingsFromWorkers() {
91  ForAllWorkers([&](int worker) {
92  int size;
93  MPI_Status status;
94  {
95  Timing t_mpi("MPI_CALL", &ta_);
96  MPI_Recv(&size, 1, MPI_INT, MPI_ANY_SOURCE, Tag::kKill, MPI_COMM_WORLD,
97  &status);
98  }
99  TimingAggregator *agg = MPI_Recv_Obj_ROOT<TimingAggregator>(
100  size, status.MPI_SOURCE, Tag::kKill);
101  RecordTiming(status.MPI_SOURCE, agg);
102  });
103 }
104 
105 int MultiSimulationManager::Start() {
106  {
107  Timing t_tot("TOTAL", &ta_);
108 
109  // Wait for all workers to reach this barrier
110  MPI_Barrier(MPI_COMM_WORLD);
111 
112  // Change status of all workers to 'available' after barrier has been
113  // reached
114  ForAllWorkers(
115  [&](int worker) { ChangeStatusWorker(worker, Status::kAvail); });
116 
117  auto dispatch_experiment =
118  L2F([&](Param *final_params, TimeSeries *result) {
119  // If there is only one MPI process, the master performs the
120  // simulation
121  if (worldsize_ == 1) {
122  simulate_(final_params, result);
123  } else { // Otherwise we dispatch the work to the worker(s)
124  auto worker = GetFirstAvailableWorker();
125 
126  // If there is no available worker, wait for one to finish
127  while (worker == -1) {
128  std::this_thread::sleep_for(std::chrono::milliseconds(100));
129  worker = GetFirstAvailableWorker();
130  }
131 
132  // Send parameters to worker
133  {
134  Timing t_mpi("MPI_CALL", &ta_);
135  MPI_Send_Obj_ROOT(final_params, worker, Tag::kTask);
136  }
137 
138  // Wait for results
139  MPI_Status status;
140  {
141  int size;
142  MPI_Recv(&size, 1, MPI_INT, worker, Tag::kResult, MPI_COMM_WORLD,
143  &status);
144  string msg = "Receiving results from worker " +
145  to_string(status.MPI_SOURCE);
146  Log(msg);
147  Timing t_mpi("MPI_CALL", &ta_);
148  TimeSeries *tmp_result =
149  MPI_Recv_Obj_ROOT<TimeSeries>(size, worker, Tag::kResult);
150  msg = "Successfully received results from worker " +
151  to_string(status.MPI_SOURCE);
152  Log(msg);
153  *result = *tmp_result;
154  delete tmp_result;
155  }
156 
157  ChangeStatusWorker(status.MPI_SOURCE, Status::kAvail);
158  }
159  });
160 
161  // From default_params read out the OptimizationParam section to
162  // determine the algorithm type: e.g. ParameterSweep, Differential
163  // Evolution, Particle Swarm Optimization
164  OptimizationParam *opt_params = default_params_->Get<OptimizationParam>();
165  auto algorithm = CreateOptimizationAlgorithm(opt_params);
166 
167  if (algorithm) {
168  (*algorithm)(dispatch_experiment, default_params_);
169  } else {
170  dispatch_experiment(default_params_, new TimeSeries());
171  }
172 
173  KillAllWorkers();
174  GetTimingsFromWorkers();
175  }
176 
177  // Record master's timing
178  RecordTiming(kMaster, &ta_);
179 
180  // Write all timing info to file
181  WriteTimingsToFile();
182 
183  return 0;
184 }
185 
186 // Returns the ID of the first available worker in the list. Returns -1 if
187 // there is no available worker.
188 int MultiSimulationManager::GetFirstAvailableWorker() {
189  int ret = -1;
190 #pragma omp critical
191  {
192  auto it = std::find(begin(availability_), end(availability_), true);
193  if (it != end(availability_)) {
194  ret = std::distance(begin(availability_), it);
195  ChangeStatusWorker(ret, Status::kBusy);
196  }
197  }
198  return ret;
199 }
200 
201 // Changes the status of a worker
202 void MultiSimulationManager::ChangeStatusWorker(int worker, Status s) {
203  std::stringstream msg;
204  msg << "Changing status of [W" << worker << "] to " << s;
205  Log(msg.str());
206  availability_[worker] = s;
207 }
208 
209 // Executes the specified function for all workers. Starting from index 1,
210 // because 0 is the master's ID.
211 void MultiSimulationManager::ForAllWorkers(
212  const std::function<void(int w)> &lambda) {
213  for (int i = 1; i < worldsize_; i++) {
214  lambda(i);
215  }
216 }
217 
219 Worker::Worker(int myrank, std::function<void(Param *, TimeSeries *)> simulate)
220  : myrank_(myrank), simulate_(simulate) {
221  Log("Started");
222 }
223 
224 Worker::~Worker() {
225  string msg = "Stopped (Completed " + to_string(task_count_) + " tasks)";
226  Log(msg);
227 }
228 
229 int Worker::Start() {
230  Timing tot("TOTAL", &ta_);
231 
232  // Wait for all MPI processes to reach this barrier
233  MPI_Barrier(MPI_COMM_WORLD);
234 
235  while (true) {
236  MPI_Status status;
237  int size;
238  // Receive the command type. If the command is a task, we use the `size`
239  // argument as the size of the object we're about to receive
240  {
241  Timing t("MPI_CALL", &ta_);
242  MPI_Recv(&size, 1, MPI_INT, kMaster, MPI_ANY_TAG, MPI_COMM_WORLD,
243  &status);
244  }
245 
246  // The tag tells us what kind of message we received from Master
247  switch (status.MPI_TAG) {
248  case Tag::kTask: {
249  Param *params = nullptr;
250  {
251  Timing t("MPI_CALL", &ta_);
252  params = MPI_Recv_Obj_ROOT<Param>(size, kMaster, Tag::kTask);
253  }
254  TimeSeries result;
255  {
256  Timing sim("SIMULATE", &ta_);
257  simulate_(params, &result);
258  }
259  IncrementTaskCount();
260  {
261  Timing t("MPI_CALL", &ta_);
262  Log("Sending back results");
263  MPI_Send_Obj_ROOT(&result, kMaster, Tag::kResult);
264  }
265  break;
266  }
267  case Tag::kKill:
268  // Send back the timing results to the master for writing to file
269  MPI_Send_Obj_ROOT<TimingAggregator>(&ta_, kMaster, Tag::kKill);
270  return 0;
271  default:
272  Log("Received unknown message tag. Stopping...");
273  return 1;
274  }
275  }
276 }
277 
278 void Worker::IncrementTaskCount() { task_count_++; }
279 
280 } // namespace experimental
281 } // namespace bdm
282 
283 #endif // USE_MPI
timing.h
bdm
Definition: agent.cc:39
optimization_param.h
mpi_helper.h
scheduler.h
bdm::L2F
LambdaFunctor< decltype(&TLambda::operator())> L2F(const TLambda &l)
Definition: functor.h:110
bdm::Log::Info
static void Info(const std::string &location, const Args &... parts)
Prints information message.
Definition: log.h:55
multi_simulation_manager.h
functor.h
bdm::experimental::CreateOptimizationAlgorithm
Algorithm * CreateOptimizationAlgorithm(OptimizationParam *opt_params)
Definition: algorithm_registry.h:71