Realm
A distributed, event-based tasking library
Loading...
Searching...
No Matches
ucp_internal.h
Go to the documentation of this file.
1/*
2 * Copyright 2025 NVIDIA Corporation
3 * SPDX-License-Identifier: Apache-2.0
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18// UCP network module internals
19
20#ifndef UCP_INTERNAL_H
21#define UCP_INTERNAL_H
22
23#include "realm/mem_impl.h"
24#include "realm/activemsg.h"
25#include "realm/atomics.h"
26#include "realm/mutex.h"
28
29#ifdef REALM_USE_CUDA
32#endif
33
36#include "realm/ucx/spinlock.h"
38#include "realm/ucx/ucc_comm.h"
39#include <ucp/api/ucp.h>
40
41#include <memory>
42#include <vector>
43#include <unordered_map>
44#include <unordered_set>
45
46namespace Realm {
47 namespace UCP {
48
55
63
64 struct CompList;
65 struct RemoteComp;
66 struct UCPRDMAInfo;
67 struct Request;
68
69 struct UCPMsgHdr {
70 uint32_t crc;
72 unsigned short msgid;
73 RemoteComp *remote_comp;
76#ifdef REALM_USE_CUDA
77 int src_dev_index;
78#endif
79 char realm_hdr[0];
80 } __attribute__((packed)); // gcc-specific
81
83 public:
85 ~UCPPoller() = default;
89 bool do_work(TimeLimit work_until);
90 void add_worker(UCPWorker *worker);
91
92 private:
93 std::vector<UCPWorker *> workers;
94 Mutex shutdown_mutex;
95 // set and cleared inside mutex, but tested outside
96 atomic<bool> shutdown_flag;
97 Mutex::CondVar shutdown_cond;
98 // set and cleared inside mutex, but tested outside
99 Mutex poll_notify_mutex;
100 atomic<bool> poll_notify_flag;
101 Mutex::CondVar poll_notify_cond;
102 };
103
105 public:
106 friend class UCPMessageImpl;
107 struct Config {
109 bool bind_hostmem{true};
112 int prog_boff_max{4}; // progress thread maximum backoff
115 bool mpool_leakcheck{false};
116 bool crc_check{true};
117 bool hbuf_malloc{false};
118 bool pbuf_malloc{false};
119 bool use_wakeup{false};
121 size_t fp_max{2 << 10 /* 2K */}; // fast path max message size
122 size_t pbuf_max_size{8 << 10 /* 8K */};
123 size_t pbuf_max_chunk_size{4 << 20 /* 4M */};
124 size_t pbuf_max_count{SIZE_MAX};
125 size_t pbuf_init_count{1024};
126 size_t pbuf_mp_thresh{2 << 10 /* 2K */};
127 size_t mmp_max_obj_size{2 << 10 /* 2K */}; // malloc mpool max object size
128 size_t outstanding_reqs_limit{1 << 17 /* 128K (count) */};
129 std::string ib_seg_size;
130 std::string host_nics;
131 std::string zcopy_thresh_host;
132 std::string tls_host;
133#ifdef REALM_USE_CUDA
134 bool bind_cudamem{true};
135 std::string gpu_nics;
136 std::string zcopy_thresh_dev;
137 std::string tls_dev;
138#endif
139 };
140
143 bool bootstrap();
144 bool init(const Config &config);
145 void finalize();
146 void attach(std::vector<NetworkSegment *> &segments);
147 void detach(std::vector<NetworkSegment *> &segments);
148 void get_shared_peers(Realm::NodeSet &shared_peers);
149 void barrier();
150 void broadcast(NodeID root, const void *val_in, void *val_out, size_t bytes);
151 void gather(NodeID root, const void *val_in, void *vals_out, size_t bytes);
152 void allgather(const char *val_in, size_t bytes, std::vector<char> &vals_out,
153 size_t *lengths);
154 void allgatherv(const char *val_in, size_t bytes, std::vector<char> &vals_out,
155 std::vector<size_t> &lengths);
157 bool check_for_quiescence(size_t sampled_receive_count);
158 size_t recommended_max_payload(const void *data, const NetworkSegment *src_segment,
159 const RemoteAddress *dest_payload_addr,
160 bool with_congestion, size_t header_size);
161
162 bool get_ucp_ep(const UCPWorker *worker, NodeID target,
163 const UCPRDMAInfo *rdma_info, ucp_ep_h *ep) const;
164
167
168 void *hbuf_get(UCPWorker *worker, size_t size);
169 void hbuf_release(UCPWorker *worker, void *buf);
170
171 void *pbuf_get(UCPWorker *worker, size_t size);
172 void pbuf_release(UCPWorker *worker, void *buf);
173
174 void notify_msg_sent(uint64_t count);
175
176 const UCPContext *get_context(const NetworkSegment *segment) const;
177 // the public interface exposes the tx worker only
178 UCPWorker *get_tx_worker(const UCPContext *context, uint8_t priority) const;
179
180 size_t num_eps(const UCPContext &context) const;
181
182 protected:
183 UCPModule *module;
185
186 private:
187 struct AmHandlersArgs {
188 UCPInternal *internal;
189 UCPWorker *worker;
190 };
191
192 struct Workers {
193 std::vector<UCPWorker *> tx_workers;
194 std::vector<UCPWorker *> rx_workers;
195 };
196
197#ifdef REALM_UCX_DYNAMIC_LOAD
198 bool resolve_ucp_api_fnptrs();
199#endif
200
201#ifdef REALM_USE_CUDA
202 bool init_ucp_contexts(const std::unordered_set<Realm::Cuda::GPU *> &gpus);
203#else
204 bool init_ucp_contexts();
205#endif
206
207 bool create_workers();
208 void destroy_workers();
209 size_t get_num_workers();
210 bool set_am_handlers();
211 bool create_eps(uint8_t priority);
212 bool create_eps();
213 bool create_pollers();
214 const UCPContext *get_context_host() const;
215#if defined(REALM_USE_CUDA)
216 const UCPContext *get_context_device(int dev_index) const;
217#endif
218 const std::vector<UCPWorker *> &get_tx_workers(const UCPContext *context) const;
219 const std::vector<UCPWorker *> &get_rx_workers(const UCPContext *context) const;
220 UCPWorker *get_rx_worker(const UCPContext *context, uint8_t priority) const;
221 bool is_congested();
222 bool add_rdma_info(NetworkSegment *segment, const UCPContext *context,
223 ucp_mem_h mem_h);
224 void add_rdma_info_odr(NetworkSegment *segment, const UCPContext *context);
225 bool am_msg_recv_data_ready(UCPInternal *internal, UCPWorker *worker,
226 const UCPMsgHdr *ucp_msg_hdr, size_t header_size,
227 void *payload, size_t payload_size, int payload_mode);
228 static ucs_status_t am_remote_comp_handler(void *arg, const void *header,
229 size_t header_size, void *data,
230 size_t data_size,
231 const ucp_am_recv_param_t *param);
232 static void am_send_reply_comp_handler(void *request, ucs_status_t status,
233 void *user_data);
234 static void am_realm_comp_handler(NodeID sender,
237 static void am_rndv_recv_data_handler(void *request, ucs_status_t status,
238 size_t length, void *user_data);
239 static ucs_status_t am_msg_recv_handler(void *arg, const void *header,
240 size_t header_size, void *payload,
241 size_t payload_size,
242 const ucp_am_recv_param_t *param);
243 static ucs_status_t am_rdma_msg_recv_handler(void *arg, const void *header,
244 size_t header_size, void *payload,
245 size_t payload_size,
246 const ucp_am_recv_param_t *param);
247
248 bool compute_shared_ranks();
249 using WorkersMap = std::unordered_map<const UCPContext *, Workers>;
250 using AttachMap = std::unordered_map<const UCPContext *, std::vector<ucp_mem_h>>;
251
252#ifdef REALM_UCX_DYNAMIC_LOAD
253 void *libucp{nullptr};
254#endif
255 bool initialized_boot{false};
256 bool initialized_ucp{false};
257 Config config;
258 bootstrap_handle_t boot_handle;
259 std::unique_ptr<ucc::UCCComm> ucc_comm{nullptr};
260 int num_shared_ranks{0};
261 std::vector<NodeID> shared_ranks;
262 std::list<UCPContext> ucp_contexts;
263#ifdef REALM_USE_CUDA
264 std::unordered_map<int, UCPContext *> dev_ctx_map;
265#endif
266 WorkersMap workers;
267 std::list<UCPPoller> pollers;
268 std::list<AmHandlersArgs> am_handlers_args;
269 AttachMap attach_mem_hs;
270 atomic<uint64_t> total_msg_sent;
271 atomic<uint64_t> total_msg_received;
272 atomic<uint64_t> total_rcomp_sent;
273 atomic<uint64_t> total_rcomp_received;
274 atomic<uint64_t> outstanding_reqs;
275 MPool *rcba_mp;
276 SpinLock rcba_mp_spinlock;
277 size_t ib_seg_size;
278 size_t zcopy_thresh_host;
279 };
280
282 public:
283 UCPMessageImpl(UCPInternal *internal, NodeID target, unsigned short msgid,
284 size_t header_size, size_t max_payload_size,
285 const void *src_payload_addr, size_t src_payload_lines,
286 size_t src_payload_line_stride, const NetworkSegment *_src_segment,
287 const RemoteAddress *_dest_payload_addr, size_t storage_size);
288
289 UCPMessageImpl(UCPInternal *internal, const NodeSet &targets, unsigned short msgid,
290 size_t header_size, size_t max_payload_size,
291 const void *src_payload_addr, size_t src_payload_lines,
292 size_t src_payload_line_stride, size_t storage_size);
293
295
296 virtual void *add_local_completion(size_t size);
297 virtual void *add_remote_completion(size_t size);
298
299 virtual void commit(size_t act_payload_size);
300 virtual void cancel();
301
302 private:
303 bool set_inline_payload_base();
304 bool commit_with_rma(ucp_ep_h ep);
305 bool commit_unicast(size_t act_payload_size);
306 bool commit_multicast(size_t act_payload_size);
307 bool send_fast_path(ucp_ep_h ep, size_t act_payload_size);
308 bool send_slow_path(ucp_ep_h ep, size_t act_payload_size, uint32_t flags);
309 Request *make_request(size_t act_payload_size);
310 static void cleanup_request(Request *req, UCPInternal *internal);
311 static bool send_request(Request *req, unsigned am_id);
312 static void am_local_failure_handler(Request *req, UCPInternal *internal);
313 static void am_local_comp_handler(void *request, ucs_status_t status,
314 void *user_data);
315 static void am_put_comp_handler(void *request, ucs_status_t status,
316 void *user_data);
317 static void am_put_flush_comp_handler(void *request, ucs_status_t status,
318 void *user_data);
319
320 UCPInternal *internal;
321 UCPWorker *worker;
322 NodeID target;
323 NodeSet targets;
324 const void *src_payload_addr;
325 size_t src_payload_lines;
326 size_t src_payload_line_stride;
327 size_t header_size;
328 PayloadBaseType payload_base_type;
329 UCPRDMAInfo *dest_payload_rdma_info{nullptr};
330 CompList *local_comp{nullptr};
331 RemoteComp *remote_comp{nullptr};
332 bool is_multicast{false};
333 ucs_memory_type_t memtype;
334
335 UCPMsgHdr ucp_msg_hdr;
336 // nothing should be added after 'ucp_msg_hdr'
337 };
338
340 public:
341 UCPRemoteMemoryCommon(const ByteArray &rdma_info_ba);
342
344
345 virtual bool get_remote_addr(off_t offset, RemoteAddress &remote_addr);
346
347 private:
348 UCPRDMAInfo *rdma_info;
349 size_t rdma_info_size;
350 };
351
352 /* A block of memory on a remote process
353 * Parent class RemoteMemory has node id in me.memory_owner_node()
354 */
356 public:
357 UCPRemoteMemory(RuntimeImpl *_runtime_impl, Memory me, size_t size,
358 Memory::Kind kind, const ByteArray &rdma_info_ba,
359 UCPInternal *internal);
360
361 virtual void get_bytes(off_t offset, void *dst, size_t size);
362 virtual void put_bytes(off_t offset, const void *src, size_t size);
363 virtual bool get_remote_addr(off_t offset, RemoteAddress &remote_addr);
364 };
365
366 // Intermediate buffer memory
368 public:
370 const ByteArray &rdma_info_ba, UCPInternal *internal);
371
372 virtual bool get_remote_addr(off_t offset, RemoteAddress &remote_addr);
373 };
374
375 }; // namespace UCP
376
377}; // namespace Realm
378
379#endif
Definition activemsg.h:205
Definition bgwork.h:129
Definition bytearray.h:53
Definition ib_memory.h:30
uintptr_t CallbackData
Definition activemsg.h:351
Memory me
Definition mem_impl.h:194
size_t size
Definition mem_impl.h:195
MemoryKind kind
Definition mem_impl.h:196
Definition memory.h:33
Kind
Definition memory.h:59
Definition network.h:262
Definition nodeset.h:117
Definition mem_impl.h:535
Definition channel.h:103
Definition runtime_impl.h:264
Definition timers.h:129
Definition ucp_module.h:31
Definition ucp_context.h:152
Definition ucp_internal.h:367
UCPIBMemory(RuntimeImpl *_runtime_impl, Memory me, size_t size, Memory::Kind kind, const ByteArray &rdma_info_ba, UCPInternal *internal)
virtual bool get_remote_addr(off_t offset, RemoteAddress &remote_addr)
Definition ucp_internal.h:104
void allgather(const char *val_in, size_t bytes, std::vector< char > &vals_out, size_t *lengths)
void allgatherv(const char *val_in, size_t bytes, std::vector< char > &vals_out, std::vector< size_t > &lengths)
void detach(std::vector< NetworkSegment * > &segments)
bool check_for_quiescence(size_t sampled_receive_count)
UCPInternal(Realm::UCPModule *_module, Realm::RuntimeImpl *_runtime)
void notify_msg_sent(uint64_t count)
Request * request_get(UCPWorker *worker)
UCPModule *RuntimeImpl * runtime
Definition ucp_internal.h:184
void broadcast(NodeID root, const void *val_in, void *val_out, size_t bytes)
UCPWorker * get_tx_worker(const UCPContext *context, uint8_t priority) const
void get_shared_peers(Realm::NodeSet &shared_peers)
size_t sample_messages_received_count()
size_t num_eps(const UCPContext &context) const
void * hbuf_get(UCPWorker *worker, size_t size)
void * pbuf_get(UCPWorker *worker, size_t size)
void hbuf_release(UCPWorker *worker, void *buf)
const UCPContext * get_context(const NetworkSegment *segment) const
void pbuf_release(UCPWorker *worker, void *buf)
size_t recommended_max_payload(const void *data, const NetworkSegment *src_segment, const RemoteAddress *dest_payload_addr, bool with_congestion, size_t header_size)
void request_release(Request *req)
bool init(const Config &config)
void attach(std::vector< NetworkSegment * > &segments)
bool get_ucp_ep(const UCPWorker *worker, NodeID target, const UCPRDMAInfo *rdma_info, ucp_ep_h *ep) const
void gather(NodeID root, const void *val_in, void *vals_out, size_t bytes)
Definition ucp_internal.h:281
virtual void commit(size_t act_payload_size)
UCPMessageImpl(UCPInternal *internal, NodeID target, unsigned short msgid, size_t header_size, size_t max_payload_size, const void *src_payload_addr, size_t src_payload_lines, size_t src_payload_line_stride, const NetworkSegment *_src_segment, const RemoteAddress *_dest_payload_addr, size_t storage_size)
virtual void * add_local_completion(size_t size)
UCPMessageImpl(UCPInternal *internal, const NodeSet &targets, unsigned short msgid, size_t header_size, size_t max_payload_size, const void *src_payload_addr, size_t src_payload_lines, size_t src_payload_line_stride, size_t storage_size)
virtual void * add_remote_completion(size_t size)
Definition ucp_internal.h:82
bool do_work(TimeLimit work_until)
void add_worker(UCPWorker *worker)
Definition ucp_internal.h:339
virtual bool get_remote_addr(off_t offset, RemoteAddress &remote_addr)
UCPRemoteMemoryCommon(const ByteArray &rdma_info_ba)
Definition ucp_internal.h:355
virtual bool get_remote_addr(off_t offset, RemoteAddress &remote_addr)
UCPRemoteMemory(RuntimeImpl *_runtime_impl, Memory me, size_t size, Memory::Kind kind, const ByteArray &rdma_info_ba, UCPInternal *internal)
virtual void get_bytes(off_t offset, void *dst, size_t size)
virtual void put_bytes(off_t offset, const void *src, size_t size)
Definition ucp_context.h:40
Definition mutex.h:325
Definition mutex.h:223
Definition atomics.h:31
PayloadBaseType
Definition ucp_internal.h:57
@ PAYLOAD_BASE_EXTERNAL
Definition ucp_internal.h:60
@ PAYLOAD_BASE_LAST
Definition ucp_internal.h:61
@ PAYLOAD_BASE_INTERNAL
Definition ucp_internal.h:59
@ PAYLOAD_BASE_INLINE
Definition ucp_internal.h:58
Realm::UCP::UCPPoller __attribute__
AmWithRemoteAddrMode
Definition ucp_internal.h:50
@ AM_WITH_REMOTE_ADDR_MODE_PUT
Definition ucp_internal.h:52
@ AM_WITH_REMOTE_ADDR_MODE_AUTO
Definition ucp_internal.h:51
@ AM_WITH_REMOTE_ADDR_MODE_AM
Definition ucp_internal.h:53
Definition activemsg.h:38
int NodeID
Definition nodeset.h:40
Definition network.h:46
Definition ucp_internal.h:107
int rdesc_rel_max
Definition ucp_internal.h:114
bool use_wakeup
Definition ucp_internal.h:119
std::string zcopy_thresh_host
Definition ucp_internal.h:131
size_t outstanding_reqs_limit
Definition ucp_internal.h:128
std::string host_nics
Definition ucp_internal.h:130
std::string ib_seg_size
Definition ucp_internal.h:129
size_t pbuf_max_size
Definition ucp_internal.h:122
int num_priorities
Definition ucp_internal.h:111
int prog_itr_max
Definition ucp_internal.h:113
size_t pbuf_mp_thresh
Definition ucp_internal.h:126
size_t pbuf_init_count
Definition ucp_internal.h:125
bool pbuf_malloc
Definition ucp_internal.h:118
size_t pbuf_max_count
Definition ucp_internal.h:124
size_t fp_max
Definition ucp_internal.h:121
size_t priority_size_max
Definition ucp_internal.h:120
std::string tls_host
Definition ucp_internal.h:132
size_t pbuf_max_chunk_size
Definition ucp_internal.h:123
int prog_boff_max
Definition ucp_internal.h:112
bool bind_hostmem
Definition ucp_internal.h:109
size_t mmp_max_obj_size
Definition ucp_internal.h:127
bool hbuf_malloc
Definition ucp_internal.h:117
bool mpool_leakcheck
Definition ucp_internal.h:115
AmWithRemoteAddrMode am_wra_mode
Definition ucp_internal.h:108
int pollers_max
Definition ucp_internal.h:110
bool crc_check
Definition ucp_internal.h:116
Definition ucp_internal.h:69
char realm_hdr[0]
Definition ucp_internal.h:79
uint32_t crc
Definition ucp_internal.h:70
size_t rdma_payload_size
Definition ucp_internal.h:75
unsigned short msgid
Definition ucp_internal.h:72
RemoteComp * remote_comp
Definition ucp_internal.h:73
NodeID src
Definition ucp_internal.h:71
void * rdma_payload_addr
Definition ucp_internal.h:74
Definition bootstrap.h:32
NodeID src
Definition ucp_internal.h:1
unsigned short msgid
Definition ucp_internal.h:2