39#include <ucp/api/ucp.h>
43#include <unordered_map>
44#include <unordered_set>
93 std::vector<UCPWorker *> workers;
99 Mutex poll_notify_mutex;
134 bool bind_cudamem{
true};
135 std::string gpu_nics;
136 std::string zcopy_thresh_dev;
146 void attach(std::vector<NetworkSegment *> &segments);
147 void detach(std::vector<NetworkSegment *> &segments);
151 void gather(
NodeID root,
const void *val_in,
void *vals_out,
size_t bytes);
152 void allgather(
const char *val_in,
size_t bytes, std::vector<char> &vals_out,
154 void allgatherv(
const char *val_in,
size_t bytes, std::vector<char> &vals_out,
155 std::vector<size_t> &lengths);
160 bool with_congestion,
size_t header_size);
163 const UCPRDMAInfo *rdma_info, ucp_ep_h *ep)
const;
187 struct AmHandlersArgs {
193 std::vector<UCPWorker *> tx_workers;
194 std::vector<UCPWorker *> rx_workers;
197#ifdef REALM_UCX_DYNAMIC_LOAD
198 bool resolve_ucp_api_fnptrs();
202 bool init_ucp_contexts(
const std::unordered_set<Realm::Cuda::GPU *> &gpus);
204 bool init_ucp_contexts();
207 bool create_workers();
208 void destroy_workers();
209 size_t get_num_workers();
210 bool set_am_handlers();
211 bool create_eps(uint8_t priority);
213 bool create_pollers();
214 const UCPContext *get_context_host()
const;
215#if defined(REALM_USE_CUDA)
216 const UCPContext *get_context_device(
int dev_index)
const;
218 const std::vector<UCPWorker *> &get_tx_workers(
const UCPContext *context)
const;
219 const std::vector<UCPWorker *> &get_rx_workers(
const UCPContext *context)
const;
220 UCPWorker *get_rx_worker(
const UCPContext *context, uint8_t priority)
const;
222 bool add_rdma_info(
NetworkSegment *segment,
const UCPContext *context,
224 void add_rdma_info_odr(
NetworkSegment *segment,
const UCPContext *context);
225 bool am_msg_recv_data_ready(UCPInternal *internal, UCPWorker *worker,
226 const UCPMsgHdr *ucp_msg_hdr,
size_t header_size,
227 void *payload,
size_t payload_size,
int payload_mode);
228 static ucs_status_t am_remote_comp_handler(
void *arg,
const void *header,
229 size_t header_size,
void *data,
231 const ucp_am_recv_param_t *param);
232 static void am_send_reply_comp_handler(
void *request, ucs_status_t status,
234 static void am_realm_comp_handler(
NodeID sender,
237 static void am_rndv_recv_data_handler(
void *request, ucs_status_t status,
238 size_t length,
void *user_data);
239 static ucs_status_t am_msg_recv_handler(
void *arg,
const void *header,
240 size_t header_size,
void *payload,
242 const ucp_am_recv_param_t *param);
243 static ucs_status_t am_rdma_msg_recv_handler(
void *arg,
const void *header,
244 size_t header_size,
void *payload,
246 const ucp_am_recv_param_t *param);
248 bool compute_shared_ranks();
249 using WorkersMap = std::unordered_map<const UCPContext *, Workers>;
250 using AttachMap = std::unordered_map<const UCPContext *, std::vector<ucp_mem_h>>;
252#ifdef REALM_UCX_DYNAMIC_LOAD
253 void *libucp{
nullptr};
255 bool initialized_boot{
false};
256 bool initialized_ucp{
false};
259 std::unique_ptr<ucc::UCCComm> ucc_comm{
nullptr};
260 int num_shared_ranks{0};
261 std::vector<NodeID> shared_ranks;
262 std::list<UCPContext> ucp_contexts;
264 std::unordered_map<int, UCPContext *> dev_ctx_map;
267 std::list<UCPPoller> pollers;
268 std::list<AmHandlersArgs> am_handlers_args;
269 AttachMap attach_mem_hs;
270 atomic<uint64_t> total_msg_sent;
271 atomic<uint64_t> total_msg_received;
272 atomic<uint64_t> total_rcomp_sent;
273 atomic<uint64_t> total_rcomp_received;
274 atomic<uint64_t> outstanding_reqs;
276 SpinLock rcba_mp_spinlock;
278 size_t zcopy_thresh_host;
284 size_t header_size,
size_t max_payload_size,
285 const void *src_payload_addr,
size_t src_payload_lines,
286 size_t src_payload_line_stride,
const NetworkSegment *_src_segment,
287 const RemoteAddress *_dest_payload_addr,
size_t storage_size);
290 size_t header_size,
size_t max_payload_size,
291 const void *src_payload_addr,
size_t src_payload_lines,
292 size_t src_payload_line_stride,
size_t storage_size);
299 virtual void commit(
size_t act_payload_size);
303 bool set_inline_payload_base();
304 bool commit_with_rma(ucp_ep_h ep);
305 bool commit_unicast(
size_t act_payload_size);
306 bool commit_multicast(
size_t act_payload_size);
307 bool send_fast_path(ucp_ep_h ep,
size_t act_payload_size);
308 bool send_slow_path(ucp_ep_h ep,
size_t act_payload_size, uint32_t flags);
309 Request *make_request(
size_t act_payload_size);
311 static bool send_request(
Request *req,
unsigned am_id);
313 static void am_local_comp_handler(
void *request, ucs_status_t status,
315 static void am_put_comp_handler(
void *request, ucs_status_t status,
317 static void am_put_flush_comp_handler(
void *request, ucs_status_t status,
324 const void *src_payload_addr;
325 size_t src_payload_lines;
326 size_t src_payload_line_stride;
329 UCPRDMAInfo *dest_payload_rdma_info{
nullptr};
330 CompList *local_comp{
nullptr};
331 RemoteComp *remote_comp{
nullptr};
332 bool is_multicast{
false};
333 ucs_memory_type_t memtype;
335 UCPMsgHdr ucp_msg_hdr;
348 UCPRDMAInfo *rdma_info;
349 size_t rdma_info_size;
Definition activemsg.h:205
Definition bytearray.h:53
Definition ib_memory.h:30
uintptr_t CallbackData
Definition activemsg.h:351
Memory me
Definition mem_impl.h:194
size_t size
Definition mem_impl.h:195
MemoryKind kind
Definition mem_impl.h:196
Kind
Definition memory.h:59
Definition mem_impl.h:535
Definition runtime_impl.h:264
Definition ucp_module.h:31
Definition ucp_context.h:152
Definition ucp_internal.h:367
UCPIBMemory(RuntimeImpl *_runtime_impl, Memory me, size_t size, Memory::Kind kind, const ByteArray &rdma_info_ba, UCPInternal *internal)
virtual bool get_remote_addr(off_t offset, RemoteAddress &remote_addr)
Definition ucp_internal.h:104
void allgather(const char *val_in, size_t bytes, std::vector< char > &vals_out, size_t *lengths)
void allgatherv(const char *val_in, size_t bytes, std::vector< char > &vals_out, std::vector< size_t > &lengths)
void detach(std::vector< NetworkSegment * > &segments)
bool check_for_quiescence(size_t sampled_receive_count)
UCPInternal(Realm::UCPModule *_module, Realm::RuntimeImpl *_runtime)
void notify_msg_sent(uint64_t count)
Request * request_get(UCPWorker *worker)
UCPModule *RuntimeImpl * runtime
Definition ucp_internal.h:184
void broadcast(NodeID root, const void *val_in, void *val_out, size_t bytes)
UCPWorker * get_tx_worker(const UCPContext *context, uint8_t priority) const
void get_shared_peers(Realm::NodeSet &shared_peers)
size_t sample_messages_received_count()
size_t num_eps(const UCPContext &context) const
void * hbuf_get(UCPWorker *worker, size_t size)
void * pbuf_get(UCPWorker *worker, size_t size)
void hbuf_release(UCPWorker *worker, void *buf)
const UCPContext * get_context(const NetworkSegment *segment) const
void pbuf_release(UCPWorker *worker, void *buf)
size_t recommended_max_payload(const void *data, const NetworkSegment *src_segment, const RemoteAddress *dest_payload_addr, bool with_congestion, size_t header_size)
void request_release(Request *req)
bool init(const Config &config)
void attach(std::vector< NetworkSegment * > &segments)
bool get_ucp_ep(const UCPWorker *worker, NodeID target, const UCPRDMAInfo *rdma_info, ucp_ep_h *ep) const
void gather(NodeID root, const void *val_in, void *vals_out, size_t bytes)
Definition ucp_internal.h:281
virtual ~UCPMessageImpl()
virtual void commit(size_t act_payload_size)
UCPMessageImpl(UCPInternal *internal, NodeID target, unsigned short msgid, size_t header_size, size_t max_payload_size, const void *src_payload_addr, size_t src_payload_lines, size_t src_payload_line_stride, const NetworkSegment *_src_segment, const RemoteAddress *_dest_payload_addr, size_t storage_size)
virtual void * add_local_completion(size_t size)
UCPMessageImpl(UCPInternal *internal, const NodeSet &targets, unsigned short msgid, size_t header_size, size_t max_payload_size, const void *src_payload_addr, size_t src_payload_lines, size_t src_payload_line_stride, size_t storage_size)
virtual void * add_remote_completion(size_t size)
Definition ucp_internal.h:82
bool do_work(TimeLimit work_until)
void add_worker(UCPWorker *worker)
Definition ucp_internal.h:339
virtual ~UCPRemoteMemoryCommon()
virtual bool get_remote_addr(off_t offset, RemoteAddress &remote_addr)
UCPRemoteMemoryCommon(const ByteArray &rdma_info_ba)
Definition ucp_internal.h:355
virtual bool get_remote_addr(off_t offset, RemoteAddress &remote_addr)
UCPRemoteMemory(RuntimeImpl *_runtime_impl, Memory me, size_t size, Memory::Kind kind, const ByteArray &rdma_info_ba, UCPInternal *internal)
virtual void get_bytes(off_t offset, void *dst, size_t size)
virtual void put_bytes(off_t offset, const void *src, size_t size)
Definition ucp_context.h:40
PayloadBaseType
Definition ucp_internal.h:57
@ PAYLOAD_BASE_EXTERNAL
Definition ucp_internal.h:60
@ PAYLOAD_BASE_LAST
Definition ucp_internal.h:61
@ PAYLOAD_BASE_INTERNAL
Definition ucp_internal.h:59
@ PAYLOAD_BASE_INLINE
Definition ucp_internal.h:58
Realm::UCP::UCPPoller __attribute__
AmWithRemoteAddrMode
Definition ucp_internal.h:50
@ AM_WITH_REMOTE_ADDR_MODE_PUT
Definition ucp_internal.h:52
@ AM_WITH_REMOTE_ADDR_MODE_AUTO
Definition ucp_internal.h:51
@ AM_WITH_REMOTE_ADDR_MODE_AM
Definition ucp_internal.h:53
Definition activemsg.h:38
int NodeID
Definition nodeset.h:40
Definition ucp_internal.h:107
int rdesc_rel_max
Definition ucp_internal.h:114
bool use_wakeup
Definition ucp_internal.h:119
std::string zcopy_thresh_host
Definition ucp_internal.h:131
size_t outstanding_reqs_limit
Definition ucp_internal.h:128
std::string host_nics
Definition ucp_internal.h:130
std::string ib_seg_size
Definition ucp_internal.h:129
size_t pbuf_max_size
Definition ucp_internal.h:122
int num_priorities
Definition ucp_internal.h:111
int prog_itr_max
Definition ucp_internal.h:113
size_t pbuf_mp_thresh
Definition ucp_internal.h:126
size_t pbuf_init_count
Definition ucp_internal.h:125
bool pbuf_malloc
Definition ucp_internal.h:118
size_t pbuf_max_count
Definition ucp_internal.h:124
size_t fp_max
Definition ucp_internal.h:121
size_t priority_size_max
Definition ucp_internal.h:120
std::string tls_host
Definition ucp_internal.h:132
size_t pbuf_max_chunk_size
Definition ucp_internal.h:123
int prog_boff_max
Definition ucp_internal.h:112
bool bind_hostmem
Definition ucp_internal.h:109
size_t mmp_max_obj_size
Definition ucp_internal.h:127
bool hbuf_malloc
Definition ucp_internal.h:117
bool mpool_leakcheck
Definition ucp_internal.h:115
AmWithRemoteAddrMode am_wra_mode
Definition ucp_internal.h:108
int pollers_max
Definition ucp_internal.h:110
bool crc_check
Definition ucp_internal.h:116
Definition ucp_internal.h:69
char realm_hdr[0]
Definition ucp_internal.h:79
uint32_t crc
Definition ucp_internal.h:70
size_t rdma_payload_size
Definition ucp_internal.h:75
unsigned short msgid
Definition ucp_internal.h:72
RemoteComp * remote_comp
Definition ucp_internal.h:73
NodeID src
Definition ucp_internal.h:71
void * rdma_payload_addr
Definition ucp_internal.h:74
Definition bootstrap.h:32
NodeID src
Definition ucp_internal.h:1
unsigned short msgid
Definition ucp_internal.h:2