Realm
A distributed, event-based tasking library
Loading...
Searching...
No Matches
ucp_context.h
Go to the documentation of this file.
1/*
2 * Copyright 2025 NVIDIA Corporation
3 * SPDX-License-Identifier: Apache-2.0
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#ifndef UCP_CONTEXT_H
19#define UCP_CONTEXT_H
20
21#include "realm/ucx/ucp_utils.h"
22#include "realm/ucx/mpool.h"
23#include "realm/ucx/spinlock.h"
24
25#include "realm/atomics.h"
26#include "realm/mutex.h"
27#ifdef REALM_USE_CUDA
29#endif
30
31#include <ucp/api/ucp.h>
32#include <unordered_map>
33#include <queue>
34
35namespace Realm {
36 namespace UCP {
37
38 class UCPContext;
39
40 class UCPWorker {
41 public:
48 enum OpType
49 {
53 };
54 struct Request {
56 ucp_ep_h ep;
57 uint32_t flags;
58 void *args;
59 void *payload;
61 ucs_memory_type_t memtype;
62 ucp_send_nbx_callback_t cb;
63
64 union {
65 struct {
66 unsigned id;
67 void *header;
69 } am;
70
71 struct {
72 ucp_rkey_h rkey;
73 uint64_t remote_addr;
74 } rma;
75 };
76
77 // Should be allocated/freed only through get/release methods because
78 // it must always have UCP-request-size bytes of space before itself.
79 Request() = delete;
80 ~Request() = delete;
81 };
82
83 UCPWorker(const UCPContext *context, Type type, size_t am_alignment,
84 bool use_wakeup,
85 unsigned prog_boff_max /*progress thread maximum backoff*/,
86 int prog_itr_max, int rdesc_rel_max, ucs_thread_mode_t thread_mode,
87 size_t user_req_size, size_t user_req_alignment, size_t pbuf_max_size,
88 size_t pbuf_max_chunk_size, size_t pbuf_max_count, size_t pbuf_init_count,
89 size_t mmp_max_obj_size, bool leak_check);
91 bool init();
92 void finalize();
93 bool ep_add(int target, ucp_address_t *addr, int remote_dev_index);
94 bool ep_get(int target, int remote_dev_index, ucp_ep_h *ep) const;
95 void *request_get();
96 void request_release(void *req);
97 void *pbuf_get(size_t size);
98 void pbuf_release(void *buf);
99 void *mmp_get(size_t size);
100 void mmp_release(void *buf);
101 size_t get_max_am_header() const { return max_am_header; }
102 const UCPContext *get_context() const { return context; }
103 ucp_worker_h get_ucp_worker() const { return worker; }
104 bool set_am_handler(unsigned am_id, ucp_am_recv_callback_t cb, void *args);
105 bool progress();
106 void return_am_rdesc(void *rdesc);
107 bool am_send_fast_path(ucp_ep_h ep, unsigned am_id, const void *header,
108 size_t header_size, const void *payload, size_t payload_size,
109 ucs_memory_type_t memtype);
110 bool submit_req(Request *req);
111 size_t num_eps() const;
112
113 private:
114 bool setup_worker_efd();
115 bool ep_close(ucp_ep_h ep);
116 bool needs_progress() const;
117 bool progress_with_wakeup();
118 bool progress_without_wakeup();
119 static void *pbuf_chunk_alloc(size_t size, void *arg);
120 static void pbuf_chunk_release(void *chunk, void *arg);
121
122 const UCPContext *context;
123 ucp_worker_h worker;
124 bool initialized{false};
125 bool have_residual_events{false};
126 int worker_efd;
127 Type type;
128 size_t ucp_req_size;
129 size_t am_alignment;
130 bool use_wakeup;
131 unsigned prog_boff_max;
132 int prog_itr_max;
133 int rdesc_rel_max;
134 ucs_thread_mode_t thread_mode;
135 size_t pbuf_max_size;
136 MPool *request_mp;
137 MPool *pbuf_mp;
138 VMPool *mmp;
139 SpinLock req_mp_spinlock;
140 SpinLock pbuf_mp_spinlock;
141 std::queue<void *> am_rdesc_q;
142 SpinLock am_rdesc_q_spinlock;
143 SpinLock mmp_spinlock;
144 std::unordered_map<void *, ucp_mem_h> pbuf_mp_mem_hs;
145 std::unordered_map<int, std::unordered_map<int, ucp_ep_h>> eps;
146 size_t max_am_header;
147 atomic<uint64_t> scount{0};
148 atomic<uint64_t> pcount{0};
149 unsigned prog_boff_count{0};
150 };
151
153 public:
154 UCPContext(int ep_nums_est = -1);
156 bool init(const std::unordered_map<std::string, std::string> &ev_map);
157 void finalize();
158 bool mem_map(const ucp_mem_map_params_t *params, ucp_mem_h *mem_h_ptr) const;
159 bool mem_unmap(ucp_mem_h mem_h) const;
160 ucp_context_h get_ucp_context() const { return context; }
161 size_t get_ucp_req_size() const { return context_attr.request_size; }
162 uint64_t supported_memtypes() const { return context_attr.memory_types; }
163
164#ifdef REALM_USE_CUDA
165 Cuda::GPU *gpu{nullptr};
166#endif
167 private:
168 bool initialized{false};
169 ucp_context_h context;
170 ucp_context_attr_t context_attr;
171 std::unordered_map<int, std::unordered_map<int, ucp_ep_h>> eps;
172 int ep_nums_est;
173 };
174
175 }; // namespace UCP
176
177}; // namespace Realm
178
179#endif
Definition cuda_internal.h:392
Definition codedesc.h:38
Definition ucp_context.h:152
size_t get_ucp_req_size() const
Definition ucp_context.h:161
ucp_context_h get_ucp_context() const
Definition ucp_context.h:160
uint64_t supported_memtypes() const
Definition ucp_context.h:162
bool init(const std::unordered_map< std::string, std::string > &ev_map)
bool mem_unmap(ucp_mem_h mem_h) const
UCPContext(int ep_nums_est=-1)
bool mem_map(const ucp_mem_map_params_t *params, ucp_mem_h *mem_h_ptr) const
Definition ucp_context.h:40
bool set_am_handler(unsigned am_id, ucp_am_recv_callback_t cb, void *args)
OpType
Definition ucp_context.h:49
@ EP_FLUSH
Definition ucp_context.h:52
@ AM_SEND
Definition ucp_context.h:50
@ PUT
Definition ucp_context.h:51
void mmp_release(void *buf)
Type
Definition ucp_context.h:43
@ WORKER_RX
Definition ucp_context.h:45
@ WORKER_TX
Definition ucp_context.h:44
@ WORKER_LAST
Definition ucp_context.h:46
const UCPContext * get_context() const
Definition ucp_context.h:102
void * pbuf_get(size_t size)
bool am_send_fast_path(ucp_ep_h ep, unsigned am_id, const void *header, size_t header_size, const void *payload, size_t payload_size, ucs_memory_type_t memtype)
bool submit_req(Request *req)
ucp_worker_h get_ucp_worker() const
Definition ucp_context.h:103
void * mmp_get(size_t size)
bool ep_add(int target, ucp_address_t *addr, int remote_dev_index)
UCPWorker(const UCPContext *context, Type type, size_t am_alignment, bool use_wakeup, unsigned prog_boff_max, int prog_itr_max, int rdesc_rel_max, ucs_thread_mode_t thread_mode, size_t user_req_size, size_t user_req_alignment, size_t pbuf_max_size, size_t pbuf_max_chunk_size, size_t pbuf_max_count, size_t pbuf_init_count, size_t mmp_max_obj_size, bool leak_check)
size_t num_eps() const
size_t get_max_am_header() const
Definition ucp_context.h:101
void request_release(void *req)
bool ep_get(int target, int remote_dev_index, ucp_ep_h *ep) const
void return_am_rdesc(void *rdesc)
void pbuf_release(void *buf)
Definition atomics.h:31
Definition activemsg.h:38
Definition ucp_context.h:54
unsigned id
Definition ucp_context.h:66
ucp_rkey_h rkey
Definition ucp_context.h:72
void * args
Definition ucp_context.h:58
ucs_memory_type_t memtype
Definition ucp_context.h:61
OpType op_type
Definition ucp_context.h:55
void * header
Definition ucp_context.h:67
ucp_ep_h ep
Definition ucp_context.h:56
struct Realm::UCP::UCPWorker::Request::@41::@44 rma
size_t payload_size
Definition ucp_context.h:60
struct Realm::UCP::UCPWorker::Request::@41::@43 am
uint64_t remote_addr
Definition ucp_context.h:73
size_t header_size
Definition ucp_context.h:68
ucp_send_nbx_callback_t cb
Definition ucp_context.h:62
uint32_t flags
Definition ucp_context.h:57
void * payload
Definition ucp_context.h:59