34 "Call to numa_available failed with return code: ", ret);
40 if (param->export_visualization || param->insitu_visualization) {
49 for (
auto& numa_agents :
agents_) {
50 for (
auto* agent : numa_agents) {
64 auto tid = omp_get_thread_num();
67 auto& numa_agents =
agents_[nid];
71 auto correction = numa_agents.size() % threads_in_numa == 0 ? 0 : 1;
72 auto chunk = numa_agents.size() / threads_in_numa + correction;
74 auto end = std::min(numa_agents.size(), start + chunk);
76 for (uint64_t i = start; i < end; ++i) {
77 auto* a = numa_agents[i];
78 if (!filter || (filter && (*filter)(a))) {
85 template <
typename TFunctor>
111 chunk = chunk >= 1 ? chunk : 1;
118 auto max_threads = omp_get_max_threads();
119 std::vector<uint64_t> num_chunks_per_numa(numa_nodes);
120 for (
int n = 0; n < numa_nodes; n++) {
121 auto correction =
agents_[n].size() % chunk == 0 ? 0 : 1;
122 num_chunks_per_numa[n] =
agents_[n].size() / chunk + correction;
125 std::vector<std::atomic<uint64_t>*> counters(max_threads,
nullptr);
126 std::vector<uint64_t> max_counters(max_threads);
127 for (
int thread_cnt = 0; thread_cnt < max_threads; thread_cnt++) {
131 num_chunks_per_numa[current_nid] %
136 uint64_t num_chunks_per_thread =
137 num_chunks_per_numa[current_nid] /
142 auto end = std::min(num_chunks_per_numa[current_nid],
143 start + num_chunks_per_thread);
145 counters[thread_cnt] =
new std::atomic<uint64_t>(start);
146 max_counters[thread_cnt] = end;
151 auto tid = omp_get_thread_num();
157 auto p_max_threads = omp_get_max_threads();
158 auto p_chunk = chunk;
168 for (
int n = 0; n < p_numa_nodes; n++) {
169 int current_nid = (nid + n) % p_numa_nodes;
170 for (
int thread_cnt = 0; thread_cnt < p_max_threads; thread_cnt++) {
171 uint64_t current_tid = (tid + thread_cnt) % p_max_threads;
176 auto& numa_agents =
agents_[current_nid];
177 uint64_t old_count = (*(counters[current_tid]))++;
178 while (old_count < max_counters[current_tid]) {
179 start = old_count * p_chunk;
180 end = std::min(
static_cast<uint64_t
>(numa_agents.size()),
183 for (uint64_t i = start; i < end; ++i) {
184 auto* a = numa_agents[i];
185 if (!filter || (filter && (*filter)(a))) {
190 old_count = (*(counters[current_tid]))++;
196 for (
auto* counter : counters) {
205 std::vector<std::vector<Agent*>>&
agents;
223 auto handle = it->
Next();
224 auto* agent =
agents[handle.GetNumaNode()][handle.GetElementIdx()];
225 auto* copy = agent->NewCopy();
245 if (param->plot_memory_layout) {
252 std::vector<uint64_t> agent_per_numa(numa_nodes);
253 std::vector<uint64_t> agent_per_numa_cumm(numa_nodes);
254 uint64_t cummulative = 0;
256 for (
int n = 1; n < numa_nodes; ++n) {
258 uint64_t num_agents =
GetNumAgents() * threads_in_numa / max_threads;
259 agent_per_numa[n] = num_agents;
260 cummulative += num_agents;
263 agent_per_numa_cumm[0] = 0;
264 for (
int n = 1; n < numa_nodes; ++n) {
265 agent_per_numa_cumm[n] = agent_per_numa_cumm[n - 1] + agent_per_numa[n - 1];
274 "Run on numa node failed. Return code: ", ret);
279 const bool minimize_memory = param->minimize_memory_while_rebalancing;
289 if (dest.capacity() < agent_per_numa[nid]) {
290 dest.reserve(agent_per_numa[nid] * 1.5);
292 dest.resize(agent_per_numa[nid]);
301 auto correction = agent_per_numa[nid] % threads_in_numa == 0 ? 0 : 1;
302 auto chunk = agent_per_numa[nid] / threads_in_numa + correction;
306 std::min(agent_per_numa_cumm[nid] + agent_per_numa[nid], start + chunk);
310 lbi->CallHandleIteratorConsumer(start, end, f);
317 if (!minimize_memory) {
318 auto delete_functor =
L2F([](
Agent* agent) {
delete agent; });
322 for (
int n = 0; n < numa_nodes; n++) {
324 if (param->plot_memory_layout) {
329 if (param->plot_memory_layout) {
334 std::cout << *
this << std::endl;
340 const std::vector<std::vector<AgentUid>*>& uids) {
345 std::vector<std::vector<uint64_t>> tbr_cum(numa_nodes);
346 for (
auto& el : tbr_cum) {
347 el.resize(uids.size() + 1);
350 std::vector<uint64_t> remove(numa_nodes);
351 std::vector<uint64_t> lowest(numa_nodes);
355 std::vector<SharedData<uint64_t>> start(numa_nodes);
358 std::vector<SharedData<uint64_t>> swaps_to_right(numa_nodes);
359 std::vector<SharedData<uint64_t>> swaps_to_left(numa_nodes);
362 std::set<AgentUid> toberemoved;
366 #pragma omp parallel for schedule(static, 1)
367 for (uint64_t i = 0; i < uids.size(); ++i) {
368 for (
auto& uid : *uids[i]) {
370 tbr_cum[ah.GetNumaNode()][i]++;
380 remove[nid] = tbr_cum[nid].back();
381 lowest[nid] =
agents_[nid].size() - remove[nid];
383 if (remove[nid] != 0) {
394 uint64_t start_init = 0;
395 uint64_t end_init = 0;
396 Partition(remove[nid], threads_in_numa, ntid, &start_init, &end_init);
397 for (uint64_t i = start_init; i < end_init; ++i) {
404 #pragma omp parallel for schedule(static, 1)
405 for (uint64_t i = 0; i < uids.size(); ++i) {
406 std::vector<uint64_t> counts(numa_nodes);
407 for (
auto& uid : *uids[i]) {
410 auto nid = ah.GetNumaNode();
411 auto eidx = ah.GetElementIdx();
414 toberemoved.insert(uid);
417 if (eidx < lowest[nid]) {
433 if (remove[nid] != 0) {
435 start[nid].resize(threads_in_numa);
436 swaps_to_left[nid].resize(threads_in_numa + 1);
437 swaps_to_right[nid].resize(threads_in_numa + 1);
441 if (remove[nid] != 0) {
443 Partition(remove[nid], threads_in_numa, ntid, &start[nid][ntid], &end);
445 for (uint64_t i = start[nid][ntid]; i < end; ++i) {
447 std::numeric_limits<uint64_t>::max()) {
449 .
to_right[nid][start[nid][ntid] + swaps_to_right[nid][ntid]++] =
456 .
not_to_left[nid][start[nid][ntid] + swaps_to_left[nid][ntid]++] =
462 if (remove[nid] != 0) {
466 swaps_to_right[nid].size() - 1);
471 if (remove[nid] != 0) {
472 uint64_t num_swaps = swaps_to_right[nid][threads_in_numa];
473 if (num_swaps != 0) {
475 uint64_t swap_start = 0;
476 uint64_t swap_end = 0;
477 Partition(num_swaps, threads_in_numa, ntid, &swap_start, &swap_end);
479 if (swap_start < swap_end) {
480 auto tr_block =
BinarySearch(swap_start, swaps_to_right[nid], 0,
481 swaps_to_right[nid].size() - 1);
482 auto tl_block =
BinarySearch(swap_start, swaps_to_left[nid], 0,
483 swaps_to_left[nid].size() - 1);
485 auto tr_block_swaps =
486 swaps_to_right[nid][tr_block + 1] - swaps_to_right[nid][tr_block];
487 auto tl_block_swaps =
488 swaps_to_left[nid][tl_block + 1] - swaps_to_left[nid][tl_block];
491 auto tr_block_idx = swap_start - swaps_to_right[nid][tr_block];
492 auto tl_block_idx = swap_start - swaps_to_left[nid][tl_block];
494 for (uint64_t s = swap_start; s < swap_end; ++s) {
496 auto tr_idx = start[nid][tr_block] + tr_block_idx;
497 auto tl_idx = start[nid][tl_block] + tl_block_idx;
503 assert(tl_eidx <
agents_[nid].size());
504 assert(tr_eidx <
agents_[nid].size());
505 auto* reordered =
agents_[nid][tl_eidx];
507 assert(toberemoved.find(
agents_[nid][tl_eidx]->GetUid()) ==
509 assert(toberemoved.find(
agents_[nid][tr_eidx]->GetUid()) !=
513 agents_[nid][tr_eidx] = reordered;
517 if (swap_end - s > 1) {
520 if (tr_block_idx >= tr_block_swaps) {
523 while (!tr_block_swaps) {
525 tr_block_swaps = swaps_to_right[nid][tr_block + 1] -
526 swaps_to_right[nid][tr_block];
531 if (tl_block_idx >= tl_block_swaps) {
534 while (!tl_block_swaps) {
536 tl_block_swaps = swaps_to_left[nid][tl_block + 1] -
537 swaps_to_left[nid][tl_block];
546 if (remove[nid] != 0) {
548 uint64_t start_del = 0;
549 uint64_t end_del = 0;
550 Partition(remove[nid], threads_in_numa, ntid, &start_del, &end_del);
552 start_del += lowest[nid];
553 end_del += lowest[nid];
556 for (uint64_t i = start_del; i < end_del; ++i) {
558 auto uid = agent->
GetUid();
559 assert(toberemoved.find(uid) != toberemoved.end());
561 uid_generator->ReuseAgentUid(uid);
572 for (uint64_t n = 0; n <
agents_.size(); ++n) {
580 return agents_[numa_node].capacity();