Realm
A distributed, event-based tasking library
Loading...
Searching...
No Matches
channel.h
Go to the documentation of this file.
1/*
2 * Copyright 2025 Los Alamos National Laboratory, Stanford University, NVIDIA Corporation
3 * SPDX-License-Identifier: Apache-2.0
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#ifndef LOWLEVEL_CHANNEL
19#define LOWLEVEL_CHANNEL
20
21#include "realm/realm_config.h"
22
23#include <stdio.h>
24#include <stdlib.h>
25#include <stdint.h>
26#ifndef REALM_ON_WINDOWS
27#include <unistd.h>
28#include <pthread.h>
29#endif
30#include <fcntl.h>
31#include <map>
32#include <vector>
33#include <deque>
34#include <queue>
35#include <unordered_set>
36#include <assert.h>
37#include <string.h>
38
39#include "realm/id.h"
40#include "realm/runtime_impl.h"
41#include "realm/mem_impl.h"
42#include "realm/inst_impl.h"
43#include "realm/bgwork.h"
44#include "realm/utils.h"
46
47namespace Realm {
48
49 class XferDes;
50 class XferDesQueue;
51 class Channel;
52 class DmaRequest;
53 class TransferIterator;
54
55 extern Logger log_new_dma;
56
57 typedef unsigned long long XferDesID;
58
59// clang-format off
60#define REALM_XFERDES_KINDS(__op__) \
61 __op__(XFER_NONE) \
62 __op__(XFER_DISK_READ) \
63 __op__(XFER_DISK_WRITE) \
64 __op__(XFER_SSD_READ) \
65 __op__(XFER_SSD_WRITE) \
66 __op__(XFER_GPU_TO_FB) \
67 __op__(XFER_GPU_FROM_FB) \
68 __op__(XFER_GPU_IN_FB) \
69 __op__(XFER_GPU_PEER_FB) \
70 __op__(XFER_MEM_CPY) \
71 __op__(XFER_GASNET_READ) \
72 __op__(XFER_GASNET_WRITE) \
73 __op__(XFER_REMOTE_WRITE) \
74 __op__(XFER_HDF5_READ) \
75 __op__(XFER_HDF5_WRITE) \
76 __op__(XFER_FILE_READ) \
77 __op__(XFER_FILE_WRITE) \
78 __op__(XFER_ADDR_SPLIT) \
79 __op__(XFER_MEM_FILL) \
80 __op__(XFER_GPU_SC_IN_FB) \
81 __op__(XFER_GPU_SC_PEER_FB)
82 // clang-format on
83
85 {
86#define C_ENUMS(name) name,
88#undef C_ENUMS
89 };
90
91 inline std::ostream &operator<<(std::ostream &os, XferDesKind kind)
92 {
93#define STRING_KIND_CASE(kind) \
94 case XferDesKind::kind: \
95 return os << #kind;
96 switch(kind) {
98 }
99#undef STRING_KIND_CASE
100 return os << "UNKNOWN_KIND";
101 }
102
103 class Request {
104 public:
111 // a pointer to the owning xfer descriptor
112 // this should set at Request creation
115 // src/dst offset in the src/dst instance
117 // src/dst (line) strides
119 // src/dst plane strides
121 // number of bytes being transferred
123 // a flag indicating whether this request read has been done
125 // a flag indicating whether this request write has been done
127 // whether I am a 1D or 2D transfer
129 // sequence info - used to update read/write counts, handle reordering
132 };
133
135 public:
139
140 // NOT thread-safe - caller must ensure neither *this nor other is being
141 // modified during this call
143
144 // imports data from this assembler into another (this is thread-safe
145 // on the `other` but assumes no changes being made on `this`)
146 void import(SequenceAssembler &other) const;
147
148 bool empty() const;
149
150 // asks if a span exists - return value is number of bytes from the
151 // start that do
152 size_t span_exists(size_t start, size_t count);
153
154 // returns the amount by which the contiguous range has been increased
155 // (i.e. from [pos, pos+retval) )
156 size_t add_span(size_t pos, size_t count);
157
158 protected:
160
162 contig_amount_x2; // everything from [0, contig_amount) is covered - LSB indicates
163 // potential presence of noncontig spans
164 atomic<size_t> first_noncontig; // nothing in [contig_amount, first_noncontig)
165 atomic<Mutex *> mutex; // created on first use
166 std::map<size_t, size_t> spans; // noncontiguous spans
167 };
168
169 class MemcpyRequest : public Request {
170 public:
171 const void *src_base;
172 void *dst_base;
173 // size_t nbytes;
174 };
175
176 class GASNetRequest : public Request {
177 public:
178 void *mem_base; // could be source or dest
179 off_t gas_off;
180 // off_t src_offset;
181 // size_t nbytes;
182 };
183
185 public:
186 // NodeID dst_node;
187 const void *src_base;
188 // void *dst_base;
189 // size_t nbytes;
190 };
191
209
215
216 // default constructor == no reduction requested
218 : id(0)
219 , is_fold(false)
220 , in_place(false)
221 , is_exclusive(false)
222 {}
223
224 XferDesRedopInfo(ReductionOpID _id, bool _is_fold, bool _in_place, bool _is_exclusive)
225 : id(_id)
226 , is_fold(_is_fold)
227 , in_place(_in_place)
228 , is_exclusive(_is_exclusive)
229 {}
230 };
231
232 // a control port is used to steer inputs/outputs of transfer descriptors -
233 // the information is encoded into 32b packets which may be read/written
234 // at different times due to flow control, so the encoder and decoder
235 // both need to be stateful
236 namespace ControlPort {
237 // apart from the first control word (which carries the space_shift
238 // amount), the bottom two bits of each control word mean:
239 static const unsigned CTRL_LO_MORE = 0; // 00: count.lo, space_index, not last
240 static const unsigned CTRL_LO_LAST = 1; // 01: count.lo, space_index, last
241 static const unsigned CTRL_MID = 2; // 10: count.mid (32 bits)
242 static const unsigned CTRL_HIGH = 3; // 11: count.hi (2+space_shift bits)
243
244 class Encoder {
245 public:
248
249 void set_port_count(size_t ports);
250
251 // encodes some/all of the { count, port, last } packet into the next
252 // 32b - returns true if encoding is complete or false if it should
253 // be called again with the same arguments for another 32b packet
254 bool encode(unsigned &data, size_t count, int port, bool last);
255
256 protected:
257 unsigned short port_shift;
258
268 unsigned char state;
269 };
270
271 class Decoder {
272 public:
275
276 // decodes the next 32b of packed data, returning true if a complete
277 // { count, port, last } has been received
278 bool decode(unsigned data, size_t &count, int &port, bool &last);
279
280 protected:
282 unsigned short port_shift;
283 };
284 }; // namespace ControlPort
285
286 class XferDes {
287 public:
288 // a pointer to the DmaRequest that contains this XferDes
289 uintptr_t dma_op;
291 // ID of the node that launches this XferDes
293 // uint64_t /*bytes_submit, */bytes_read, bytes_write/*, bytes_total*/;
297 // current input and output port mask
320 std::vector<XferPort> input_ports, output_ports;
325 size_t remaining_count; // units of bytes for normal (elements for serialized data?)
327 };
329 // maximum size for a single request
330 uint64_t max_req_size;
331 // priority of the containing XferDes
333 // current, previous and next XferDes in the chain, XFERDES_NO_GUID
334 // means this XferDes is the first/last one.
335 XferDesID guid; //, pre_xd_guid, next_xd_guid;
336 // XferDesKind of the Xfer Descriptor
338 enum
339 {
341 };
342 // channel this XferDes describes
344
347 // for most fills, we can use an inline buffer with good alignment
348 static const size_t ALIGNED_FILL_STORAGE_SIZE = 32;
353 AlignedStorage inline_fill_storage;
354
355 // xd_lock is designed to provide thread-safety for
356 // SIMULTANEOUS invocation to get_requests,
357 // notify_request_read_done, and notify_request_write_done
359 // default iterators provided to generate requests
360 // Layouts::GenericLayoutIterator<DIM>* li;
361 // SJT:what is this for?
362 // unsigned offset_idx;
363 // used to track by upstream/downstream xds so that we can safely
364 // sleep xds that are stalled
366
368
370
372
373 // intrusive list for queued XDs in a channel
380
381 protected:
382 // this will be removed soon
383 // queue that contains all available free requests
385 std::queue<Request *> available_reqs;
386
387 public:
388 XferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid,
389 const std::vector<XferDesPortInfo> &inputs_info,
390 const std::vector<XferDesPortInfo> &outputs_info, int _priority,
391 const void *_fill_data, size_t fill_size);
392
393 // transfer descriptors are reference counted rather than explcitly
394 // deleted
395 void add_reference(void);
397
399
400 protected:
401 virtual ~XferDes();
402
403 public:
405
406 virtual long get_requests(Request **requests, long nr) = 0;
407
409
411
412 virtual void flush();
413
414 long default_get_requests(Request **requests, long nr, unsigned flags = 0);
417
418 virtual void update_bytes_read(int port_idx, size_t offset, size_t size);
419 virtual void update_bytes_write(int port_idx, size_t offset, size_t size);
420 void update_pre_bytes_write(int port_idx, size_t offset, size_t size);
421 void update_pre_bytes_total(int port_idx, size_t pre_bytes_total);
422 void update_next_bytes_read(int port_idx, size_t offset, size_t size);
423
424 // called once iteration is complete, but we need to track in flight
425 // writes, flush byte counts, etc.
427
429
430 unsigned current_progress(void);
431
432 // checks to see if progress has been made since the last read of the
433 // progress counter - atomically marks the xd for wakeup if not
434 bool check_for_progress(unsigned last_counter);
435
436 // updates the progress counter, waking up the xd if needed
437 void update_progress(void);
438
439 virtual bool request_available()
440 {
442 return !available_reqs.empty();
443 }
444
446 {
447 Request *req;
448 {
450 req = available_reqs.front();
451 available_reqs.pop();
452 }
453 req->is_read_done = false;
454 req->is_write_done = false;
455 // by default, an "active" request holds a reference on the xd
457 return req;
458 }
459
460 virtual void enqueue_request(Request *req)
461 {
462 {
464 available_reqs.push(req);
465 }
466 // update progress counter if iteration isn't completed yet - it might
467 // have been waiting for another request object
471 }
472
474 public:
475 void defer(XferDesQueue *_xferDes_queue, XferDes *_xd, Event wait_on);
476
477 virtual void event_triggered(bool poisoned, TimeLimit work_until);
478 virtual void print(std::ostream &os) const;
479 virtual Event get_finish_event(void) const;
480
481 protected:
483 XferDes *xd; // TODO: eliminate this based on a known offset
484 };
486
487 // helper widget to cache spans so that SequenceAssembler updates are as
488 // large as possible
489 template <void (XferDes::*UPDATE)(int port_idx, size_t offset, size_t size)>
491 public:
492 SequenceCache(XferDes *_xd, size_t _flush_bytes = 0);
493
494 void add_span(int port_idx, size_t offset, size_t size);
495 void flush();
496
497 protected:
498 static const size_t MAX_ENTRIES = 4;
499
505 };
508
510
511 // a helper routine for individual XferDes implementations - tries to get
512 // addresses and check flow control for at least 'min_xfer_size' bytes
513 // worth of transfers from a single input to a single output, and returns
514 // the number of bytes that can be transferred before another call to
515 // this method
516 // returns 0 if the transfer is complete OR if there are fewer than the
517 // minimum requested bytes available and there's reason to believe that
518 // trying again later will result in a larger chunk
519 // as a side effect, the input/output control information is updated - the
520 // actual input/output ports involved in the next transfer are stored there
521 size_t get_addresses(size_t min_xfer_size, ReadSequenceCache *rseqcache);
522 size_t get_addresses(size_t min_xfer_size, ReadSequenceCache *rseqcache,
523 const InstanceLayoutPieceBase *&in_nonaffine,
524 const InstanceLayoutPieceBase *&out_nonaffine);
525
526 // after a call to 'get_addresses', this call updates the various data
527 // structures to record that transfers for 'total_{read,write}_bytes' bytes
528 // were at least initiated - return value is whether iteration is complete
529 bool record_address_consumption(size_t total_read_bytes, size_t total_write_bytes);
530
531 // fills can be more efficient if the fill data is replicated into a larger
532 // block
533 void replicate_fill_data(size_t new_size);
534 };
535
536 class MemcpyChannel;
537
538 class MemfillChannel;
539
540 class MemfillXferDes : public XferDes {
541 public:
542 MemfillXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node,
543 XferDesID _guid, const std::vector<XferDesPortInfo> &inputs_info,
544 const std::vector<XferDesPortInfo> &outputs_info, int _priority,
545 const void *_fill_data, size_t _fill_size, size_t _fill_total);
546
547 long get_requests(Request **requests, long nr);
548
549 virtual bool request_available();
551 virtual void enqueue_request(Request *req);
552
554 };
555
556 class MemreduceChannel;
557
558 class MemreduceXferDes : public XferDes {
559 public:
560 MemreduceXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node,
561 XferDesID _guid, const std::vector<XferDesPortInfo> &inputs_info,
562 const std::vector<XferDesPortInfo> &outputs_info, int _priority,
563 XferDesRedopInfo _redop_info);
564
565 long get_requests(Request **requests, long nr);
566
568
569 protected:
572 };
573
574 class GASNetChannel;
575
576 class GASNetXferDes : public XferDes {
577 public:
578 GASNetXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node,
579 XferDesID _guid, const std::vector<XferDesPortInfo> &inputs_info,
580 const std::vector<XferDesPortInfo> &outputs_info, int _priority);
581
582 ~GASNetXferDes() { free(gasnet_reqs); }
583
584 long get_requests(Request **requests, long nr);
587 void flush();
588
590
591 private:
592 GASNetRequest *gasnet_reqs;
593 };
594
595 class RemoteWriteChannel;
596
598 public:
599 RemoteWriteXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node,
600 XferDesID _guid, const std::vector<XferDesPortInfo> &inputs_info,
601 const std::vector<XferDesPortInfo> &outputs_info, int _priority);
602
603 ~RemoteWriteXferDes() { free(requests); }
604
605 long get_requests(Request **requests, long nr);
608 void flush();
609
610 // doesn't do pre_bytes_write updates, since the remote write message
611 // takes care of it with lower latency
612 virtual void update_bytes_write(int port_idx, size_t offset, size_t size);
613
615
616 // writes directly to a contiguous chunk of destination
621
622 static void handle_message(NodeID sender, const Write1DMessage &args,
623 const void *data, size_t datalen);
624 static bool handle_inline(NodeID sender, const Write1DMessage &args,
625 const void *data, size_t datalen, TimeLimit work_until);
626 };
627
628 private:
629 RemoteWriteRequest *requests;
630 // char *dst_buf_base;
631 };
632
634 public:
635 virtual ~XferDesFactory() {}
636 virtual bool needs_release() = 0;
637
638 virtual void create_xfer_des(uintptr_t dma_op, NodeID launch_node, NodeID target_node,
639 XferDesID guid,
640 const std::vector<XferDesPortInfo> &inputs_info,
641 const std::vector<XferDesPortInfo> &outputs_info,
642 int priority, XferDesRedopInfo redop_info,
643 const void *fill_data, size_t fill_size,
644 size_t fill_total) = 0;
645 };
646
648 // RegionInstance inst;
649 uintptr_t dma_op;
652 uintptr_t channel;
653 };
654
656 static void handle_message(NodeID sender, const SimpleXferDesCreateMessage &args,
657 const void *data, size_t datalen);
658 };
659
660 // a simple xfer des factory knows how to create an xfer des with no
661 // extra information on a single channel
663 public:
664 SimpleXferDesFactory(uintptr_t _channel);
665
666 virtual bool needs_release();
667
668 virtual void create_xfer_des(uintptr_t dma_op, NodeID launch_node, NodeID target_node,
669 XferDesID guid,
670 const std::vector<XferDesPortInfo> &inputs_info,
671 const std::vector<XferDesPortInfo> &outputs_info,
672 int priority, XferDesRedopInfo redop_info,
673 const void *fill_data, size_t fill_size,
674 size_t fill_total);
675
676 protected:
677 uintptr_t channel;
678 };
679
680 class RemoteChannelInfo;
681 class RemoteChannel;
682
683 // TODO(apryakhin): Deprecate ChannelCopyInfo
685 ChannelCopyInfo(Memory _src_mem, Memory _dst_mem, Memory _ind_mem = Memory::NO_MEMORY,
686 size_t _num_spaces = 1, bool _is_scatter = false,
687 bool _is_ranges = false, bool _is_direct = true,
688 bool _oor_possible = false, size_t _addr_size = 0)
689 : src_mem(_src_mem)
690 , dst_mem(_dst_mem)
691 , ind_mem(_ind_mem)
692 , num_spaces(_num_spaces)
693 , is_scatter(_is_scatter)
694 , is_ranges(_is_ranges)
695 , is_direct(_is_direct)
696 , oor_possible(_oor_possible)
697 , addr_size(_addr_size)
698 {}
707 size_t addr_size;
708 };
709
710 std::ostream &operator<<(std::ostream &os, const ChannelCopyInfo &info);
711 bool operator==(const ChannelCopyInfo &lhs, const ChannelCopyInfo &rhs);
712
713 class Channel {
714 public:
716 : node(Network::my_node_id)
717 , kind(_kind)
718 {}
719 virtual ~Channel(){};
720
721 // TODO: make pure virtual
722 virtual void shutdown() {}
723
724 // most channels can hand out a factory that makes arbitrary xds on
725 // that channel
727
728 public:
729 // which node manages this channel
731 // the kind of XferDes this channel can accept
733
734 // whether the channel has a redop path
735 bool has_redop_path{false};
736
737 // whether the channel has a non-redop path
739
740 virtual bool supports_redop(ReductionOpID redop_id) const;
741
742 // attempt to make progress on the specified xferdes
743 virtual long progress_xd(XferDes *xd, long max_nr);
744
745 /*
746 * Submit nr asynchronous requests into the channel instance.
747 * This is supposed to be a non-blocking function call, and
748 * should immediately return the number of requests that are
749 * successfully submitted.
750 */
751 virtual long submit(Request **requests, long nr) = 0;
752
753 /*
754 *
755 */
756 virtual void pull() = 0;
757
758 /*
759 * Return the number of slots that are available for
760 * submitting requests
761 */
762 virtual long available() = 0;
763
775 struct MemBitmask {
777 static const int BITMASK_SIZE = (1 << ID::MEMORY_INDEX_WIDTH) >> 6;
779 };
780 union {
784 };
785 union {
789 };
791 unsigned bandwidth; // units = MB/s = B/us
792 unsigned latency; // units = ns
793 unsigned frag_overhead; // units = ns
794 unsigned char max_src_dim, max_dst_dim;
795 bool redops_allowed; // TODO: list of redops?
796 bool serdez_allowed; // TODO: list of serdez ops?
797
798 // constructor
800 : src_mem(Memory::NO_MEMORY)
801 , dst_mem(Memory::NO_MEMORY)
802 {}
803
804 // mutators to modify less-common fields
805 SupportedPath &set_max_dim(int src_and_dst_dim);
806 SupportedPath &set_max_dim(int src_dim, int dst_dim);
809
810 // only valid when a SupportedPath is modifiable by the above methods
811 // (i.e. only on the creator node and only until another path is added)
813
815 MemBitmask &bitmask);
816 };
817
818 const std::vector<SupportedPath> &get_paths(void) const;
819
821
822 // returns 0 if the path is not supported, or a strictly-positive
823 // estimate of the time required (in nanoseconds) to transfer data
824 // along a supported path
825 virtual uint64_t
826 supports_path(ChannelCopyInfo channel_copy_info, CustomSerdezID src_serdez_id,
827 CustomSerdezID dst_serdez_id, ReductionOpID redop_id,
828 size_t total_bytes, const std::vector<size_t> *src_frags,
829 const std::vector<size_t> *dst_frags, XferDesKind *kind_ret = 0,
830 unsigned *bw_ret = 0, unsigned *lat_ret = 0);
834 virtual bool supports_indirection_memory(Memory mem) const;
835
838
839 virtual bool needs_wrapping_iterator() const { return false; }
840
842
843 void print(std::ostream &os) const;
844
845 virtual void enqueue_ready_xd(XferDes *xd) = 0;
846 virtual void wakeup_xd(XferDes *xd) = 0;
847
848 protected:
849 // returns the added path for further modification, but reference is
850 // only valid until the next call to 'add_path'
852 unsigned bandwidth, unsigned latency, unsigned frag_overhead,
853 XferDesKind xd_kind);
855 bool dst_global, unsigned bandwidth, unsigned latency,
856 unsigned frag_overhead, XferDesKind xd_kind);
857 SupportedPath &add_path(Memory::Kind src_kind, bool src_global,
858 span<const Memory> dst_mems, unsigned bandwidth,
859 unsigned latency, unsigned frag_overhead,
860 XferDesKind xd_kind);
861 SupportedPath &add_path(Memory::Kind src_kind, bool src_global, Memory::Kind dst_kind,
862 bool dst_global, unsigned bandwidth, unsigned latency,
863 unsigned frag_overhead, XferDesKind xd_kind);
864 // TODO: allow rdma path to limit by kind?
865 SupportedPath &add_path(bool local_loopback, unsigned bandwidth, unsigned latency,
866 unsigned frag_overhead, XferDesKind xd_kind);
867
868 std::vector<SupportedPath> paths;
869 };
870
871 std::ostream &operator<<(std::ostream &os, const Channel::SupportedPath &p);
872
873 class LocalChannel : public Channel {
874 public:
876
877 virtual XferDes *create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid,
878 const std::vector<XferDesPortInfo> &inputs_info,
879 const std::vector<XferDesPortInfo> &outputs_info,
880 int priority, XferDesRedopInfo redop_info,
881 const void *fill_data, size_t fill_size,
882 size_t fill_total) = 0;
883
885
886 protected:
888 };
889
890 // polymorphic container for info necessary to create a remote channel
892 public:
894
896
897 template <typename S>
898 static RemoteChannelInfo *deserialize_new(S &deserializer);
899 };
900
901 template <typename S>
902 bool serialize(S &serializer, const RemoteChannelInfo &rci);
903
905 public:
906 SimpleRemoteChannelInfo(NodeID _owner, XferDesKind _kind, uintptr_t _remote_ptr,
907 const std::vector<Channel::SupportedPath> &_paths,
908 const std::vector<Memory> &indirect_memories);
909 SimpleRemoteChannelInfo(NodeID _owner, XferDesKind _kind, uintptr_t _remote_ptr,
910 const std::vector<Channel::SupportedPath> &_paths);
911
913
914 template <typename S>
915 bool serialize(S &serializer) const;
916
917 template <typename S>
918 static RemoteChannelInfo *deserialize_new(S &deserializer);
919
920 protected:
922
926
929 uintptr_t remote_ptr;
930 std::vector<Channel::SupportedPath> paths;
931 std::vector<Memory> indirect_memories;
932 };
933
934 class RemoteChannel : public Channel {
935 protected:
937
938 RemoteChannel(uintptr_t _remote_ptr, const std::vector<Memory> &indirect_memories);
939 RemoteChannel(uintptr_t _remote_ptr);
940
941 void shutdown() override;
942
944
945 public:
946 uintptr_t get_remote_ptr() const;
947
949
950 bool supports_redop(ReductionOpID redop_id) const override;
951
952 /*
953 * Submit nr asynchronous requests into the channel instance.
954 * This is supposed to be a non-blocking function call, and
955 * should immediately return the number of requests that are
956 * successfully submitted.
957 */
958 long submit(Request **requests, long nr) override;
959
960 /*
961 *
962 */
963 void pull() override;
964
965 /*
966 * Return the number of slots that are available for
967 * submitting requests
968 */
969 long available() override;
970
971 uint64_t supports_path(ChannelCopyInfo channel_copy_info,
972 CustomSerdezID src_serdez_id, CustomSerdezID dst_serdez_id,
973 ReductionOpID redop_id, size_t total_bytes,
974 const std::vector<size_t> *src_frags,
975 const std::vector<size_t> *dst_frags,
976 XferDesKind *kind_ret = 0, unsigned *bw_ret = 0,
977 unsigned *lat_ret = 0) override;
978
982 bool supports_indirection_memory(Memory mem) const override;
983
984 void enqueue_ready_xd(XferDes *xd) override { assert(0); }
985 void wakeup_xd(XferDes *xd) override { assert(0); }
986
987 protected:
988 mutable RWLock mutex;
989 uintptr_t remote_ptr;
990 std::unordered_set<ReductionOpID> supported_redops;
992 const std::set<Memory> indirect_memories;
993 };
994
995 template <typename CHANNEL, typename XD>
997 public:
998 XDQueue(LocalChannel *_channel, const std::string &_name, bool _ordered);
999
1000 void enqueue_xd(XD *xd, bool at_front = false);
1001
1002 virtual bool do_work(TimeLimit work_until);
1003
1004 protected:
1005 friend CHANNEL;
1006
1011 };
1012
1013 template <typename CHANNEL, typename XD>
1015 public:
1017 const std::string &_name, int _numa_domain = -1);
1018
1019 virtual void shutdown();
1020
1021 virtual void enqueue_ready_xd(XferDes *xd);
1022 virtual void wakeup_xd(XferDes *xd);
1023
1024 // TODO: remove!
1025 void pull() { assert(0); }
1027 {
1028 assert(0);
1029 return 0;
1030 }
1031 virtual long progress_xd(XferDes *xd, long max_nr)
1032 {
1033 assert(0);
1034 return 0;
1035 }
1036
1037 protected:
1039 };
1040
1041 class MemfillChannel : public SingleXDQChannel<MemfillChannel, MemfillXferDes> {
1042 public:
1044
1045 // multiple concurrent memfills ok
1046 static const bool is_ordered = false;
1047
1049
1050 virtual XferDes *create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid,
1051 const std::vector<XferDesPortInfo> &inputs_info,
1052 const std::vector<XferDesPortInfo> &outputs_info,
1053 int priority, XferDesRedopInfo redop_info,
1054 const void *fill_data, size_t fill_size,
1055 size_t fill_total);
1056
1057 virtual long submit(Request **requests, long nr);
1058
1060 };
1061
1062 class MemreduceChannel : public SingleXDQChannel<MemreduceChannel, MemreduceXferDes> {
1063 public:
1065
1066 // multiple concurrent memreduces ok
1067 static const bool is_ordered = false;
1068
1069 virtual bool supports_redop(ReductionOpID redop_id) const;
1070
1071 virtual XferDes *create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid,
1072 const std::vector<XferDesPortInfo> &inputs_info,
1073 const std::vector<XferDesPortInfo> &outputs_info,
1074 int priority, XferDesRedopInfo redop_info,
1075 const void *fill_data, size_t fill_size,
1076 size_t fill_total);
1077
1078 virtual long submit(Request **requests, long nr);
1079
1081 };
1082
1083 class GASNetChannel : public SingleXDQChannel<GASNetChannel, GASNetXferDes> {
1084 public:
1087
1088 // no more than one GASNet xfer of each type at a time
1089 static const bool is_ordered = true;
1090
1091 virtual XferDes *create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid,
1092 const std::vector<XferDesPortInfo> &inputs_info,
1093 const std::vector<XferDesPortInfo> &outputs_info,
1094 int priority, XferDesRedopInfo redop_info,
1095 const void *fill_data, size_t fill_size,
1096 size_t fill_total);
1097
1098 long submit(Request **requests, long nr);
1099 };
1100
1102 : public SingleXDQChannel<RemoteWriteChannel, RemoteWriteXferDes> {
1103 public:
1106
1107 // multiple concurrent RDMAs ok
1108 static const bool is_ordered = false;
1109
1110 virtual XferDes *create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid,
1111 const std::vector<XferDesPortInfo> &inputs_info,
1112 const std::vector<XferDesPortInfo> &outputs_info,
1113 int priority, XferDesRedopInfo redop_info,
1114 const void *fill_data, size_t fill_size,
1115 size_t fill_total);
1116
1117 long submit(Request **requests, long nr);
1118 };
1119
1120 class TransferOperation;
1124
1125 static void handle_message(NodeID sender, const NotifyXferDesCompleteMessage &args,
1126 const void *data, size_t datalen);
1127
1129 };
1130
1131#if 0 // TODO: DELETE
1132 struct XferDesRemoteWriteMessage {
1133 RemoteWriteRequest *req;
1134 XferDesID next_xd_guid;
1135 int next_port_idx;
1136 unsigned span_size; // 32 bits to fit in packet size
1137 size_t span_start, /*span_size,*/ pre_bytes_total;
1138
1139 static void handle_message(NodeID sender,
1140 const XferDesRemoteWriteMessage &args,
1141 const void *data,
1142 size_t datalen);
1143
1144 static void send_request(NodeID target,
1145 const RemoteAddress& dst_buf,
1146 const void *src_buf, size_t nbytes,
1147 RemoteWriteRequest* req,
1148 XferDesID next_xd_guid,
1149 int next_port_idx,
1150 size_t span_start,
1151 size_t span_size,
1152 size_t pre_bytes_total)
1153 {
1155 src_buf, nbytes,
1156 dst_buf);
1157 amsg->req = req;
1158 amsg->next_xd_guid = next_xd_guid;
1159 amsg->next_port_idx = next_port_idx;
1160 amsg->span_start = span_start;
1161 assert(span_size <= UINT_MAX);
1162 amsg->span_size = (unsigned)span_size;
1163 amsg->pre_bytes_total = pre_bytes_total;
1164 amsg.commit();
1165 }
1166
1167 static void send_request(NodeID target,
1168 const RemoteAddress& dst_buf,
1169 const void *src_buf, size_t nbytes, off_t src_str,
1170 size_t nlines, RemoteWriteRequest* req,
1171 XferDesID next_xd_guid,
1172 int next_port_idx,
1173 size_t span_start,
1174 size_t span_size,
1175 size_t pre_bytes_total)
1176 {
1177 ActiveMessage<XferDesRemoteWriteMessage> amsg(target,
1178 src_buf, nbytes,
1179 nlines, src_str,
1180 dst_buf);
1181 amsg->req = req;
1182 amsg->next_xd_guid = next_xd_guid;
1183 amsg->next_port_idx = next_port_idx;
1184 amsg->span_start = span_start;
1185 assert(span_size <= UINT_MAX);
1186 amsg->span_size = (unsigned)span_size;
1187 amsg->pre_bytes_total = pre_bytes_total;
1188 amsg.commit();
1189 }
1190 };
1191
1192 struct XferDesRemoteWriteAckMessage {
1193 RemoteWriteRequest* req;
1194
1195 static void handle_message(NodeID sender,
1196 const XferDesRemoteWriteAckMessage &args,
1197 const void *data,
1198 size_t datalen);
1199 static void send_request(NodeID target, RemoteWriteRequest* req)
1200 {
1201 ActiveMessage<XferDesRemoteWriteAckMessage> amsg(target);
1202 amsg->req = req;
1203 amsg.commit();
1204 }
1205 };
1206#endif
1207
1210 static void handle_message(NodeID sender, const XferDesDestroyMessage &args,
1211 const void *data, size_t datalen);
1212 static void send_request(NodeID target, XferDesID guid)
1213 {
1215 amsg->guid = guid;
1216 amsg.commit();
1217 }
1218 };
1219
1224
1225 static void handle_message(NodeID sender, const UpdateBytesTotalMessage &args,
1226 const void *data, size_t datalen);
1227 };
1228
1233
1234 static void handle_message(NodeID sender, const UpdateBytesWriteMessage &args,
1235 const void *data, size_t datalen);
1236
1237 static void send_request(NodeID target, XferDesID guid, int port_idx,
1238 size_t span_start, size_t span_size)
1239 {
1241 amsg->guid = guid;
1242 amsg->port_idx = port_idx;
1243 amsg->span_start = span_start;
1244 amsg->span_size = span_size;
1245 amsg.commit();
1246 }
1247 };
1248
1253
1254 static void handle_message(NodeID sender, const UpdateBytesReadMessage &args,
1255 const void *data, size_t datalen);
1256
1257 static void send_request(NodeID target, XferDesID guid, int port_idx,
1258 size_t span_start, size_t span_size)
1259 {
1261 amsg->guid = guid;
1262 amsg->port_idx = port_idx;
1263 amsg->span_start = span_start;
1264 amsg->span_size = span_size;
1265 amsg.commit();
1266 }
1267 };
1268
1269 // object used to hold input progress (pre_write and bytes_total) before
1270 // we've actually created the correct xd
1272 public:
1274
1275 protected:
1277
1278 public:
1281
1282 void update_pre_bytes_write(int port_idx, size_t span_start, size_t span_size);
1283 void update_pre_bytes_total(int port_idx, size_t pre_bytes_total);
1284
1286
1288
1290
1291 protected:
1292 static const int INLINE_PORTS = 4;
1299 std::map<int, size_t> extra_bytes_total;
1300 std::map<int, SequenceAssembler> extra_pre_write;
1301 };
1302
1304 public:
1305 enum
1306 {
1308 INDEX_BITS = 32
1311 //: core_rsrv("DMA request queue", crs, CoreReservationParameters())
1312 {
1313 // reserve the first several guid
1315 }
1316
1317 ~XferDesQueue() { assert(guid_to_xd.empty()); }
1318
1320
1321 XferDesID get_guid(NodeID execution_node)
1322 {
1323 // GUID rules:
1324 // First NODE_BITS indicates which node will execute this xd
1325 // Next NODE_BITS indicates on which node this xd is generated
1326 // Last INDEX_BITS means a unique idx, which is used to resolve conflicts
1328 return (((XferDesID)execution_node << (NODE_BITS + INDEX_BITS)) |
1330 }
1331
1332 void update_pre_bytes_write(XferDesID xd_guid, int port_idx, size_t span_start,
1333 size_t span_size);
1334 void update_pre_bytes_total(XferDesID xd_guid, int port_idx, size_t pre_bytes_total);
1335 void update_next_bytes_read(XferDesID xd_guid, int port_idx, size_t span_start,
1336 size_t span_size);
1337
1339
1340 // returns true if xd is ready, false if enqueue has been deferred
1341 bool enqueue_xferDes_local(XferDes *xd, bool add_to_queue = true);
1342
1343 protected:
1344 // guid_to_xd maps a guid to either an XferDes * (as a uintptr_t) or
1345 // a XferDesPlaceholder * (as a uintptr_t with the LSB set)
1347 std::map<XferDesID, uintptr_t> guid_to_xd;
1348
1351 };
1352
1354}; // namespace Realm
1355
1356#include "realm/transfer/channel.inl"
1357
1358#endif
#define REALM_XFERDES_KINDS(__op__)
Definition channel.h:60
Definition activemsg.h:65
Definition address_list.h:100
Definition address_list.h:55
Definition mutex.h:384
Definition bgwork.h:129
Definition bgwork.h:36
Definition channel.h:713
virtual long progress_xd(XferDes *xd, long max_nr)
virtual Memory suggest_ib_memories_for_node(NodeID node) const
SupportedPath & add_path(span< const Memory > src_mems, span< const Memory > dst_mems, unsigned bandwidth, unsigned latency, unsigned frag_overhead, XferDesKind xd_kind)
virtual void shutdown()
Definition channel.h:722
NodeID node
Definition channel.h:730
SupportedPath & add_path(bool local_loopback, unsigned bandwidth, unsigned latency, unsigned frag_overhead, XferDesKind xd_kind)
SupportedPath & add_path(Memory::Kind src_kind, bool src_global, Memory::Kind dst_kind, bool dst_global, unsigned bandwidth, unsigned latency, unsigned frag_overhead, XferDesKind xd_kind)
SupportedPath & add_path(span< const Memory > src_mems, Memory::Kind dst_kind, bool dst_global, unsigned bandwidth, unsigned latency, unsigned frag_overhead, XferDesKind xd_kind)
virtual bool needs_wrapping_iterator() const
Definition channel.h:839
bool has_non_redop_path
Definition channel.h:738
virtual void enqueue_ready_xd(XferDes *xd)=0
virtual bool supports_indirection_memory(Memory mem) const
Queries if a given mem can be used as an indirection buffer.
const std::vector< SupportedPath > & get_paths(void) const
void print(std::ostream &os) const
std::vector< SupportedPath > paths
Definition channel.h:868
virtual uint64_t supports_path(ChannelCopyInfo channel_copy_info, CustomSerdezID src_serdez_id, CustomSerdezID dst_serdez_id, ReductionOpID redop_id, size_t total_bytes, const std::vector< size_t > *src_frags, const std::vector< size_t > *dst_frags, XferDesKind *kind_ret=0, unsigned *bw_ret=0, unsigned *lat_ret=0)
virtual ~Channel()
Definition channel.h:719
virtual void pull()=0
SupportedPath & add_path(Memory::Kind src_kind, bool src_global, span< const Memory > dst_mems, unsigned bandwidth, unsigned latency, unsigned frag_overhead, XferDesKind xd_kind)
XferDesKind kind
Definition channel.h:732
virtual XferDesFactory * get_factory()=0
virtual long submit(Request **requests, long nr)=0
virtual RemoteChannelInfo * construct_remote_info() const
Channel(XferDesKind _kind)
Definition channel.h:715
virtual Memory suggest_ib_memories() const
bool has_redop_path
Definition channel.h:735
virtual void wakeup_xd(XferDes *xd)=0
virtual bool supports_redop(ReductionOpID redop_id) const
virtual long available()=0
void update_channel_state(void)
Definition channel.h:271
unsigned short port_shift
Definition channel.h:282
size_t temp_count
Definition channel.h:281
bool decode(unsigned data, size_t &count, int &port, bool &last)
Definition channel.h:244
void set_port_count(size_t ports)
State
Definition channel.h:260
@ STATE_IDLE
Definition channel.h:263
@ STATE_SENT_HIGH
Definition channel.h:264
@ STATE_SENT_MID
Definition channel.h:265
@ STATE_INIT
Definition channel.h:261
@ STATE_DONE
Definition channel.h:266
@ STATE_HAVE_PORT_COUNT
Definition channel.h:262
unsigned char state
Definition channel.h:268
bool encode(unsigned &data, size_t count, int port, bool last)
unsigned short port_shift
Definition channel.h:257
Definition custom_serdez.h:150
Definition threads.h:428
Definition event_impl.h:49
Definition event.h:50
Definition channel.h:1083
GASNetChannel(BackgroundWorkManager *bgwork, XferDesKind _kind)
static const bool is_ordered
Definition channel.h:1089
long submit(Request **requests, long nr)
virtual XferDes * create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)
Definition channel.h:176
void * mem_base
Definition channel.h:178
off_t gas_off
Definition channel.h:179
Definition channel.h:576
void notify_request_read_done(Request *req)
bool progress_xd(GASNetChannel *channel, TimeLimit work_until)
long get_requests(Request **requests, long nr)
GASNetXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int _priority)
void notify_request_write_done(Request *req)
~GASNetXferDes()
Definition channel.h:582
static const int MEMORY_INDEX_WIDTH
Definition id.h:58
Definition inst_layout.h:266
Definition lists.h:119
Definition channel.h:873
virtual XferDes * create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)=0
SimpleXferDesFactory factory_singleton
Definition channel.h:887
LocalChannel(XferDesKind _kind)
virtual XferDesFactory * get_factory()
Definition memcpy_channel.h:54
Definition channel.h:169
const void * src_base
Definition channel.h:171
void * dst_base
Definition channel.h:172
Definition channel.h:1041
bool is_stopped
Definition channel.h:1059
virtual XferDes * create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)
static const bool is_ordered
Definition channel.h:1046
virtual long submit(Request **requests, long nr)
MemfillChannel(BackgroundWorkManager *bgwork)
Definition channel.h:540
MemfillXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int _priority, const void *_fill_data, size_t _fill_size, size_t _fill_total)
virtual Request * dequeue_request()
bool progress_xd(MemfillChannel *channel, TimeLimit work_until)
long get_requests(Request **requests, long nr)
virtual bool request_available()
virtual void enqueue_request(Request *req)
Definition mem_impl.h:50
Definition memory.h:33
Kind
Definition memory.h:59
static const Memory NO_MEMORY
Definition memory.h:49
Definition channel.h:1062
virtual bool supports_redop(ReductionOpID redop_id) const
static const bool is_ordered
Definition channel.h:1067
MemreduceChannel(BackgroundWorkManager *bgwork)
bool is_stopped
Definition channel.h:1080
virtual long submit(Request **requests, long nr)
virtual XferDes * create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)
Definition channel.h:558
MemreduceXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int _priority, XferDesRedopInfo _redop_info)
long get_requests(Request **requests, long nr)
const ReductionOpUntyped * redop
Definition channel.h:571
bool progress_xd(MemreduceChannel *channel, TimeLimit work_until)
XferDesRedopInfo redop_info
Definition channel.h:570
Definition mutex.h:398
Definition instance.h:66
Definition channel.h:891
virtual RemoteChannel * create_remote_channel()=0
virtual ~RemoteChannelInfo()
Definition channel.h:893
static RemoteChannelInfo * deserialize_new(S &deserializer)
Definition channel.h:934
XferDesFactory * get_factory() override
long available() override
uint64_t supports_path(ChannelCopyInfo channel_copy_info, CustomSerdezID src_serdez_id, CustomSerdezID dst_serdez_id, ReductionOpID redop_id, size_t total_bytes, const std::vector< size_t > *src_frags, const std::vector< size_t > *dst_frags, XferDesKind *kind_ret=0, unsigned *bw_ret=0, unsigned *lat_ret=0) override
uintptr_t get_remote_ptr() const
SimpleXferDesFactory factory_singleton
Definition channel.h:991
const std::set< Memory > indirect_memories
Definition channel.h:992
long submit(Request **requests, long nr) override
std::unordered_set< ReductionOpID > supported_redops
Definition channel.h:990
RemoteChannel(uintptr_t _remote_ptr)
RWLock mutex
Definition channel.h:988
void register_redop(ReductionOpID redop_id)
void enqueue_ready_xd(XferDes *xd) override
Definition channel.h:984
void wakeup_xd(XferDes *xd) override
Definition channel.h:985
void pull() override
bool supports_indirection_memory(Memory mem) const override
Queries if a given mem can be used as an indirection buffer.
bool supports_redop(ReductionOpID redop_id) const override
uintptr_t remote_ptr
Definition channel.h:989
void shutdown() override
RemoteChannel(uintptr_t _remote_ptr, const std::vector< Memory > &indirect_memories)
Definition channel.h:1102
long submit(Request **requests, long nr)
static const bool is_ordered
Definition channel.h:1108
virtual XferDes * create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)
RemoteWriteChannel(BackgroundWorkManager *bgwork)
Definition channel.h:184
const void * src_base
Definition channel.h:187
Definition channel.h:597
virtual void update_bytes_write(int port_idx, size_t offset, size_t size)
~RemoteWriteXferDes()
Definition channel.h:603
RemoteWriteXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int _priority)
void notify_request_read_done(Request *req)
bool progress_xd(RemoteWriteChannel *channel, TimeLimit work_until)
long get_requests(Request **requests, long nr)
void notify_request_write_done(Request *req)
Definition channel.h:103
Dimension dim
Definition channel.h:128
off_t src_str
Definition channel.h:118
off_t dst_pstr
Definition channel.h:120
size_t write_seq_count
Definition channel.h:131
int src_port_idx
Definition channel.h:114
off_t src_off
Definition channel.h:116
off_t dst_off
Definition channel.h:116
off_t src_pstr
Definition channel.h:120
off_t dst_str
Definition channel.h:118
size_t nlines
Definition channel.h:122
Dimension
Definition channel.h:106
@ DIM_3D
Definition channel.h:109
@ DIM_1D
Definition channel.h:107
@ DIM_2D
Definition channel.h:108
bool is_write_done
Definition channel.h:126
size_t write_seq_pos
Definition channel.h:131
size_t nbytes
Definition channel.h:122
XferDes * xd
Definition channel.h:113
size_t read_seq_pos
Definition channel.h:130
int dst_port_idx
Definition channel.h:114
bool is_read_done
Definition channel.h:124
size_t nplanes
Definition channel.h:122
size_t read_seq_count
Definition channel.h:130
Definition channel.h:134
atomic< Mutex * > mutex
Definition channel.h:165
atomic< size_t > contig_amount_x2
Definition channel.h:162
std::map< size_t, size_t > spans
Definition channel.h:166
atomic< size_t > first_noncontig
Definition channel.h:164
size_t add_span(size_t pos, size_t count)
size_t span_exists(size_t start, size_t count)
SequenceAssembler(const SequenceAssembler &copy_from)
void swap(SequenceAssembler &other)
Definition channel.h:904
static Serialization::PolymorphicSerdezSubclass< RemoteChannelInfo, SimpleRemoteChannelInfo > serdez_subclass
Definition channel.h:925
std::vector< Channel::SupportedPath > paths
Definition channel.h:930
SimpleRemoteChannelInfo(NodeID _owner, XferDesKind _kind, uintptr_t _remote_ptr, const std::vector< Channel::SupportedPath > &_paths)
static RemoteChannelInfo * deserialize_new(S &deserializer)
uintptr_t remote_ptr
Definition channel.h:929
std::vector< Memory > indirect_memories
Definition channel.h:931
SimpleRemoteChannelInfo(NodeID _owner, XferDesKind _kind, uintptr_t _remote_ptr, const std::vector< Channel::SupportedPath > &_paths, const std::vector< Memory > &indirect_memories)
virtual RemoteChannel * create_remote_channel()
NodeID owner
Definition channel.h:927
bool serialize(S &serializer) const
XferDesKind kind
Definition channel.h:928
Definition channel.h:662
virtual void create_xfer_des(uintptr_t dma_op, NodeID launch_node, NodeID target_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)
SimpleXferDesFactory(uintptr_t _channel)
virtual bool needs_release()
uintptr_t channel
Definition channel.h:677
Definition channel.h:1014
virtual void shutdown()
void pull()
Definition channel.h:1025
SingleXDQChannel(BackgroundWorkManager *bgwork, XferDesKind _kind, const std::string &_name, int _numa_domain=-1)
virtual void wakeup_xd(XferDes *xd)
virtual long progress_xd(XferDes *xd, long max_nr)
Definition channel.h:1031
XDQueue< CHANNEL, XD > xdq
Definition channel.h:1038
long available()
Definition channel.h:1026
virtual void enqueue_ready_xd(XferDes *xd)
Definition timers.h:129
Definition transfer.h:41
Definition transfer.h:594
Definition mutex.h:223
Definition channel.h:996
XDQueue(LocalChannel *_channel, const std::string &_name, bool _ordered)
Mutex mutex
Definition channel.h:1009
XferDes::XferDesList ready_xds
Definition channel.h:1010
void enqueue_xd(XD *xd, bool at_front=false)
LocalChannel * channel
Definition channel.h:1007
virtual bool do_work(TimeLimit work_until)
friend CHANNEL
Definition channel.h:1005
bool ordered_mode
Definition channel.h:1008
bool in_ordered_worker
Definition channel.h:1008
Definition channel.h:633
virtual ~XferDesFactory()
Definition channel.h:635
virtual bool needs_release()=0
virtual void create_xfer_des(uintptr_t dma_op, NodeID launch_node, NodeID target_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)=0
Definition channel.h:1271
XferDes * xd
Definition channel.h:1294
SequenceAssembler inline_pre_write[INLINE_PORTS]
Definition channel.h:1297
std::map< int, size_t > extra_bytes_total
Definition channel.h:1299
void add_update_pre_bytes_total_received(void)
void update_pre_bytes_write(int port_idx, size_t span_start, size_t span_size)
unsigned get_update_pre_bytes_total_received(void)
Mutex extra_mutex
Definition channel.h:1298
atomic< unsigned > refcount
Definition channel.h:1293
static const int INLINE_PORTS
Definition channel.h:1292
std::map< int, SequenceAssembler > extra_pre_write
Definition channel.h:1300
atomic< unsigned > nb_update_pre_bytes_total_calls_received
Definition channel.h:1295
void update_pre_bytes_total(int port_idx, size_t pre_bytes_total)
size_t inline_bytes_total[INLINE_PORTS]
Definition channel.h:1296
void set_real_xd(XferDes *_xd)
Definition channel.h:1303
atomic< XferDesID > next_to_assign_idx
Definition channel.h:1350
void update_next_bytes_read(XferDesID xd_guid, int port_idx, size_t span_start, size_t span_size)
static XferDesQueue * get_singleton()
bool enqueue_xferDes_local(XferDes *xd, bool add_to_queue=true)
Mutex queues_lock
Definition channel.h:1349
void update_pre_bytes_total(XferDesID xd_guid, int port_idx, size_t pre_bytes_total)
void destroy_xferDes(XferDesID guid)
void update_pre_bytes_write(XferDesID xd_guid, int port_idx, size_t span_start, size_t span_size)
Mutex guid_lock
Definition channel.h:1346
~XferDesQueue()
Definition channel.h:1317
@ NODE_BITS
Definition channel.h:1307
@ INDEX_BITS
Definition channel.h:1308
std::map< XferDesID, uintptr_t > guid_to_xd
Definition channel.h:1347
XferDesQueue()
Definition channel.h:1310
XferDesID get_guid(NodeID execution_node)
Definition channel.h:1321
Definition channel.h:473
XferDes * xd
Definition channel.h:483
virtual void event_triggered(bool poisoned, TimeLimit work_until)
XferDesQueue * xferDes_queue
Definition channel.h:482
virtual Event get_finish_event(void) const
virtual void print(std::ostream &os) const
void defer(XferDesQueue *_xferDes_queue, XferDes *_xd, Event wait_on)
Definition channel.h:490
size_t sizes[MAX_ENTRIES]
Definition channel.h:503
void add_span(int port_idx, size_t offset, size_t size)
int ports[MAX_ENTRIES]
Definition channel.h:501
static const size_t MAX_ENTRIES
Definition channel.h:498
size_t offsets[MAX_ENTRIES]
Definition channel.h:502
XferDes * xd
Definition channel.h:500
SequenceCache(XferDes *_xd, size_t _flush_bytes=0)
size_t flush_bytes
Definition channel.h:504
size_t total_bytes
Definition channel.h:504
Definition channel.h:286
REALM_PMTA_DEFN(XferDes, IntrusivePriorityListLink< XferDes >, xd_link)
Mutex update_write_lock
Definition channel.h:358
Channel * channel
Definition channel.h:343
Mutex available_req_mutex
Definition channel.h:384
virtual Request * dequeue_request()
Definition channel.h:445
atomic< int64_t > bytes_write_pending
Definition channel.h:295
REALM_PMTA_DEFN(XferDes, int, priority)
virtual void update_bytes_read(int port_idx, size_t offset, size_t size)
SequenceCache<&XferDes::update_bytes_write > WriteSequenceCache
Definition channel.h:507
void update_progress(void)
std::queue< Request * > available_reqs
Definition channel.h:385
void add_reference(void)
uint64_t current_out_port_remain
Definition channel.h:299
void default_notify_request_read_done(Request *req)
long default_get_requests(Request **requests, long nr, unsigned flags=0)
void add_update_pre_bytes_total_received(void)
size_t update_control_info(ReadSequenceCache *rseqcache)
atomic< unsigned > progress_counter
Definition channel.h:365
IntrusivePriorityListLink< XferDes > xd_link
Definition channel.h:374
atomic< bool > iteration_completed
Definition channel.h:294
AlignedStorage inline_fill_storage
Definition channel.h:353
void * fill_data
Definition channel.h:345
virtual void notify_request_write_done(Request *req)
virtual ~XferDes()
uint64_t max_req_size
Definition channel.h:330
SequenceCache<&XferDes::update_bytes_read > ReadSequenceCache
Definition channel.h:506
size_t fill_size
Definition channel.h:346
std::vector< XferPort > output_ports
Definition channel.h:320
static const size_t ALIGNED_FILL_STORAGE_SIZE
Definition channel.h:348
void update_next_bytes_read(int port_idx, size_t offset, size_t size)
virtual bool request_available()
Definition channel.h:439
ControlPortState output_control
Definition channel.h:328
NodeID launch_node
Definition channel.h:292
bool record_address_consumption(size_t total_read_bytes, size_t total_write_bytes)
virtual Event request_metadata()
void update_pre_bytes_total(int port_idx, size_t pre_bytes_total)
bool check_for_progress(unsigned last_counter)
REALM_ALIGNED_TYPE_CONST(AlignedStorage, UnalignedStorage, 16)
unsigned current_progress(void)
ControlPortState input_control
Definition channel.h:328
Mutex xd_lock
Definition channel.h:358
uint64_t current_in_port_remain
Definition channel.h:299
Mutex update_read_lock
Definition channel.h:358
atomic< bool > transfer_completed
Definition channel.h:296
size_t get_addresses(size_t min_xfer_size, ReadSequenceCache *rseqcache)
XferDesQueue * xferDes_queue
Definition channel.h:290
int priority
Definition channel.h:332
void begin_completion()
XferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int _priority, const void *_fill_data, size_t fill_size)
void replicate_fill_data(size_t new_size)
DeferredXDEnqueue deferred_enqueue
Definition channel.h:485
virtual void update_bytes_write(int port_idx, size_t offset, size_t size)
virtual long get_requests(Request **requests, long nr)=0
atomic< unsigned > reference_count
Definition channel.h:367
uint64_t current_out_port_mask
Definition channel.h:298
size_t get_addresses(size_t min_xfer_size, ReadSequenceCache *rseqcache, const InstanceLayoutPieceBase *&in_nonaffine, const InstanceLayoutPieceBase *&out_nonaffine)
@ XFERDES_NO_GUID
Definition channel.h:340
virtual void notify_request_read_done(Request *req)
void default_notify_request_write_done(Request *req)
atomic< unsigned > nb_update_pre_bytes_total_calls_received
Definition channel.h:371
virtual void flush()
IntrusivePriorityList< XferDes, int, REALM_PMTA_USE(XferDes, xd_link), REALM_PMTA_USE(XferDes, priority), DummyLock > XferDesList
Definition channel.h:379
void update_pre_bytes_write(int port_idx, size_t offset, size_t size)
uint64_t current_in_port_mask
Definition channel.h:298
void mark_completed()
XferDesKind kind
Definition channel.h:337
unsigned nb_update_pre_bytes_total_calls_expected
Definition channel.h:369
XferDesID guid
Definition channel.h:335
void remove_reference(void)
uintptr_t dma_op
Definition channel.h:289
virtual void enqueue_request(Request *req)
Definition channel.h:460
size_t orig_fill_size
Definition channel.h:346
std::vector< XferPort > input_ports
Definition channel.h:320
Definition atomics.h:31
T load(void) const
void store(T newval)
T fetch_add(T to_add)
Definition utils.h:84
#define REALM_INTERNAL_API_EXTERNAL_LINKAGE
Definition compiler_support.h:218
#define REALM_PMTA_USE(structtype, name)
Definition lists.h:42
#define STRING_KIND_CASE(kind, desc)
#define C_ENUMS(name, desc)
Definition memory.h:60
Definition activemsg.h:38
Logger log_new_dma
int NodeID
Definition nodeset.h:40
XferDesKind
Definition channel.h:85
int CustomSerdezID
Definition custom_serdez.h:148
bool serialize(S &serdez, const ByteArrayRef &a)
void destroy_xfer_des(XferDesID _guid)
REALM_CUDA_HD bool operator==(const Point< N, T > &lhs, const Point< N, T2 > &rhs)
unsigned long long XferDesID
Definition channel.h:57
::realm_reduction_op_id_t ReductionOpID
Definition event.h:38
std::ostream & operator<<(std::ostream &os, const DenseRectangleList< N, T > &drl)
Definition channel.h:684
bool oor_possible
Definition channel.h:706
Memory ind_mem
Definition channel.h:701
ChannelCopyInfo(Memory _src_mem, Memory _dst_mem, Memory _ind_mem=Memory::NO_MEMORY, size_t _num_spaces=1, bool _is_scatter=false, bool _is_ranges=false, bool _is_direct=true, bool _oor_possible=false, size_t _addr_size=0)
Definition channel.h:685
size_t addr_size
Definition channel.h:707
size_t num_spaces
Definition channel.h:702
bool is_scatter
Definition channel.h:703
Memory src_mem
Definition channel.h:699
bool is_ranges
Definition channel.h:704
Memory dst_mem
Definition channel.h:700
bool is_direct
Definition channel.h:705
uint64_t mems[BITMASK_SIZE]
Definition channel.h:778
static const int BITMASK_SIZE
Definition channel.h:777
uint64_t ib_mems[BITMASK_SIZE]
Definition channel.h:778
NodeID node
Definition channel.h:776
Definition channel.h:764
SrcDstType src_type
Definition channel.h:774
MemBitmask src_bitmask
Definition channel.h:783
bool serdez_allowed
Definition channel.h:796
SupportedPath & set_max_dim(int src_dim, int dst_dim)
unsigned latency
Definition channel.h:792
SupportedPath * chain
Definition channel.h:812
MemBitmask dst_bitmask
Definition channel.h:788
SrcDstType
Definition channel.h:766
@ LOCAL_RDMA
Definition channel.h:770
@ LOCAL_KIND
Definition channel.h:768
@ SPECIFIC_MEMORY
Definition channel.h:767
@ MEMORY_BITMASK
Definition channel.h:772
@ GLOBAL_KIND
Definition channel.h:769
@ REMOTE_RDMA
Definition channel.h:771
unsigned frag_overhead
Definition channel.h:793
Memory src_mem
Definition channel.h:781
SrcDstType dst_type
Definition channel.h:774
unsigned char max_dst_dim
Definition channel.h:794
void populate_memory_bitmask(span< const Memory > mems, NodeID node, MemBitmask &bitmask)
Memory::Kind src_kind
Definition channel.h:782
Memory dst_mem
Definition channel.h:786
SupportedPath & set_max_dim(int src_and_dst_dim)
SupportedPath & allow_redops()
unsigned bandwidth
Definition channel.h:791
Memory::Kind dst_kind
Definition channel.h:787
unsigned char max_src_dim
Definition channel.h:794
bool redops_allowed
Definition channel.h:795
SupportedPath & allow_serdez()
XferDesKind xd_kind
Definition channel.h:790
SupportedPath()
Definition channel.h:799
Definition channel.h:1121
TransferOperation * op
Definition channel.h:1122
static void handle_message(NodeID sender, const NotifyXferDesCompleteMessage &args, const void *data, size_t datalen)
XferDesID xd_id
Definition channel.h:1123
static void send_request(NodeID target, TransferOperation *op, XferDesID xd_id)
Definition redop.h:56
Definition network.h:46
static void handle_message(NodeID sender, const Write1DMessage &args, const void *data, size_t datalen)
int next_port_idx
Definition channel.h:619
XferDesID next_xd_guid
Definition channel.h:618
size_t span_start
Definition channel.h:620
static bool handle_inline(NodeID sender, const Write1DMessage &args, const void *data, size_t datalen, TimeLimit work_until)
Definition channel.h:655
static void handle_message(NodeID sender, const SimpleXferDesCreateMessage &args, const void *data, size_t datalen)
Definition channel.h:1249
int port_idx
Definition channel.h:1251
size_t span_size
Definition channel.h:1252
size_t span_start
Definition channel.h:1252
static void handle_message(NodeID sender, const UpdateBytesReadMessage &args, const void *data, size_t datalen)
XferDesID guid
Definition channel.h:1250
static void send_request(NodeID target, XferDesID guid, int port_idx, size_t span_start, size_t span_size)
Definition channel.h:1257
Definition channel.h:1220
size_t pre_bytes_total
Definition channel.h:1223
static void handle_message(NodeID sender, const UpdateBytesTotalMessage &args, const void *data, size_t datalen)
XferDesID guid
Definition channel.h:1221
int port_idx
Definition channel.h:1222
Definition channel.h:1229
size_t span_start
Definition channel.h:1232
int port_idx
Definition channel.h:1231
static void handle_message(NodeID sender, const UpdateBytesWriteMessage &args, const void *data, size_t datalen)
XferDesID guid
Definition channel.h:1230
static void send_request(NodeID target, XferDesID guid, int port_idx, size_t span_start, size_t span_size)
Definition channel.h:1237
size_t span_size
Definition channel.h:1232
Definition channel.h:647
uintptr_t channel
Definition channel.h:652
uintptr_t dma_op
Definition channel.h:649
XferDesID guid
Definition channel.h:650
NodeID launch_node
Definition channel.h:651
Definition channel.h:1208
static void send_request(NodeID target, XferDesID guid)
Definition channel.h:1212
static void handle_message(NodeID sender, const XferDesDestroyMessage &args, const void *data, size_t datalen)
XferDesID guid
Definition channel.h:1209
Definition channel.h:192
Memory mem
Definition channel.h:203
XferDesID peer_guid
Definition channel.h:200
CustomSerdezID serdez_id
Definition channel.h:207
RegionInstance inst
Definition channel.h:204
size_t ib_size
Definition channel.h:205
int port_type
Definition channel.h:199
int indirect_port_idx
Definition channel.h:202
int peer_port_idx
Definition channel.h:201
TransferIterator * iter
Definition channel.h:206
size_t ib_offset
Definition channel.h:205
@ DATA_PORT
Definition channel.h:195
@ SCATTER_CONTROL_PORT
Definition channel.h:197
@ GATHER_CONTROL_PORT
Definition channel.h:196
Definition channel.h:210
XferDesRedopInfo()
Definition channel.h:217
ReductionOpID id
Definition channel.h:211
XferDesRedopInfo(ReductionOpID _id, bool _is_fold, bool _in_place, bool _is_exclusive)
Definition channel.h:224
bool is_exclusive
Definition channel.h:214
bool is_fold
Definition channel.h:212
bool in_place
Definition channel.h:213
Definition channel.h:321
bool eos_received
Definition channel.h:326
size_t remaining_count
Definition channel.h:325
int current_io_port
Definition channel.h:324
ControlPort::Decoder decoder
Definition channel.h:322
int control_port_idx
Definition channel.h:323
Definition channel.h:349
char data[ALIGNED_FILL_STORAGE_SIZE]
Definition channel.h:350
Definition channel.h:300
AddressListCursor addrcursor
Definition channel.h:318
const CustomSerdezUntyped * serdez_op
Definition channel.h:303
size_t local_bytes_total
Definition channel.h:309
int peer_port_idx
Definition channel.h:305
bool is_indirect_port
Definition channel.h:307
atomic< size_t > local_bytes_cons
Definition channel.h:310
size_t ib_offset
Definition channel.h:316
TransferIterator * iter
Definition channel.h:302
atomic< bool > needs_pbt_update
Definition channel.h:308
atomic< size_t > remote_bytes_total
Definition channel.h:310
AddressList addrlist
Definition channel.h:317
size_t ib_size
Definition channel.h:316
MemoryImpl * mem
Definition channel.h:301
int indirect_port_idx
Definition channel.h:306
SequenceAssembler seq_local
Definition channel.h:311
XferDesID peer_guid
Definition channel.h:304
SequenceAssembler seq_remote
Definition channel.h:311
Memory ib_mem
Definition channel.h:315