Realm
A distributed, event-based tasking library
Loading...
Searching...
No Matches
channel.h
Go to the documentation of this file.
1/*
2 * Copyright 2025 Los Alamos National Laboratory, Stanford University, NVIDIA Corporation
3 * SPDX-License-Identifier: Apache-2.0
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#ifndef LOWLEVEL_CHANNEL
19#define LOWLEVEL_CHANNEL
20
21#include "realm/realm_config.h"
22
23#include <stdio.h>
24#include <stdlib.h>
25#include <stdint.h>
26#ifndef REALM_ON_WINDOWS
27#include <unistd.h>
28#include <pthread.h>
29#endif
30#include <fcntl.h>
31#include <map>
32#include <vector>
33#include <deque>
34#include <queue>
35#include <unordered_set>
36#include <assert.h>
37#include <string.h>
38
39#include "realm/id.h"
40#include "realm/runtime_impl.h"
41#include "realm/mem_impl.h"
42#include "realm/inst_impl.h"
43#include "realm/bgwork.h"
44#include "realm/utils.h"
46
47namespace Realm {
48
49 class XferDes;
50 class XferDesQueue;
51 class Channel;
52 class DmaRequest;
53 class TransferIterator;
54
55 extern Logger log_new_dma;
56
57 typedef unsigned long long XferDesID;
58
59 // clang-format off
60#define REALM_XFERDES_KINDS(__op__) \
61 __op__(XFER_NONE) \
62 __op__(XFER_DISK_READ) \
63 __op__(XFER_DISK_WRITE) \
64 __op__(XFER_SSD_READ) \
65 __op__(XFER_SSD_WRITE) \
66 __op__(XFER_GPU_TO_FB) \
67 __op__(XFER_GPU_FROM_FB) \
68 __op__(XFER_GPU_IN_FB) \
69 __op__(XFER_GPU_PEER_FB) \
70 __op__(XFER_MEM_CPY) \
71 __op__(XFER_GASNET_READ) \
72 __op__(XFER_GASNET_WRITE) \
73 __op__(XFER_REMOTE_WRITE) \
74 __op__(XFER_HDF5_READ) \
75 __op__(XFER_HDF5_WRITE) \
76 __op__(XFER_FILE_READ) \
77 __op__(XFER_FILE_WRITE) \
78 __op__(XFER_ADDR_SPLIT) \
79 __op__(XFER_MEM_FILL) \
80 __op__(XFER_GPU_SC_IN_FB) \
81 __op__(XFER_GPU_SC_PEER_FB)
82 // clang-format on
83
85 {
86#define C_ENUMS(name) name,
88#undef C_ENUMS
89 };
90
91 inline std::ostream &operator<<(std::ostream &os, XferDesKind kind)
92 {
93#define STRING_KIND_CASE(kind) \
94 case XferDesKind::kind: \
95 return os << #kind;
96 switch(kind) {
98 }
99#undef STRING_KIND_CASE
100 return os << "UNKNOWN_KIND";
101 }
102
103 class Request {
104 public:
111 // a pointer to the owning xfer descriptor
112 // this should set at Request creation
115 // src/dst offset in the src/dst instance
117 // src/dst (line) strides
119 // src/dst plane strides
121 // number of bytes being transferred
123 // a flag indicating whether this request read has been done
125 // a flag indicating whether this request write has been done
127 // whether I am a 1D or 2D transfer
129 // sequence info - used to update read/write counts, handle reordering
132 };
133
135 public:
139
140 // NOT thread-safe - caller must ensure neither *this nor other is being
141 // modified during this call
143
144 // imports data from this assembler into another (this is thread-safe
145 // on the `other` but assumes no changes being made on `this`)
146 void import(SequenceAssembler &other) const;
147
148 bool empty() const;
149
150 // asks if a span exists - return value is number of bytes from the
151 // start that do
152 size_t span_exists(size_t start, size_t count);
153
154 // returns the amount by which the contiguous range has been increased
155 // (i.e. from [pos, pos+retval) )
156 size_t add_span(size_t pos, size_t count);
157
158 protected:
160
162 contig_amount_x2; // everything from [0, contig_amount) is covered - LSB indicates
163 // potential presence of noncontig spans
164 atomic<size_t> first_noncontig; // nothing in [contig_amount, first_noncontig)
165 atomic<Mutex *> mutex; // created on first use
166 std::map<size_t, size_t> spans; // noncontiguous spans
167 };
168
169 class MemcpyRequest : public Request {
170 public:
171 const void *src_base;
172 void *dst_base;
173 // size_t nbytes;
174 };
175
176 class GASNetRequest : public Request {
177 public:
178 void *mem_base; // could be source or dest
179 off_t gas_off;
180 // off_t src_offset;
181 // size_t nbytes;
182 };
183
185 public:
186 // NodeID dst_node;
187 const void *src_base;
188 // void *dst_base;
189 // size_t nbytes;
190 };
191
209
215
216 // default constructor == no reduction requested
218 : id(0)
219 , is_fold(false)
220 , in_place(false)
221 , is_exclusive(false)
222 {}
223
224 XferDesRedopInfo(ReductionOpID _id, bool _is_fold, bool _in_place, bool _is_exclusive)
225 : id(_id)
226 , is_fold(_is_fold)
227 , in_place(_in_place)
228 , is_exclusive(_is_exclusive)
229 {}
230 };
231
232 // a control port is used to steer inputs/outputs of transfer descriptors -
233 // the information is encoded into 32b packets which may be read/written
234 // at different times due to flow control, so the encoder and decoder
235 // both need to be stateful
236 namespace ControlPort {
237 // apart from the first control word (which carries the space_shift
238 // amount), the bottom two bits of each control word mean:
239 static const unsigned CTRL_LO_MORE = 0; // 00: count.lo, space_index, not last
240 static const unsigned CTRL_LO_LAST = 1; // 01: count.lo, space_index, last
241 static const unsigned CTRL_MID = 2; // 10: count.mid (32 bits)
242 static const unsigned CTRL_HIGH = 3; // 11: count.hi (2+space_shift bits)
243
244 class Encoder {
245 public:
248
249 void set_port_count(size_t ports);
250
251 // encodes some/all of the { count, port, last } packet into the next
252 // 32b - returns true if encoding is complete or false if it should
253 // be called again with the same arguments for another 32b packet
254 bool encode(unsigned &data, size_t count, int port, bool last);
255
256 protected:
257 unsigned short port_shift;
258
268 unsigned char state;
269 };
270
271 class Decoder {
272 public:
275
276 // decodes the next 32b of packed data, returning true if a complete
277 // { count, port, last } has been received
278 bool decode(unsigned data, size_t &count, int &port, bool &last);
279
280 protected:
282 unsigned short port_shift;
283 };
284 }; // namespace ControlPort
285
286 class XferDes {
287 public:
288 // a pointer to the DmaRequest that contains this XferDes
289 uintptr_t dma_op;
291 // ID of the node that launches this XferDes
293 // uint64_t /*bytes_submit, */bytes_read, bytes_write/*, bytes_total*/;
297 // current input and output port mask
320 std::vector<XferPort> input_ports, output_ports;
325 size_t remaining_count; // units of bytes for normal (elements for serialized data?)
327 };
329 // maximum size for a single request
330 uint64_t max_req_size;
331 // priority of the containing XferDes
333 // current, previous and next XferDes in the chain, XFERDES_NO_GUID
334 // means this XferDes is the first/last one.
335 XferDesID guid; //, pre_xd_guid, next_xd_guid;
336 // XferDesKind of the Xfer Descriptor
338 enum
339 {
341 };
342 // channel this XferDes describes
344
347 // for most fills, we can use an inline buffer with good alignment
348 static const size_t ALIGNED_FILL_STORAGE_SIZE = 32;
353 AlignedStorage inline_fill_storage;
354
355 // xd_lock is designed to provide thread-safety for
356 // SIMULTANEOUS invocation to get_requests,
357 // notify_request_read_done, and notify_request_write_done
359 // default iterators provided to generate requests
360 // Layouts::GenericLayoutIterator<DIM>* li;
361 // SJT:what is this for?
362 // unsigned offset_idx;
363 // used to track by upstream/downstream xds so that we can safely
364 // sleep xds that are stalled
366
368
370
372
373 // intrusive list for queued XDs in a channel
380
381 protected:
382 // this will be removed soon
383 // queue that contains all available free requests
385 std::queue<Request *> available_reqs;
386
387 public:
388 XferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid,
389 const std::vector<XferDesPortInfo> &inputs_info,
390 const std::vector<XferDesPortInfo> &outputs_info, int _priority,
391 const void *_fill_data, size_t fill_size);
392
393 // transfer descriptors are reference counted rather than explcitly
394 // deleted
395 void add_reference(void);
397
399
400 protected:
401 virtual ~XferDes();
402
403 public:
405
406 virtual long get_requests(Request **requests, long nr) = 0;
407
409
411
412 virtual void flush();
413
414 long default_get_requests(Request **requests, long nr, unsigned flags = 0);
417
418 virtual void update_bytes_read(int port_idx, size_t offset, size_t size);
419 virtual void update_bytes_write(int port_idx, size_t offset, size_t size);
420 void update_pre_bytes_write(int port_idx, size_t offset, size_t size);
421 void update_pre_bytes_total(int port_idx, size_t pre_bytes_total);
422 void update_next_bytes_read(int port_idx, size_t offset, size_t size);
423
424 // called once iteration is complete, but we need to track in flight
425 // writes, flush byte counts, etc.
427
429
430 unsigned current_progress(void);
431
432 // checks to see if progress has been made since the last read of the
433 // progress counter - atomically marks the xd for wakeup if not
434 bool check_for_progress(unsigned last_counter);
435
436 // updates the progress counter, waking up the xd if needed
437 void update_progress(void);
438
439 virtual bool request_available()
440 {
442 return !available_reqs.empty();
443 }
444
446 {
447 Request *req;
448 {
450 req = available_reqs.front();
451 available_reqs.pop();
452 }
453 req->is_read_done = false;
454 req->is_write_done = false;
455 // by default, an "active" request holds a reference on the xd
457 return req;
458 }
459
460 virtual void enqueue_request(Request *req)
461 {
462 {
464 available_reqs.push(req);
465 }
466 // update progress counter if iteration isn't completed yet - it might
467 // have been waiting for another request object
471 }
472
474 public:
475 void defer(XferDesQueue *_xferDes_queue, XferDes *_xd, Event wait_on);
476
477 virtual void event_triggered(bool poisoned, TimeLimit work_until);
478 virtual void print(std::ostream &os) const;
479 virtual Event get_finish_event(void) const;
480
481 protected:
483 XferDes *xd; // TODO: eliminate this based on a known offset
484 };
486
487 // helper widget to cache spans so that SequenceAssembler updates are as
488 // large as possible
489 template <void (XferDes::*UPDATE)(int port_idx, size_t offset, size_t size)>
491 public:
492 SequenceCache(XferDes *_xd, size_t _flush_bytes = 0);
493
494 void add_span(int port_idx, size_t offset, size_t size);
495 void flush();
496
497 protected:
498 static const size_t MAX_ENTRIES = 4;
499
505 };
508
510
511 // a helper routine for individual XferDes implementations - tries to get
512 // addresses and check flow control for at least 'min_xfer_size' bytes
513 // worth of transfers from a single input to a single output, and returns
514 // the number of bytes that can be transferred before another call to
515 // this method
516 // returns 0 if the transfer is complete OR if there are fewer than the
517 // minimum requested bytes available and there's reason to believe that
518 // trying again later will result in a larger chunk
519 // as a side effect, the input/output control information is updated - the
520 // actual input/output ports involved in the next transfer are stored there
521 size_t get_addresses(size_t min_xfer_size, ReadSequenceCache *rseqcache);
522 size_t get_addresses(size_t min_xfer_size, ReadSequenceCache *rseqcache,
523 const InstanceLayoutPieceBase *&in_nonaffine,
524 const InstanceLayoutPieceBase *&out_nonaffine);
525
526 // after a call to 'get_addresses', this call updates the various data
527 // structures to record that transfers for 'total_{read,write}_bytes' bytes
528 // were at least initiated - return value is whether iteration is complete
529 bool record_address_consumption(size_t total_read_bytes, size_t total_write_bytes);
530
531 // fills can be more efficient if the fill data is replicated into a larger
532 // block
533 void replicate_fill_data(size_t new_size);
534 };
535
536 class MemcpyChannel;
537
538 class MemfillChannel;
539
540 class MemfillXferDes : public XferDes {
541 public:
542 MemfillXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node,
543 XferDesID _guid, const std::vector<XferDesPortInfo> &inputs_info,
544 const std::vector<XferDesPortInfo> &outputs_info, int _priority,
545 const void *_fill_data, size_t _fill_size, size_t _fill_total);
546
547 long get_requests(Request **requests, long nr);
548
549 virtual bool request_available();
551 virtual void enqueue_request(Request *req);
552
554 };
555
556 class MemreduceChannel;
557
558 class MemreduceXferDes : public XferDes {
559 public:
560 MemreduceXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node,
561 XferDesID _guid, const std::vector<XferDesPortInfo> &inputs_info,
562 const std::vector<XferDesPortInfo> &outputs_info, int _priority,
563 XferDesRedopInfo _redop_info);
564
565 long get_requests(Request **requests, long nr);
566
568
569 protected:
572 };
573
574 class GASNetChannel;
575
576 class GASNetXferDes : public XferDes {
577 public:
578 GASNetXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node,
579 XferDesID _guid, const std::vector<XferDesPortInfo> &inputs_info,
580 const std::vector<XferDesPortInfo> &outputs_info, int _priority);
581
582 ~GASNetXferDes() { free(gasnet_reqs); }
583
584 long get_requests(Request **requests, long nr);
587 void flush();
588
590
591 private:
592 GASNetRequest *gasnet_reqs;
593 };
594
595 class RemoteWriteChannel;
596
598 public:
599 RemoteWriteXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node,
600 XferDesID _guid, const std::vector<XferDesPortInfo> &inputs_info,
601 const std::vector<XferDesPortInfo> &outputs_info, int _priority);
602
603 ~RemoteWriteXferDes() { free(requests); }
604
605 long get_requests(Request **requests, long nr);
608 void flush();
609
610 // doesn't do pre_bytes_write updates, since the remote write message
611 // takes care of it with lower latency
612 virtual void update_bytes_write(int port_idx, size_t offset, size_t size);
613
615
616 // writes directly to a contiguous chunk of destination
621
622 static void handle_message(NodeID sender, const Write1DMessage &args,
623 const void *data, size_t datalen);
624 static bool handle_inline(NodeID sender, const Write1DMessage &args,
625 const void *data, size_t datalen, TimeLimit work_until);
626 };
627
628 private:
629 RemoteWriteRequest *requests;
630 // char *dst_buf_base;
631 };
632
634 public:
635 virtual ~XferDesFactory() {}
636 virtual bool needs_release() = 0;
637
638 virtual void create_xfer_des(uintptr_t dma_op, NodeID launch_node, NodeID target_node,
639 XferDesID guid,
640 const std::vector<XferDesPortInfo> &inputs_info,
641 const std::vector<XferDesPortInfo> &outputs_info,
642 int priority, XferDesRedopInfo redop_info,
643 const void *fill_data, size_t fill_size,
644 size_t fill_total) = 0;
645 };
646
648 // RegionInstance inst;
649 uintptr_t dma_op;
652 uintptr_t channel;
653 };
654
656 static void handle_message(NodeID sender, const SimpleXferDesCreateMessage &args,
657 const void *data, size_t datalen);
658 };
659
660 // a simple xfer des factory knows how to create an xfer des with no
661 // extra information on a single channel
663 public:
664 SimpleXferDesFactory(uintptr_t _channel);
665
666 virtual bool needs_release();
667
668 virtual void create_xfer_des(uintptr_t dma_op, NodeID launch_node, NodeID target_node,
669 XferDesID guid,
670 const std::vector<XferDesPortInfo> &inputs_info,
671 const std::vector<XferDesPortInfo> &outputs_info,
672 int priority, XferDesRedopInfo redop_info,
673 const void *fill_data, size_t fill_size,
674 size_t fill_total);
675
676 protected:
677 uintptr_t channel;
678 };
679
680 class RemoteChannelInfo;
681 class RemoteChannel;
682
683 // TODO(apryakhin): Deprecate ChannelCopyInfo
685 ChannelCopyInfo(Memory _src_mem, Memory _dst_mem, Memory _ind_mem = Memory::NO_MEMORY,
686 size_t _num_spaces = 1, bool _is_scatter = false,
687 bool _is_ranges = false, bool _is_direct = true,
688 bool _oor_possible = false, size_t _addr_size = 0)
689 : src_mem(_src_mem)
690 , dst_mem(_dst_mem)
691 , ind_mem(_ind_mem)
692 , num_spaces(_num_spaces)
693 , is_scatter(_is_scatter)
694 , is_ranges(_is_ranges)
695 , is_direct(_is_direct)
696 , oor_possible(_oor_possible)
697 , addr_size(_addr_size)
698 {}
707 size_t addr_size;
708 };
709
710 std::ostream &operator<<(std::ostream &os, const ChannelCopyInfo &info);
711 bool operator==(const ChannelCopyInfo &lhs, const ChannelCopyInfo &rhs);
712
713 class Channel {
714 public:
716 : node(Network::my_node_id)
717 , kind(_kind)
718 {}
719 virtual ~Channel(){};
720
721 // TODO: make pure virtual
722 virtual void shutdown() {}
723
724 // most channels can hand out a factory that makes arbitrary xds on
725 // that channel
727
728 public:
729 // which node manages this channel
731 // the kind of XferDes this channel can accept
733
734 // whether the channel has a redop path
735 bool has_redop_path{false};
736
737 // whether the channel has a non-redop path
739
740 virtual bool supports_redop(ReductionOpID redop_id) const;
741
742 virtual bool support_idindexed_fields(Memory src_mem, Memory dst_mem) const
743 {
744 return false;
745 };
746
747 // attempt to make progress on the specified xferdes
748 virtual long progress_xd(XferDes *xd, long max_nr);
749
750 /*
751 * Submit nr asynchronous requests into the channel instance.
752 * This is supposed to be a non-blocking function call, and
753 * should immediately return the number of requests that are
754 * successfully submitted.
755 */
756 virtual long submit(Request **requests, long nr) = 0;
757
758 /*
759 *
760 */
761 virtual void pull() = 0;
762
763 /*
764 * Return the number of slots that are available for
765 * submitting requests
766 */
767 virtual long available() = 0;
768
780 struct MemBitmask {
782 static const int BITMASK_SIZE = (1 << ID::MEMORY_INDEX_WIDTH) >> 6;
784 };
785 union {
789 };
790 union {
794 };
796 unsigned bandwidth; // units = MB/s = B/us
797 unsigned latency; // units = ns
798 unsigned frag_overhead; // units = ns
799 unsigned char max_src_dim, max_dst_dim;
800 bool redops_allowed; // TODO: list of redops?
801 bool serdez_allowed; // TODO: list of serdez ops?
802
803 // constructor
805 : src_mem(Memory::NO_MEMORY)
806 , dst_mem(Memory::NO_MEMORY)
807 {}
808
809 // mutators to modify less-common fields
810 SupportedPath &set_max_dim(int src_and_dst_dim);
811 SupportedPath &set_max_dim(int src_dim, int dst_dim);
814
815 // only valid when a SupportedPath is modifiable by the above methods
816 // (i.e. only on the creator node and only until another path is added)
818
820 MemBitmask &bitmask);
821 };
822
823 const std::vector<SupportedPath> &get_paths(void) const;
824
826
827 // returns 0 if the path is not supported, or a strictly-positive
828 // estimate of the time required (in nanoseconds) to transfer data
829 // along a supported path
830 virtual uint64_t
831 supports_path(ChannelCopyInfo channel_copy_info, CustomSerdezID src_serdez_id,
832 CustomSerdezID dst_serdez_id, ReductionOpID redop_id,
833 size_t total_bytes, const std::vector<size_t> *src_frags,
834 const std::vector<size_t> *dst_frags, XferDesKind *kind_ret = 0,
835 unsigned *bw_ret = 0, unsigned *lat_ret = 0);
839 virtual bool supports_indirection_memory(Memory mem) const;
840
843
844 virtual bool needs_wrapping_iterator() const { return false; }
845
847
848 void print(std::ostream &os) const;
849
850 virtual void enqueue_ready_xd(XferDes *xd) = 0;
851 virtual void wakeup_xd(XferDes *xd) = 0;
852
853 protected:
854 // returns the added path for further modification, but reference is
855 // only valid until the next call to 'add_path'
857 unsigned bandwidth, unsigned latency, unsigned frag_overhead,
858 XferDesKind xd_kind);
860 bool dst_global, unsigned bandwidth, unsigned latency,
861 unsigned frag_overhead, XferDesKind xd_kind);
862 SupportedPath &add_path(Memory::Kind src_kind, bool src_global,
863 span<const Memory> dst_mems, unsigned bandwidth,
864 unsigned latency, unsigned frag_overhead,
865 XferDesKind xd_kind);
866 SupportedPath &add_path(Memory::Kind src_kind, bool src_global, Memory::Kind dst_kind,
867 bool dst_global, unsigned bandwidth, unsigned latency,
868 unsigned frag_overhead, XferDesKind xd_kind);
869 // TODO: allow rdma path to limit by kind?
870 SupportedPath &add_path(bool local_loopback, unsigned bandwidth, unsigned latency,
871 unsigned frag_overhead, XferDesKind xd_kind);
872
873 std::vector<SupportedPath> paths;
874 };
875
876 std::ostream &operator<<(std::ostream &os, const Channel::SupportedPath &p);
877
878 class LocalChannel : public Channel {
879 public:
881
882 virtual XferDes *create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid,
883 const std::vector<XferDesPortInfo> &inputs_info,
884 const std::vector<XferDesPortInfo> &outputs_info,
885 int priority, XferDesRedopInfo redop_info,
886 const void *fill_data, size_t fill_size,
887 size_t fill_total) = 0;
888
890
891 protected:
893 };
894
895 // polymorphic container for info necessary to create a remote channel
897 public:
899
901
902 template <typename S>
903 static RemoteChannelInfo *deserialize_new(S &deserializer);
904 };
905
906 template <typename S>
907 bool serialize(S &serializer, const RemoteChannelInfo &rci);
908
910 public:
911 SimpleRemoteChannelInfo(NodeID _owner, XferDesKind _kind, uintptr_t _remote_ptr,
912 const std::vector<Channel::SupportedPath> &_paths,
913 const std::vector<Memory> &indirect_memories);
914 SimpleRemoteChannelInfo(NodeID _owner, XferDesKind _kind, uintptr_t _remote_ptr,
915 const std::vector<Channel::SupportedPath> &_paths);
916
918
919 template <typename S>
920 bool serialize(S &serializer) const;
921
922 template <typename S>
923 static RemoteChannelInfo *deserialize_new(S &deserializer);
924
925 protected:
927
931
934 uintptr_t remote_ptr;
935 std::vector<Channel::SupportedPath> paths;
936 std::vector<Memory> indirect_memories;
937 };
938
939 class RemoteChannel : public Channel {
940 protected:
942
943 RemoteChannel(uintptr_t _remote_ptr, const std::vector<Memory> &indirect_memories);
944 RemoteChannel(uintptr_t _remote_ptr);
945
946 void shutdown() override;
947
949
950 public:
951 uintptr_t get_remote_ptr() const;
952
954
955 bool supports_redop(ReductionOpID redop_id) const override;
956
957 /*
958 * Submit nr asynchronous requests into the channel instance.
959 * This is supposed to be a non-blocking function call, and
960 * should immediately return the number of requests that are
961 * successfully submitted.
962 */
963 long submit(Request **requests, long nr) override;
964
965 /*
966 *
967 */
968 void pull() override;
969
970 /*
971 * Return the number of slots that are available for
972 * submitting requests
973 */
974 long available() override;
975
976 uint64_t supports_path(ChannelCopyInfo channel_copy_info,
977 CustomSerdezID src_serdez_id, CustomSerdezID dst_serdez_id,
978 ReductionOpID redop_id, size_t total_bytes,
979 const std::vector<size_t> *src_frags,
980 const std::vector<size_t> *dst_frags,
981 XferDesKind *kind_ret = 0, unsigned *bw_ret = 0,
982 unsigned *lat_ret = 0) override;
983
987 bool supports_indirection_memory(Memory mem) const override;
988
989 void enqueue_ready_xd(XferDes *xd) override { assert(0); }
990 void wakeup_xd(XferDes *xd) override { assert(0); }
991
992 protected:
993 mutable RWLock mutex;
994 uintptr_t remote_ptr;
995 std::unordered_set<ReductionOpID> supported_redops;
997 const std::set<Memory> indirect_memories;
998 };
999
1000 template <typename CHANNEL, typename XD>
1002 public:
1003 XDQueue(LocalChannel *_channel, const std::string &_name, bool _ordered);
1004
1005 void enqueue_xd(XD *xd, bool at_front = false);
1006
1007 virtual bool do_work(TimeLimit work_until);
1008
1009 protected:
1010 friend CHANNEL;
1011
1016 };
1017
1018 template <typename CHANNEL, typename XD>
1020 public:
1022 const std::string &_name, int _numa_domain = -1);
1023
1024 virtual void shutdown();
1025
1026 virtual void enqueue_ready_xd(XferDes *xd);
1027 virtual void wakeup_xd(XferDes *xd);
1028
1029 // TODO: remove!
1030 void pull() { assert(0); }
1032 {
1033 assert(0);
1034 return 0;
1035 }
1036 virtual long progress_xd(XferDes *xd, long max_nr)
1037 {
1038 assert(0);
1039 return 0;
1040 }
1041
1042 protected:
1044 };
1045
1046 class MemfillChannel : public SingleXDQChannel<MemfillChannel, MemfillXferDes> {
1047 public:
1049
1050 // multiple concurrent memfills ok
1051 static const bool is_ordered = false;
1052
1054
1055 virtual XferDes *create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid,
1056 const std::vector<XferDesPortInfo> &inputs_info,
1057 const std::vector<XferDesPortInfo> &outputs_info,
1058 int priority, XferDesRedopInfo redop_info,
1059 const void *fill_data, size_t fill_size,
1060 size_t fill_total);
1061
1062 virtual long submit(Request **requests, long nr);
1063
1065 };
1066
1067 class MemreduceChannel : public SingleXDQChannel<MemreduceChannel, MemreduceXferDes> {
1068 public:
1070
1071 // multiple concurrent memreduces ok
1072 static const bool is_ordered = false;
1073
1074 virtual bool supports_redop(ReductionOpID redop_id) const;
1075
1076 virtual XferDes *create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid,
1077 const std::vector<XferDesPortInfo> &inputs_info,
1078 const std::vector<XferDesPortInfo> &outputs_info,
1079 int priority, XferDesRedopInfo redop_info,
1080 const void *fill_data, size_t fill_size,
1081 size_t fill_total);
1082
1083 virtual long submit(Request **requests, long nr);
1084
1086 };
1087
1088 class GASNetChannel : public SingleXDQChannel<GASNetChannel, GASNetXferDes> {
1089 public:
1092
1093 // no more than one GASNet xfer of each type at a time
1094 static const bool is_ordered = true;
1095
1096 virtual XferDes *create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid,
1097 const std::vector<XferDesPortInfo> &inputs_info,
1098 const std::vector<XferDesPortInfo> &outputs_info,
1099 int priority, XferDesRedopInfo redop_info,
1100 const void *fill_data, size_t fill_size,
1101 size_t fill_total);
1102
1103 long submit(Request **requests, long nr);
1104 };
1105
1107 : public SingleXDQChannel<RemoteWriteChannel, RemoteWriteXferDes> {
1108 public:
1111
1112 // multiple concurrent RDMAs ok
1113 static const bool is_ordered = false;
1114
1115 virtual XferDes *create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid,
1116 const std::vector<XferDesPortInfo> &inputs_info,
1117 const std::vector<XferDesPortInfo> &outputs_info,
1118 int priority, XferDesRedopInfo redop_info,
1119 const void *fill_data, size_t fill_size,
1120 size_t fill_total);
1121
1122 long submit(Request **requests, long nr);
1123 };
1124
1125 class TransferOperation;
1129
1130 static void handle_message(NodeID sender, const NotifyXferDesCompleteMessage &args,
1131 const void *data, size_t datalen);
1132
1134 };
1135
1136#if 0 // TODO: DELETE
1137 struct XferDesRemoteWriteMessage {
1138 RemoteWriteRequest *req;
1139 XferDesID next_xd_guid;
1140 int next_port_idx;
1141 unsigned span_size; // 32 bits to fit in packet size
1142 size_t span_start, /*span_size,*/ pre_bytes_total;
1143
1144 static void handle_message(NodeID sender,
1145 const XferDesRemoteWriteMessage &args,
1146 const void *data,
1147 size_t datalen);
1148
1149 static void send_request(NodeID target,
1150 const RemoteAddress& dst_buf,
1151 const void *src_buf, size_t nbytes,
1152 RemoteWriteRequest* req,
1153 XferDesID next_xd_guid,
1154 int next_port_idx,
1155 size_t span_start,
1156 size_t span_size,
1157 size_t pre_bytes_total)
1158 {
1160 src_buf, nbytes,
1161 dst_buf);
1162 amsg->req = req;
1163 amsg->next_xd_guid = next_xd_guid;
1164 amsg->next_port_idx = next_port_idx;
1165 amsg->span_start = span_start;
1166 assert(span_size <= UINT_MAX);
1167 amsg->span_size = (unsigned)span_size;
1168 amsg->pre_bytes_total = pre_bytes_total;
1169 amsg.commit();
1170 }
1171
1172 static void send_request(NodeID target,
1173 const RemoteAddress& dst_buf,
1174 const void *src_buf, size_t nbytes, off_t src_str,
1175 size_t nlines, RemoteWriteRequest* req,
1176 XferDesID next_xd_guid,
1177 int next_port_idx,
1178 size_t span_start,
1179 size_t span_size,
1180 size_t pre_bytes_total)
1181 {
1182 ActiveMessage<XferDesRemoteWriteMessage> amsg(target,
1183 src_buf, nbytes,
1184 nlines, src_str,
1185 dst_buf);
1186 amsg->req = req;
1187 amsg->next_xd_guid = next_xd_guid;
1188 amsg->next_port_idx = next_port_idx;
1189 amsg->span_start = span_start;
1190 assert(span_size <= UINT_MAX);
1191 amsg->span_size = (unsigned)span_size;
1192 amsg->pre_bytes_total = pre_bytes_total;
1193 amsg.commit();
1194 }
1195 };
1196
1197 struct XferDesRemoteWriteAckMessage {
1198 RemoteWriteRequest* req;
1199
1200 static void handle_message(NodeID sender,
1201 const XferDesRemoteWriteAckMessage &args,
1202 const void *data,
1203 size_t datalen);
1204 static void send_request(NodeID target, RemoteWriteRequest* req)
1205 {
1206 ActiveMessage<XferDesRemoteWriteAckMessage> amsg(target);
1207 amsg->req = req;
1208 amsg.commit();
1209 }
1210 };
1211#endif
1212
1215 static void handle_message(NodeID sender, const XferDesDestroyMessage &args,
1216 const void *data, size_t datalen);
1217 static void send_request(NodeID target, XferDesID guid)
1218 {
1220 amsg->guid = guid;
1221 amsg.commit();
1222 }
1223 };
1224
1229
1230 static void handle_message(NodeID sender, const UpdateBytesTotalMessage &args,
1231 const void *data, size_t datalen);
1232 };
1233
1238
1239 static void handle_message(NodeID sender, const UpdateBytesWriteMessage &args,
1240 const void *data, size_t datalen);
1241
1242 static void send_request(NodeID target, XferDesID guid, int port_idx,
1243 size_t span_start, size_t span_size)
1244 {
1246 amsg->guid = guid;
1247 amsg->port_idx = port_idx;
1248 amsg->span_start = span_start;
1249 amsg->span_size = span_size;
1250 amsg.commit();
1251 }
1252 };
1253
1258
1259 static void handle_message(NodeID sender, const UpdateBytesReadMessage &args,
1260 const void *data, size_t datalen);
1261
1262 static void send_request(NodeID target, XferDesID guid, int port_idx,
1263 size_t span_start, size_t span_size)
1264 {
1266 amsg->guid = guid;
1267 amsg->port_idx = port_idx;
1268 amsg->span_start = span_start;
1269 amsg->span_size = span_size;
1270 amsg.commit();
1271 }
1272 };
1273
1274 // object used to hold input progress (pre_write and bytes_total) before
1275 // we've actually created the correct xd
1277 public:
1279
1280 protected:
1282
1283 public:
1286
1287 void update_pre_bytes_write(int port_idx, size_t span_start, size_t span_size);
1288 void update_pre_bytes_total(int port_idx, size_t pre_bytes_total);
1289
1291
1293
1295
1296 protected:
1297 static const int INLINE_PORTS = 4;
1304 std::map<int, size_t> extra_bytes_total;
1305 std::map<int, SequenceAssembler> extra_pre_write;
1306 };
1307
1309 public:
1310 enum
1311 {
1313 INDEX_BITS = 32
1316 //: core_rsrv("DMA request queue", crs, CoreReservationParameters())
1317 {
1318 // reserve the first several guid
1320 }
1321
1322 ~XferDesQueue() { assert(guid_to_xd.empty()); }
1323
1325
1326 XferDesID get_guid(NodeID execution_node)
1327 {
1328 // GUID rules:
1329 // First NODE_BITS indicates which node will execute this xd
1330 // Next NODE_BITS indicates on which node this xd is generated
1331 // Last INDEX_BITS means a unique idx, which is used to resolve conflicts
1333 return (((XferDesID)execution_node << (NODE_BITS + INDEX_BITS)) |
1335 }
1336
1337 void update_pre_bytes_write(XferDesID xd_guid, int port_idx, size_t span_start,
1338 size_t span_size);
1339 void update_pre_bytes_total(XferDesID xd_guid, int port_idx, size_t pre_bytes_total);
1340 void update_next_bytes_read(XferDesID xd_guid, int port_idx, size_t span_start,
1341 size_t span_size);
1342
1344
1345 // returns true if xd is ready, false if enqueue has been deferred
1346 bool enqueue_xferDes_local(XferDes *xd, bool add_to_queue = true);
1347
1348 protected:
1349 // guid_to_xd maps a guid to either an XferDes * (as a uintptr_t) or
1350 // a XferDesPlaceholder * (as a uintptr_t with the LSB set)
1352 std::map<XferDesID, uintptr_t> guid_to_xd;
1353
1356 };
1357
1359}; // namespace Realm
1360
1361#include "realm/transfer/channel.inl"
1362
1363#endif
#define REALM_XFERDES_KINDS(__op__)
Definition channel.h:60
Definition activemsg.h:65
Definition address_list.h:100
Definition address_list.h:55
Definition mutex.h:384
Definition bgwork.h:129
Definition bgwork.h:36
Definition channel.h:713
virtual long progress_xd(XferDes *xd, long max_nr)
virtual Memory suggest_ib_memories_for_node(NodeID node) const
SupportedPath & add_path(span< const Memory > src_mems, span< const Memory > dst_mems, unsigned bandwidth, unsigned latency, unsigned frag_overhead, XferDesKind xd_kind)
virtual void shutdown()
Definition channel.h:722
virtual bool support_idindexed_fields(Memory src_mem, Memory dst_mem) const
Definition channel.h:742
NodeID node
Definition channel.h:730
SupportedPath & add_path(bool local_loopback, unsigned bandwidth, unsigned latency, unsigned frag_overhead, XferDesKind xd_kind)
SupportedPath & add_path(Memory::Kind src_kind, bool src_global, Memory::Kind dst_kind, bool dst_global, unsigned bandwidth, unsigned latency, unsigned frag_overhead, XferDesKind xd_kind)
SupportedPath & add_path(span< const Memory > src_mems, Memory::Kind dst_kind, bool dst_global, unsigned bandwidth, unsigned latency, unsigned frag_overhead, XferDesKind xd_kind)
virtual bool needs_wrapping_iterator() const
Definition channel.h:844
bool has_non_redop_path
Definition channel.h:738
virtual void enqueue_ready_xd(XferDes *xd)=0
virtual bool supports_indirection_memory(Memory mem) const
Queries if a given mem can be used as an indirection buffer.
const std::vector< SupportedPath > & get_paths(void) const
void print(std::ostream &os) const
std::vector< SupportedPath > paths
Definition channel.h:873
virtual uint64_t supports_path(ChannelCopyInfo channel_copy_info, CustomSerdezID src_serdez_id, CustomSerdezID dst_serdez_id, ReductionOpID redop_id, size_t total_bytes, const std::vector< size_t > *src_frags, const std::vector< size_t > *dst_frags, XferDesKind *kind_ret=0, unsigned *bw_ret=0, unsigned *lat_ret=0)
virtual ~Channel()
Definition channel.h:719
virtual void pull()=0
SupportedPath & add_path(Memory::Kind src_kind, bool src_global, span< const Memory > dst_mems, unsigned bandwidth, unsigned latency, unsigned frag_overhead, XferDesKind xd_kind)
XferDesKind kind
Definition channel.h:732
virtual XferDesFactory * get_factory()=0
virtual long submit(Request **requests, long nr)=0
virtual RemoteChannelInfo * construct_remote_info() const
Channel(XferDesKind _kind)
Definition channel.h:715
virtual Memory suggest_ib_memories() const
bool has_redop_path
Definition channel.h:735
virtual void wakeup_xd(XferDes *xd)=0
virtual bool supports_redop(ReductionOpID redop_id) const
virtual long available()=0
void update_channel_state(void)
Definition channel.h:271
unsigned short port_shift
Definition channel.h:282
size_t temp_count
Definition channel.h:281
bool decode(unsigned data, size_t &count, int &port, bool &last)
Definition channel.h:244
void set_port_count(size_t ports)
State
Definition channel.h:260
@ STATE_IDLE
Definition channel.h:263
@ STATE_SENT_HIGH
Definition channel.h:264
@ STATE_SENT_MID
Definition channel.h:265
@ STATE_INIT
Definition channel.h:261
@ STATE_DONE
Definition channel.h:266
@ STATE_HAVE_PORT_COUNT
Definition channel.h:262
unsigned char state
Definition channel.h:268
bool encode(unsigned &data, size_t count, int port, bool last)
unsigned short port_shift
Definition channel.h:257
Definition custom_serdez.h:150
Definition threads.h:428
Definition event_impl.h:49
Definition event.h:50
Definition channel.h:1088
GASNetChannel(BackgroundWorkManager *bgwork, XferDesKind _kind)
static const bool is_ordered
Definition channel.h:1094
long submit(Request **requests, long nr)
virtual XferDes * create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)
Definition channel.h:176
void * mem_base
Definition channel.h:178
off_t gas_off
Definition channel.h:179
Definition channel.h:576
void notify_request_read_done(Request *req)
bool progress_xd(GASNetChannel *channel, TimeLimit work_until)
long get_requests(Request **requests, long nr)
GASNetXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int _priority)
void notify_request_write_done(Request *req)
~GASNetXferDes()
Definition channel.h:582
static const int MEMORY_INDEX_WIDTH
Definition id.h:58
Definition inst_layout.h:267
Definition lists.h:119
Definition channel.h:878
virtual XferDes * create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)=0
SimpleXferDesFactory factory_singleton
Definition channel.h:892
LocalChannel(XferDesKind _kind)
virtual XferDesFactory * get_factory()
Definition memcpy_channel.h:54
Definition channel.h:169
const void * src_base
Definition channel.h:171
void * dst_base
Definition channel.h:172
Definition channel.h:1046
bool is_stopped
Definition channel.h:1064
virtual XferDes * create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)
static const bool is_ordered
Definition channel.h:1051
virtual long submit(Request **requests, long nr)
MemfillChannel(BackgroundWorkManager *bgwork)
Definition channel.h:540
MemfillXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int _priority, const void *_fill_data, size_t _fill_size, size_t _fill_total)
virtual Request * dequeue_request()
bool progress_xd(MemfillChannel *channel, TimeLimit work_until)
long get_requests(Request **requests, long nr)
virtual bool request_available()
virtual void enqueue_request(Request *req)
Definition mem_impl.h:50
Definition memory.h:33
Kind
Definition memory.h:59
static const Memory NO_MEMORY
Definition memory.h:49
Definition channel.h:1067
virtual bool supports_redop(ReductionOpID redop_id) const
static const bool is_ordered
Definition channel.h:1072
MemreduceChannel(BackgroundWorkManager *bgwork)
bool is_stopped
Definition channel.h:1085
virtual long submit(Request **requests, long nr)
virtual XferDes * create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)
Definition channel.h:558
MemreduceXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int _priority, XferDesRedopInfo _redop_info)
long get_requests(Request **requests, long nr)
const ReductionOpUntyped * redop
Definition channel.h:571
bool progress_xd(MemreduceChannel *channel, TimeLimit work_until)
XferDesRedopInfo redop_info
Definition channel.h:570
Definition mutex.h:398
Definition instance.h:66
Definition channel.h:896
virtual RemoteChannel * create_remote_channel()=0
virtual ~RemoteChannelInfo()
Definition channel.h:898
static RemoteChannelInfo * deserialize_new(S &deserializer)
Definition channel.h:939
XferDesFactory * get_factory() override
long available() override
uint64_t supports_path(ChannelCopyInfo channel_copy_info, CustomSerdezID src_serdez_id, CustomSerdezID dst_serdez_id, ReductionOpID redop_id, size_t total_bytes, const std::vector< size_t > *src_frags, const std::vector< size_t > *dst_frags, XferDesKind *kind_ret=0, unsigned *bw_ret=0, unsigned *lat_ret=0) override
uintptr_t get_remote_ptr() const
SimpleXferDesFactory factory_singleton
Definition channel.h:996
const std::set< Memory > indirect_memories
Definition channel.h:997
long submit(Request **requests, long nr) override
std::unordered_set< ReductionOpID > supported_redops
Definition channel.h:995
RemoteChannel(uintptr_t _remote_ptr)
RWLock mutex
Definition channel.h:993
void register_redop(ReductionOpID redop_id)
void enqueue_ready_xd(XferDes *xd) override
Definition channel.h:989
void wakeup_xd(XferDes *xd) override
Definition channel.h:990
void pull() override
bool supports_indirection_memory(Memory mem) const override
Queries if a given mem can be used as an indirection buffer.
bool supports_redop(ReductionOpID redop_id) const override
uintptr_t remote_ptr
Definition channel.h:994
void shutdown() override
RemoteChannel(uintptr_t _remote_ptr, const std::vector< Memory > &indirect_memories)
Definition channel.h:1107
long submit(Request **requests, long nr)
static const bool is_ordered
Definition channel.h:1113
virtual XferDes * create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)
RemoteWriteChannel(BackgroundWorkManager *bgwork)
Definition channel.h:184
const void * src_base
Definition channel.h:187
Definition channel.h:597
virtual void update_bytes_write(int port_idx, size_t offset, size_t size)
~RemoteWriteXferDes()
Definition channel.h:603
RemoteWriteXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int _priority)
void notify_request_read_done(Request *req)
bool progress_xd(RemoteWriteChannel *channel, TimeLimit work_until)
long get_requests(Request **requests, long nr)
void notify_request_write_done(Request *req)
Definition channel.h:103
Dimension dim
Definition channel.h:128
off_t src_str
Definition channel.h:118
off_t dst_pstr
Definition channel.h:120
size_t write_seq_count
Definition channel.h:131
int src_port_idx
Definition channel.h:114
off_t src_off
Definition channel.h:116
off_t dst_off
Definition channel.h:116
off_t src_pstr
Definition channel.h:120
off_t dst_str
Definition channel.h:118
size_t nlines
Definition channel.h:122
Dimension
Definition channel.h:106
@ DIM_3D
Definition channel.h:109
@ DIM_1D
Definition channel.h:107
@ DIM_2D
Definition channel.h:108
bool is_write_done
Definition channel.h:126
size_t write_seq_pos
Definition channel.h:131
size_t nbytes
Definition channel.h:122
XferDes * xd
Definition channel.h:113
size_t read_seq_pos
Definition channel.h:130
int dst_port_idx
Definition channel.h:114
bool is_read_done
Definition channel.h:124
size_t nplanes
Definition channel.h:122
size_t read_seq_count
Definition channel.h:130
Definition channel.h:134
atomic< Mutex * > mutex
Definition channel.h:165
atomic< size_t > contig_amount_x2
Definition channel.h:162
std::map< size_t, size_t > spans
Definition channel.h:166
atomic< size_t > first_noncontig
Definition channel.h:164
size_t add_span(size_t pos, size_t count)
size_t span_exists(size_t start, size_t count)
SequenceAssembler(const SequenceAssembler &copy_from)
void swap(SequenceAssembler &other)
Definition channel.h:909
static Serialization::PolymorphicSerdezSubclass< RemoteChannelInfo, SimpleRemoteChannelInfo > serdez_subclass
Definition channel.h:930
std::vector< Channel::SupportedPath > paths
Definition channel.h:935
SimpleRemoteChannelInfo(NodeID _owner, XferDesKind _kind, uintptr_t _remote_ptr, const std::vector< Channel::SupportedPath > &_paths)
static RemoteChannelInfo * deserialize_new(S &deserializer)
uintptr_t remote_ptr
Definition channel.h:934
std::vector< Memory > indirect_memories
Definition channel.h:936
SimpleRemoteChannelInfo(NodeID _owner, XferDesKind _kind, uintptr_t _remote_ptr, const std::vector< Channel::SupportedPath > &_paths, const std::vector< Memory > &indirect_memories)
virtual RemoteChannel * create_remote_channel()
NodeID owner
Definition channel.h:932
bool serialize(S &serializer) const
XferDesKind kind
Definition channel.h:933
Definition channel.h:662
virtual void create_xfer_des(uintptr_t dma_op, NodeID launch_node, NodeID target_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)
SimpleXferDesFactory(uintptr_t _channel)
virtual bool needs_release()
uintptr_t channel
Definition channel.h:677
Definition channel.h:1019
virtual void shutdown()
void pull()
Definition channel.h:1030
SingleXDQChannel(BackgroundWorkManager *bgwork, XferDesKind _kind, const std::string &_name, int _numa_domain=-1)
virtual void wakeup_xd(XferDes *xd)
virtual long progress_xd(XferDes *xd, long max_nr)
Definition channel.h:1036
XDQueue< CHANNEL, XD > xdq
Definition channel.h:1043
long available()
Definition channel.h:1031
virtual void enqueue_ready_xd(XferDes *xd)
Definition timers.h:129
Definition transfer.h:41
Definition transfer.h:658
Definition mutex.h:223
Definition channel.h:1001
XDQueue(LocalChannel *_channel, const std::string &_name, bool _ordered)
Mutex mutex
Definition channel.h:1014
XferDes::XferDesList ready_xds
Definition channel.h:1015
void enqueue_xd(XD *xd, bool at_front=false)
LocalChannel * channel
Definition channel.h:1012
virtual bool do_work(TimeLimit work_until)
friend CHANNEL
Definition channel.h:1010
bool ordered_mode
Definition channel.h:1013
bool in_ordered_worker
Definition channel.h:1013
Definition channel.h:633
virtual ~XferDesFactory()
Definition channel.h:635
virtual bool needs_release()=0
virtual void create_xfer_des(uintptr_t dma_op, NodeID launch_node, NodeID target_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)=0
Definition channel.h:1276
XferDes * xd
Definition channel.h:1299
SequenceAssembler inline_pre_write[INLINE_PORTS]
Definition channel.h:1302
std::map< int, size_t > extra_bytes_total
Definition channel.h:1304
void add_update_pre_bytes_total_received(void)
void update_pre_bytes_write(int port_idx, size_t span_start, size_t span_size)
unsigned get_update_pre_bytes_total_received(void)
Mutex extra_mutex
Definition channel.h:1303
atomic< unsigned > refcount
Definition channel.h:1298
static const int INLINE_PORTS
Definition channel.h:1297
std::map< int, SequenceAssembler > extra_pre_write
Definition channel.h:1305
atomic< unsigned > nb_update_pre_bytes_total_calls_received
Definition channel.h:1300
void update_pre_bytes_total(int port_idx, size_t pre_bytes_total)
size_t inline_bytes_total[INLINE_PORTS]
Definition channel.h:1301
void set_real_xd(XferDes *_xd)
Definition channel.h:1308
atomic< XferDesID > next_to_assign_idx
Definition channel.h:1355
void update_next_bytes_read(XferDesID xd_guid, int port_idx, size_t span_start, size_t span_size)
static XferDesQueue * get_singleton()
bool enqueue_xferDes_local(XferDes *xd, bool add_to_queue=true)
Mutex queues_lock
Definition channel.h:1354
void update_pre_bytes_total(XferDesID xd_guid, int port_idx, size_t pre_bytes_total)
void destroy_xferDes(XferDesID guid)
void update_pre_bytes_write(XferDesID xd_guid, int port_idx, size_t span_start, size_t span_size)
Mutex guid_lock
Definition channel.h:1351
~XferDesQueue()
Definition channel.h:1322
@ NODE_BITS
Definition channel.h:1312
@ INDEX_BITS
Definition channel.h:1313
std::map< XferDesID, uintptr_t > guid_to_xd
Definition channel.h:1352
XferDesQueue()
Definition channel.h:1315
XferDesID get_guid(NodeID execution_node)
Definition channel.h:1326
Definition channel.h:473
XferDes * xd
Definition channel.h:483
virtual void event_triggered(bool poisoned, TimeLimit work_until)
XferDesQueue * xferDes_queue
Definition channel.h:482
virtual Event get_finish_event(void) const
virtual void print(std::ostream &os) const
void defer(XferDesQueue *_xferDes_queue, XferDes *_xd, Event wait_on)
Definition channel.h:490
size_t sizes[MAX_ENTRIES]
Definition channel.h:503
void add_span(int port_idx, size_t offset, size_t size)
int ports[MAX_ENTRIES]
Definition channel.h:501
static const size_t MAX_ENTRIES
Definition channel.h:498
size_t offsets[MAX_ENTRIES]
Definition channel.h:502
XferDes * xd
Definition channel.h:500
SequenceCache(XferDes *_xd, size_t _flush_bytes=0)
size_t flush_bytes
Definition channel.h:504
size_t total_bytes
Definition channel.h:504
Definition channel.h:286
REALM_PMTA_DEFN(XferDes, IntrusivePriorityListLink< XferDes >, xd_link)
Mutex update_write_lock
Definition channel.h:358
Channel * channel
Definition channel.h:343
Mutex available_req_mutex
Definition channel.h:384
virtual Request * dequeue_request()
Definition channel.h:445
atomic< int64_t > bytes_write_pending
Definition channel.h:295
REALM_PMTA_DEFN(XferDes, int, priority)
virtual void update_bytes_read(int port_idx, size_t offset, size_t size)
SequenceCache<&XferDes::update_bytes_write > WriteSequenceCache
Definition channel.h:507
void update_progress(void)
std::queue< Request * > available_reqs
Definition channel.h:385
void add_reference(void)
uint64_t current_out_port_remain
Definition channel.h:299
void default_notify_request_read_done(Request *req)
long default_get_requests(Request **requests, long nr, unsigned flags=0)
void add_update_pre_bytes_total_received(void)
size_t update_control_info(ReadSequenceCache *rseqcache)
atomic< unsigned > progress_counter
Definition channel.h:365
IntrusivePriorityListLink< XferDes > xd_link
Definition channel.h:374
atomic< bool > iteration_completed
Definition channel.h:294
AlignedStorage inline_fill_storage
Definition channel.h:353
void * fill_data
Definition channel.h:345
virtual void notify_request_write_done(Request *req)
virtual ~XferDes()
uint64_t max_req_size
Definition channel.h:330
SequenceCache<&XferDes::update_bytes_read > ReadSequenceCache
Definition channel.h:506
size_t fill_size
Definition channel.h:346
std::vector< XferPort > output_ports
Definition channel.h:320
static const size_t ALIGNED_FILL_STORAGE_SIZE
Definition channel.h:348
void update_next_bytes_read(int port_idx, size_t offset, size_t size)
virtual bool request_available()
Definition channel.h:439
ControlPortState output_control
Definition channel.h:328
NodeID launch_node
Definition channel.h:292
bool record_address_consumption(size_t total_read_bytes, size_t total_write_bytes)
virtual Event request_metadata()
void update_pre_bytes_total(int port_idx, size_t pre_bytes_total)
bool check_for_progress(unsigned last_counter)
REALM_ALIGNED_TYPE_CONST(AlignedStorage, UnalignedStorage, 16)
unsigned current_progress(void)
ControlPortState input_control
Definition channel.h:328
Mutex xd_lock
Definition channel.h:358
uint64_t current_in_port_remain
Definition channel.h:299
Mutex update_read_lock
Definition channel.h:358
atomic< bool > transfer_completed
Definition channel.h:296
size_t get_addresses(size_t min_xfer_size, ReadSequenceCache *rseqcache)
XferDesQueue * xferDes_queue
Definition channel.h:290
int priority
Definition channel.h:332
void begin_completion()
XferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int _priority, const void *_fill_data, size_t fill_size)
void replicate_fill_data(size_t new_size)
DeferredXDEnqueue deferred_enqueue
Definition channel.h:485
virtual void update_bytes_write(int port_idx, size_t offset, size_t size)
virtual long get_requests(Request **requests, long nr)=0
atomic< unsigned > reference_count
Definition channel.h:367
uint64_t current_out_port_mask
Definition channel.h:298
size_t get_addresses(size_t min_xfer_size, ReadSequenceCache *rseqcache, const InstanceLayoutPieceBase *&in_nonaffine, const InstanceLayoutPieceBase *&out_nonaffine)
@ XFERDES_NO_GUID
Definition channel.h:340
virtual void notify_request_read_done(Request *req)
void default_notify_request_write_done(Request *req)
atomic< unsigned > nb_update_pre_bytes_total_calls_received
Definition channel.h:371
virtual void flush()
IntrusivePriorityList< XferDes, int, REALM_PMTA_USE(XferDes, xd_link), REALM_PMTA_USE(XferDes, priority), DummyLock > XferDesList
Definition channel.h:379
void update_pre_bytes_write(int port_idx, size_t offset, size_t size)
uint64_t current_in_port_mask
Definition channel.h:298
void mark_completed()
XferDesKind kind
Definition channel.h:337
unsigned nb_update_pre_bytes_total_calls_expected
Definition channel.h:369
XferDesID guid
Definition channel.h:335
void remove_reference(void)
uintptr_t dma_op
Definition channel.h:289
virtual void enqueue_request(Request *req)
Definition channel.h:460
size_t orig_fill_size
Definition channel.h:346
std::vector< XferPort > input_ports
Definition channel.h:320
Definition atomics.h:31
T load(void) const
void store(T newval)
T fetch_add(T to_add)
Definition utils.h:84
#define REALM_INTERNAL_API_EXTERNAL_LINKAGE
Definition compiler_support.h:218
#define REALM_PMTA_USE(structtype, name)
Definition lists.h:42
#define STRING_KIND_CASE(kind, desc)
#define C_ENUMS(name, desc)
Definition memory.h:60
Definition activemsg.h:38
Logger log_new_dma
int NodeID
Definition nodeset.h:40
XferDesKind
Definition channel.h:85
int CustomSerdezID
Definition custom_serdez.h:148
bool serialize(S &serdez, const ByteArrayRef &a)
void destroy_xfer_des(XferDesID _guid)
REALM_CUDA_HD bool operator==(const Point< N, T > &lhs, const Point< N, T2 > &rhs)
unsigned long long XferDesID
Definition channel.h:57
::realm_reduction_op_id_t ReductionOpID
Definition event.h:38
std::ostream & operator<<(std::ostream &os, const DenseRectangleList< N, T > &drl)
Definition channel.h:684
bool oor_possible
Definition channel.h:706
Memory ind_mem
Definition channel.h:701
ChannelCopyInfo(Memory _src_mem, Memory _dst_mem, Memory _ind_mem=Memory::NO_MEMORY, size_t _num_spaces=1, bool _is_scatter=false, bool _is_ranges=false, bool _is_direct=true, bool _oor_possible=false, size_t _addr_size=0)
Definition channel.h:685
size_t addr_size
Definition channel.h:707
size_t num_spaces
Definition channel.h:702
bool is_scatter
Definition channel.h:703
Memory src_mem
Definition channel.h:699
bool is_ranges
Definition channel.h:704
Memory dst_mem
Definition channel.h:700
bool is_direct
Definition channel.h:705
uint64_t mems[BITMASK_SIZE]
Definition channel.h:783
static const int BITMASK_SIZE
Definition channel.h:782
uint64_t ib_mems[BITMASK_SIZE]
Definition channel.h:783
NodeID node
Definition channel.h:781
Definition channel.h:769
SrcDstType src_type
Definition channel.h:779
MemBitmask src_bitmask
Definition channel.h:788
bool serdez_allowed
Definition channel.h:801
SupportedPath & set_max_dim(int src_dim, int dst_dim)
unsigned latency
Definition channel.h:797
SupportedPath * chain
Definition channel.h:817
MemBitmask dst_bitmask
Definition channel.h:793
SrcDstType
Definition channel.h:771
@ LOCAL_RDMA
Definition channel.h:775
@ LOCAL_KIND
Definition channel.h:773
@ SPECIFIC_MEMORY
Definition channel.h:772
@ MEMORY_BITMASK
Definition channel.h:777
@ GLOBAL_KIND
Definition channel.h:774
@ REMOTE_RDMA
Definition channel.h:776
unsigned frag_overhead
Definition channel.h:798
Memory src_mem
Definition channel.h:786
SrcDstType dst_type
Definition channel.h:779
unsigned char max_dst_dim
Definition channel.h:799
void populate_memory_bitmask(span< const Memory > mems, NodeID node, MemBitmask &bitmask)
Memory::Kind src_kind
Definition channel.h:787
Memory dst_mem
Definition channel.h:791
SupportedPath & set_max_dim(int src_and_dst_dim)
SupportedPath & allow_redops()
unsigned bandwidth
Definition channel.h:796
Memory::Kind dst_kind
Definition channel.h:792
unsigned char max_src_dim
Definition channel.h:799
bool redops_allowed
Definition channel.h:800
SupportedPath & allow_serdez()
XferDesKind xd_kind
Definition channel.h:795
SupportedPath()
Definition channel.h:804
Definition channel.h:1126
TransferOperation * op
Definition channel.h:1127
static void handle_message(NodeID sender, const NotifyXferDesCompleteMessage &args, const void *data, size_t datalen)
XferDesID xd_id
Definition channel.h:1128
static void send_request(NodeID target, TransferOperation *op, XferDesID xd_id)
Definition redop.h:56
Definition network.h:46
static void handle_message(NodeID sender, const Write1DMessage &args, const void *data, size_t datalen)
int next_port_idx
Definition channel.h:619
XferDesID next_xd_guid
Definition channel.h:618
size_t span_start
Definition channel.h:620
static bool handle_inline(NodeID sender, const Write1DMessage &args, const void *data, size_t datalen, TimeLimit work_until)
Definition channel.h:655
static void handle_message(NodeID sender, const SimpleXferDesCreateMessage &args, const void *data, size_t datalen)
Definition channel.h:1254
int port_idx
Definition channel.h:1256
size_t span_size
Definition channel.h:1257
size_t span_start
Definition channel.h:1257
static void handle_message(NodeID sender, const UpdateBytesReadMessage &args, const void *data, size_t datalen)
XferDesID guid
Definition channel.h:1255
static void send_request(NodeID target, XferDesID guid, int port_idx, size_t span_start, size_t span_size)
Definition channel.h:1262
Definition channel.h:1225
size_t pre_bytes_total
Definition channel.h:1228
static void handle_message(NodeID sender, const UpdateBytesTotalMessage &args, const void *data, size_t datalen)
XferDesID guid
Definition channel.h:1226
int port_idx
Definition channel.h:1227
Definition channel.h:1234
size_t span_start
Definition channel.h:1237
int port_idx
Definition channel.h:1236
static void handle_message(NodeID sender, const UpdateBytesWriteMessage &args, const void *data, size_t datalen)
XferDesID guid
Definition channel.h:1235
static void send_request(NodeID target, XferDesID guid, int port_idx, size_t span_start, size_t span_size)
Definition channel.h:1242
size_t span_size
Definition channel.h:1237
Definition channel.h:647
uintptr_t channel
Definition channel.h:652
uintptr_t dma_op
Definition channel.h:649
XferDesID guid
Definition channel.h:650
NodeID launch_node
Definition channel.h:651
Definition channel.h:1213
static void send_request(NodeID target, XferDesID guid)
Definition channel.h:1217
static void handle_message(NodeID sender, const XferDesDestroyMessage &args, const void *data, size_t datalen)
XferDesID guid
Definition channel.h:1214
Definition channel.h:192
Memory mem
Definition channel.h:203
XferDesID peer_guid
Definition channel.h:200
CustomSerdezID serdez_id
Definition channel.h:207
RegionInstance inst
Definition channel.h:204
size_t ib_size
Definition channel.h:205
int port_type
Definition channel.h:199
@ DATA_PORT
Definition channel.h:195
@ SCATTER_CONTROL_PORT
Definition channel.h:197
@ GATHER_CONTROL_PORT
Definition channel.h:196
int indirect_port_idx
Definition channel.h:202
int peer_port_idx
Definition channel.h:201
TransferIterator * iter
Definition channel.h:206
size_t ib_offset
Definition channel.h:205
Definition channel.h:210
XferDesRedopInfo()
Definition channel.h:217
ReductionOpID id
Definition channel.h:211
XferDesRedopInfo(ReductionOpID _id, bool _is_fold, bool _in_place, bool _is_exclusive)
Definition channel.h:224
bool is_exclusive
Definition channel.h:214
bool is_fold
Definition channel.h:212
bool in_place
Definition channel.h:213
Definition channel.h:321
bool eos_received
Definition channel.h:326
size_t remaining_count
Definition channel.h:325
int current_io_port
Definition channel.h:324
ControlPort::Decoder decoder
Definition channel.h:322
int control_port_idx
Definition channel.h:323
Definition channel.h:349
char data[ALIGNED_FILL_STORAGE_SIZE]
Definition channel.h:350
Definition channel.h:300
AddressListCursor addrcursor
Definition channel.h:318
const CustomSerdezUntyped * serdez_op
Definition channel.h:303
size_t local_bytes_total
Definition channel.h:309
int peer_port_idx
Definition channel.h:305
bool is_indirect_port
Definition channel.h:307
atomic< size_t > local_bytes_cons
Definition channel.h:310
size_t ib_offset
Definition channel.h:316
TransferIterator * iter
Definition channel.h:302
atomic< bool > needs_pbt_update
Definition channel.h:308
atomic< size_t > remote_bytes_total
Definition channel.h:310
AddressList addrlist
Definition channel.h:317
size_t ib_size
Definition channel.h:316
MemoryImpl * mem
Definition channel.h:301
int indirect_port_idx
Definition channel.h:306
SequenceAssembler seq_local
Definition channel.h:311
XferDesID peer_guid
Definition channel.h:304
SequenceAssembler seq_remote
Definition channel.h:311
Memory ib_mem
Definition channel.h:315