Realm
A distributed, event-based tasking library
Loading...
Searching...
No Matches
gasnetex_internal.h
Go to the documentation of this file.
1/*
2 * Copyright 2025 Stanford University, NVIDIA Corporation
3 * SPDX-License-Identifier: Apache-2.0
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18// GASNet-EX network module internals
19
20#ifndef GASNETEX_INTERNAL_H
21#define GASNETEX_INTERNAL_H
22
24
25#ifdef REALM_USE_GASNETEX_WRAPPER
26// Disable the definitions and importing of symbols we're not going to use
27#define GEX_NO_PROTOTYPES 1
28#endif
29
31
32#include "realm/bgwork.h"
33#include "realm/atomics.h"
34#include "realm/activemsg.h"
35#include "realm/lists.h"
36
37namespace Realm {
38
39#define REALM_GEX_MODULE_VERSION 20240301
40
42
43 // rdma pointers need to identify which endpoint they belong to
48
49 class OutbufManager;
50
52 public:
54
61
63
65
77
78 static const int MAX_PACKETS = 256;
79
80 bool pktbuf_reserve(size_t bytes, int &pktidx, uintptr_t &offset);
82 uintptr_t pktbuf_get_offset(int pktidx);
83 bool pktbuf_commit(int pktidx, PktType pkttype, bool update_realbuf);
84
85 protected:
86 friend class OutbufManager;
87 friend class XmitSrcDestPair;
88
89 void set_state(State new_state);
90
94 uintptr_t baseptr;
95 size_t size;
96
100
102
103 // dbuf reservations are NOT thread-safe - external serialization is used
106
107 // pbuf reservatsions are NOT thread-safe - external serialization is used
108 atomic<int> pktbuf_total_packets; // unsynchronized read ok
110 size_t pktbuf_pkt_ends[MAX_PACKETS]; // stores where a packet _ends_
111
112 // pbuf commits are lock-free
115
116 // pbuf consumption is NOT thread-safe - one pusher at a time
120 };
121
123 public:
125 : md(_md)
126 {}
127
128 void operator()() const { md->dec_usecount(); }
129
130 protected:
132 };
133
135 public:
138
139 void init(size_t _outbuf_count, size_t _outbuf_size, uintptr_t _baseptr);
140
142 bool new_endpoint);
144
145 virtual bool do_work(TimeLimit work_until);
146
147 protected:
152 // we manage the copying over overflow bufs back to real outbufs - track
153 // both pending overflow bufs and reserved outbufs and request bgwork
154 // time when we've got at least one of each
156 // In order to detect the case of misconfigured objcounts given the
157 // number of endpoints we count how many different endpoints we
158 // need to allocate buffers for, if it is more than the total number
159 // of buffers then we fail immediately since we'll never succeed
160 // TODO: Remove this once we insert a slow insertion pathway
161 // from dynamically allocated output buffers that are not
162 // registered with GASNet
163 // https://github.com/StanfordLegion/realm/issues/239
168 };
169
171
173 public:
175
176 void *add_local_completion(size_t bytes, bool late_ok = false);
177 void *add_remote_completion(size_t bytes);
178
179 // marks ready and returns true if non-empty, else resets and returns false
180 bool mark_ready(unsigned exp_local, unsigned exp_remote);
181
184
185 // these two calls can be concurrent, which complicates the determination of
186 // who can free the entry
189
190 int index;
193 // these three bits need to be in a single atomic location
194 static const unsigned LOCAL_PENDING_BIT = 1;
195 static const unsigned REMOTE_PENDING_BIT = 2;
196 static const unsigned READY_BIT = 4;
200
201 static const size_t TOTAL_CAPACITY = 256;
205 Storage_aligned storage;
206 };
207
209 static const size_t LOG2_GROUPSIZE = 8; // 256 per group
210
212 };
213
215 public:
218
221
223 void invoke_completions(PendingCompletion *comp, bool do_local, bool do_remote);
224
226
228
229 protected:
230 // NOTE: we stuff completion IDs, message IDs and 2 more bits into a
231 // 32-bit word, so we're limited to 2^(30-12) = 256K completions
232 static const size_t LOG2_MAXGROUPS = 10;
233
234 // protects pops from the free list (to avoid A-B-A problem), but NOT pushes
237 Realm::atomic<int> num_groups; // number of groups currently allocated
240 size_t pending_soft_limit; // try to stall traffic above this threshold
241 };
242
243 template <typename T, unsigned CHUNK_SIZE>
245 public:
248
250 void free_obj(T *obj);
251
252 protected:
253 // we'll store a pointer back to the chunk header just past each object
254 // each object needs enough space for a pointer back to the chunk header
255 struct Chunk;
256 struct WithPtr {
257 typedef char Storage_unaligned[sizeof(T)];
259 union {
260 Storage_aligned raw_storage;
261 uintptr_t nextptr;
262 };
264 };
270
272 uintptr_t free_head;
273 uintptr_t *free_tail;
277 };
278
279 struct PendingPutHeader;
280
282 protected:
283 // should not be directly allocated
284 template <typename T, unsigned CHUNK_SIZE>
285 friend class ChunkedRecycler;
288
289 public:
291 {
293 STRAT_SHORT_IMMEDIATE, // AM short, attempt immediate
294 STRAT_SHORT_PBUF, // AM short, deferred in pktbuf
295 STRAT_MEDIUM_IMMEDIATE, // AM medium, attempt immediate
296 STRAT_MEDIUM_PBUF, // AM medium, header/data in pktbuf
297 STRAT_MEDIUM_MALLOCSRC, // AM medium, malloc'd temp source (HACK)
298 STRAT_MEDIUM_PREP, // AM medium, using NPAM
305 };
309 unsigned short msgid;
317 };
318
319 // we'll keep separate transmit queues for each src/dst pair
320 // (i.e src_ep_index, dst_rank, dst_ep_index), hopefully avoiding any
321 // head of line blocking if networking resources are exhausted on a given
322 // path
324 public:
326 gex_rank_t _tgt_rank, gex_ep_index_t _tgt_ep_index);
328
329 // indicates whether any packets are pending, including those that
330 // have not been committed yet - useful for quiescence detection but
331 // also for ensuring new packets don't jump the queue
332 bool has_packets_queued() const;
333
334 // a packet that is sent immediately is counted as if it was
335 // reserved and then sent in rapid succession
337
338 // reserves space in a pbuf for an outbound packet (allocating and
339 // enqueuing a pbuf if needed)
340 bool reserve_pbuf_inline(size_t hdr_bytes, size_t payload_bytes, bool overflow_ok,
341 OutbufMetadata *&pktbuf, int &pktidx, void *&hdr_base,
342 void *&payload_base);
343 bool reserve_pbuf_long_rget(size_t hdr_bytes, bool overflow_ok,
344 OutbufMetadata *&pktbuf, int &pktidx, void *&hdr_base);
345 bool reserve_pbuf_put(bool overflow_ok, OutbufMetadata *&pktbuf, int &pktidx);
346 void commit_pbuf_inline(OutbufMetadata *pktbuf, int pktidx, const void *hdr_base,
347 gex_am_arg_t arg0, size_t act_payload_bytes);
348 void commit_pbuf_long(OutbufMetadata *pktbuf, int pktidx, const void *hdr_base,
349 gex_am_arg_t arg0, const void *payload_base,
350 size_t payload_bytes, uintptr_t dest_addr,
351 OutbufMetadata *databuf);
352 void commit_pbuf_rget(OutbufMetadata *pktbuf, int pktidx, const void *hdr_base,
353 gex_am_arg_t arg0, const void *payload_base,
354 size_t payload_bytes, uintptr_t dest_addr,
356 void commit_pbuf_put(OutbufMetadata *pktbuf, int pktidx, PendingPutHeader *put,
357 const void *payload_base, size_t payload_bytes,
358 uintptr_t dest_addr);
359 void cancel_pbuf(OutbufMetadata *pktbuf, int pktidx);
360
362
364
365 // adds the xpair to the injector ready list or the poller critical
366 // pair list as appropriate, eventually resulting in a call to
367 // push_packets - MUST NOT be called until the push_packets
368 // that resulted from the previous enqueue is done-ish (i.e. not going
369 // to push anything else)
370 void request_push(bool force_critical);
371
372 void push_packets(bool immediate_mode, TimeLimit work_until);
373
374 long long time_since_failure() const;
375
376 // cancels a pending push that was enqueued via request_push but never
377 // processed by push_packets - used during shutdown to balance the
378 // push_mutex_check lock
380
381 // used when cfg_am_limit is nonzero
383 void return_am_credits(int count);
384
390
392 const void *payload_base;
394 uintptr_t dest_addr;
395 union {
396 struct {
398 } l;
399 struct {
400 // rget needs to give both src and target ep index for data
402 } r;
403 };
404 };
405
406 struct PutMetadata {
407 const void *src_addr;
408 uintptr_t dest_addr;
411 };
412
413 protected:
414 friend class GASNetEXInternal;
415
416 bool reserve_pbuf_helper(size_t total_bytes, bool overflow_ok,
417 OutbufMetadata *&pktbuf, int &pktidx, uintptr_t &baseptr);
418 bool commit_pbuf_helper(OutbufMetadata *pktbuf, int pktidx, const void *hdr_base,
419 uintptr_t &baseptr);
420
427 // we don't hold the mutex while pushing packets, but we need definitely
428 // don't want multiple threads trying to push for the same src/dst pair
437 // circular queue of pending completion replys
440 atomic<unsigned> comp_reply_count; // read without mutex
442 // TODO: track packets in flight to avoid clogging?
443 atomic<int> am_credits; // available end-to-end AM credits for this peer
444 };
445
446 class XmitSrc {
447 public:
448 XmitSrc(GASNetEXInternal *_internal, gex_ep_index_t _src_ep_index);
450
452
453 protected:
454 friend class GASNetEXInternal;
455
458
459 // we'll allocate XmitSrcDestPair's on demand - atomics allow nonblocking
460 // lookup
462
463 // TODO: track in flight work at src level?
464 };
465
466 struct PendingReverseGet;
467
506
507 // an injector tries to send packets, but is not allowed to stall -
508 // all requests must use GEX_FLAG_IMMEDIATE and any failure results in
509 // handing the xpair off to the poller (which is allowed to block eventually)
511 public:
513
515
516 // drains any xpairs remaining in the ready queue, cancelling their
517 // pending pushes - called during shutdown
519
521
522 virtual bool do_work(TimeLimit work_until);
523
524 protected:
527 atomic<bool> work_active; // set during do_work when xpairs are on stack
529 };
530
532 public:
534
537
539
540 // drains any xpairs remaining in the critical queue, cancelling their
541 // pending pushes - called during shutdown
543
545
547
548 virtual bool do_work(TimeLimit work_until);
549
550 // causes calling thread to wait for a full call to gasnet_AMPoll() to
551 // be performed by the poller
553
554 protected:
555 bool started = false;
558 atomic<bool> shutdown_flag; // set/cleared inside mutex, but tested outside
560 atomic<bool> pollwait_flag; // set/cleared inside mutex, but tested outside
562 atomic<bool> work_active; // set during do_work when items are on stack
565 };
566
568 public:
570
572
574
575 virtual bool do_work(TimeLimit work_until);
576
577 protected:
580 atomic<bool> has_work; // can be read without mutex
582 };
583
585 protected:
586 // should not be directly allocated
587 template <typename T, unsigned CHUNK_SIZE>
588 friend class ChunkedRecycler;
591
592 public:
596 static const size_t MAX_HDR_SIZE = 128;
597 size_t hdr_size;
598 unsigned char hdr_data[MAX_HDR_SIZE];
599 uintptr_t src_ptr, tgt_ptr;
604 };
605
606 // NOTE: ReverseGetter exists for now because we have to use RMAs instead
607 // of long AMs for transfers between segments bound to non-primordial
608 // endpoints (on the source and/or the target) - once GASNet supports
609 // AMs on all endpoints, this code should probably go away
610 class ReverseGetter;
611
613 protected:
614 // should not be directly allocated
615 template <typename T, unsigned CHUNK_SIZE>
616 friend class ChunkedRecycler;
619
620 public:
626 static const size_t MAX_HDR_SIZE = 128;
627 size_t hdr_size;
628 unsigned char hdr_data[MAX_HDR_SIZE];
629 uintptr_t src_ptr, tgt_ptr;
631 };
632
634 public:
636
637 void add_reverse_get(gex_rank_t srcrank, gex_ep_index_t src_ep_index,
638 gex_ep_index_t tgt_ep_index, gex_am_arg_t arg0, const void *hdr,
639 size_t hdr_bytes, uintptr_t src_ptr, uintptr_t tgt_ptr,
640 size_t payload_bytes);
641
643
644 virtual bool do_work(TimeLimit work_until);
645
646 protected:
647 friend class GASNetEXEvent;
649
655 };
656
658 public:
660
662
663 bool init(int *argc, const char ***argv);
664 uintptr_t attach(size_t size);
665
666 bool attempt_binding(void *base, size_t size, NetworkSegmentInfo::MemoryType memtype,
668 gex_ep_index_t *ep_indexp);
670
671 void detach();
672
673 void get_shared_peers(Realm::NodeSet &shared_peers);
674 void barrier();
675 void broadcast(gex_rank_t root, const void *val_in, void *val_out, size_t bytes);
676 void gather(gex_rank_t root, const void *val_in, void *vals_out, size_t bytes);
677 void allgatherv(const char *val_in, size_t bytes, std::vector<char> &vals_out,
678 std::vector<size_t> &lengths);
679
681 bool check_for_quiescence(size_t sampled_receive_count);
682
685
686 size_t recommended_max_payload(gex_rank_t target, gex_ep_index_t target_ep_index,
687 bool with_congestion, size_t header_size,
688 uintptr_t dest_payload_addr);
689 size_t recommended_max_payload(gex_rank_t target, gex_ep_index_t target_ep_index,
690 const void *data, size_t bytes_per_line, size_t lines,
691 size_t line_stride, bool with_congestion,
692 size_t header_size, uintptr_t dest_payload_addr);
693 size_t recommended_max_payload(bool with_congestion, size_t header_size);
694
696 unsigned short msgid, void *&header_base,
697 size_t header_size, void *&payload_base,
698 size_t payload_size, uintptr_t dest_payload_addr);
699 void commit_message(PreparedMessage *msg, PendingCompletion *comp, void *header_base,
700 size_t header_size, void *payload_base, size_t payload_size);
702
703 gex_am_arg_t handle_short(gex_rank_t srcrank, gex_am_arg_t arg0, const void *hdr,
704 size_t hdr_bytes);
705 gex_am_arg_t handle_medium(gex_rank_t srcrank, gex_am_arg_t arg0, const void *hdr,
706 size_t hdr_bytes, const void *data, size_t data_bytes);
707 gex_am_arg_t handle_long(gex_rank_t srcrank, gex_am_arg_t arg0, const void *hdr,
708 size_t hdr_bytes, const void *data, size_t data_bytes);
709 void handle_reverse_get(gex_rank_t srcrank, gex_ep_index_t src_ep_index,
710 gex_ep_index_t tgt_ep_index, gex_am_arg_t arg0,
711 const void *hdr, size_t hdr_bytes, uintptr_t src_ptr,
712 uintptr_t tgt_ptr, size_t payload_bytes);
714 const void *data, size_t data_bytes, gex_am_arg_t *comps);
716 size_t nargs);
717
718 protected:
719 friend class ReverseGetter;
720 friend class XmitSrc;
721 friend class XmitSrcDestPair;
722 friend class GASNetEXEvent;
723 friend class GASNetEXPoller;
724 friend class GASNetEXCompleter;
725
726 // callbacks from IncomingMessageManager
727 static void short_message_complete(NodeID sender, uintptr_t objptr,
728 uintptr_t comp_info);
729 static void medium_message_complete(NodeID sender, uintptr_t objptr,
730 uintptr_t comp_info);
731 static void long_message_complete(NodeID sender, uintptr_t objptr,
732 uintptr_t comp_info);
733
735
736 GASNetEXModule *module;
739 // order in 'eps' should match GASNet's indexing
740 std::vector<gex_ep_opaque_t> eps;
745
753 struct SegmentInfoSorter;
754 // this list is sorted by address to enable quick address lookup
755 std::vector<SegmentInfo> segments_by_addr;
756
757 const SegmentInfo *find_segment(const void *srcptr) const;
758
759 std::vector<XmitSrc *> xmitsrcs;
760
767
768 // TODO: split counter into per-thread values to avoid contention?
770
771 // manage a single open databuf for all endpoints
775
776 // allocator/managers for various objects we want to reuse
780 };
781
782}; // namespace Realm
783
784#endif
Definition bgwork.h:129
Definition gasnetex_internal.h:244
Mutex mutex
Definition gasnetex_internal.h:271
size_t max_alloc
Definition gasnetex_internal.h:276
atomic< size_t > cur_alloc
Definition gasnetex_internal.h:275
Chunk * chunks_head
Definition gasnetex_internal.h:274
uintptr_t free_head
Definition gasnetex_internal.h:272
size_t cur_capacity
Definition gasnetex_internal.h:276
uintptr_t * free_tail
Definition gasnetex_internal.h:273
static const size_t ALIGNMENT
Definition activemsg.h:183
Definition threads.h:428
Definition gasnetex_internal.h:567
GASNetEXCompleter(GASNetEXInternal *_internal)
void add_ready_events(GASNetEXEvent::EventList &newly_ready)
atomic< bool > has_work
Definition gasnetex_internal.h:580
Mutex mutex
Definition gasnetex_internal.h:579
GASNetEXInternal * internal
Definition gasnetex_internal.h:578
GASNetEXEvent::EventList ready_events
Definition gasnetex_internal.h:581
virtual bool do_work(TimeLimit work_until)
Definition gasnetex_internal.h:468
GASNetEXEvent & set_pktbuf(OutbufMetadata *_pktbuf)
GASNetEXEvent * leaf
Definition gasnetex_internal.h:504
~GASNetEXEvent()
Definition gasnetex_internal.h:474
REALM_PMTA_DEFN(GASNetEXEvent, IntrusiveListLink< GASNetEXEvent >, event_list_link)
GASNetEXEvent & set_leaf(GASNetEXEvent *_leaf)
GASNetEXEvent & set_put(PendingPutHeader *_put)
gex_ep_opaque_t event
Definition gasnetex_internal.h:498
PendingCompletion * local_comp
Definition gasnetex_internal.h:499
GASNetEXEvent & set_event(gex_event_opaque_t _event)
GASNetEXEvent & set_rget(PendingReverseGet *_rget)
GASNetEXEvent & set_databuf(OutbufMetadata *_databuf)
PendingPutHeader * put
Definition gasnetex_internal.h:503
IntrusiveListLink< GASNetEXEvent > event_list_link
Definition gasnetex_internal.h:491
OutbufMetadata * pktbuf
Definition gasnetex_internal.h:500
gex_event_opaque_t get_event() const
OutbufMetadata * databuf
Definition gasnetex_internal.h:501
void trigger(GASNetEXInternal *internal)
IntrusiveList< GASNetEXEvent, REALM_PMTA_USE(GASNetEXEvent, event_list_link), DummyLock > EventList
Definition gasnetex_internal.h:495
GASNetEXEvent & set_local_comp(PendingCompletion *_local_comp)
PendingReverseGet * rget
Definition gasnetex_internal.h:502
Definition gasnetex_internal.h:510
void add_ready_xpair(XmitSrcDestPair *xpair)
GASNetEXInjector(GASNetEXInternal *_internal)
GASNetEXInternal * internal
Definition gasnetex_internal.h:525
atomic< bool > work_active
Definition gasnetex_internal.h:527
virtual bool do_work(TimeLimit work_until)
XmitSrcDestPair::XmitPairList ready_xpairs
Definition gasnetex_internal.h:528
Mutex mutex
Definition gasnetex_internal.h:526
Definition gasnetex_internal.h:657
GASNetEXCompleter completer
Definition gasnetex_internal.h:763
std::vector< XmitSrc * > xmitsrcs
Definition gasnetex_internal.h:759
atomic< uint64_t > total_packets_received
Definition gasnetex_internal.h:769
size_t recommended_max_payload(bool with_congestion, size_t header_size)
gex_rank_t prim_rank
Definition gasnetex_internal.h:742
gex_callback_handle_t gex_callback_handle
Definition gasnetex_internal.h:774
PreparedMessage * prepare_message(gex_rank_t target, gex_ep_index_t target_ep_index, unsigned short msgid, void *&header_base, size_t header_size, void *&payload_base, size_t payload_size, uintptr_t dest_payload_addr)
static void long_message_complete(NodeID sender, uintptr_t objptr, uintptr_t comp_info)
OutbufMetadata * databuf_md
Definition gasnetex_internal.h:773
size_t recommended_max_payload(gex_rank_t target, gex_ep_index_t target_ep_index, bool with_congestion, size_t header_size, uintptr_t dest_payload_addr)
GASNetEXPoller poller
Definition gasnetex_internal.h:761
Mutex databuf_mutex
Definition gasnetex_internal.h:772
PendingCompletionManager compmgr
Definition gasnetex_internal.h:765
bool init(int *argc, const char ***argv)
void cancel_message(PreparedMessage *msg)
GASNetEXModule *RuntimeImpl * runtime
Definition gasnetex_internal.h:737
gex_tm_opaque_t prim_tm
Definition gasnetex_internal.h:741
uintptr_t attach(size_t size)
OutbufManager obmgr
Definition gasnetex_internal.h:766
size_t recommended_max_payload(gex_rank_t target, gex_ep_index_t target_ep_index, const void *data, size_t bytes_per_line, size_t lines, size_t line_stride, bool with_congestion, size_t header_size, uintptr_t dest_payload_addr)
const SegmentInfo * find_segment(const void *srcptr) const
size_t handle_batch(gex_rank_t srcrank, gex_am_arg_t arg0, gex_am_arg_t cksum, const void *data, size_t data_bytes, gex_am_arg_t *comps)
static void medium_message_complete(NodeID sender, uintptr_t objptr, uintptr_t comp_info)
PendingCompletion * early_local_completion(PendingCompletion *comp)
GASNetEXInternal(GASNetEXModule *_module, RuntimeImpl *_runtime)
std::vector< gex_ep_opaque_t > eps
Definition gasnetex_internal.h:740
size_t sample_messages_received_count()
PendingCompletion * get_available_comp()
bool check_for_quiescence(size_t sampled_receive_count)
gex_client_opaque_t client
Definition gasnetex_internal.h:738
ChunkedRecycler< PendingPutHeader, 32 > put_alloc
Definition gasnetex_internal.h:779
static void short_message_complete(NodeID sender, uintptr_t objptr, uintptr_t comp_info)
void get_shared_peers(Realm::NodeSet &shared_peers)
ReverseGetter rgetter
Definition gasnetex_internal.h:764
size_t prim_segsize
Definition gasnetex_internal.h:744
gex_am_arg_t handle_short(gex_rank_t srcrank, gex_am_arg_t arg0, const void *hdr, size_t hdr_bytes)
ChunkedRecycler< PreparedMessage, 32 > prep_alloc
Definition gasnetex_internal.h:778
void handle_completion_reply(gex_rank_t srcrank, const gex_am_arg_t *args, size_t nargs)
void commit_message(PreparedMessage *msg, PendingCompletion *comp, void *header_base, size_t header_size, void *payload_base, size_t payload_size)
gex_am_arg_t handle_medium(gex_rank_t srcrank, gex_am_arg_t arg0, const void *hdr, size_t hdr_bytes, const void *data, size_t data_bytes)
gex_am_arg_t handle_long(gex_rank_t srcrank, gex_am_arg_t arg0, const void *hdr, size_t hdr_bytes, const void *data, size_t data_bytes)
void allgatherv(const char *val_in, size_t bytes, std::vector< char > &vals_out, std::vector< size_t > &lengths)
void broadcast(gex_rank_t root, const void *val_in, void *val_out, size_t bytes)
GASNetEXInjector injector
Definition gasnetex_internal.h:762
void gather(gex_rank_t root, const void *val_in, void *vals_out, size_t bytes)
ChunkedRecycler< GASNetEXEvent, 64 > event_alloc
Definition gasnetex_internal.h:777
gex_segment_opaque_t prim_segment
Definition gasnetex_internal.h:743
gex_rank_t prim_size
Definition gasnetex_internal.h:742
PendingCompletion * extract_arg0_local_comp(gex_am_arg_t &arg0)
void handle_reverse_get(gex_rank_t srcrank, gex_ep_index_t src_ep_index, gex_ep_index_t tgt_ep_index, gex_am_arg_t arg0, const void *hdr, size_t hdr_bytes, uintptr_t src_ptr, uintptr_t tgt_ptr, size_t payload_bytes)
std::vector< SegmentInfo > segments_by_addr
Definition gasnetex_internal.h:755
bool attempt_binding(void *base, size_t size, NetworkSegmentInfo::MemoryType memtype, NetworkSegmentInfo::MemoryTypeExtraData memextra, gex_ep_index_t *ep_indexp)
Definition gasnetex_module.h:29
Definition gasnetex_internal.h:531
GASNetEXPoller(GASNetEXInternal *_internal)
GASNetEXEvent::EventList pending_events
Definition gasnetex_internal.h:564
void add_pending_event(GASNetEXEvent *event)
virtual bool do_work(TimeLimit work_until)
Mutex mutex
Definition gasnetex_internal.h:557
Mutex::CondVar pollwait_cond
Definition gasnetex_internal.h:561
XmitSrcDestPair::XmitPairList critical_xpairs
Definition gasnetex_internal.h:563
void add_critical_xpair(XmitSrcDestPair *xpair)
atomic< bool > shutdown_flag
Definition gasnetex_internal.h:558
Mutex::CondVar shutdown_cond
Definition gasnetex_internal.h:559
bool started
Definition gasnetex_internal.h:555
GASNetEXInternal * internal
Definition gasnetex_internal.h:556
atomic< bool > work_active
Definition gasnetex_internal.h:562
atomic< bool > pollwait_flag
Definition gasnetex_internal.h:560
Definition lists.h:66
Definition mutex.h:80
Definition nodeset.h:117
Definition gasnetex_internal.h:134
OutbufMetadata * reserved_head
Definition gasnetex_internal.h:167
void init(size_t _outbuf_count, size_t _outbuf_size, uintptr_t _baseptr)
size_t num_endpoints
Definition gasnetex_internal.h:164
size_t outbuf_size
Definition gasnetex_internal.h:149
OutbufMetadata * overflow_head
Definition gasnetex_internal.h:165
OutbufMetadata * metadatas
Definition gasnetex_internal.h:148
OutbufMetadata * first_available
Definition gasnetex_internal.h:151
size_t num_overflow
Definition gasnetex_internal.h:155
OutbufMetadata ** overflow_tail
Definition gasnetex_internal.h:166
OutbufMetadata * alloc_outbuf(OutbufMetadata::State state, bool overflow_ok, bool new_endpoint)
void free_outbuf(OutbufMetadata *md)
virtual bool do_work(TimeLimit work_until)
size_t num_buffers
Definition gasnetex_internal.h:164
Mutex mutex
Definition gasnetex_internal.h:150
size_t num_reserved
Definition gasnetex_internal.h:155
Definition gasnetex_internal.h:51
int databuf_use_count
Definition gasnetex_internal.h:105
OutbufManager * manager
Definition gasnetex_internal.h:92
bool is_overflow
Definition gasnetex_internal.h:97
size_t pktbuf_rsrv_offset
Definition gasnetex_internal.h:109
static const int MAX_PACKETS
Definition gasnetex_internal.h:78
bool pktbuf_reserve(size_t bytes, int &pktidx, uintptr_t &offset)
atomic< int > remain_count
Definition gasnetex_internal.h:101
State state
Definition gasnetex_internal.h:91
atomic< OutbufMetadata * > realbuf
Definition gasnetex_internal.h:99
size_t databuf_rsrv_offset
Definition gasnetex_internal.h:104
OutbufMetadata * next_overflow
Definition gasnetex_internal.h:98
size_t pktbuf_sent_offset
Definition gasnetex_internal.h:118
atomic< int > pktbuf_total_packets
Definition gasnetex_internal.h:108
int pktbuf_use_count
Definition gasnetex_internal.h:119
PktType
Definition gasnetex_internal.h:67
@ PKTTYPE_LONG
Definition gasnetex_internal.h:71
@ PKTTYPE_RGET
Definition gasnetex_internal.h:72
@ PKTTYPE_INLINE
Definition gasnetex_internal.h:69
@ PKTTYPE_COPY_IN_PROGRESS
Definition gasnetex_internal.h:75
@ PKTTYPE_PUT
Definition gasnetex_internal.h:73
@ PKTTYPE_INVALID
Definition gasnetex_internal.h:68
@ PKTTYPE_INLINE_SHORT
Definition gasnetex_internal.h:70
@ PKTTYPE_CANCELLED
Definition gasnetex_internal.h:74
void set_state(State new_state)
bool pktbuf_commit(int pktidx, PktType pkttype, bool update_realbuf)
size_t size
Definition gasnetex_internal.h:95
atomic< PktType > pktbuf_pkt_types[MAX_PACKETS]
Definition gasnetex_internal.h:113
OutbufMetadata * nextbuf
Definition gasnetex_internal.h:93
uintptr_t pktbuf_get_offset(int pktidx)
int pktbuf_sent_packets
Definition gasnetex_internal.h:117
size_t pktbuf_pkt_ends[MAX_PACKETS]
Definition gasnetex_internal.h:110
State
Definition gasnetex_internal.h:56
@ STATE_PKTBUF
Definition gasnetex_internal.h:59
@ STATE_IDLE
Definition gasnetex_internal.h:57
@ STATE_DATABUF
Definition gasnetex_internal.h:58
atomic< int > pktbuf_ready_packets
Definition gasnetex_internal.h:114
uintptr_t baseptr
Definition gasnetex_internal.h:94
Definition gasnetex_internal.h:122
OutbufMetadata * md
Definition gasnetex_internal.h:131
OutbufUsecountDec(OutbufMetadata *_md)
Definition gasnetex_internal.h:124
void operator()() const
Definition gasnetex_internal.h:128
Definition gasnetex_internal.h:214
void invoke_completions(PendingCompletion *comp, bool do_local, bool do_remote)
PendingCompletion * get_available()
Realm::atomic< PendingCompletionGroup * > groups[1<< LOG2_MAXGROUPS]
Definition gasnetex_internal.h:238
size_t pending_soft_limit
Definition gasnetex_internal.h:240
Realm::atomic< PendingCompletion * > first_free
Definition gasnetex_internal.h:236
PendingCompletion * lookup_completion(int index)
Realm::Mutex mutex
Definition gasnetex_internal.h:235
void recycle_comp(PendingCompletion *comp)
Realm::atomic< int > num_groups
Definition gasnetex_internal.h:237
atomic< size_t > num_pending
Definition gasnetex_internal.h:239
static const size_t LOG2_MAXGROUPS
Definition gasnetex_internal.h:232
bool over_pending_completion_soft_limit() const
Definition gasnetex_internal.h:172
REALM_ALIGNED_TYPE_CONST(Storage_aligned, Storage_unaligned, Realm::CompletionCallbackBase::ALIGNMENT)
void * add_local_completion(size_t bytes, bool late_ok=false)
int index
Definition gasnetex_internal.h:190
Storage_aligned storage
Definition gasnetex_internal.h:205
PendingCompletionManager * manager
Definition gasnetex_internal.h:192
static const size_t TOTAL_CAPACITY
Definition gasnetex_internal.h:201
char Storage_unaligned[TOTAL_CAPACITY]
Definition gasnetex_internal.h:202
static const unsigned LOCAL_PENDING_BIT
Definition gasnetex_internal.h:194
size_t remote_bytes
Definition gasnetex_internal.h:199
void * add_remote_completion(size_t bytes)
static const unsigned REMOTE_PENDING_BIT
Definition gasnetex_internal.h:195
Realm::atomic< unsigned > remote_left
Definition gasnetex_internal.h:198
Realm::atomic< unsigned > local_left
Definition gasnetex_internal.h:198
static const unsigned READY_BIT
Definition gasnetex_internal.h:196
Realm::atomic< unsigned > state
Definition gasnetex_internal.h:197
size_t local_bytes
Definition gasnetex_internal.h:199
PendingCompletion * next_free
Definition gasnetex_internal.h:191
bool mark_ready(unsigned exp_local, unsigned exp_remote)
Definition gasnetex_internal.h:633
void add_reverse_get(gex_rank_t srcrank, gex_ep_index_t src_ep_index, gex_ep_index_t tgt_ep_index, gex_am_arg_t arg0, const void *hdr, size_t hdr_bytes, uintptr_t src_ptr, uintptr_t tgt_ptr, size_t payload_bytes)
GASNetEXInternal * internal
Definition gasnetex_internal.h:650
PendingReverseGet ** tailp
Definition gasnetex_internal.h:653
Mutex mutex
Definition gasnetex_internal.h:651
ChunkedRecycler< PendingReverseGet, 8 > rget_alloc
Definition gasnetex_internal.h:654
PendingReverseGet * head
Definition gasnetex_internal.h:652
virtual bool do_work(TimeLimit work_until)
void reverse_get_complete(PendingReverseGet *rget)
ReverseGetter(GASNetEXInternal *_internal)
Definition runtime_impl.h:265
Definition timers.h:129
Definition mutex.h:325
Definition mutex.h:223
Definition gasnetex_internal.h:323
IntrusiveListLink< XmitSrcDestPair > xpair_list_link
Definition gasnetex_internal.h:385
OutbufMetadata * cur_pbuf
Definition gasnetex_internal.h:431
void commit_pbuf_inline(OutbufMetadata *pktbuf, int pktidx, const void *hdr_base, gex_am_arg_t arg0, size_t act_payload_bytes)
bool commit_pbuf_helper(OutbufMetadata *pktbuf, int pktidx, const void *hdr_base, uintptr_t &baseptr)
void enqueue_put_header(PendingPutHeader *put)
atomic< unsigned > comp_reply_count
Definition gasnetex_internal.h:440
unsigned comp_reply_capacity
Definition gasnetex_internal.h:441
void commit_pbuf_put(OutbufMetadata *pktbuf, int pktidx, PendingPutHeader *put, const void *payload_base, size_t payload_bytes, uintptr_t dest_addr)
atomic< OutbufMetadata * > first_pbuf
Definition gasnetex_internal.h:430
GASNetEXInternal * internal
Definition gasnetex_internal.h:421
void request_push(bool force_critical)
bool reserve_pbuf_put(bool overflow_ok, OutbufMetadata *&pktbuf, int &pktidx)
unsigned comp_reply_wrptr
Definition gasnetex_internal.h:439
IntrusiveList< XmitSrcDestPair, REALM_PMTA_USE(XmitSrcDestPair, xpair_list_link), DummyLock > XmitPairList
Definition gasnetex_internal.h:389
bool has_packets_queued() const
gex_ep_index_t src_ep_index
Definition gasnetex_internal.h:422
atomic< size_t > packets_reserved
Definition gasnetex_internal.h:425
REALM_PMTA_DEFN(XmitSrcDestPair, IntrusiveListLink< XmitSrcDestPair >, xpair_list_link)
Mutex mutex
Definition gasnetex_internal.h:426
long long time_since_failure() const
atomic< int > am_credits
Definition gasnetex_internal.h:443
atomic< unsigned > imm_fail_count
Definition gasnetex_internal.h:432
gex_am_arg_t * comp_reply_data
Definition gasnetex_internal.h:438
long long first_fail_time
Definition gasnetex_internal.h:434
void return_am_credits(int count)
atomic< size_t > packets_sent
Definition gasnetex_internal.h:425
bool reserve_pbuf_long_rget(size_t hdr_bytes, bool overflow_ok, OutbufMetadata *&pktbuf, int &pktidx, void *&hdr_base)
atomic< PendingPutHeader * > put_head
Definition gasnetex_internal.h:435
unsigned comp_reply_rdptr
Definition gasnetex_internal.h:439
void commit_pbuf_rget(OutbufMetadata *pktbuf, int pktidx, const void *hdr_base, gex_am_arg_t arg0, const void *payload_base, size_t payload_bytes, uintptr_t dest_addr, gex_ep_index_t src_ep_index, gex_ep_index_t tgt_ep_index)
void enqueue_completion_reply(gex_am_arg_t comp_info)
MutexChecker push_mutex_check
Definition gasnetex_internal.h:429
void commit_pbuf_long(OutbufMetadata *pktbuf, int pktidx, const void *hdr_base, gex_am_arg_t arg0, const void *payload_base, size_t payload_bytes, uintptr_t dest_addr, OutbufMetadata *databuf)
bool reserve_pbuf_helper(size_t total_bytes, bool overflow_ok, OutbufMetadata *&pktbuf, int &pktidx, uintptr_t &baseptr)
gex_rank_t tgt_rank
Definition gasnetex_internal.h:423
atomic< PendingPutHeader * > * put_tailp
Definition gasnetex_internal.h:436
void cancel_pbuf(OutbufMetadata *pktbuf, int pktidx)
gex_ep_index_t tgt_ep_index
Definition gasnetex_internal.h:424
void push_packets(bool immediate_mode, TimeLimit work_until)
bool has_ready_packets
Definition gasnetex_internal.h:433
XmitSrcDestPair(GASNetEXInternal *_internal, gex_ep_index_t _src_ep_index, gex_rank_t _tgt_rank, gex_ep_index_t _tgt_ep_index)
bool reserve_pbuf_inline(size_t hdr_bytes, size_t payload_bytes, bool overflow_ok, OutbufMetadata *&pktbuf, int &pktidx, void *&hdr_base, void *&payload_base)
Definition gasnetex_internal.h:446
XmitSrc(GASNetEXInternal *_internal, gex_ep_index_t _src_ep_index)
XmitSrcDestPair * lookup_pair(gex_rank_t tgt_rank, gex_ep_index_t tgt_ep_index)
gex_ep_index_t src_ep_index
Definition gasnetex_internal.h:457
atomic< XmitSrcDestPair * > * pairs
Definition gasnetex_internal.h:461
GASNetEXInternal * internal
Definition gasnetex_internal.h:456
Definition atomics.h:31
void * gex_tm_opaque_t
Definition gasnetex_wrapper.h:38
void * gex_event_opaque_t
Definition gasnetex_wrapper.h:42
int32_t gex_am_arg_t
Definition gasnetex_wrapper.h:33
uint16_t gex_ep_index_t
Definition gasnetex_wrapper.h:30
void * gex_am_src_desc_opaque_t
Definition gasnetex_wrapper.h:43
uint32_t gex_rank_t
Definition gasnetex_wrapper.h:29
void * gex_segment_opaque_t
Definition gasnetex_wrapper.h:39
void * gex_ep_opaque_t
Definition gasnetex_wrapper.h:37
void * gex_client_opaque_t
Definition gasnetex_wrapper.h:36
#define REALM_PMTA_USE(structtype, name)
Definition lists.h:42
unsigned MemoryType
Definition network.h:244
uintptr_t MemoryTypeExtraData
Definition network.h:247
Definition activemsg.h:38
int NodeID
Definition nodeset.h:40
gex_wrapper_handle_t gex_wrapper_handle
Definition gasnetex_internal.h:265
Chunk * next_chunk
Definition gasnetex_internal.h:267
atomic< unsigned > remaining_count
Definition gasnetex_internal.h:266
WithPtr elements[CHUNK_SIZE]
Definition gasnetex_internal.h:268
Definition gasnetex_internal.h:256
Storage_aligned raw_storage
Definition gasnetex_internal.h:260
Chunk * backptr
Definition gasnetex_internal.h:263
REALM_ALIGNED_TYPE_SAMEAS(Storage_aligned, Storage_unaligned, T)
char Storage_unaligned[sizeof(T)]
Definition gasnetex_internal.h:257
uintptr_t nextptr
Definition gasnetex_internal.h:261
Definition gasnetex_internal.h:746
uintptr_t base
Definition gasnetex_internal.h:747
NetworkSegmentInfo::MemoryTypeExtraData memextra
Definition gasnetex_internal.h:751
gex_segment_opaque_t segment
Definition gasnetex_internal.h:749
uintptr_t limit
Definition gasnetex_internal.h:747
NetworkSegmentInfo::MemoryType memtype
Definition gasnetex_internal.h:750
gex_ep_index_t ep_index
Definition gasnetex_internal.h:748
Definition gasnetex_internal.h:44
gex_ep_index_t ep_index
Definition gasnetex_internal.h:46
uintptr_t base
Definition gasnetex_internal.h:45
Definition gasnetex_internal.h:208
static const size_t LOG2_GROUPSIZE
Definition gasnetex_internal.h:209
PendingCompletion entries[1<< LOG2_GROUPSIZE]
Definition gasnetex_internal.h:211
Definition gasnetex_internal.h:584
gex_ep_index_t src_ep_index
Definition gasnetex_internal.h:594
PendingPutHeader()
Definition gasnetex_internal.h:589
uintptr_t tgt_ptr
Definition gasnetex_internal.h:599
gex_ep_index_t tgt_ep_index
Definition gasnetex_internal.h:594
gex_am_arg_t arg0
Definition gasnetex_internal.h:595
~PendingPutHeader()
Definition gasnetex_internal.h:590
size_t payload_bytes
Definition gasnetex_internal.h:600
static const size_t MAX_HDR_SIZE
Definition gasnetex_internal.h:596
size_t hdr_size
Definition gasnetex_internal.h:597
atomic< PendingPutHeader * > next_put
Definition gasnetex_internal.h:603
XmitSrcDestPair * xpair
Definition gasnetex_internal.h:602
unsigned char hdr_data[MAX_HDR_SIZE]
Definition gasnetex_internal.h:598
uintptr_t src_ptr
Definition gasnetex_internal.h:599
gex_rank_t target
Definition gasnetex_internal.h:593
PendingCompletion * local_comp
Definition gasnetex_internal.h:601
Definition gasnetex_internal.h:612
ReverseGetter * rgetter
Definition gasnetex_internal.h:621
unsigned char hdr_data[MAX_HDR_SIZE]
Definition gasnetex_internal.h:628
PendingReverseGet()
Definition gasnetex_internal.h:617
static const size_t MAX_HDR_SIZE
Definition gasnetex_internal.h:626
gex_am_arg_t arg0
Definition gasnetex_internal.h:625
~PendingReverseGet()
Definition gasnetex_internal.h:618
gex_ep_index_t tgt_ep_index
Definition gasnetex_internal.h:624
size_t payload_bytes
Definition gasnetex_internal.h:630
size_t hdr_size
Definition gasnetex_internal.h:627
gex_ep_index_t src_ep_index
Definition gasnetex_internal.h:624
uintptr_t tgt_ptr
Definition gasnetex_internal.h:629
gex_rank_t srcrank
Definition gasnetex_internal.h:623
PendingReverseGet * next_rget
Definition gasnetex_internal.h:622
uintptr_t src_ptr
Definition gasnetex_internal.h:629
Definition gasnetex_internal.h:281
gex_ep_index_t source_ep_index
Definition gasnetex_internal.h:308
PreparedMessage()
Definition gasnetex_internal.h:286
PendingPutHeader * put
Definition gasnetex_internal.h:316
OutbufMetadata * databuf
Definition gasnetex_internal.h:312
unsigned short msgid
Definition gasnetex_internal.h:309
void * temp_buffer
Definition gasnetex_internal.h:311
gex_am_src_desc_opaque_t srcdesc
Definition gasnetex_internal.h:315
Strategy
Definition gasnetex_internal.h:291
@ STRAT_MEDIUM_IMMEDIATE
Definition gasnetex_internal.h:295
@ STRAT_RGET_IMMEDIATE
Definition gasnetex_internal.h:301
@ STRAT_SHORT_PBUF
Definition gasnetex_internal.h:294
@ STRAT_PUT_PBUF
Definition gasnetex_internal.h:304
@ STRAT_LONG_IMMEDIATE
Definition gasnetex_internal.h:299
@ STRAT_LONG_PBUF
Definition gasnetex_internal.h:300
@ STRAT_SHORT_IMMEDIATE
Definition gasnetex_internal.h:293
@ STRAT_MEDIUM_PREP
Definition gasnetex_internal.h:298
@ STRAT_RGET_PBUF
Definition gasnetex_internal.h:302
@ STRAT_MEDIUM_MALLOCSRC
Definition gasnetex_internal.h:297
@ STRAT_MEDIUM_PBUF
Definition gasnetex_internal.h:296
@ STRAT_PUT_IMMEDIATE
Definition gasnetex_internal.h:303
@ STRAT_UNKNOWN
Definition gasnetex_internal.h:292
gex_ep_index_t target_ep_index
Definition gasnetex_internal.h:308
OutbufMetadata * pktbuf
Definition gasnetex_internal.h:313
Strategy strategy
Definition gasnetex_internal.h:306
uintptr_t dest_payload_addr
Definition gasnetex_internal.h:310
int pktidx
Definition gasnetex_internal.h:314
~PreparedMessage()
Definition gasnetex_internal.h:287
gex_rank_t target
Definition gasnetex_internal.h:307
Definition gasnetex_internal.h:391
struct Realm::XmitSrcDestPair::LongRgetData::@12::@14 l
struct Realm::XmitSrcDestPair::LongRgetData::@12::@15 r
size_t payload_bytes
Definition gasnetex_internal.h:393
OutbufMetadata * databuf
Definition gasnetex_internal.h:397
const void * payload_base
Definition gasnetex_internal.h:392
gex_ep_index_t src_ep_index
Definition gasnetex_internal.h:401
gex_ep_index_t tgt_ep_index
Definition gasnetex_internal.h:401
uintptr_t dest_addr
Definition gasnetex_internal.h:394
Definition gasnetex_internal.h:406
const void * src_addr
Definition gasnetex_internal.h:407
uintptr_t dest_addr
Definition gasnetex_internal.h:408
size_t payload_bytes
Definition gasnetex_internal.h:409
PendingPutHeader * put
Definition gasnetex_internal.h:410
Definition gasnetex_wrapper.h:251
Handle structure that contains the full API for the wrapper.
Definition gasnetex_wrapper.h:77
unsigned short msgid
Definition ucp_internal.h:2