BioDynaMo  v1.05.119-a4ff3934
in_place_exec_ctxt.cc
Go to the documentation of this file.
1 // -----------------------------------------------------------------------------
2 //
3 // Copyright (C) 2021 CERN & University of Surrey for the benefit of the
4 // BioDynaMo collaboration. All Rights Reserved.
5 //
6 // Licensed under the Apache License, Version 2.0 (the "License");
7 // you may not use this file except in compliance with the License.
8 //
9 // See the LICENSE file distributed with this work for details.
10 // See the NOTICE file distributed with this work for additional information
11 // regarding copyright ownership.
12 //
13 // -----------------------------------------------------------------------------
14 
16 
17 #include <algorithm>
18 #include <mutex>
19 #include <utility>
20 
21 #include "core/agent/agent.h"
23 #include "core/functor.h"
24 #include "core/resource_manager.h"
25 #include "core/scheduler.h"
26 
27 namespace bdm {
28 
30  : batches_(nullptr) {
32 }
33 
35  DeleteOldCopies();
36  auto* non_atomic_batches = batches_.load();
37  if (non_atomic_batches != nullptr) {
38  for (uint64_t i = 0; i < num_batches_; ++i) {
39  delete non_atomic_batches[i];
40  }
41  delete[] non_atomic_batches;
42  }
43 }
44 
46  const AgentUid& uid,
48  value) {
49  auto index = uid.GetIndex();
50  auto bidx = index / kBatchSize;
51  if (bidx >= num_batches_) {
52  auto new_size =
53  std::max(index, static_cast<uint32_t>(num_batches_ * kBatchSize * 1.1));
54  Resize(new_size);
55  }
56 
57  auto el_idx = index % kBatchSize;
58  (*batches_.load()[bidx])[el_idx] = value;
59 }
60 
63  const AgentUid& uid) const {
65  auto index = uid.GetIndex();
66  auto bidx = index / kBatchSize;
67  if (bidx >= num_batches_) {
68  Log::Fatal("ThreadSafeAgentUidMap::operator[]",
69  Concat("AgentUid out of range access: AgentUid: ", uid,
70  ", ThreadSafeAgentUidMap max index ",
71  num_batches_ * kBatchSize));
72  return kDefault;
73  }
74 
75  auto el_idx = index % kBatchSize;
76  return (*batches_.load()[bidx])[el_idx];
77 }
78 
80  return num_batches_ * kBatchSize;
81 }
82 
85  auto new_num_batches = new_size / kBatchSize + 1;
86  std::lock_guard<Spinlock> guard(lock_);
87  if (new_num_batches >= num_batches_) {
88  auto bcopy = new Batch*[new_num_batches];
89  auto** non_atomic_batches = batches_.load();
90  for (uint64_t i = 0; i < new_num_batches; ++i) {
91  if (i < num_batches_) {
92  bcopy[i] = non_atomic_batches[i];
93  } else {
94  bcopy[i] = new Batch();
95  bcopy[i]->reserve(kBatchSize);
96  }
97  }
98  batches_.exchange(bcopy);
99  old_copies_.push_back(non_atomic_batches);
100  num_batches_ = new_num_batches;
101  }
102 }
103 
105  for (auto& entry : old_copies_) {
106  if (entry != nullptr) {
107  delete[] entry;
108  }
109  }
110  old_copies_.clear();
111 }
112 
114  const std::shared_ptr<ThreadSafeAgentUidMap>& map)
115  : new_agent_map_(map), tinfo_(ThreadInfo::GetInstance()) {
116  new_agents_.reserve(1e3);
118 }
119 
121  for (auto* agent : new_agents_) {
122  delete agent;
123  }
124 }
125 
127  const std::vector<ExecutionContext*>& all_exec_ctxts) {
128  // first iteration might have uncommitted changes
129  AddAgentsToRm(all_exec_ctxts);
130  RemoveAgentsFromRm(all_exec_ctxts);
131 }
132 
134  const std::vector<ExecutionContext*>& all_exec_ctxts) {
135  AddAgentsToRm(all_exec_ctxts);
136  RemoveAgentsFromRm(all_exec_ctxts);
137 
139  rm->EndOfIteration();
140 }
141 
143  const std::vector<ExecutionContext*>& all_exec_ctxts) {}
144 
146  const std::vector<ExecutionContext*>& all_exec_ctxts) {}
147 
149  Agent* agent, AgentHandle ah, const std::vector<Operation*>& operations) {
150  auto* env = Simulation::GetActive()->GetEnvironment();
151  auto* param = Simulation::GetActive()->GetParam();
152 
153  if (param->thread_safety_mechanism ==
154  Param::ThreadSafetyMechanism::kUserSpecified) {
155  while (true) {
156  critical_region_.clear();
157  critical_region_2_.clear();
158  locks_.clear();
160  // Sort such that the locks further down are acquired in a sorted order
161  // This technique avoids deadlocks.
162  std::sort(critical_region_.begin(), critical_region_.end());
163  // Remove all AgentPointers which correspond to a nullptr.
164  while (critical_region_.size() && critical_region_.back() == nullptr) {
165  critical_region_.pop_back();
166  }
167  // Remove all duplicate entries
168  critical_region_.erase(
169  std::unique(critical_region_.begin(), critical_region_.end()),
170  critical_region_.end());
171  for (auto aptr : critical_region_) {
172  locks_.push_back(aptr->GetLock());
173  }
174  for (uint64_t i = 0; i < locks_.size(); ++i) {
175  locks_[i]->lock();
176  }
178  // Sort such that the locks further down are acquired in a sorted order
179  // This technique avoids deadlocks.
180  std::sort(critical_region_2_.begin(), critical_region_2_.end());
181  // Remove all AgentPointers which correspond to a nullptr.
182  while (critical_region_2_.size() &&
183  critical_region_2_.back() == nullptr) {
184  critical_region_2_.pop_back();
185  }
186  // Remove all duplicate entries
187  critical_region_2_.erase(
188  std::unique(critical_region_2_.begin(), critical_region_2_.end()),
189  critical_region_2_.end());
190  // if the critical regions are not the same, then another thread
191  // changed it before the locks were acquired. In this case we have to
192  // try again. Otherwise we can leave the while loop.
194  break;
195  }
196  for (int i = locks_.size() - 1; i >= 0; --i) {
197  locks_[i]->unlock();
198  }
199  }
200  neighbor_cache_.clear();
202  for (auto& op : operations) {
203  (*op)(agent);
204  }
205  for (int i = locks_.size() - 1; i >= 0; --i) {
206  locks_[i]->unlock();
207  }
208  } else if (param->thread_safety_mechanism ==
209  Param::ThreadSafetyMechanism::kAutomatic) {
210  auto* nb_mutex_builder = env->GetNeighborMutexBuilder();
211  auto* mutex = nb_mutex_builder->GetMutex(agent->GetBoxIdx());
212  std::lock_guard<decltype(*mutex)> guard(*mutex);
213  neighbor_cache_.clear();
215  for (auto* op : operations) {
216  (*op)(agent);
217  }
218  } else if (param->thread_safety_mechanism ==
219  Param::ThreadSafetyMechanism::kNone) {
220  neighbor_cache_.clear();
222  for (auto* op : operations) {
223  (*op)(agent);
224  }
225  } else {
226  Log::Fatal("InPlaceExecutionContext::Execute",
227  "Invalid value for parameter thread_safety_mechanism: ",
228  param->thread_safety_mechanism);
229  }
230 }
231 
233  new_agents_.push_back(new_agent);
234  new_agent_map_->Insert(new_agent->GetUid(), new_agent);
235 }
236 
238  real_t query_squared_radius) const {
239  if (!cache_neighbors_) {
240  return false;
241  }
242 
243  // A neighbor cache is valid when the cached search radius was greater or
244  // equal than the current search radius (i.e. all the agents within the
245  // current neighbor cache are at least in the cache).
246  // The agents that are between the cached and the current search radius must
247  // be filtered out hereafter
248  return query_squared_radius <= cached_squared_search_radius_;
249 }
250 
252  const Agent& query,
253  void* criteria) {
254  // forward call to env and populate cache
255  auto* env = Simulation::GetActive()->GetEnvironment();
256  auto for_each = L2F([&](Agent* agent) { lambda(agent); });
257  env->ForEachNeighbor(for_each, query, criteria);
258 }
259 
261  Functor<void, Agent*, real_t>& lambda, const Agent& query,
262  real_t squared_radius) {
263  // use values in cache
264  if (IsNeighborCacheValid(squared_radius)) {
265  for (auto& pair : neighbor_cache_) {
266  if (pair.second < squared_radius) {
267  lambda(pair.first, pair.second);
268  }
269  }
270  return;
271  }
272 
273  auto* param = Simulation::GetActive()->GetParam();
274  auto* env = Simulation::GetActive()->GetEnvironment();
275 
276  // Store the search radius to check validity of cache in consecutive use of
277  // ForEachNeighbor
278  cached_squared_search_radius_ = squared_radius;
279 
280  // Populate the cache and execute the lambda for each neighbor
281  auto for_each = L2F([&](Agent* agent, real_t squared_distance) {
282  if (param->cache_neighbors) {
283  neighbor_cache_.push_back(std::make_pair(agent, squared_distance));
284  }
285  lambda(agent, squared_distance);
286  });
287  env->ForEachNeighbor(for_each, query, squared_radius);
288 }
289 
291  Functor<void, Agent*, real_t>& lambda, const Real3& query_position,
292  real_t squared_radius) {
293  auto for_each = L2F([&](Agent* agent, real_t squared_distance) {
294  lambda(agent, squared_distance);
295  });
296  auto* env = Simulation::GetActive()->GetEnvironment();
297  env->ForEachNeighbor(for_each, query_position, squared_radius);
298 }
299 
301  auto* sim = Simulation::GetActive();
302  auto* rm = sim->GetResourceManager();
303  auto* agent = rm->GetAgent(uid);
304  if (agent != nullptr) {
305  return agent;
306  }
307 
308  return (*new_agent_map_)[uid];
309 }
310 
312  return GetAgent(uid);
313 }
314 
316  remove_.push_back(uid);
317 }
318 
320  const std::vector<ExecutionContext*>& all_exec_ctxts) {
321  // group execution contexts by numa domain
322  std::vector<uint64_t> new_agent_per_numa(tinfo_->GetNumaNodes());
323  std::vector<uint64_t> thread_offsets(tinfo_->GetMaxThreads());
324 
325  for (int tid = 0; tid < tinfo_->GetMaxThreads(); ++tid) {
326  auto* ctxt = bdm_static_cast<InPlaceExecutionContext*>(all_exec_ctxts[tid]);
327  int nid = tinfo_->GetNumaNode(tid);
328  thread_offsets[tid] = new_agent_per_numa[nid];
329  new_agent_per_numa[nid] += ctxt->new_agents_.size();
330  }
331 
332  // reserve enough memory in ResourceManager
333  std::vector<uint64_t> numa_offsets(tinfo_->GetNumaNodes());
335  rm->ResizeAgentUidMap();
336  for (unsigned n = 0; n < new_agent_per_numa.size(); n++) {
337  numa_offsets[n] = rm->GrowAgentContainer(new_agent_per_numa[n], n);
338  }
339 
340 // add new_agents_ to the ResourceManager in parallel
341 #pragma omp parallel for schedule(static, 1)
342  for (int i = 0; i < tinfo_->GetMaxThreads(); i++) {
343  auto* ctxt = bdm_static_cast<InPlaceExecutionContext*>(all_exec_ctxts[i]);
344  int nid = tinfo_->GetNumaNode(i);
345  uint64_t offset = thread_offsets[i] + numa_offsets[nid];
346  rm->AddAgents(nid, offset, ctxt->new_agents_);
347  ctxt->new_agents_.clear();
348  }
349 
350  new_agent_map_->DeleteOldCopies();
351  if (rm->GetNumAgents() > new_agent_map_->Size()) {
352  new_agent_map_->Resize(rm->GetNumAgents() * 1.5);
353  }
354 }
355 
357  const std::vector<ExecutionContext*>& all_exec_ctxts) {
358  std::vector<decltype(remove_)*> all_remove(tinfo_->GetMaxThreads());
359 
360  auto num_removals = 0;
361  for (int i = 0; i < tinfo_->GetMaxThreads(); i++) {
362  auto* ctxt = bdm_static_cast<InPlaceExecutionContext*>(all_exec_ctxts[i]);
363  all_remove[i] = &ctxt->remove_;
364  num_removals += ctxt->remove_.size();
365  }
366 
367  if (num_removals != 0) {
369  rm->RemoveAgents(all_remove);
370 
371  for (int i = 0; i < tinfo_->GetMaxThreads(); i++) {
372  auto* ctxt = bdm_static_cast<InPlaceExecutionContext*>(all_exec_ctxts[i]);
373  ctxt->remove_.clear();
374  }
375  }
376 }
377 // TODO(lukas) Add tests for caching mechanism in ForEachNeighbor*
378 
379 } // namespace bdm
in_place_exec_ctxt.h
bdm::InPlaceExecutionContext::new_agents_
std::vector< Agent * > new_agents_
Pointer to new agents.
Definition: in_place_exec_ctxt.h:145
bdm::InPlaceExecutionContext::remove_
std::vector< AgentUid > remove_
Definition: in_place_exec_ctxt.h:149
bdm::InPlaceExecutionContext::AddAgent
void AddAgent(Agent *new_agent) override
Adds the agent to the simulation (threadsafe, takes ownership). Note that we avoid the use of smart p...
Definition: in_place_exec_ctxt.cc:232
bdm::ResourceManager::EndOfIteration
virtual void EndOfIteration()
Definition: resource_manager.h:426
bdm::InPlaceExecutionContext::AddAgentsToRm
virtual void AddAgentsToRm(const std::vector< ExecutionContext * > &all_exec_ctxts)
Definition: in_place_exec_ctxt.cc:319
bdm::InPlaceExecutionContext::ThreadSafeAgentUidMap::Resize
void Resize(uint64_t new_size)
Definition: in_place_exec_ctxt.cc:83
bdm::ResourceManager::ResizeAgentUidMap
void ResizeAgentUidMap()
Definition: resource_manager.h:414
bdm
Definition: agent.cc:39
bdm::ResourceManager::RemoveAgents
void RemoveAgents(const std::vector< std::vector< AgentUid > * > &uids)
Definition: resource_manager.cc:339
bdm::InPlaceExecutionContext::cache_neighbors_
bool cache_neighbors_
Cache the value of Param::cache_neighbors.
Definition: in_place_exec_ctxt.h:158
bdm::ThreadInfo::GetNumaNodes
int GetNumaNodes() const
Returns the number of NUMA nodes on this machine.
Definition: thread_info.h:48
bdm::InPlaceExecutionContext::ThreadSafeAgentUidMap::operator[]
const value_type & operator[](const AgentUid &key) const
Definition: in_place_exec_ctxt.cc:62
bdm::real_t
double real_t
Definition: real_t.h:21
bdm::Param::cache_neighbors
bool cache_neighbors
Definition: param.h:480
scheduler.h
bdm::Agent::GetUid
const AgentUid & GetUid() const
Definition: agent.cc:123
bdm::InPlaceExecutionContext::cached_squared_search_radius_
real_t cached_squared_search_radius_
The radius that was used to cache neighbors in neighbor_cache_
Definition: in_place_exec_ctxt.h:156
bdm::InPlaceExecutionContext::tinfo_
ThreadInfo * tinfo_
Definition: in_place_exec_ctxt.h:142
bdm::InPlaceExecutionContext::Execute
void Execute(Agent *agent, AgentHandle ah, const std::vector< Operation * > &operations) override
Definition: in_place_exec_ctxt.cc:148
bdm::L2F
LambdaFunctor< decltype(&TLambda::operator())> L2F(const TLambda &l)
Definition: functor.h:110
bdm::Agent
Contains code required by all agents.
Definition: agent.h:79
bdm::InPlaceExecutionContext::SetupIterationAll
void SetupIterationAll(const std::vector< ExecutionContext * > &all_exec_ctxts) override
Definition: in_place_exec_ctxt.cc:126
bdm::InPlaceExecutionContext::ThreadSafeAgentUidMap::Batch
std::vector< value_type > Batch
Definition: in_place_exec_ctxt.h:58
bdm::InPlaceExecutionContext::ThreadSafeAgentUidMap::DeleteOldCopies
void DeleteOldCopies()
Definition: in_place_exec_ctxt.cc:104
bdm::InPlaceExecutionContext::InPlaceExecutionContext
InPlaceExecutionContext(const std::shared_ptr< ThreadSafeAgentUidMap > &map)
Definition: in_place_exec_ctxt.cc:113
bdm::InPlaceExecutionContext::TearDownIterationAll
void TearDownIterationAll(const std::vector< ExecutionContext * > &all_exec_ctxts) override
Definition: in_place_exec_ctxt.cc:133
bdm::InPlaceExecutionContext::neighbor_cache_
std::vector< std::pair< Agent *, real_t > > neighbor_cache_
Definition: in_place_exec_ctxt.h:154
bdm::Functor< void, Agent * >
bdm::InPlaceExecutionContext::ThreadSafeAgentUidMap::ThreadSafeAgentUidMap
ThreadSafeAgentUidMap()
Definition: in_place_exec_ctxt.cc:29
bdm::InPlaceExecutionContext::ForEachNeighbor
void ForEachNeighbor(Functor< void, Agent * > &lambda, const Agent &query, void *criteria) override
Definition: in_place_exec_ctxt.cc:251
bdm::Environment::ForEachNeighbor
virtual void ForEachNeighbor(Functor< void, Agent *, real_t > &lambda, const Agent &query, real_t squared_radius)=0
bdm::Simulation::GetResourceManager
ResourceManager * GetResourceManager()
Returns the ResourceManager instance.
Definition: simulation.cc:244
bdm::InPlaceExecutionContext::ThreadSafeAgentUidMap::Insert
void Insert(const AgentUid &uid, const value_type &value)
Definition: in_place_exec_ctxt.cc:45
bdm::InPlaceExecutionContext::ThreadSafeAgentUidMap::kBatchSize
constexpr static uint64_t kBatchSize
Definition: in_place_exec_ctxt.h:69
bdm::InPlaceExecutionContext::IsNeighborCacheValid
bool IsNeighborCacheValid(real_t query_squared_radius) const
Definition: in_place_exec_ctxt.cc:237
bdm::AgentUid::GetIndex
Index_t GetIndex() const
Definition: agent_uid.h:42
bdm::InPlaceExecutionContext::TearDownAgentOpsAll
void TearDownAgentOpsAll(const std::vector< ExecutionContext * > &all_exec_ctxts) override
Definition: in_place_exec_ctxt.cc:145
bdm::InPlaceExecutionContext::~InPlaceExecutionContext
~InPlaceExecutionContext() override
Definition: in_place_exec_ctxt.cc:120
bdm::Concat
std::string Concat(const Args &... parts)
Concatenates all arguments into a string. Equivalent to streaming all arguments into a stringstream a...
Definition: string.h:70
agent.h
bdm::Log::Fatal
static void Fatal(const std::string &location, const Args &... parts)
Prints fatal error message.
Definition: log.h:115
bdm::InPlaceExecutionContext::ThreadSafeAgentUidMap::Size
uint64_t Size() const
Definition: in_place_exec_ctxt.cc:79
bdm::InPlaceExecutionContext::RemoveAgent
void RemoveAgent(const AgentUid &uid) override
Definition: in_place_exec_ctxt.cc:315
bdm::Agent::GetBoxIdx
uint32_t GetBoxIdx() const
Definition: agent.cc:125
bdm::InPlaceExecutionContext::ThreadSafeAgentUidMap::~ThreadSafeAgentUidMap
~ThreadSafeAgentUidMap()
Definition: in_place_exec_ctxt.cc:34
bdm::InPlaceExecutionContext::critical_region_
std::vector< AgentPointer<> > critical_region_
Used to determine which agents must not be updated from different threads.
Definition: in_place_exec_ctxt.h:173
bdm::AgentUid
Definition: agent_uid.h:25
bdm::InPlaceExecutionContext::locks_
std::vector< Spinlock * > locks_
Definition: in_place_exec_ctxt.h:177
bdm::Simulation::GetEnvironment
Environment * GetEnvironment()
Definition: simulation.cc:260
environment.h
bdm::Simulation::GetParam
const Param * GetParam() const
Returns the simulation parameters.
Definition: simulation.cc:254
bdm::ThreadInfo::GetMaxThreads
int GetMaxThreads() const
Return the maximum number of threads.
Definition: thread_info.h:66
bdm::InPlaceExecutionContext::critical_region_2_
std::vector< AgentPointer<> > critical_region_2_
Used to determine which agents must not be updated from different threads.
Definition: in_place_exec_ctxt.h:175
bdm::MathArray< real_t, 3 >
bdm::InPlaceExecutionContext::GetAgent
Agent * GetAgent(const AgentUid &uid) override
Definition: in_place_exec_ctxt.cc:300
resource_manager.h
bdm::Simulation::GetActive
static Simulation * GetActive()
This function returns the currently active Simulation simulation.
Definition: simulation.cc:68
bdm::InPlaceExecutionContext::GetConstAgent
const Agent * GetConstAgent(const AgentUid &uid) override
Definition: in_place_exec_ctxt.cc:311
bdm::InPlaceExecutionContext::RemoveAgentsFromRm
virtual void RemoveAgentsFromRm(const std::vector< ExecutionContext * > &all_exec_ctxts)
Definition: in_place_exec_ctxt.cc:356
bdm::InPlaceExecutionContext::new_agent_map_
std::shared_ptr< ThreadSafeAgentUidMap > new_agent_map_
Lookup table AgentUid -> AgentPointer for new created agents.
Definition: in_place_exec_ctxt.h:140
functor.h
bdm::ThreadInfo
This class stores information about each thread. (e.g. to which NUMA node it belongs to....
Definition: thread_info.h:31
bdm::ThreadInfo::GetNumaNode
int GetNumaNode(int omp_thread_id) const
Returns the numa node the given openmp thread is bound to.
Definition: thread_info.h:51
bdm::Agent::CriticalRegion
virtual void CriticalRegion(std::vector< AgentPointer<>> *aptrs)
Definition: agent.h:170
bdm::AgentHandle
Definition: agent_handle.h:29
bdm::InPlaceExecutionContext::SetupAgentOpsAll
void SetupAgentOpsAll(const std::vector< ExecutionContext * > &all_exec_ctxts) override
Definition: in_place_exec_ctxt.cc:142