libkvikio  23.12.00
parallel_operation.hpp
1 /*
2  * Copyright (c) 2021-2022, NVIDIA CORPORATION.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <cassert>
19 #include <future>
20 #include <numeric>
21 #include <system_error>
22 #include <utility>
23 #include <vector>
24 
25 #include <kvikio/defaults.hpp>
26 #include <kvikio/error.hpp>
27 #include <kvikio/utils.hpp>
28 
29 namespace kvikio {
30 
43 template <typename F, typename T>
44 std::future<std::size_t> parallel_io(F op,
45  T buf,
46  std::size_t size,
47  std::size_t file_offset,
48  std::size_t task_size,
49  std::size_t devPtr_offset)
50 {
51  if (task_size == 0) { throw std::invalid_argument("`task_size` cannot be zero"); }
52 
53  // Single-task guard
54  if (task_size >= size || page_size >= size) {
55  return defaults::thread_pool().submit(op, buf, size, file_offset, devPtr_offset);
56  }
57 
58  // We know an upper bound of the total number of tasks
59  std::vector<std::future<std::size_t>> tasks;
60  tasks.reserve(size / task_size + 2);
61 
62  // 1) Submit `task_size` sized tasks
63  while (size >= task_size) {
64  tasks.push_back(defaults::thread_pool().submit(op, buf, task_size, file_offset, devPtr_offset));
65  file_offset += task_size;
66  devPtr_offset += task_size;
67  size -= task_size;
68  }
69 
70  // 2) Submit a task for the remainder
71  if (size > 0) {
72  tasks.push_back(defaults::thread_pool().submit(op, buf, size, file_offset, devPtr_offset));
73  }
74 
75  // Finally, we sum the result of all tasks.
76  auto gather_tasks = [](std::vector<std::future<std::size_t>>&& tasks) -> std::size_t {
77  std::size_t ret = 0;
78  for (auto& task : tasks) {
79  ret += task.get();
80  }
81  return ret;
82  };
83  return std::async(std::launch::deferred, gather_tasks, std::move(tasks));
84 }
85 
86 } // namespace kvikio
static kvikio::third_party::thread_pool & thread_pool()
Get the default thread pool.
Definition: defaults.hpp:169
std::future< bool > submit(const F &task, const A &... args)
Submit a function with zero or more arguments and no return value into the task queue,...