Realm
A distributed, event-based tasking library
Loading...
Searching...
No Matches
channel.h
Go to the documentation of this file.
1/*
2 * Copyright 2025 Los Alamos National Laboratory, Stanford University, NVIDIA Corporation
3 * SPDX-License-Identifier: Apache-2.0
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#ifndef LOWLEVEL_CHANNEL
19#define LOWLEVEL_CHANNEL
20
21#include "realm/realm_config.h"
22
23#include <stdio.h>
24#include <stdlib.h>
25#include <stdint.h>
26#ifndef REALM_ON_WINDOWS
27#include <unistd.h>
28#include <pthread.h>
29#endif
30#include <fcntl.h>
31#include <map>
32#include <vector>
33#include <deque>
34#include <queue>
35#include <unordered_set>
36#include <assert.h>
37#include <string.h>
38
39#include "realm/id.h"
40#include "realm/mem_impl.h"
41#include "realm/inst_impl.h"
42#include "realm/bgwork.h"
43#include "realm/utils.h"
45
46namespace Realm {
47
48 class XferDes;
49 class XferDesQueue;
50 class Channel;
51 class DmaRequest;
52 class TransferIterator;
53
54 extern Logger log_new_dma;
55
56 typedef unsigned long long XferDesID;
57
58 // clang-format off
59#define REALM_XFERDES_KINDS(__op__) \
60 __op__(XFER_NONE) \
61 __op__(XFER_DISK_READ) \
62 __op__(XFER_DISK_WRITE) \
63 __op__(XFER_SSD_READ) \
64 __op__(XFER_SSD_WRITE) \
65 __op__(XFER_GPU_TO_FB) \
66 __op__(XFER_GPU_FROM_FB) \
67 __op__(XFER_GPU_IN_FB) \
68 __op__(XFER_GPU_PEER_FB) \
69 __op__(XFER_MEM_CPY) \
70 __op__(XFER_GASNET_READ) \
71 __op__(XFER_GASNET_WRITE) \
72 __op__(XFER_REMOTE_WRITE) \
73 __op__(XFER_HDF5_READ) \
74 __op__(XFER_HDF5_WRITE) \
75 __op__(XFER_FILE_READ) \
76 __op__(XFER_FILE_WRITE) \
77 __op__(XFER_ADDR_SPLIT) \
78 __op__(XFER_MEM_FILL) \
79 __op__(XFER_GPU_SC_IN_FB) \
80 __op__(XFER_GPU_SC_PEER_FB)
81 // clang-format on
82
84 {
85#define C_ENUMS(name) name,
87#undef C_ENUMS
88 };
89
90 inline std::ostream &operator<<(std::ostream &os, XferDesKind kind)
91 {
92#define STRING_KIND_CASE(kind) \
93 case XferDesKind::kind: \
94 return os << #kind;
95 switch(kind) {
97 }
98#undef STRING_KIND_CASE
99 return os << "UNKNOWN_KIND";
100 }
101
102 class Request {
103 public:
110 // a pointer to the owning xfer descriptor
111 // this should set at Request creation
114 // src/dst offset in the src/dst instance
116 // src/dst (line) strides
118 // src/dst plane strides
120 // number of bytes being transferred
122 // a flag indicating whether this request read has been done
124 // a flag indicating whether this request write has been done
126 // whether I am a 1D or 2D transfer
128 // sequence info - used to update read/write counts, handle reordering
131 };
132
134 public:
138
139 // NOT thread-safe - caller must ensure neither *this nor other is being
140 // modified during this call
142
143 // imports data from this assembler into another (this is thread-safe
144 // on the `other` but assumes no changes being made on `this`)
145 void import(SequenceAssembler &other) const;
146
147 bool empty() const;
148
149 // asks if a span exists - return value is number of bytes from the
150 // start that do
151 size_t span_exists(size_t start, size_t count);
152
153 // returns the amount by which the contiguous range has been increased
154 // (i.e. from [pos, pos+retval) )
155 size_t add_span(size_t pos, size_t count);
156
157 protected:
159
161 contig_amount_x2; // everything from [0, contig_amount) is covered - LSB indicates
162 // potential presence of noncontig spans
163 atomic<size_t> first_noncontig; // nothing in [contig_amount, first_noncontig)
164 atomic<Mutex *> mutex; // created on first use
165 std::map<size_t, size_t> spans; // noncontiguous spans
166 };
167
168 class MemcpyRequest : public Request {
169 public:
170 const void *src_base;
171 void *dst_base;
172 // size_t nbytes;
173 };
174
175 class GASNetRequest : public Request {
176 public:
177 void *mem_base; // could be source or dest
178 off_t gas_off;
179 // off_t src_offset;
180 // size_t nbytes;
181 };
182
184 public:
185 // NodeID dst_node;
186 const void *src_base;
187 // void *dst_base;
188 // size_t nbytes;
189 };
190
208
214
215 // default constructor == no reduction requested
217 : id(0)
218 , is_fold(false)
219 , in_place(false)
220 , is_exclusive(false)
221 {}
222
223 XferDesRedopInfo(ReductionOpID _id, bool _is_fold, bool _in_place, bool _is_exclusive)
224 : id(_id)
225 , is_fold(_is_fold)
226 , in_place(_in_place)
227 , is_exclusive(_is_exclusive)
228 {}
229 };
230
231 // a control port is used to steer inputs/outputs of transfer descriptors -
232 // the information is encoded into 32b packets which may be read/written
233 // at different times due to flow control, so the encoder and decoder
234 // both need to be stateful
235 namespace ControlPort {
236 // apart from the first control word (which carries the space_shift
237 // amount), the bottom two bits of each control word mean:
238 static const unsigned CTRL_LO_MORE = 0; // 00: count.lo, space_index, not last
239 static const unsigned CTRL_LO_LAST = 1; // 01: count.lo, space_index, last
240 static const unsigned CTRL_MID = 2; // 10: count.mid (32 bits)
241 static const unsigned CTRL_HIGH = 3; // 11: count.hi (2+space_shift bits)
242
243 class Encoder {
244 public:
247
248 void set_port_count(size_t ports);
249
250 // encodes some/all of the { count, port, last } packet into the next
251 // 32b - returns true if encoding is complete or false if it should
252 // be called again with the same arguments for another 32b packet
253 bool encode(unsigned &data, size_t count, int port, bool last);
254
255 protected:
256 unsigned short port_shift;
257
267 unsigned char state;
268 };
269
270 class Decoder {
271 public:
274
275 // decodes the next 32b of packed data, returning true if a complete
276 // { count, port, last } has been received
277 bool decode(unsigned data, size_t &count, int &port, bool &last);
278
279 protected:
281 unsigned short port_shift;
282 };
283 }; // namespace ControlPort
284
285 class XferDes {
286 public:
287 // a pointer to the DmaRequest that contains this XferDes
288 uintptr_t dma_op;
290 // ID of the node that launches this XferDes
292 // uint64_t /*bytes_submit, */bytes_read, bytes_write/*, bytes_total*/;
296 // current input and output port mask
319 std::vector<XferPort> input_ports, output_ports;
324 size_t remaining_count; // units of bytes for normal (elements for serialized data?)
326 };
328 // maximum size for a single request
329 uint64_t max_req_size;
330 // priority of the containing XferDes
332 // current, previous and next XferDes in the chain, XFERDES_NO_GUID
333 // means this XferDes is the first/last one.
334 XferDesID guid; //, pre_xd_guid, next_xd_guid;
335 // XferDesKind of the Xfer Descriptor
337 enum
338 {
340 };
341 // channel this XferDes describes
343
346 // for most fills, we can use an inline buffer with good alignment
347 static const size_t ALIGNED_FILL_STORAGE_SIZE = 32;
352 AlignedStorage inline_fill_storage;
353
354 // xd_lock is designed to provide thread-safety for
355 // SIMULTANEOUS invocation to get_requests,
356 // notify_request_read_done, and notify_request_write_done
358 // default iterators provided to generate requests
359 // Layouts::GenericLayoutIterator<DIM>* li;
360 // SJT:what is this for?
361 // unsigned offset_idx;
362 // used to track by upstream/downstream xds so that we can safely
363 // sleep xds that are stalled
365
367
369
371
372 // intrusive list for queued XDs in a channel
379
380 protected:
381 // this will be removed soon
382 // queue that contains all available free requests
384 std::queue<Request *> available_reqs;
385
386 public:
387 XferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid,
388 const std::vector<XferDesPortInfo> &inputs_info,
389 const std::vector<XferDesPortInfo> &outputs_info, int _priority,
390 const void *_fill_data, size_t fill_size);
391
392 // transfer descriptors are reference counted rather than explcitly
393 // deleted
394 void add_reference(void);
396
398
399 protected:
400 virtual ~XferDes();
401
402 public:
404
405 virtual long get_requests(Request **requests, long nr) = 0;
406
408
410
411 virtual void flush();
412
413 long default_get_requests(Request **requests, long nr, unsigned flags = 0);
416
417 virtual void update_bytes_read(int port_idx, size_t offset, size_t size);
418 virtual void update_bytes_write(int port_idx, size_t offset, size_t size);
419 void update_pre_bytes_write(int port_idx, size_t offset, size_t size);
420 void update_pre_bytes_total(int port_idx, size_t pre_bytes_total);
421 void update_next_bytes_read(int port_idx, size_t offset, size_t size);
422
423 // called once iteration is complete, but we need to track in flight
424 // writes, flush byte counts, etc.
426
427 void mark_completed(TimeLimit work_until);
428
429 unsigned current_progress(void);
430
431 // checks to see if progress has been made since the last read of the
432 // progress counter - atomically marks the xd for wakeup if not
433 bool check_for_progress(unsigned last_counter);
434
435 // updates the progress counter, waking up the xd if needed
436 void update_progress(void);
437
438 virtual bool request_available()
439 {
441 return !available_reqs.empty();
442 }
443
445 {
446 Request *req;
447 {
449 req = available_reqs.front();
450 available_reqs.pop();
451 }
452 req->is_read_done = false;
453 req->is_write_done = false;
454 // by default, an "active" request holds a reference on the xd
456 return req;
457 }
458
459 virtual void enqueue_request(Request *req)
460 {
461 {
463 available_reqs.push(req);
464 }
465 // update progress counter if iteration isn't completed yet - it might
466 // have been waiting for another request object
470 }
471
473 public:
474 void defer(XferDesQueue *_xferDes_queue, XferDes *_xd, Event wait_on);
475
476 virtual void event_triggered(bool poisoned, TimeLimit work_until);
477 virtual void print(std::ostream &os) const;
478 virtual Event get_finish_event(void) const;
479
480 protected:
482 XferDes *xd; // TODO: eliminate this based on a known offset
483 };
485
486 // helper widget to cache spans so that SequenceAssembler updates are as
487 // large as possible
488 template <void (XferDes::*UPDATE)(int port_idx, size_t offset, size_t size)>
490 public:
491 SequenceCache(XferDes *_xd, size_t _flush_bytes = 0);
492
493 void add_span(int port_idx, size_t offset, size_t size);
494 void flush();
495
496 protected:
497 static const size_t MAX_ENTRIES = 4;
498
504 };
507
509
510 // a helper routine for individual XferDes implementations - tries to get
511 // addresses and check flow control for at least 'min_xfer_size' bytes
512 // worth of transfers from a single input to a single output, and returns
513 // the number of bytes that can be transferred before another call to
514 // this method
515 // returns 0 if the transfer is complete OR if there are fewer than the
516 // minimum requested bytes available and there's reason to believe that
517 // trying again later will result in a larger chunk
518 // as a side effect, the input/output control information is updated - the
519 // actual input/output ports involved in the next transfer are stored there
520 size_t get_addresses(size_t min_xfer_size, ReadSequenceCache *rseqcache);
521 size_t get_addresses(size_t min_xfer_size, ReadSequenceCache *rseqcache,
522 const InstanceLayoutPieceBase *&in_nonaffine,
523 const InstanceLayoutPieceBase *&out_nonaffine);
524
525 // after a call to 'get_addresses', this call updates the various data
526 // structures to record that transfers for 'total_{read,write}_bytes' bytes
527 // were at least initiated - return value is whether iteration is complete
528 bool record_address_consumption(size_t total_read_bytes, size_t total_write_bytes);
529
530 // fills can be more efficient if the fill data is replicated into a larger
531 // block
532 void replicate_fill_data(size_t new_size);
533 };
534
535 class MemcpyChannel;
536
537 class MemfillChannel;
538
539 class MemfillXferDes : public XferDes {
540 public:
541 MemfillXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node,
542 XferDesID _guid, const std::vector<XferDesPortInfo> &inputs_info,
543 const std::vector<XferDesPortInfo> &outputs_info, int _priority,
544 const void *_fill_data, size_t _fill_size, size_t _fill_total);
545
546 long get_requests(Request **requests, long nr);
547
548 virtual bool request_available();
550 virtual void enqueue_request(Request *req);
551
553 };
554
555 class MemreduceChannel;
556
557 class MemreduceXferDes : public XferDes {
558 public:
559 MemreduceXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node,
560 XferDesID _guid, const std::vector<XferDesPortInfo> &inputs_info,
561 const std::vector<XferDesPortInfo> &outputs_info, int _priority,
562 XferDesRedopInfo _redop_info);
563
564 long get_requests(Request **requests, long nr);
565
567
568 protected:
571 };
572
573 class GASNetChannel;
574
575 class GASNetXferDes : public XferDes {
576 public:
577 GASNetXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node,
578 XferDesID _guid, const std::vector<XferDesPortInfo> &inputs_info,
579 const std::vector<XferDesPortInfo> &outputs_info, int _priority);
580
581 ~GASNetXferDes() { free(gasnet_reqs); }
582
583 long get_requests(Request **requests, long nr);
586 void flush();
587
589
590 private:
591 GASNetRequest *gasnet_reqs;
592 };
593
594 class RemoteWriteChannel;
595
597 public:
598 RemoteWriteXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node,
599 XferDesID _guid, const std::vector<XferDesPortInfo> &inputs_info,
600 const std::vector<XferDesPortInfo> &outputs_info, int _priority);
601
602 ~RemoteWriteXferDes() { free(requests); }
603
604 long get_requests(Request **requests, long nr);
607 void flush();
608
609 // doesn't do pre_bytes_write updates, since the remote write message
610 // takes care of it with lower latency
611 virtual void update_bytes_write(int port_idx, size_t offset, size_t size);
612
614
615 // writes directly to a contiguous chunk of destination
620
621 static void handle_message(NodeID sender, const Write1DMessage &args,
622 const void *data, size_t datalen);
623 static bool handle_inline(NodeID sender, const Write1DMessage &args,
624 const void *data, size_t datalen, TimeLimit work_until);
625 };
626
627 private:
628 RemoteWriteRequest *requests;
629 // char *dst_buf_base;
630 };
631
633 public:
634 virtual ~XferDesFactory() {}
635 virtual bool needs_release() = 0;
636
637 virtual void create_xfer_des(uintptr_t dma_op, NodeID launch_node, NodeID target_node,
638 XferDesID guid,
639 const std::vector<XferDesPortInfo> &inputs_info,
640 const std::vector<XferDesPortInfo> &outputs_info,
641 int priority, XferDesRedopInfo redop_info,
642 const void *fill_data, size_t fill_size,
643 size_t fill_total) = 0;
644 };
645
647 // RegionInstance inst;
648 uintptr_t dma_op;
651 uintptr_t channel;
652 };
653
655 static void handle_message(NodeID sender, const SimpleXferDesCreateMessage &args,
656 const void *data, size_t datalen);
657 };
658
659 // a simple xfer des factory knows how to create an xfer des with no
660 // extra information on a single channel
662 public:
663 SimpleXferDesFactory(uintptr_t _channel);
664
665 virtual bool needs_release();
666
667 virtual void create_xfer_des(uintptr_t dma_op, NodeID launch_node, NodeID target_node,
668 XferDesID guid,
669 const std::vector<XferDesPortInfo> &inputs_info,
670 const std::vector<XferDesPortInfo> &outputs_info,
671 int priority, XferDesRedopInfo redop_info,
672 const void *fill_data, size_t fill_size,
673 size_t fill_total);
674
675 protected:
676 uintptr_t channel;
677 };
678
679 class RemoteChannelInfo;
680 class RemoteChannel;
681
682 // TODO(apryakhin): Deprecate ChannelCopyInfo
684 ChannelCopyInfo(Memory _src_mem, Memory _dst_mem, Memory _ind_mem = Memory::NO_MEMORY,
685 size_t _num_spaces = 1, bool _is_scatter = false,
686 bool _is_ranges = false, bool _is_direct = true,
687 bool _oor_possible = false, size_t _addr_size = 0)
688 : src_mem(_src_mem)
689 , dst_mem(_dst_mem)
690 , ind_mem(_ind_mem)
691 , num_spaces(_num_spaces)
692 , is_scatter(_is_scatter)
693 , is_ranges(_is_ranges)
694 , is_direct(_is_direct)
695 , oor_possible(_oor_possible)
696 , addr_size(_addr_size)
697 {}
706 size_t addr_size;
707 };
708
709 std::ostream &operator<<(std::ostream &os, const ChannelCopyInfo &info);
710 bool operator==(const ChannelCopyInfo &lhs, const ChannelCopyInfo &rhs);
711
712 class Channel {
713 public:
715 : node(Network::my_node_id)
716 , kind(_kind)
717 {}
718 virtual ~Channel(){};
719
720 // TODO: make pure virtual
721 virtual void shutdown() {}
722
723 // most channels can hand out a factory that makes arbitrary xds on
724 // that channel
726
727 public:
728 // which node manages this channel
730 // the kind of XferDes this channel can accept
732
733 // whether the channel has a redop path
734 bool has_redop_path{false};
735
736 // whether the channel has a non-redop path
738
739 virtual bool supports_redop(ReductionOpID redop_id) const;
740
741 virtual bool support_idindexed_fields(Memory src_mem, Memory dst_mem) const
742 {
743 return false;
744 };
745
746 // attempt to make progress on the specified xferdes
747 virtual long progress_xd(XferDes *xd, long max_nr);
748
749 /*
750 * Submit nr asynchronous requests into the channel instance.
751 * This is supposed to be a non-blocking function call, and
752 * should immediately return the number of requests that are
753 * successfully submitted.
754 */
755 virtual long submit(Request **requests, long nr) = 0;
756
757 /*
758 *
759 */
760 virtual void pull() = 0;
761
762 /*
763 * Return the number of slots that are available for
764 * submitting requests
765 */
766 virtual long available() = 0;
767
779 struct MemBitmask {
781 static const int BITMASK_SIZE = (1 << ID::MEMORY_INDEX_WIDTH) >> 6;
783 };
784 union {
788 };
789 union {
793 };
795 unsigned bandwidth; // units = MB/s = B/us
796 unsigned latency; // units = ns
797 unsigned frag_overhead; // units = ns
798 unsigned char max_src_dim, max_dst_dim;
799 bool redops_allowed; // TODO: list of redops?
800 bool serdez_allowed; // TODO: list of serdez ops?
801
802 // constructor
804 : src_mem(Memory::NO_MEMORY)
805 , dst_mem(Memory::NO_MEMORY)
806 {}
807
808 // mutators to modify less-common fields
809 SupportedPath &set_max_dim(int src_and_dst_dim);
810 SupportedPath &set_max_dim(int src_dim, int dst_dim);
813
814 // only valid when a SupportedPath is modifiable by the above methods
815 // (i.e. only on the creator node and only until another path is added)
817
819 MemBitmask &bitmask);
820 };
821
822 const std::vector<SupportedPath> &get_paths(void) const;
823
825
826 // returns 0 if the path is not supported, or a strictly-positive
827 // estimate of the time required (in nanoseconds) to transfer data
828 // along a supported path
829 virtual uint64_t
830 supports_path(ChannelCopyInfo channel_copy_info, CustomSerdezID src_serdez_id,
831 CustomSerdezID dst_serdez_id, ReductionOpID redop_id,
832 size_t total_bytes, const std::vector<size_t> *src_frags,
833 const std::vector<size_t> *dst_frags, XferDesKind *kind_ret = 0,
834 unsigned *bw_ret = 0, unsigned *lat_ret = 0);
838 virtual bool supports_indirection_memory(Memory mem) const;
839
842
843 virtual bool needs_wrapping_iterator() const { return false; }
844
846
847 void print(std::ostream &os) const;
848
849 virtual void enqueue_ready_xd(XferDes *xd) = 0;
850 virtual void wakeup_xd(XferDes *xd) = 0;
851
852 protected:
853 // returns the added path for further modification, but reference is
854 // only valid until the next call to 'add_path'
856 unsigned bandwidth, unsigned latency, unsigned frag_overhead,
857 XferDesKind xd_kind);
859 bool dst_global, unsigned bandwidth, unsigned latency,
860 unsigned frag_overhead, XferDesKind xd_kind);
861 SupportedPath &add_path(Memory::Kind src_kind, bool src_global,
862 span<const Memory> dst_mems, unsigned bandwidth,
863 unsigned latency, unsigned frag_overhead,
864 XferDesKind xd_kind);
865 SupportedPath &add_path(Memory::Kind src_kind, bool src_global, Memory::Kind dst_kind,
866 bool dst_global, unsigned bandwidth, unsigned latency,
867 unsigned frag_overhead, XferDesKind xd_kind);
868 // TODO: allow rdma path to limit by kind?
869 SupportedPath &add_path(bool local_loopback, unsigned bandwidth, unsigned latency,
870 unsigned frag_overhead, XferDesKind xd_kind);
871
872 std::vector<SupportedPath> paths;
873 };
874
875 std::ostream &operator<<(std::ostream &os, const Channel::SupportedPath &p);
876
877 class LocalChannel : public Channel {
878 public:
880
881 virtual XferDes *create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid,
882 const std::vector<XferDesPortInfo> &inputs_info,
883 const std::vector<XferDesPortInfo> &outputs_info,
884 int priority, XferDesRedopInfo redop_info,
885 const void *fill_data, size_t fill_size,
886 size_t fill_total) = 0;
887
889
890 protected:
892 };
893
894 // polymorphic container for info necessary to create a remote channel
896 public:
898
900
901 template <typename S>
902 static RemoteChannelInfo *deserialize_new(S &deserializer);
903 };
904
905 template <typename S>
906 bool serialize(S &serializer, const RemoteChannelInfo &rci);
907
909 public:
910 SimpleRemoteChannelInfo(NodeID _owner, XferDesKind _kind, uintptr_t _remote_ptr,
911 const std::vector<Channel::SupportedPath> &_paths,
912 const std::vector<Memory> &indirect_memories);
913 SimpleRemoteChannelInfo(NodeID _owner, XferDesKind _kind, uintptr_t _remote_ptr,
914 const std::vector<Channel::SupportedPath> &_paths);
915
917
918 template <typename S>
919 bool serialize(S &serializer) const;
920
921 template <typename S>
922 static RemoteChannelInfo *deserialize_new(S &deserializer);
923
924 protected:
926
930
933 uintptr_t remote_ptr;
934 std::vector<Channel::SupportedPath> paths;
935 std::vector<Memory> indirect_memories;
936 };
937
938 class RemoteChannel : public Channel {
939 protected:
941
942 RemoteChannel(uintptr_t _remote_ptr, const std::vector<Memory> &indirect_memories);
943 RemoteChannel(uintptr_t _remote_ptr);
944
945 void shutdown() override;
946
948
949 public:
950 uintptr_t get_remote_ptr() const;
951
953
954 bool supports_redop(ReductionOpID redop_id) const override;
955
956 /*
957 * Submit nr asynchronous requests into the channel instance.
958 * This is supposed to be a non-blocking function call, and
959 * should immediately return the number of requests that are
960 * successfully submitted.
961 */
962 long submit(Request **requests, long nr) override;
963
964 /*
965 *
966 */
967 void pull() override;
968
969 /*
970 * Return the number of slots that are available for
971 * submitting requests
972 */
973 long available() override;
974
975 uint64_t supports_path(ChannelCopyInfo channel_copy_info,
976 CustomSerdezID src_serdez_id, CustomSerdezID dst_serdez_id,
977 ReductionOpID redop_id, size_t total_bytes,
978 const std::vector<size_t> *src_frags,
979 const std::vector<size_t> *dst_frags,
980 XferDesKind *kind_ret = 0, unsigned *bw_ret = 0,
981 unsigned *lat_ret = 0) override;
982
986 bool supports_indirection_memory(Memory mem) const override;
987
988 void enqueue_ready_xd(XferDes *xd) override { assert(0); }
989 void wakeup_xd(XferDes *xd) override { assert(0); }
990
991 protected:
992 mutable RWLock mutex;
993 uintptr_t remote_ptr;
994 std::unordered_set<ReductionOpID> supported_redops;
996 const std::set<Memory> indirect_memories;
997 };
998
999 template <typename CHANNEL, typename XD>
1001 public:
1002 XDQueue(LocalChannel *_channel, const std::string &_name, bool _ordered);
1003
1004 void enqueue_xd(XD *xd, bool at_front = false);
1005
1006 virtual bool do_work(TimeLimit work_until);
1007
1008 protected:
1009 friend CHANNEL;
1010
1015 };
1016
1017 template <typename CHANNEL, typename XD>
1019 public:
1021 const std::string &_name, int _numa_domain = -1);
1022
1023 virtual void shutdown();
1024
1025 virtual void enqueue_ready_xd(XferDes *xd);
1026 virtual void wakeup_xd(XferDes *xd);
1027
1028 // TODO: remove!
1029 void pull() { assert(0); }
1031 {
1032 assert(0);
1033 return 0;
1034 }
1035 virtual long progress_xd(XferDes *xd, long max_nr)
1036 {
1037 assert(0);
1038 return 0;
1039 }
1040
1041 protected:
1043 };
1044
1045 class MemfillChannel : public SingleXDQChannel<MemfillChannel, MemfillXferDes> {
1046 public:
1048
1049 // multiple concurrent memfills ok
1050 static const bool is_ordered = false;
1051
1053
1054 virtual XferDes *create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid,
1055 const std::vector<XferDesPortInfo> &inputs_info,
1056 const std::vector<XferDesPortInfo> &outputs_info,
1057 int priority, XferDesRedopInfo redop_info,
1058 const void *fill_data, size_t fill_size,
1059 size_t fill_total);
1060
1061 virtual long submit(Request **requests, long nr);
1062
1064 };
1065
1066 class MemreduceChannel : public SingleXDQChannel<MemreduceChannel, MemreduceXferDes> {
1067 public:
1069
1070 // multiple concurrent memreduces ok
1071 static const bool is_ordered = false;
1072
1073 virtual bool supports_redop(ReductionOpID redop_id) const;
1074
1075 virtual XferDes *create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid,
1076 const std::vector<XferDesPortInfo> &inputs_info,
1077 const std::vector<XferDesPortInfo> &outputs_info,
1078 int priority, XferDesRedopInfo redop_info,
1079 const void *fill_data, size_t fill_size,
1080 size_t fill_total);
1081
1082 virtual long submit(Request **requests, long nr);
1083
1085 };
1086
1087 class GASNetChannel : public SingleXDQChannel<GASNetChannel, GASNetXferDes> {
1088 public:
1091
1092 // no more than one GASNet xfer of each type at a time
1093 static const bool is_ordered = true;
1094
1095 virtual XferDes *create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid,
1096 const std::vector<XferDesPortInfo> &inputs_info,
1097 const std::vector<XferDesPortInfo> &outputs_info,
1098 int priority, XferDesRedopInfo redop_info,
1099 const void *fill_data, size_t fill_size,
1100 size_t fill_total);
1101
1102 long submit(Request **requests, long nr);
1103 };
1104
1106 : public SingleXDQChannel<RemoteWriteChannel, RemoteWriteXferDes> {
1107 public:
1110
1111 // multiple concurrent RDMAs ok
1112 static const bool is_ordered = false;
1113
1114 virtual XferDes *create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid,
1115 const std::vector<XferDesPortInfo> &inputs_info,
1116 const std::vector<XferDesPortInfo> &outputs_info,
1117 int priority, XferDesRedopInfo redop_info,
1118 const void *fill_data, size_t fill_size,
1119 size_t fill_total);
1120
1121 long submit(Request **requests, long nr);
1122 };
1123
1124 class TransferOperation;
1128
1129 static void handle_message(NodeID sender, const NotifyXferDesCompleteMessage &args,
1130 const void *data, size_t datalen);
1131
1133 };
1134
1135#if 0 // TODO: DELETE
1136 struct XferDesRemoteWriteMessage {
1137 RemoteWriteRequest *req;
1138 XferDesID next_xd_guid;
1139 int next_port_idx;
1140 unsigned span_size; // 32 bits to fit in packet size
1141 size_t span_start, /*span_size,*/ pre_bytes_total;
1142
1143 static void handle_message(NodeID sender,
1144 const XferDesRemoteWriteMessage &args,
1145 const void *data,
1146 size_t datalen);
1147
1148 static void send_request(NodeID target,
1149 const RemoteAddress& dst_buf,
1150 const void *src_buf, size_t nbytes,
1151 RemoteWriteRequest* req,
1152 XferDesID next_xd_guid,
1153 int next_port_idx,
1154 size_t span_start,
1155 size_t span_size,
1156 size_t pre_bytes_total)
1157 {
1159 src_buf, nbytes,
1160 dst_buf);
1161 amsg->req = req;
1162 amsg->next_xd_guid = next_xd_guid;
1163 amsg->next_port_idx = next_port_idx;
1164 amsg->span_start = span_start;
1165 assert(span_size <= UINT_MAX);
1166 amsg->span_size = (unsigned)span_size;
1167 amsg->pre_bytes_total = pre_bytes_total;
1168 amsg.commit();
1169 }
1170
1171 static void send_request(NodeID target,
1172 const RemoteAddress& dst_buf,
1173 const void *src_buf, size_t nbytes, off_t src_str,
1174 size_t nlines, RemoteWriteRequest* req,
1175 XferDesID next_xd_guid,
1176 int next_port_idx,
1177 size_t span_start,
1178 size_t span_size,
1179 size_t pre_bytes_total)
1180 {
1181 ActiveMessage<XferDesRemoteWriteMessage> amsg(target,
1182 src_buf, nbytes,
1183 nlines, src_str,
1184 dst_buf);
1185 amsg->req = req;
1186 amsg->next_xd_guid = next_xd_guid;
1187 amsg->next_port_idx = next_port_idx;
1188 amsg->span_start = span_start;
1189 assert(span_size <= UINT_MAX);
1190 amsg->span_size = (unsigned)span_size;
1191 amsg->pre_bytes_total = pre_bytes_total;
1192 amsg.commit();
1193 }
1194 };
1195
1196 struct XferDesRemoteWriteAckMessage {
1197 RemoteWriteRequest* req;
1198
1199 static void handle_message(NodeID sender,
1200 const XferDesRemoteWriteAckMessage &args,
1201 const void *data,
1202 size_t datalen);
1203 static void send_request(NodeID target, RemoteWriteRequest* req)
1204 {
1205 ActiveMessage<XferDesRemoteWriteAckMessage> amsg(target);
1206 amsg->req = req;
1207 amsg.commit();
1208 }
1209 };
1210#endif
1211
1214 static void handle_message(NodeID sender, const XferDesDestroyMessage &args,
1215 const void *data, size_t datalen);
1216 static void send_request(NodeID target, XferDesID guid)
1217 {
1219 amsg->guid = guid;
1220 amsg.commit();
1221 }
1222 };
1223
1228
1229 static void handle_message(NodeID sender, const UpdateBytesTotalMessage &args,
1230 const void *data, size_t datalen);
1231 };
1232
1237
1238 static void handle_message(NodeID sender, const UpdateBytesWriteMessage &args,
1239 const void *data, size_t datalen);
1240
1241 static void send_request(NodeID target, XferDesID guid, int port_idx,
1242 size_t span_start, size_t span_size)
1243 {
1245 amsg->guid = guid;
1246 amsg->port_idx = port_idx;
1247 amsg->span_start = span_start;
1248 amsg->span_size = span_size;
1249 amsg.commit();
1250 }
1251 };
1252
1257
1258 static void handle_message(NodeID sender, const UpdateBytesReadMessage &args,
1259 const void *data, size_t datalen);
1260
1261 static void send_request(NodeID target, XferDesID guid, int port_idx,
1262 size_t span_start, size_t span_size)
1263 {
1265 amsg->guid = guid;
1266 amsg->port_idx = port_idx;
1267 amsg->span_start = span_start;
1268 amsg->span_size = span_size;
1269 amsg.commit();
1270 }
1271 };
1272
1273 // object used to hold input progress (pre_write and bytes_total) before
1274 // we've actually created the correct xd
1276 public:
1278
1279 protected:
1281
1282 public:
1285
1286 void update_pre_bytes_write(int port_idx, size_t span_start, size_t span_size);
1287 void update_pre_bytes_total(int port_idx, size_t pre_bytes_total);
1288
1290
1292
1294
1295 protected:
1296 static const int INLINE_PORTS = 4;
1303 std::map<int, size_t> extra_bytes_total;
1304 std::map<int, SequenceAssembler> extra_pre_write;
1305 };
1306
1308 public:
1309 enum
1310 {
1312 INDEX_BITS = 32
1315 //: core_rsrv("DMA request queue", crs, CoreReservationParameters())
1316 {
1317 // reserve the first several guid
1319 }
1320
1321 ~XferDesQueue() { assert(guid_to_xd.empty()); }
1322
1324
1325 XferDesID get_guid(NodeID execution_node)
1326 {
1327 // GUID rules:
1328 // First NODE_BITS indicates which node will execute this xd
1329 // Next NODE_BITS indicates on which node this xd is generated
1330 // Last INDEX_BITS means a unique idx, which is used to resolve conflicts
1332 return (((XferDesID)execution_node << (NODE_BITS + INDEX_BITS)) |
1334 }
1335
1336 void update_pre_bytes_write(XferDesID xd_guid, int port_idx, size_t span_start,
1337 size_t span_size);
1338 void update_pre_bytes_total(XferDesID xd_guid, int port_idx, size_t pre_bytes_total);
1339 void update_next_bytes_read(XferDesID xd_guid, int port_idx, size_t span_start,
1340 size_t span_size);
1341
1343
1344 // returns true if xd is ready, false if enqueue has been deferred
1345 bool enqueue_xferDes_local(XferDes *xd, bool add_to_queue = true);
1346
1347 protected:
1348 // guid_to_xd maps a guid to either an XferDes * (as a uintptr_t) or
1349 // a XferDesPlaceholder * (as a uintptr_t with the LSB set)
1351 std::map<XferDesID, uintptr_t> guid_to_xd;
1352
1355 };
1356
1358}; // namespace Realm
1359
1360#include "realm/transfer/channel.inl"
1361
1362#endif
#define REALM_XFERDES_KINDS(__op__)
Definition channel.h:59
Definition activemsg.h:65
Definition address_list.h:101
Definition address_list.h:56
Definition mutex.h:384
Definition bgwork.h:129
Definition bgwork.h:36
Definition channel.h:712
virtual long progress_xd(XferDes *xd, long max_nr)
virtual Memory suggest_ib_memories_for_node(NodeID node) const
SupportedPath & add_path(span< const Memory > src_mems, span< const Memory > dst_mems, unsigned bandwidth, unsigned latency, unsigned frag_overhead, XferDesKind xd_kind)
virtual void shutdown()
Definition channel.h:721
virtual bool support_idindexed_fields(Memory src_mem, Memory dst_mem) const
Definition channel.h:741
NodeID node
Definition channel.h:729
SupportedPath & add_path(bool local_loopback, unsigned bandwidth, unsigned latency, unsigned frag_overhead, XferDesKind xd_kind)
SupportedPath & add_path(Memory::Kind src_kind, bool src_global, Memory::Kind dst_kind, bool dst_global, unsigned bandwidth, unsigned latency, unsigned frag_overhead, XferDesKind xd_kind)
SupportedPath & add_path(span< const Memory > src_mems, Memory::Kind dst_kind, bool dst_global, unsigned bandwidth, unsigned latency, unsigned frag_overhead, XferDesKind xd_kind)
virtual bool needs_wrapping_iterator() const
Definition channel.h:843
bool has_non_redop_path
Definition channel.h:737
virtual void enqueue_ready_xd(XferDes *xd)=0
virtual bool supports_indirection_memory(Memory mem) const
Queries if a given mem can be used as an indirection buffer.
const std::vector< SupportedPath > & get_paths(void) const
void print(std::ostream &os) const
std::vector< SupportedPath > paths
Definition channel.h:872
virtual uint64_t supports_path(ChannelCopyInfo channel_copy_info, CustomSerdezID src_serdez_id, CustomSerdezID dst_serdez_id, ReductionOpID redop_id, size_t total_bytes, const std::vector< size_t > *src_frags, const std::vector< size_t > *dst_frags, XferDesKind *kind_ret=0, unsigned *bw_ret=0, unsigned *lat_ret=0)
virtual ~Channel()
Definition channel.h:718
virtual void pull()=0
SupportedPath & add_path(Memory::Kind src_kind, bool src_global, span< const Memory > dst_mems, unsigned bandwidth, unsigned latency, unsigned frag_overhead, XferDesKind xd_kind)
XferDesKind kind
Definition channel.h:731
virtual XferDesFactory * get_factory()=0
virtual long submit(Request **requests, long nr)=0
virtual RemoteChannelInfo * construct_remote_info() const
Channel(XferDesKind _kind)
Definition channel.h:714
virtual Memory suggest_ib_memories() const
bool has_redop_path
Definition channel.h:734
virtual void wakeup_xd(XferDes *xd)=0
virtual bool supports_redop(ReductionOpID redop_id) const
virtual long available()=0
void update_channel_state(void)
Definition channel.h:270
unsigned short port_shift
Definition channel.h:281
size_t temp_count
Definition channel.h:280
bool decode(unsigned data, size_t &count, int &port, bool &last)
Definition channel.h:243
void set_port_count(size_t ports)
State
Definition channel.h:259
@ STATE_IDLE
Definition channel.h:262
@ STATE_SENT_HIGH
Definition channel.h:263
@ STATE_SENT_MID
Definition channel.h:264
@ STATE_INIT
Definition channel.h:260
@ STATE_DONE
Definition channel.h:265
@ STATE_HAVE_PORT_COUNT
Definition channel.h:261
unsigned char state
Definition channel.h:267
bool encode(unsigned &data, size_t count, int port, bool last)
unsigned short port_shift
Definition channel.h:256
Definition custom_serdez.h:150
Definition threads.h:428
Definition event_impl.h:49
Definition event.h:50
Definition channel.h:1087
GASNetChannel(BackgroundWorkManager *bgwork, XferDesKind _kind)
static const bool is_ordered
Definition channel.h:1093
long submit(Request **requests, long nr)
virtual XferDes * create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)
Definition channel.h:175
void * mem_base
Definition channel.h:177
off_t gas_off
Definition channel.h:178
Definition channel.h:575
void notify_request_read_done(Request *req)
bool progress_xd(GASNetChannel *channel, TimeLimit work_until)
long get_requests(Request **requests, long nr)
GASNetXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int _priority)
void notify_request_write_done(Request *req)
~GASNetXferDes()
Definition channel.h:581
static const int MEMORY_INDEX_WIDTH
Definition id.h:58
Definition inst_layout.h:267
Definition lists.h:119
Definition channel.h:877
virtual XferDes * create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)=0
SimpleXferDesFactory factory_singleton
Definition channel.h:891
LocalChannel(XferDesKind _kind)
virtual XferDesFactory * get_factory()
Definition memcpy_channel.h:55
Definition channel.h:168
const void * src_base
Definition channel.h:170
void * dst_base
Definition channel.h:171
Definition channel.h:1045
bool is_stopped
Definition channel.h:1063
virtual XferDes * create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)
static const bool is_ordered
Definition channel.h:1050
virtual long submit(Request **requests, long nr)
MemfillChannel(BackgroundWorkManager *bgwork)
Definition channel.h:539
MemfillXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int _priority, const void *_fill_data, size_t _fill_size, size_t _fill_total)
virtual Request * dequeue_request()
bool progress_xd(MemfillChannel *channel, TimeLimit work_until)
long get_requests(Request **requests, long nr)
virtual bool request_available()
virtual void enqueue_request(Request *req)
Definition mem_impl.h:50
Definition memory.h:33
Kind
Definition memory.h:59
static const Memory NO_MEMORY
Definition memory.h:49
Definition channel.h:1066
virtual bool supports_redop(ReductionOpID redop_id) const
static const bool is_ordered
Definition channel.h:1071
MemreduceChannel(BackgroundWorkManager *bgwork)
bool is_stopped
Definition channel.h:1084
virtual long submit(Request **requests, long nr)
virtual XferDes * create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)
Definition channel.h:557
MemreduceXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int _priority, XferDesRedopInfo _redop_info)
long get_requests(Request **requests, long nr)
const ReductionOpUntyped * redop
Definition channel.h:570
bool progress_xd(MemreduceChannel *channel, TimeLimit work_until)
XferDesRedopInfo redop_info
Definition channel.h:569
Definition mutex.h:398
Definition instance.h:66
Definition channel.h:895
virtual RemoteChannel * create_remote_channel()=0
virtual ~RemoteChannelInfo()
Definition channel.h:897
static RemoteChannelInfo * deserialize_new(S &deserializer)
Definition channel.h:938
XferDesFactory * get_factory() override
long available() override
uint64_t supports_path(ChannelCopyInfo channel_copy_info, CustomSerdezID src_serdez_id, CustomSerdezID dst_serdez_id, ReductionOpID redop_id, size_t total_bytes, const std::vector< size_t > *src_frags, const std::vector< size_t > *dst_frags, XferDesKind *kind_ret=0, unsigned *bw_ret=0, unsigned *lat_ret=0) override
uintptr_t get_remote_ptr() const
SimpleXferDesFactory factory_singleton
Definition channel.h:995
const std::set< Memory > indirect_memories
Definition channel.h:996
long submit(Request **requests, long nr) override
std::unordered_set< ReductionOpID > supported_redops
Definition channel.h:994
RemoteChannel(uintptr_t _remote_ptr)
RWLock mutex
Definition channel.h:992
void register_redop(ReductionOpID redop_id)
void enqueue_ready_xd(XferDes *xd) override
Definition channel.h:988
void wakeup_xd(XferDes *xd) override
Definition channel.h:989
void pull() override
bool supports_indirection_memory(Memory mem) const override
Queries if a given mem can be used as an indirection buffer.
bool supports_redop(ReductionOpID redop_id) const override
uintptr_t remote_ptr
Definition channel.h:993
void shutdown() override
RemoteChannel(uintptr_t _remote_ptr, const std::vector< Memory > &indirect_memories)
Definition channel.h:1106
long submit(Request **requests, long nr)
static const bool is_ordered
Definition channel.h:1112
virtual XferDes * create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)
RemoteWriteChannel(BackgroundWorkManager *bgwork)
Definition channel.h:183
const void * src_base
Definition channel.h:186
Definition channel.h:596
virtual void update_bytes_write(int port_idx, size_t offset, size_t size)
~RemoteWriteXferDes()
Definition channel.h:602
RemoteWriteXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int _priority)
void notify_request_read_done(Request *req)
bool progress_xd(RemoteWriteChannel *channel, TimeLimit work_until)
long get_requests(Request **requests, long nr)
void notify_request_write_done(Request *req)
Definition channel.h:102
Dimension dim
Definition channel.h:127
off_t src_str
Definition channel.h:117
off_t dst_pstr
Definition channel.h:119
size_t write_seq_count
Definition channel.h:130
int src_port_idx
Definition channel.h:113
off_t src_off
Definition channel.h:115
off_t dst_off
Definition channel.h:115
off_t src_pstr
Definition channel.h:119
off_t dst_str
Definition channel.h:117
size_t nlines
Definition channel.h:121
Dimension
Definition channel.h:105
@ DIM_3D
Definition channel.h:108
@ DIM_1D
Definition channel.h:106
@ DIM_2D
Definition channel.h:107
bool is_write_done
Definition channel.h:125
size_t write_seq_pos
Definition channel.h:130
size_t nbytes
Definition channel.h:121
XferDes * xd
Definition channel.h:112
size_t read_seq_pos
Definition channel.h:129
int dst_port_idx
Definition channel.h:113
bool is_read_done
Definition channel.h:123
size_t nplanes
Definition channel.h:121
size_t read_seq_count
Definition channel.h:129
Definition channel.h:133
atomic< Mutex * > mutex
Definition channel.h:164
atomic< size_t > contig_amount_x2
Definition channel.h:161
std::map< size_t, size_t > spans
Definition channel.h:165
atomic< size_t > first_noncontig
Definition channel.h:163
size_t add_span(size_t pos, size_t count)
size_t span_exists(size_t start, size_t count)
SequenceAssembler(const SequenceAssembler &copy_from)
void swap(SequenceAssembler &other)
Definition channel.h:908
static Serialization::PolymorphicSerdezSubclass< RemoteChannelInfo, SimpleRemoteChannelInfo > serdez_subclass
Definition channel.h:929
std::vector< Channel::SupportedPath > paths
Definition channel.h:934
SimpleRemoteChannelInfo(NodeID _owner, XferDesKind _kind, uintptr_t _remote_ptr, const std::vector< Channel::SupportedPath > &_paths)
static RemoteChannelInfo * deserialize_new(S &deserializer)
uintptr_t remote_ptr
Definition channel.h:933
std::vector< Memory > indirect_memories
Definition channel.h:935
SimpleRemoteChannelInfo(NodeID _owner, XferDesKind _kind, uintptr_t _remote_ptr, const std::vector< Channel::SupportedPath > &_paths, const std::vector< Memory > &indirect_memories)
virtual RemoteChannel * create_remote_channel()
NodeID owner
Definition channel.h:931
bool serialize(S &serializer) const
XferDesKind kind
Definition channel.h:932
Definition channel.h:661
virtual void create_xfer_des(uintptr_t dma_op, NodeID launch_node, NodeID target_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)
SimpleXferDesFactory(uintptr_t _channel)
virtual bool needs_release()
uintptr_t channel
Definition channel.h:676
Definition channel.h:1018
virtual void shutdown()
void pull()
Definition channel.h:1029
SingleXDQChannel(BackgroundWorkManager *bgwork, XferDesKind _kind, const std::string &_name, int _numa_domain=-1)
virtual void wakeup_xd(XferDes *xd)
virtual long progress_xd(XferDes *xd, long max_nr)
Definition channel.h:1035
XDQueue< CHANNEL, XD > xdq
Definition channel.h:1042
long available()
Definition channel.h:1030
virtual void enqueue_ready_xd(XferDes *xd)
Definition timers.h:129
Definition transfer.h:45
Definition transfer.h:663
Definition mutex.h:223
Definition channel.h:1000
XDQueue(LocalChannel *_channel, const std::string &_name, bool _ordered)
Mutex mutex
Definition channel.h:1013
XferDes::XferDesList ready_xds
Definition channel.h:1014
void enqueue_xd(XD *xd, bool at_front=false)
LocalChannel * channel
Definition channel.h:1011
virtual bool do_work(TimeLimit work_until)
friend CHANNEL
Definition channel.h:1009
bool ordered_mode
Definition channel.h:1012
bool in_ordered_worker
Definition channel.h:1012
Definition channel.h:632
virtual ~XferDesFactory()
Definition channel.h:634
virtual bool needs_release()=0
virtual void create_xfer_des(uintptr_t dma_op, NodeID launch_node, NodeID target_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)=0
Definition channel.h:1275
XferDes * xd
Definition channel.h:1298
SequenceAssembler inline_pre_write[INLINE_PORTS]
Definition channel.h:1301
std::map< int, size_t > extra_bytes_total
Definition channel.h:1303
void add_update_pre_bytes_total_received(void)
void update_pre_bytes_write(int port_idx, size_t span_start, size_t span_size)
unsigned get_update_pre_bytes_total_received(void)
Mutex extra_mutex
Definition channel.h:1302
atomic< unsigned > refcount
Definition channel.h:1297
static const int INLINE_PORTS
Definition channel.h:1296
std::map< int, SequenceAssembler > extra_pre_write
Definition channel.h:1304
atomic< unsigned > nb_update_pre_bytes_total_calls_received
Definition channel.h:1299
void update_pre_bytes_total(int port_idx, size_t pre_bytes_total)
size_t inline_bytes_total[INLINE_PORTS]
Definition channel.h:1300
void set_real_xd(XferDes *_xd)
Definition channel.h:1307
atomic< XferDesID > next_to_assign_idx
Definition channel.h:1354
void update_next_bytes_read(XferDesID xd_guid, int port_idx, size_t span_start, size_t span_size)
static XferDesQueue * get_singleton()
bool enqueue_xferDes_local(XferDes *xd, bool add_to_queue=true)
Mutex queues_lock
Definition channel.h:1353
void update_pre_bytes_total(XferDesID xd_guid, int port_idx, size_t pre_bytes_total)
void destroy_xferDes(XferDesID guid)
void update_pre_bytes_write(XferDesID xd_guid, int port_idx, size_t span_start, size_t span_size)
Mutex guid_lock
Definition channel.h:1350
~XferDesQueue()
Definition channel.h:1321
@ NODE_BITS
Definition channel.h:1311
@ INDEX_BITS
Definition channel.h:1312
std::map< XferDesID, uintptr_t > guid_to_xd
Definition channel.h:1351
XferDesQueue()
Definition channel.h:1314
XferDesID get_guid(NodeID execution_node)
Definition channel.h:1325
Definition channel.h:472
XferDes * xd
Definition channel.h:482
virtual void event_triggered(bool poisoned, TimeLimit work_until)
XferDesQueue * xferDes_queue
Definition channel.h:481
virtual Event get_finish_event(void) const
virtual void print(std::ostream &os) const
void defer(XferDesQueue *_xferDes_queue, XferDes *_xd, Event wait_on)
Definition channel.h:489
size_t sizes[MAX_ENTRIES]
Definition channel.h:502
void add_span(int port_idx, size_t offset, size_t size)
int ports[MAX_ENTRIES]
Definition channel.h:500
static const size_t MAX_ENTRIES
Definition channel.h:497
size_t offsets[MAX_ENTRIES]
Definition channel.h:501
XferDes * xd
Definition channel.h:499
SequenceCache(XferDes *_xd, size_t _flush_bytes=0)
size_t flush_bytes
Definition channel.h:503
size_t total_bytes
Definition channel.h:503
Definition channel.h:285
REALM_PMTA_DEFN(XferDes, IntrusivePriorityListLink< XferDes >, xd_link)
Mutex update_write_lock
Definition channel.h:357
Channel * channel
Definition channel.h:342
void mark_completed(TimeLimit work_until)
Mutex available_req_mutex
Definition channel.h:383
virtual Request * dequeue_request()
Definition channel.h:444
atomic< int64_t > bytes_write_pending
Definition channel.h:294
REALM_PMTA_DEFN(XferDes, int, priority)
virtual void update_bytes_read(int port_idx, size_t offset, size_t size)
SequenceCache<&XferDes::update_bytes_write > WriteSequenceCache
Definition channel.h:506
void update_progress(void)
std::queue< Request * > available_reqs
Definition channel.h:384
void add_reference(void)
uint64_t current_out_port_remain
Definition channel.h:298
void default_notify_request_read_done(Request *req)
long default_get_requests(Request **requests, long nr, unsigned flags=0)
void add_update_pre_bytes_total_received(void)
size_t update_control_info(ReadSequenceCache *rseqcache)
atomic< unsigned > progress_counter
Definition channel.h:364
IntrusivePriorityListLink< XferDes > xd_link
Definition channel.h:373
atomic< bool > iteration_completed
Definition channel.h:293
AlignedStorage inline_fill_storage
Definition channel.h:352
void * fill_data
Definition channel.h:344
virtual void notify_request_write_done(Request *req)
virtual ~XferDes()
uint64_t max_req_size
Definition channel.h:329
SequenceCache<&XferDes::update_bytes_read > ReadSequenceCache
Definition channel.h:505
size_t fill_size
Definition channel.h:345
std::vector< XferPort > output_ports
Definition channel.h:319
static const size_t ALIGNED_FILL_STORAGE_SIZE
Definition channel.h:347
void update_next_bytes_read(int port_idx, size_t offset, size_t size)
virtual bool request_available()
Definition channel.h:438
ControlPortState output_control
Definition channel.h:327
NodeID launch_node
Definition channel.h:291
bool record_address_consumption(size_t total_read_bytes, size_t total_write_bytes)
virtual Event request_metadata()
void update_pre_bytes_total(int port_idx, size_t pre_bytes_total)
bool check_for_progress(unsigned last_counter)
REALM_ALIGNED_TYPE_CONST(AlignedStorage, UnalignedStorage, 16)
unsigned current_progress(void)
ControlPortState input_control
Definition channel.h:327
Mutex xd_lock
Definition channel.h:357
uint64_t current_in_port_remain
Definition channel.h:298
Mutex update_read_lock
Definition channel.h:357
atomic< bool > transfer_completed
Definition channel.h:295
size_t get_addresses(size_t min_xfer_size, ReadSequenceCache *rseqcache)
XferDesQueue * xferDes_queue
Definition channel.h:289
int priority
Definition channel.h:331
void begin_completion()
XferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int _priority, const void *_fill_data, size_t fill_size)
void replicate_fill_data(size_t new_size)
DeferredXDEnqueue deferred_enqueue
Definition channel.h:484
virtual void update_bytes_write(int port_idx, size_t offset, size_t size)
virtual long get_requests(Request **requests, long nr)=0
atomic< unsigned > reference_count
Definition channel.h:366
uint64_t current_out_port_mask
Definition channel.h:297
size_t get_addresses(size_t min_xfer_size, ReadSequenceCache *rseqcache, const InstanceLayoutPieceBase *&in_nonaffine, const InstanceLayoutPieceBase *&out_nonaffine)
@ XFERDES_NO_GUID
Definition channel.h:339
virtual void notify_request_read_done(Request *req)
void default_notify_request_write_done(Request *req)
atomic< unsigned > nb_update_pre_bytes_total_calls_received
Definition channel.h:370
virtual void flush()
IntrusivePriorityList< XferDes, int, REALM_PMTA_USE(XferDes, xd_link), REALM_PMTA_USE(XferDes, priority), DummyLock > XferDesList
Definition channel.h:378
void update_pre_bytes_write(int port_idx, size_t offset, size_t size)
uint64_t current_in_port_mask
Definition channel.h:297
XferDesKind kind
Definition channel.h:336
unsigned nb_update_pre_bytes_total_calls_expected
Definition channel.h:368
XferDesID guid
Definition channel.h:334
void remove_reference(void)
uintptr_t dma_op
Definition channel.h:288
virtual void enqueue_request(Request *req)
Definition channel.h:459
size_t orig_fill_size
Definition channel.h:345
std::vector< XferPort > input_ports
Definition channel.h:319
Definition atomics.h:31
T load(void) const
void store(T newval)
T fetch_add(T to_add)
Definition utils.h:84
#define REALM_INTERNAL_API_EXTERNAL_LINKAGE
Definition compiler_support.h:218
#define REALM_PMTA_USE(structtype, name)
Definition lists.h:42
#define STRING_KIND_CASE(kind, desc)
#define C_ENUMS(name, desc)
Definition memory.h:60
Definition activemsg.h:38
Logger log_new_dma
int NodeID
Definition nodeset.h:40
XferDesKind
Definition channel.h:84
int CustomSerdezID
Definition custom_serdez.h:148
bool serialize(S &serdez, const ByteArrayRef &a)
void destroy_xfer_des(XferDesID _guid)
REALM_CUDA_HD bool operator==(const Point< N, T > &lhs, const Point< N, T2 > &rhs)
unsigned long long XferDesID
Definition channel.h:56
::realm_reduction_op_id_t ReductionOpID
Definition event.h:38
std::ostream & operator<<(std::ostream &os, const DenseRectangleList< N, T > &drl)
Definition channel.h:683
bool oor_possible
Definition channel.h:705
Memory ind_mem
Definition channel.h:700
ChannelCopyInfo(Memory _src_mem, Memory _dst_mem, Memory _ind_mem=Memory::NO_MEMORY, size_t _num_spaces=1, bool _is_scatter=false, bool _is_ranges=false, bool _is_direct=true, bool _oor_possible=false, size_t _addr_size=0)
Definition channel.h:684
size_t addr_size
Definition channel.h:706
size_t num_spaces
Definition channel.h:701
bool is_scatter
Definition channel.h:702
Memory src_mem
Definition channel.h:698
bool is_ranges
Definition channel.h:703
Memory dst_mem
Definition channel.h:699
bool is_direct
Definition channel.h:704
uint64_t mems[BITMASK_SIZE]
Definition channel.h:782
static const int BITMASK_SIZE
Definition channel.h:781
uint64_t ib_mems[BITMASK_SIZE]
Definition channel.h:782
NodeID node
Definition channel.h:780
Definition channel.h:768
SrcDstType src_type
Definition channel.h:778
MemBitmask src_bitmask
Definition channel.h:787
bool serdez_allowed
Definition channel.h:800
SupportedPath & set_max_dim(int src_dim, int dst_dim)
unsigned latency
Definition channel.h:796
SupportedPath * chain
Definition channel.h:816
MemBitmask dst_bitmask
Definition channel.h:792
SrcDstType
Definition channel.h:770
@ LOCAL_RDMA
Definition channel.h:774
@ LOCAL_KIND
Definition channel.h:772
@ SPECIFIC_MEMORY
Definition channel.h:771
@ MEMORY_BITMASK
Definition channel.h:776
@ GLOBAL_KIND
Definition channel.h:773
@ REMOTE_RDMA
Definition channel.h:775
unsigned frag_overhead
Definition channel.h:797
Memory src_mem
Definition channel.h:785
SrcDstType dst_type
Definition channel.h:778
unsigned char max_dst_dim
Definition channel.h:798
void populate_memory_bitmask(span< const Memory > mems, NodeID node, MemBitmask &bitmask)
Memory::Kind src_kind
Definition channel.h:786
Memory dst_mem
Definition channel.h:790
SupportedPath & set_max_dim(int src_and_dst_dim)
SupportedPath & allow_redops()
unsigned bandwidth
Definition channel.h:795
Memory::Kind dst_kind
Definition channel.h:791
unsigned char max_src_dim
Definition channel.h:798
bool redops_allowed
Definition channel.h:799
SupportedPath & allow_serdez()
XferDesKind xd_kind
Definition channel.h:794
SupportedPath()
Definition channel.h:803
Definition channel.h:1125
TransferOperation * op
Definition channel.h:1126
static void handle_message(NodeID sender, const NotifyXferDesCompleteMessage &args, const void *data, size_t datalen)
XferDesID xd_id
Definition channel.h:1127
static void send_request(NodeID target, TransferOperation *op, XferDesID xd_id)
Definition redop.h:56
Definition network.h:46
static void handle_message(NodeID sender, const Write1DMessage &args, const void *data, size_t datalen)
int next_port_idx
Definition channel.h:618
XferDesID next_xd_guid
Definition channel.h:617
size_t span_start
Definition channel.h:619
static bool handle_inline(NodeID sender, const Write1DMessage &args, const void *data, size_t datalen, TimeLimit work_until)
Definition channel.h:654
static void handle_message(NodeID sender, const SimpleXferDesCreateMessage &args, const void *data, size_t datalen)
Definition channel.h:1253
int port_idx
Definition channel.h:1255
size_t span_size
Definition channel.h:1256
size_t span_start
Definition channel.h:1256
static void handle_message(NodeID sender, const UpdateBytesReadMessage &args, const void *data, size_t datalen)
XferDesID guid
Definition channel.h:1254
static void send_request(NodeID target, XferDesID guid, int port_idx, size_t span_start, size_t span_size)
Definition channel.h:1261
Definition channel.h:1224
size_t pre_bytes_total
Definition channel.h:1227
static void handle_message(NodeID sender, const UpdateBytesTotalMessage &args, const void *data, size_t datalen)
XferDesID guid
Definition channel.h:1225
int port_idx
Definition channel.h:1226
Definition channel.h:1233
size_t span_start
Definition channel.h:1236
int port_idx
Definition channel.h:1235
static void handle_message(NodeID sender, const UpdateBytesWriteMessage &args, const void *data, size_t datalen)
XferDesID guid
Definition channel.h:1234
static void send_request(NodeID target, XferDesID guid, int port_idx, size_t span_start, size_t span_size)
Definition channel.h:1241
size_t span_size
Definition channel.h:1236
Definition channel.h:646
uintptr_t channel
Definition channel.h:651
uintptr_t dma_op
Definition channel.h:648
XferDesID guid
Definition channel.h:649
NodeID launch_node
Definition channel.h:650
Definition channel.h:1212
static void send_request(NodeID target, XferDesID guid)
Definition channel.h:1216
static void handle_message(NodeID sender, const XferDesDestroyMessage &args, const void *data, size_t datalen)
XferDesID guid
Definition channel.h:1213
Definition channel.h:191
Memory mem
Definition channel.h:202
XferDesID peer_guid
Definition channel.h:199
CustomSerdezID serdez_id
Definition channel.h:206
RegionInstance inst
Definition channel.h:203
size_t ib_size
Definition channel.h:204
int port_type
Definition channel.h:198
@ DATA_PORT
Definition channel.h:194
@ SCATTER_CONTROL_PORT
Definition channel.h:196
@ GATHER_CONTROL_PORT
Definition channel.h:195
int indirect_port_idx
Definition channel.h:201
int peer_port_idx
Definition channel.h:200
TransferIterator * iter
Definition channel.h:205
size_t ib_offset
Definition channel.h:204
Definition channel.h:209
XferDesRedopInfo()
Definition channel.h:216
ReductionOpID id
Definition channel.h:210
XferDesRedopInfo(ReductionOpID _id, bool _is_fold, bool _in_place, bool _is_exclusive)
Definition channel.h:223
bool is_exclusive
Definition channel.h:213
bool is_fold
Definition channel.h:211
bool in_place
Definition channel.h:212
Definition channel.h:320
bool eos_received
Definition channel.h:325
size_t remaining_count
Definition channel.h:324
int current_io_port
Definition channel.h:323
ControlPort::Decoder decoder
Definition channel.h:321
int control_port_idx
Definition channel.h:322
Definition channel.h:348
char data[ALIGNED_FILL_STORAGE_SIZE]
Definition channel.h:349
Definition channel.h:299
AddressListCursor addrcursor
Definition channel.h:317
const CustomSerdezUntyped * serdez_op
Definition channel.h:302
size_t local_bytes_total
Definition channel.h:308
int peer_port_idx
Definition channel.h:304
bool is_indirect_port
Definition channel.h:306
atomic< size_t > local_bytes_cons
Definition channel.h:309
size_t ib_offset
Definition channel.h:315
TransferIterator * iter
Definition channel.h:301
atomic< bool > needs_pbt_update
Definition channel.h:307
atomic< size_t > remote_bytes_total
Definition channel.h:309
AddressList addrlist
Definition channel.h:316
size_t ib_size
Definition channel.h:315
MemoryImpl * mem
Definition channel.h:300
int indirect_port_idx
Definition channel.h:305
SequenceAssembler seq_local
Definition channel.h:310
XferDesID peer_guid
Definition channel.h:303
SequenceAssembler seq_remote
Definition channel.h:310
Memory ib_mem
Definition channel.h:314