BioDynaMo  v1.05.119-a4ff3934
reduce.h
Go to the documentation of this file.
1 // -----------------------------------------------------------------------------
2 //
3 // Copyright (C) 2021 CERN & University of Surrey for the benefit of the
4 // BioDynaMo collaboration. All Rights Reserved.
5 //
6 // Licensed under the Apache License, Version 2.0 (the "License");
7 // you may not use this file except in compliance with the License.
8 //
9 // See the LICENSE file distributed with this work for details.
10 // See the NOTICE file distributed with this work for additional information
11 // regarding copyright ownership.
12 //
13 // -----------------------------------------------------------------------------
14 
15 #ifndef CORE_ANALYSIS_REDUCE_H_
16 #define CORE_ANALYSIS_REDUCE_H_
17 
18 #include <array>
19 #include <vector>
20 
21 #include "core/agent/agent.h"
23 #include "core/functor.h"
25 #include "core/resource_manager.h"
26 #include "core/util/thread_info.h"
27 
28 namespace bdm {
29 namespace experimental {
30 
31 // -----------------------------------------------------------------------------
37 template <typename TResult>
38 struct Reducer : public Functor<void, Agent*> {
39  ~Reducer() override = default;
40  virtual TResult GetResult() = 0;
42  virtual void Reset() = 0;
43  virtual Reducer* NewCopy() const = 0;
45 };
46 
47 // -----------------------------------------------------------------------------
75 template <typename T, typename TResult = T>
76 class GenericReducer : public Reducer<TResult> {
77  public:
79 
80  GenericReducer(void(agent_function)(Agent*, T*),
81  T (*reduce_partial_results)(const SharedData<T>&),
82  bool (*filter)(Agent*) = nullptr,
83  TResult (*post_process)(TResult) = nullptr)
84  : agent_function_(agent_function),
85  reduce_partial_results_(reduce_partial_results),
86  filter_(filter),
87  post_process_(post_process) {
88  Reset();
89  tl_results_.resize(ThreadInfo::GetInstance()->GetMaxThreads());
90  for (auto& el : tl_results_) {
91  el = T();
92  }
93  }
94 
95  virtual ~GenericReducer() = default;
96 
97  void operator()(Agent* agent) override {
98  if (!filter_ || (filter_(agent))) {
99  auto tid = ThreadInfo::GetInstance()->GetMyThreadId();
100  agent_function_(agent, &(tl_results_[tid]));
101  }
102  }
103 
104  void Reset() override {
105  tl_results_.resize(ThreadInfo::GetInstance()->GetMaxThreads());
106  for (auto& el : tl_results_) {
107  el = T();
108  }
109  }
110 
111  TResult GetResult() override {
112  auto combined = static_cast<TResult>(reduce_partial_results_(tl_results_));
113  if (post_process_) {
114  return post_process_(combined);
115  }
116  return combined;
117  }
118 
119  Reducer<TResult>* NewCopy() const override {
120  return new GenericReducer(*this);
121  }
122 
123  private:
125  void (*agent_function_)(Agent*, T*) = nullptr;
126  T (*reduce_partial_results_)(const SharedData<T>&) = nullptr;
127  bool (*filter_)(Agent*) = nullptr;
128  TResult (*post_process_)(TResult) = nullptr;
130 };
131 
132 // The following custom streamer should be visible to rootcling for dictionary
133 // generation, but not to the interpreter!
134 #if (!defined(__CLING__) || defined(__ROOTCLING__)) && defined(USE_DICT)
135 
136 // The custom streamer is needed because ROOT can't stream function pointers
137 // by default.
138 template <typename T, typename TResult>
139 inline void GenericReducer<T, TResult>::Streamer(TBuffer& R__b) {
140  if (R__b.IsReading()) {
141  R__b.ReadClassBuffer(GenericReducer::Class(), this);
142  Long64_t l;
143  R__b.ReadLong64(l);
144  this->agent_function_ = reinterpret_cast<void (*)(Agent*, T*)>(l);
145  R__b.ReadLong64(l);
146  this->reduce_partial_results_ =
147  reinterpret_cast<T (*)(const SharedData<T>&)>(l);
148  R__b.ReadLong64(l);
149  this->filter_ = reinterpret_cast<bool (*)(Agent*)>(l);
150  R__b.ReadLong64(l);
151  this->post_process_ = reinterpret_cast<TResult (*)(TResult)>(l);
152  } else {
153  R__b.WriteClassBuffer(GenericReducer::Class(), this);
154  Long64_t l = reinterpret_cast<Long64_t>(this->agent_function_);
155  R__b.WriteLong64(l);
156  l = reinterpret_cast<Long64_t>(this->reduce_partial_results_);
157  R__b.WriteLong64(l);
158  l = reinterpret_cast<Long64_t>(this->filter_);
159  R__b.WriteLong64(l);
160  l = reinterpret_cast<Long64_t>(this->post_process_);
161  R__b.WriteLong64(l);
162  }
163 }
164 
165 #endif // !defined(__CLING__) || defined(__ROOTCLING__)
166 
182 template <typename T>
183 inline T Reduce(Simulation* sim, Functor<void, Agent*, T*>& agent_functor,
184  Functor<T, const SharedData<T>&>& reduce_partial_results,
185  Functor<bool, Agent*>* filter = nullptr) {
186  // The thread-local (partial) results
187  SharedData<T> tl_results;
188  // initialize thread local data
189  tl_results.resize(ThreadInfo::GetInstance()->GetMaxThreads());
190  for (auto& el : tl_results) {
191  el = T();
192  }
193 
194  // reduce
195  // execute agent functor in parallel
196  auto actual_agent_func = L2F([&](Agent* agent, AgentHandle) {
197  auto tid = ThreadInfo::GetInstance()->GetMyThreadId();
198  agent_functor(agent, &(tl_results[tid]));
199  });
200  auto* rm = sim->GetResourceManager();
201  rm->ForEachAgentParallel(actual_agent_func, filter);
202  // combine thread-local results
203  return reduce_partial_results(tl_results);
204 }
205 
206 // -----------------------------------------------------------------------------
234 template <typename TResult = uint64_t>
235 struct Counter : public Reducer<TResult> {
236  public:
238  Counter() { Reset(); }
239 
240  Counter(bool (*condition)(Agent*), TResult (*post_process)(TResult) = nullptr)
241  : condition_(condition), post_process_(post_process) {
242  Reset();
243  }
244 
245  virtual ~Counter() = default;
246 
247  void Reset() override {
248  tl_results_.resize(ThreadInfo::GetInstance()->GetMaxThreads());
249  for (auto& el : tl_results_) {
250  el = 0u;
251  }
252  }
253 
254  void operator()(Agent* agent) override {
255  if (condition_(agent)) {
256  auto tid = ThreadInfo::GetInstance()->GetMyThreadId();
257  this->tl_results_[tid]++;
258  }
259  }
260 
261  TResult GetResult() override {
263  auto combined = static_cast<TResult>(sum(this->tl_results_));
264  if (post_process_) {
265  return post_process_(combined);
266  }
267  return combined;
268  }
269 
270  Reducer<TResult>* NewCopy() const override { return new Counter(*this); }
271 
272  private:
274  bool (*condition_)(Agent*) = nullptr;
275  TResult (*post_process_)(TResult) = nullptr;
277 };
278 
279 // The following custom streamer should be visible to rootcling for dictionary
280 // generation, but not to the interpreter!
281 #if (!defined(__CLING__) || defined(__ROOTCLING__)) && defined(USE_DICT)
282 
283 // The custom streamer is needed because ROOT can't stream function pointers
284 // by default.
285 template <typename TResult>
286 inline void Counter<TResult>::Streamer(TBuffer& R__b) {
287  if (R__b.IsReading()) {
288  R__b.ReadClassBuffer(Counter::Class(), this);
289  Long64_t l;
290  R__b.ReadLong64(l);
291  this->condition_ = reinterpret_cast<bool (*)(Agent*)>(l);
292  R__b.ReadLong64(l);
293  this->post_process_ = reinterpret_cast<TResult (*)(TResult)>(l);
294  } else {
295  R__b.WriteClassBuffer(Counter::Class(), this);
296  Long64_t l = reinterpret_cast<Long64_t>(this->condition_);
297  R__b.WriteLong64(l);
298  l = reinterpret_cast<Long64_t>(this->post_process_);
299  R__b.WriteLong64(l);
300  }
301 }
302 
303 #endif // !defined(__CLING__) || defined(__ROOTCLING__)
304 
317 inline uint64_t Count(Simulation* sim, Functor<bool, Agent*>& condition,
318  Functor<bool, Agent*>* filter = nullptr) {
319  // The thread-local (partial) results
320  SharedData<uint64_t> tl_results;
321  // initialize thread local data
322  tl_results.resize(ThreadInfo::GetInstance()->GetMaxThreads());
323  for (auto& el : tl_results) {
324  el = 0;
325  }
326 
327  // reduce
328  // execute agent functor in parallel
329  auto actual_agent_func = L2F([&](Agent* agent, AgentHandle) {
330  auto tid = ThreadInfo::GetInstance()->GetMyThreadId();
331  if (condition(agent)) {
332  tl_results[tid]++;
333  }
334  });
335  auto* rm = sim->GetResourceManager();
336  rm->ForEachAgentParallel(actual_agent_func, filter);
337  // combine thread-local results
339  return sum(tl_results);
340 }
341 
342 } // namespace experimental
343 } // namespace bdm
344 
345 #endif // CORE_ANALYSIS_REDUCE_H_
reduction_op.h
bdm::experimental::Counter::post_process_
TResult(* post_process_)(TResult)
Definition: reduce.h:275
bdm::experimental::Counter::~Counter
virtual ~Counter()=default
bdm::experimental::GenericReducer::post_process_
TResult(* post_process_)(TResult)
Definition: reduce.h:128
bdm::ThreadInfo::GetInstance
static ThreadInfo * GetInstance()
Definition: thread_info.cc:21
bdm::SharedData
The SharedData class avoids false sharing between threads.
Definition: shared_data.h:48
bdm
Definition: agent.cc:39
bdm::experimental::GenericReducer::GetResult
TResult GetResult() override
Definition: reduce.h:111
bdm::experimental::GenericReducer::agent_function_
void(* agent_function_)(Agent *, T *)
Definition: reduce.h:125
thread_info.h
bdm::experimental::Reducer::~Reducer
~Reducer() override=default
bdm::experimental::GenericReducer::tl_results_
SharedData< T > tl_results_
Definition: reduce.h:124
bdm::experimental::Counter::GetResult
TResult GetResult() override
Definition: reduce.h:261
bdm::experimental::Counter::Reset
void Reset() override
Resets the internal state between calculations.
Definition: reduce.h:247
BDM_CLASS_DEF_OVERRIDE
#define BDM_CLASS_DEF_OVERRIDE(class_name, class_version_id)
Definition: root.h:202
bdm::experimental::Reduce
T Reduce(Simulation *sim, Functor< void, Agent *, T * > &agent_functor, Functor< T, const SharedData< T > & > &reduce_partial_results, Functor< bool, Agent * > *filter=nullptr)
Definition: reduce.h:183
bdm::SumReduction
Definition: reduction_op.h:87
bdm::experimental::GenericReducer::operator()
void operator()(Agent *agent) override
Definition: reduce.h:97
bdm::SharedData::resize
void resize(size_t new_size)
Resize the SharedData.data_ vector.
Definition: shared_data.h:74
bdm::L2F
LambdaFunctor< decltype(&TLambda::operator())> L2F(const TLambda &l)
Definition: functor.h:110
bdm::Agent
Contains code required by all agents.
Definition: agent.h:79
bdm::experimental::GenericReducer::GenericReducer
GenericReducer()
Definition: reduce.h:78
bdm::experimental::GenericReducer::GenericReducer
GenericReducer(void(agent_function)(Agent *, T *), T(*reduce_partial_results)(const SharedData< T > &), bool(*filter)(Agent *)=nullptr, TResult(*post_process)(TResult)=nullptr)
Definition: reduce.h:80
bdm::Functor
Definition: functor.h:24
bdm::experimental::Reducer
Definition: reduce.h:38
bdm::experimental::GenericReducer
Definition: reduce.h:76
BDM_CLASS_DEF
#define BDM_CLASS_DEF(class_name, class_version_id)
Forward all calls to BDM_NULL_CLASS_DEF.
Definition: root.h:198
bdm::Simulation::GetResourceManager
ResourceManager * GetResourceManager()
Returns the ResourceManager instance.
Definition: simulation.cc:244
bdm::experimental::GenericReducer::~GenericReducer
virtual ~GenericReducer()=default
agent.h
bdm::experimental::Count
uint64_t Count(Simulation *sim, Functor< bool, Agent * > &condition, Functor< bool, Agent * > *filter=nullptr)
Definition: reduce.h:317
bdm::experimental::Counter::tl_results_
SharedData< uint64_t > tl_results_
Definition: reduce.h:273
bdm::experimental::Reducer::NewCopy
virtual Reducer * NewCopy() const =0
bdm::experimental::Counter::condition_
bool(* condition_)(Agent *)
Definition: reduce.h:274
bdm::experimental::Counter
Definition: reduce.h:235
bdm::experimental::Counter::Counter
Counter(bool(*condition)(Agent *), TResult(*post_process)(TResult)=nullptr)
Definition: reduce.h:240
bdm::experimental::GenericReducer::Reset
void Reset() override
Resets the internal state between calculations.
Definition: reduce.h:104
shared_data.h
bdm::experimental::GenericReducer::reduce_partial_results_
T(* reduce_partial_results_)(const SharedData< T > &)
Definition: reduce.h:126
bdm::ResourceManager::ForEachAgentParallel
virtual void ForEachAgentParallel(Functor< void, Agent * > &function, Functor< bool, Agent * > *filter=nullptr)
Definition: resource_manager.cc:92
bdm::experimental::Reducer::Reset
virtual void Reset()=0
Resets the internal state between calculations.
bdm::experimental::Counter::Counter
Counter()
Required for IO.
Definition: reduce.h:238
resource_manager.h
bdm::experimental::Counter::NewCopy
Reducer< TResult > * NewCopy() const override
Definition: reduce.h:270
bdm::experimental::Counter::operator()
void operator()(Agent *agent) override
Definition: reduce.h:254
bdm::experimental::GenericReducer::NewCopy
Reducer< TResult > * NewCopy() const override
Definition: reduce.h:119
bdm::ThreadInfo::GetMyThreadId
int GetMyThreadId() const
Definition: thread_info.h:39
bdm::experimental::Reducer::GetResult
virtual TResult GetResult()=0
bdm::experimental::GenericReducer::filter_
bool(* filter_)(Agent *)
Definition: reduce.h:127
functor.h
bdm::Simulation
Definition: simulation.h:50
bdm::AgentHandle
Definition: agent_handle.h:29