36 auto* non_atomic_batches = batches_.load();
37 if (non_atomic_batches !=
nullptr) {
38 for (uint64_t i = 0; i < num_batches_; ++i) {
39 delete non_atomic_batches[i];
41 delete[] non_atomic_batches;
50 auto bidx = index / kBatchSize;
51 if (bidx >= num_batches_) {
53 std::max(index,
static_cast<uint32_t
>(num_batches_ * kBatchSize * 1.1));
57 auto el_idx = index % kBatchSize;
58 (*batches_.load()[bidx])[el_idx] = value;
66 auto bidx = index / kBatchSize;
67 if (bidx >= num_batches_) {
68 Log::Fatal(
"ThreadSafeAgentUidMap::operator[]",
69 Concat(
"AgentUid out of range access: AgentUid: ", uid,
70 ", ThreadSafeAgentUidMap max index ",
71 num_batches_ * kBatchSize));
75 auto el_idx = index % kBatchSize;
76 return (*batches_.load()[bidx])[el_idx];
80 return num_batches_ * kBatchSize;
85 auto new_num_batches = new_size / kBatchSize + 1;
86 std::lock_guard<Spinlock> guard(lock_);
87 if (new_num_batches >= num_batches_) {
88 auto bcopy =
new Batch*[new_num_batches];
89 auto** non_atomic_batches = batches_.load();
90 for (uint64_t i = 0; i < new_num_batches; ++i) {
91 if (i < num_batches_) {
92 bcopy[i] = non_atomic_batches[i];
94 bcopy[i] =
new Batch();
95 bcopy[i]->reserve(kBatchSize);
98 batches_.exchange(bcopy);
99 old_copies_.push_back(non_atomic_batches);
100 num_batches_ = new_num_batches;
105 for (
auto& entry : old_copies_) {
106 if (entry !=
nullptr) {
114 const std::shared_ptr<ThreadSafeAgentUidMap>& map)
127 const std::vector<ExecutionContext*>& all_exec_ctxts) {
134 const std::vector<ExecutionContext*>& all_exec_ctxts) {
143 const std::vector<ExecutionContext*>& all_exec_ctxts) {}
146 const std::vector<ExecutionContext*>& all_exec_ctxts) {}
153 if (param->thread_safety_mechanism ==
154 Param::ThreadSafetyMechanism::kUserSpecified) {
172 locks_.push_back(aptr->GetLock());
174 for (uint64_t i = 0; i <
locks_.size(); ++i) {
196 for (
int i =
locks_.size() - 1; i >= 0; --i) {
202 for (
auto& op : operations) {
205 for (
int i =
locks_.size() - 1; i >= 0; --i) {
208 }
else if (param->thread_safety_mechanism ==
209 Param::ThreadSafetyMechanism::kAutomatic) {
210 auto* nb_mutex_builder = env->GetNeighborMutexBuilder();
211 auto* mutex = nb_mutex_builder->GetMutex(agent->
GetBoxIdx());
212 std::lock_guard<decltype(*mutex)> guard(*mutex);
215 for (
auto* op : operations) {
218 }
else if (param->thread_safety_mechanism ==
219 Param::ThreadSafetyMechanism::kNone) {
222 for (
auto* op : operations) {
226 Log::Fatal(
"InPlaceExecutionContext::Execute",
227 "Invalid value for parameter thread_safety_mechanism: ",
228 param->thread_safety_mechanism);
238 real_t query_squared_radius)
const {
256 auto for_each =
L2F([&](
Agent* agent) { lambda(agent); });
257 env->ForEachNeighbor(for_each, query, criteria);
266 if (pair.second < squared_radius) {
267 lambda(pair.first, pair.second);
282 if (param->cache_neighbors) {
283 neighbor_cache_.push_back(std::make_pair(agent, squared_distance));
285 lambda(agent, squared_distance);
287 env->ForEachNeighbor(for_each, query, squared_radius);
294 lambda(agent, squared_distance);
302 auto* rm = sim->GetResourceManager();
303 auto* agent = rm->GetAgent(uid);
304 if (agent !=
nullptr) {
320 const std::vector<ExecutionContext*>& all_exec_ctxts) {
326 auto* ctxt = bdm_static_cast<InPlaceExecutionContext*>(all_exec_ctxts[tid]);
328 thread_offsets[tid] = new_agent_per_numa[nid];
329 new_agent_per_numa[nid] += ctxt->new_agents_.size();
336 for (
unsigned n = 0; n < new_agent_per_numa.size(); n++) {
337 numa_offsets[n] = rm->GrowAgentContainer(new_agent_per_numa[n], n);
341 #pragma omp parallel for schedule(static, 1)
343 auto* ctxt = bdm_static_cast<InPlaceExecutionContext*>(all_exec_ctxts[i]);
345 uint64_t offset = thread_offsets[i] + numa_offsets[nid];
346 rm->AddAgents(nid, offset, ctxt->new_agents_);
347 ctxt->new_agents_.clear();
357 const std::vector<ExecutionContext*>& all_exec_ctxts) {
360 auto num_removals = 0;
362 auto* ctxt = bdm_static_cast<InPlaceExecutionContext*>(all_exec_ctxts[i]);
363 all_remove[i] = &ctxt->remove_;
364 num_removals += ctxt->remove_.size();
367 if (num_removals != 0) {
372 auto* ctxt = bdm_static_cast<InPlaceExecutionContext*>(all_exec_ctxts[i]);
373 ctxt->remove_.clear();