18#ifndef LOWLEVEL_CHANNEL
19#define LOWLEVEL_CHANNEL
26#ifndef REALM_ON_WINDOWS
35#include <unordered_set>
53 class TransferIterator;
60#define REALM_XFERDES_KINDS(__op__) \
62 __op__(XFER_DISK_READ) \
63 __op__(XFER_DISK_WRITE) \
64 __op__(XFER_SSD_READ) \
65 __op__(XFER_SSD_WRITE) \
66 __op__(XFER_GPU_TO_FB) \
67 __op__(XFER_GPU_FROM_FB) \
68 __op__(XFER_GPU_IN_FB) \
69 __op__(XFER_GPU_PEER_FB) \
70 __op__(XFER_MEM_CPY) \
71 __op__(XFER_GASNET_READ) \
72 __op__(XFER_GASNET_WRITE) \
73 __op__(XFER_REMOTE_WRITE) \
74 __op__(XFER_HDF5_READ) \
75 __op__(XFER_HDF5_WRITE) \
76 __op__(XFER_FILE_READ) \
77 __op__(XFER_FILE_WRITE) \
78 __op__(XFER_ADDR_SPLIT) \
79 __op__(XFER_MEM_FILL) \
80 __op__(XFER_GPU_SC_IN_FB) \
81 __op__(XFER_GPU_SC_PEER_FB)
86#define C_ENUMS(name) name,
93#define STRING_KIND_CASE(kind) \
94 case XferDesKind::kind: \
99#undef STRING_KIND_CASE
100 return os <<
"UNKNOWN_KIND";
236 namespace ControlPort {
239 static const unsigned CTRL_LO_MORE = 0;
240 static const unsigned CTRL_LO_LAST = 1;
241 static const unsigned CTRL_MID = 2;
242 static const unsigned CTRL_HIGH = 3;
254 bool encode(
unsigned &data,
size_t count,
int port,
bool last);
278 bool decode(
unsigned data,
size_t &count,
int &port,
bool &last);
389 const std::vector<XferDesPortInfo> &inputs_info,
390 const std::vector<XferDesPortInfo> &outputs_info,
int _priority,
391 const void *_fill_data,
size_t fill_size);
478 virtual void print(std::ostream &os)
const;
489 template <
void (XferDes::*UPDATE)(
int port_
idx,
size_t offset,
size_t size)>
494 void add_span(
int port_idx,
size_t offset,
size_t size);
543 XferDesID _guid,
const std::vector<XferDesPortInfo> &inputs_info,
544 const std::vector<XferDesPortInfo> &outputs_info,
int _priority,
545 const void *_fill_data,
size_t _fill_size,
size_t _fill_total);
561 XferDesID _guid,
const std::vector<XferDesPortInfo> &inputs_info,
562 const std::vector<XferDesPortInfo> &outputs_info,
int _priority,
579 XferDesID _guid,
const std::vector<XferDesPortInfo> &inputs_info,
580 const std::vector<XferDesPortInfo> &outputs_info,
int _priority);
600 XferDesID _guid,
const std::vector<XferDesPortInfo> &inputs_info,
601 const std::vector<XferDesPortInfo> &outputs_info,
int _priority);
623 const void *data,
size_t datalen);
625 const void *data,
size_t datalen,
TimeLimit work_until);
640 const std::vector<XferDesPortInfo> &inputs_info,
641 const std::vector<XferDesPortInfo> &outputs_info,
643 const void *fill_data,
size_t fill_size,
644 size_t fill_total) = 0;
657 const void *data,
size_t datalen);
670 const std::vector<XferDesPortInfo> &inputs_info,
671 const std::vector<XferDesPortInfo> &outputs_info,
673 const void *fill_data,
size_t fill_size,
686 size_t _num_spaces = 1,
bool _is_scatter =
false,
687 bool _is_ranges =
false,
bool _is_direct =
true,
688 bool _oor_possible =
false,
size_t _addr_size = 0)
716 :
node(Network::my_node_id)
818 const std::vector<SupportedPath> &
get_paths(
void)
const;
828 size_t total_bytes,
const std::vector<size_t> *src_frags,
829 const std::vector<size_t> *dst_frags,
XferDesKind *kind_ret = 0,
830 unsigned *bw_ret = 0,
unsigned *lat_ret = 0);
852 unsigned bandwidth,
unsigned latency,
unsigned frag_overhead,
855 bool dst_global,
unsigned bandwidth,
unsigned latency,
859 unsigned latency,
unsigned frag_overhead,
862 bool dst_global,
unsigned bandwidth,
unsigned latency,
878 const std::vector<XferDesPortInfo> &inputs_info,
879 const std::vector<XferDesPortInfo> &outputs_info,
881 const void *fill_data,
size_t fill_size,
882 size_t fill_total) = 0;
897 template <
typename S>
901 template <
typename S>
907 const std::vector<Channel::SupportedPath> &_paths,
910 const std::vector<Channel::SupportedPath> &_paths);
914 template <
typename S>
917 template <
typename S>
930 std::vector<Channel::SupportedPath>
paths;
974 const std::vector<size_t> *src_frags,
975 const std::vector<size_t> *dst_frags,
977 unsigned *lat_ret = 0)
override;
995 template <
typename CHANNEL,
typename XD>
1013 template <
typename CHANNEL,
typename XD>
1017 const std::string &_name,
int _numa_domain = -1);
1051 const std::vector<XferDesPortInfo> &inputs_info,
1052 const std::vector<XferDesPortInfo> &outputs_info,
1054 const void *fill_data,
size_t fill_size,
1072 const std::vector<XferDesPortInfo> &inputs_info,
1073 const std::vector<XferDesPortInfo> &outputs_info,
1075 const void *fill_data,
size_t fill_size,
1092 const std::vector<XferDesPortInfo> &inputs_info,
1093 const std::vector<XferDesPortInfo> &outputs_info,
1095 const void *fill_data,
size_t fill_size,
1111 const std::vector<XferDesPortInfo> &inputs_info,
1112 const std::vector<XferDesPortInfo> &outputs_info,
1114 const void *fill_data,
size_t fill_size,
1126 const void *data,
size_t datalen);
1132 struct XferDesRemoteWriteMessage {
1137 size_t span_start, pre_bytes_total;
1139 static void handle_message(
NodeID sender,
1140 const XferDesRemoteWriteMessage &args,
1144 static void send_request(
NodeID target,
1146 const void *src_buf,
size_t nbytes,
1152 size_t pre_bytes_total)
1158 amsg->next_xd_guid = next_xd_guid;
1159 amsg->next_port_idx = next_port_idx;
1160 amsg->span_start = span_start;
1161 assert(span_size <= UINT_MAX);
1162 amsg->span_size = (unsigned)span_size;
1163 amsg->pre_bytes_total = pre_bytes_total;
1167 static void send_request(NodeID target,
1168 const RemoteAddress& dst_buf,
1169 const void *src_buf,
size_t nbytes, off_t src_str,
1170 size_t nlines, RemoteWriteRequest* req,
1171 XferDesID next_xd_guid,
1175 size_t pre_bytes_total)
1177 ActiveMessage<XferDesRemoteWriteMessage> amsg(target,
1182 amsg->next_xd_guid = next_xd_guid;
1183 amsg->next_port_idx = next_port_idx;
1184 amsg->span_start = span_start;
1185 assert(span_size <= UINT_MAX);
1186 amsg->span_size = (unsigned)span_size;
1187 amsg->pre_bytes_total = pre_bytes_total;
1192 struct XferDesRemoteWriteAckMessage {
1193 RemoteWriteRequest* req;
1195 static void handle_message(NodeID sender,
1196 const XferDesRemoteWriteAckMessage &args,
1199 static void send_request(NodeID target, RemoteWriteRequest* req)
1201 ActiveMessage<XferDesRemoteWriteAckMessage> amsg(target);
1211 const void *data,
size_t datalen);
1226 const void *data,
size_t datalen);
1235 const void *data,
size_t datalen);
1255 const void *data,
size_t datalen);
1356#include "realm/transfer/channel.inl"
#define REALM_XFERDES_KINDS(__op__)
Definition channel.h:60
Definition activemsg.h:65
Definition address_list.h:100
Definition address_list.h:55
virtual long progress_xd(XferDes *xd, long max_nr)
virtual Memory suggest_ib_memories_for_node(NodeID node) const
SupportedPath & add_path(span< const Memory > src_mems, span< const Memory > dst_mems, unsigned bandwidth, unsigned latency, unsigned frag_overhead, XferDesKind xd_kind)
virtual void shutdown()
Definition channel.h:722
NodeID node
Definition channel.h:730
SupportedPath & add_path(bool local_loopback, unsigned bandwidth, unsigned latency, unsigned frag_overhead, XferDesKind xd_kind)
SupportedPath & add_path(Memory::Kind src_kind, bool src_global, Memory::Kind dst_kind, bool dst_global, unsigned bandwidth, unsigned latency, unsigned frag_overhead, XferDesKind xd_kind)
SupportedPath & add_path(span< const Memory > src_mems, Memory::Kind dst_kind, bool dst_global, unsigned bandwidth, unsigned latency, unsigned frag_overhead, XferDesKind xd_kind)
virtual bool needs_wrapping_iterator() const
Definition channel.h:839
bool has_non_redop_path
Definition channel.h:738
virtual void enqueue_ready_xd(XferDes *xd)=0
virtual bool supports_indirection_memory(Memory mem) const
Queries if a given mem can be used as an indirection buffer.
const std::vector< SupportedPath > & get_paths(void) const
void print(std::ostream &os) const
std::vector< SupportedPath > paths
Definition channel.h:868
virtual uint64_t supports_path(ChannelCopyInfo channel_copy_info, CustomSerdezID src_serdez_id, CustomSerdezID dst_serdez_id, ReductionOpID redop_id, size_t total_bytes, const std::vector< size_t > *src_frags, const std::vector< size_t > *dst_frags, XferDesKind *kind_ret=0, unsigned *bw_ret=0, unsigned *lat_ret=0)
virtual ~Channel()
Definition channel.h:719
SupportedPath & add_path(Memory::Kind src_kind, bool src_global, span< const Memory > dst_mems, unsigned bandwidth, unsigned latency, unsigned frag_overhead, XferDesKind xd_kind)
XferDesKind kind
Definition channel.h:732
virtual XferDesFactory * get_factory()=0
virtual long submit(Request **requests, long nr)=0
virtual RemoteChannelInfo * construct_remote_info() const
Channel(XferDesKind _kind)
Definition channel.h:715
virtual Memory suggest_ib_memories() const
bool has_redop_path
Definition channel.h:735
virtual void wakeup_xd(XferDes *xd)=0
virtual bool supports_redop(ReductionOpID redop_id) const
virtual long available()=0
void update_channel_state(void)
unsigned short port_shift
Definition channel.h:282
size_t temp_count
Definition channel.h:281
bool decode(unsigned data, size_t &count, int &port, bool &last)
void set_port_count(size_t ports)
State
Definition channel.h:260
@ STATE_IDLE
Definition channel.h:263
@ STATE_SENT_HIGH
Definition channel.h:264
@ STATE_SENT_MID
Definition channel.h:265
@ STATE_INIT
Definition channel.h:261
@ STATE_DONE
Definition channel.h:266
@ STATE_HAVE_PORT_COUNT
Definition channel.h:262
unsigned char state
Definition channel.h:268
bool encode(unsigned &data, size_t count, int port, bool last)
unsigned short port_shift
Definition channel.h:257
Definition custom_serdez.h:150
Definition event_impl.h:49
Definition channel.h:1083
GASNetChannel(BackgroundWorkManager *bgwork, XferDesKind _kind)
static const bool is_ordered
Definition channel.h:1089
long submit(Request **requests, long nr)
virtual XferDes * create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)
void * mem_base
Definition channel.h:178
off_t gas_off
Definition channel.h:179
void notify_request_read_done(Request *req)
bool progress_xd(GASNetChannel *channel, TimeLimit work_until)
long get_requests(Request **requests, long nr)
GASNetXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int _priority)
void notify_request_write_done(Request *req)
~GASNetXferDes()
Definition channel.h:582
static const int MEMORY_INDEX_WIDTH
Definition id.h:58
Definition inst_layout.h:266
virtual XferDes * create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)=0
SimpleXferDesFactory factory_singleton
Definition channel.h:887
LocalChannel(XferDesKind _kind)
virtual XferDesFactory * get_factory()
Definition memcpy_channel.h:54
const void * src_base
Definition channel.h:171
void * dst_base
Definition channel.h:172
Definition channel.h:1041
bool is_stopped
Definition channel.h:1059
virtual XferDes * create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)
static const bool is_ordered
Definition channel.h:1046
virtual long submit(Request **requests, long nr)
MemfillChannel(BackgroundWorkManager *bgwork)
MemfillXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int _priority, const void *_fill_data, size_t _fill_size, size_t _fill_total)
virtual Request * dequeue_request()
bool progress_xd(MemfillChannel *channel, TimeLimit work_until)
long get_requests(Request **requests, long nr)
virtual bool request_available()
virtual void enqueue_request(Request *req)
Kind
Definition memory.h:59
static const Memory NO_MEMORY
Definition memory.h:49
Definition channel.h:1062
virtual bool supports_redop(ReductionOpID redop_id) const
static const bool is_ordered
Definition channel.h:1067
MemreduceChannel(BackgroundWorkManager *bgwork)
bool is_stopped
Definition channel.h:1080
virtual long submit(Request **requests, long nr)
virtual XferDes * create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)
MemreduceXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int _priority, XferDesRedopInfo _redop_info)
long get_requests(Request **requests, long nr)
const ReductionOpUntyped * redop
Definition channel.h:571
bool progress_xd(MemreduceChannel *channel, TimeLimit work_until)
XferDesRedopInfo redop_info
Definition channel.h:570
virtual RemoteChannel * create_remote_channel()=0
virtual ~RemoteChannelInfo()
Definition channel.h:893
static RemoteChannelInfo * deserialize_new(S &deserializer)
XferDesFactory * get_factory() override
long available() override
uint64_t supports_path(ChannelCopyInfo channel_copy_info, CustomSerdezID src_serdez_id, CustomSerdezID dst_serdez_id, ReductionOpID redop_id, size_t total_bytes, const std::vector< size_t > *src_frags, const std::vector< size_t > *dst_frags, XferDesKind *kind_ret=0, unsigned *bw_ret=0, unsigned *lat_ret=0) override
uintptr_t get_remote_ptr() const
SimpleXferDesFactory factory_singleton
Definition channel.h:991
const std::set< Memory > indirect_memories
Definition channel.h:992
long submit(Request **requests, long nr) override
std::unordered_set< ReductionOpID > supported_redops
Definition channel.h:990
RemoteChannel(uintptr_t _remote_ptr)
RWLock mutex
Definition channel.h:988
void register_redop(ReductionOpID redop_id)
void enqueue_ready_xd(XferDes *xd) override
Definition channel.h:984
void wakeup_xd(XferDes *xd) override
Definition channel.h:985
bool supports_indirection_memory(Memory mem) const override
Queries if a given mem can be used as an indirection buffer.
bool supports_redop(ReductionOpID redop_id) const override
uintptr_t remote_ptr
Definition channel.h:989
RemoteChannel(uintptr_t _remote_ptr, const std::vector< Memory > &indirect_memories)
Definition channel.h:1102
long submit(Request **requests, long nr)
static const bool is_ordered
Definition channel.h:1108
virtual XferDes * create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)
RemoteWriteChannel(BackgroundWorkManager *bgwork)
const void * src_base
Definition channel.h:187
virtual void update_bytes_write(int port_idx, size_t offset, size_t size)
~RemoteWriteXferDes()
Definition channel.h:603
RemoteWriteXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int _priority)
void notify_request_read_done(Request *req)
bool progress_xd(RemoteWriteChannel *channel, TimeLimit work_until)
long get_requests(Request **requests, long nr)
void notify_request_write_done(Request *req)
Dimension dim
Definition channel.h:128
off_t src_str
Definition channel.h:118
off_t dst_pstr
Definition channel.h:120
size_t write_seq_count
Definition channel.h:131
int src_port_idx
Definition channel.h:114
off_t src_off
Definition channel.h:116
off_t dst_off
Definition channel.h:116
off_t src_pstr
Definition channel.h:120
off_t dst_str
Definition channel.h:118
size_t nlines
Definition channel.h:122
Dimension
Definition channel.h:106
@ DIM_3D
Definition channel.h:109
@ DIM_1D
Definition channel.h:107
@ DIM_2D
Definition channel.h:108
bool is_write_done
Definition channel.h:126
size_t write_seq_pos
Definition channel.h:131
size_t nbytes
Definition channel.h:122
XferDes * xd
Definition channel.h:113
size_t read_seq_pos
Definition channel.h:130
int dst_port_idx
Definition channel.h:114
bool is_read_done
Definition channel.h:124
size_t nplanes
Definition channel.h:122
size_t read_seq_count
Definition channel.h:130
atomic< Mutex * > mutex
Definition channel.h:165
atomic< size_t > contig_amount_x2
Definition channel.h:162
std::map< size_t, size_t > spans
Definition channel.h:166
atomic< size_t > first_noncontig
Definition channel.h:164
size_t add_span(size_t pos, size_t count)
size_t span_exists(size_t start, size_t count)
SequenceAssembler(const SequenceAssembler ©_from)
void swap(SequenceAssembler &other)
Definition serialize.h:363
SimpleRemoteChannelInfo()
static Serialization::PolymorphicSerdezSubclass< RemoteChannelInfo, SimpleRemoteChannelInfo > serdez_subclass
Definition channel.h:925
std::vector< Channel::SupportedPath > paths
Definition channel.h:930
SimpleRemoteChannelInfo(NodeID _owner, XferDesKind _kind, uintptr_t _remote_ptr, const std::vector< Channel::SupportedPath > &_paths)
static RemoteChannelInfo * deserialize_new(S &deserializer)
uintptr_t remote_ptr
Definition channel.h:929
std::vector< Memory > indirect_memories
Definition channel.h:931
SimpleRemoteChannelInfo(NodeID _owner, XferDesKind _kind, uintptr_t _remote_ptr, const std::vector< Channel::SupportedPath > &_paths, const std::vector< Memory > &indirect_memories)
virtual RemoteChannel * create_remote_channel()
NodeID owner
Definition channel.h:927
bool serialize(S &serializer) const
XferDesKind kind
Definition channel.h:928
virtual void create_xfer_des(uintptr_t dma_op, NodeID launch_node, NodeID target_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)
SimpleXferDesFactory(uintptr_t _channel)
virtual bool needs_release()
uintptr_t channel
Definition channel.h:677
Definition channel.h:1014
void pull()
Definition channel.h:1025
SingleXDQChannel(BackgroundWorkManager *bgwork, XferDesKind _kind, const std::string &_name, int _numa_domain=-1)
virtual void wakeup_xd(XferDes *xd)
virtual long progress_xd(XferDes *xd, long max_nr)
Definition channel.h:1031
XDQueue< CHANNEL, XD > xdq
Definition channel.h:1038
long available()
Definition channel.h:1026
virtual void enqueue_ready_xd(XferDes *xd)
Definition transfer.h:594
XDQueue(LocalChannel *_channel, const std::string &_name, bool _ordered)
Mutex mutex
Definition channel.h:1009
XferDes::XferDesList ready_xds
Definition channel.h:1010
void enqueue_xd(XD *xd, bool at_front=false)
LocalChannel * channel
Definition channel.h:1007
virtual bool do_work(TimeLimit work_until)
friend CHANNEL
Definition channel.h:1005
bool ordered_mode
Definition channel.h:1008
bool in_ordered_worker
Definition channel.h:1008
virtual ~XferDesFactory()
Definition channel.h:635
virtual bool needs_release()=0
virtual void create_xfer_des(uintptr_t dma_op, NodeID launch_node, NodeID target_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)=0
Definition channel.h:1271
XferDes * xd
Definition channel.h:1294
SequenceAssembler inline_pre_write[INLINE_PORTS]
Definition channel.h:1297
std::map< int, size_t > extra_bytes_total
Definition channel.h:1299
void add_update_pre_bytes_total_received(void)
void update_pre_bytes_write(int port_idx, size_t span_start, size_t span_size)
unsigned get_update_pre_bytes_total_received(void)
Mutex extra_mutex
Definition channel.h:1298
atomic< unsigned > refcount
Definition channel.h:1293
static const int INLINE_PORTS
Definition channel.h:1292
std::map< int, SequenceAssembler > extra_pre_write
Definition channel.h:1300
atomic< unsigned > nb_update_pre_bytes_total_calls_received
Definition channel.h:1295
void update_pre_bytes_total(int port_idx, size_t pre_bytes_total)
size_t inline_bytes_total[INLINE_PORTS]
Definition channel.h:1296
void set_real_xd(XferDes *_xd)
Definition channel.h:1303
atomic< XferDesID > next_to_assign_idx
Definition channel.h:1350
void update_next_bytes_read(XferDesID xd_guid, int port_idx, size_t span_start, size_t span_size)
static XferDesQueue * get_singleton()
bool enqueue_xferDes_local(XferDes *xd, bool add_to_queue=true)
Mutex queues_lock
Definition channel.h:1349
void update_pre_bytes_total(XferDesID xd_guid, int port_idx, size_t pre_bytes_total)
void destroy_xferDes(XferDesID guid)
void update_pre_bytes_write(XferDesID xd_guid, int port_idx, size_t span_start, size_t span_size)
Mutex guid_lock
Definition channel.h:1346
~XferDesQueue()
Definition channel.h:1317
@ NODE_BITS
Definition channel.h:1307
@ INDEX_BITS
Definition channel.h:1308
std::map< XferDesID, uintptr_t > guid_to_xd
Definition channel.h:1347
XferDesQueue()
Definition channel.h:1310
XferDesID get_guid(NodeID execution_node)
Definition channel.h:1321
XferDes * xd
Definition channel.h:483
virtual void event_triggered(bool poisoned, TimeLimit work_until)
XferDesQueue * xferDes_queue
Definition channel.h:482
virtual Event get_finish_event(void) const
virtual void print(std::ostream &os) const
void defer(XferDesQueue *_xferDes_queue, XferDes *_xd, Event wait_on)
size_t sizes[MAX_ENTRIES]
Definition channel.h:503
void add_span(int port_idx, size_t offset, size_t size)
int ports[MAX_ENTRIES]
Definition channel.h:501
static const size_t MAX_ENTRIES
Definition channel.h:498
size_t offsets[MAX_ENTRIES]
Definition channel.h:502
XferDes * xd
Definition channel.h:500
SequenceCache(XferDes *_xd, size_t _flush_bytes=0)
size_t flush_bytes
Definition channel.h:504
size_t total_bytes
Definition channel.h:504
REALM_PMTA_DEFN(XferDes, IntrusivePriorityListLink< XferDes >, xd_link)
Mutex update_write_lock
Definition channel.h:358
Channel * channel
Definition channel.h:343
Mutex available_req_mutex
Definition channel.h:384
virtual Request * dequeue_request()
Definition channel.h:445
atomic< int64_t > bytes_write_pending
Definition channel.h:295
REALM_PMTA_DEFN(XferDes, int, priority)
virtual void update_bytes_read(int port_idx, size_t offset, size_t size)
SequenceCache<&XferDes::update_bytes_write > WriteSequenceCache
Definition channel.h:507
void update_progress(void)
std::queue< Request * > available_reqs
Definition channel.h:385
uint64_t current_out_port_remain
Definition channel.h:299
void default_notify_request_read_done(Request *req)
long default_get_requests(Request **requests, long nr, unsigned flags=0)
void add_update_pre_bytes_total_received(void)
size_t update_control_info(ReadSequenceCache *rseqcache)
atomic< unsigned > progress_counter
Definition channel.h:365
IntrusivePriorityListLink< XferDes > xd_link
Definition channel.h:374
atomic< bool > iteration_completed
Definition channel.h:294
AlignedStorage inline_fill_storage
Definition channel.h:353
void * fill_data
Definition channel.h:345
virtual void notify_request_write_done(Request *req)
uint64_t max_req_size
Definition channel.h:330
SequenceCache<&XferDes::update_bytes_read > ReadSequenceCache
Definition channel.h:506
size_t fill_size
Definition channel.h:346
std::vector< XferPort > output_ports
Definition channel.h:320
static const size_t ALIGNED_FILL_STORAGE_SIZE
Definition channel.h:348
void update_next_bytes_read(int port_idx, size_t offset, size_t size)
virtual bool request_available()
Definition channel.h:439
ControlPortState output_control
Definition channel.h:328
NodeID launch_node
Definition channel.h:292
bool record_address_consumption(size_t total_read_bytes, size_t total_write_bytes)
virtual Event request_metadata()
void update_pre_bytes_total(int port_idx, size_t pre_bytes_total)
bool check_for_progress(unsigned last_counter)
REALM_ALIGNED_TYPE_CONST(AlignedStorage, UnalignedStorage, 16)
unsigned current_progress(void)
ControlPortState input_control
Definition channel.h:328
Mutex xd_lock
Definition channel.h:358
uint64_t current_in_port_remain
Definition channel.h:299
Mutex update_read_lock
Definition channel.h:358
atomic< bool > transfer_completed
Definition channel.h:296
size_t get_addresses(size_t min_xfer_size, ReadSequenceCache *rseqcache)
XferDesQueue * xferDes_queue
Definition channel.h:290
int priority
Definition channel.h:332
XferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int _priority, const void *_fill_data, size_t fill_size)
void replicate_fill_data(size_t new_size)
DeferredXDEnqueue deferred_enqueue
Definition channel.h:485
virtual void update_bytes_write(int port_idx, size_t offset, size_t size)
virtual long get_requests(Request **requests, long nr)=0
atomic< unsigned > reference_count
Definition channel.h:367
uint64_t current_out_port_mask
Definition channel.h:298
size_t get_addresses(size_t min_xfer_size, ReadSequenceCache *rseqcache, const InstanceLayoutPieceBase *&in_nonaffine, const InstanceLayoutPieceBase *&out_nonaffine)
@ XFERDES_NO_GUID
Definition channel.h:340
virtual void notify_request_read_done(Request *req)
void default_notify_request_write_done(Request *req)
atomic< unsigned > nb_update_pre_bytes_total_calls_received
Definition channel.h:371
IntrusivePriorityList< XferDes, int, REALM_PMTA_USE(XferDes, xd_link), REALM_PMTA_USE(XferDes, priority), DummyLock > XferDesList
Definition channel.h:379
void update_pre_bytes_write(int port_idx, size_t offset, size_t size)
uint64_t current_in_port_mask
Definition channel.h:298
XferDesKind kind
Definition channel.h:337
unsigned nb_update_pre_bytes_total_calls_expected
Definition channel.h:369
XferDesID guid
Definition channel.h:335
void remove_reference(void)
uintptr_t dma_op
Definition channel.h:289
virtual void enqueue_request(Request *req)
Definition channel.h:460
size_t orig_fill_size
Definition channel.h:346
std::vector< XferPort > input_ports
Definition channel.h:320
#define REALM_INTERNAL_API_EXTERNAL_LINKAGE
Definition compiler_support.h:218
#define REALM_PMTA_USE(structtype, name)
Definition lists.h:42
#define STRING_KIND_CASE(kind, desc)
#define C_ENUMS(name, desc)
Definition memory.h:60
Definition activemsg.h:38
int NodeID
Definition nodeset.h:40
XferDesKind
Definition channel.h:85
int CustomSerdezID
Definition custom_serdez.h:148
bool serialize(S &serdez, const ByteArrayRef &a)
void destroy_xfer_des(XferDesID _guid)
REALM_CUDA_HD bool operator==(const Point< N, T > &lhs, const Point< N, T2 > &rhs)
unsigned long long XferDesID
Definition channel.h:57
::realm_reduction_op_id_t ReductionOpID
Definition event.h:38
std::ostream & operator<<(std::ostream &os, const DenseRectangleList< N, T > &drl)
bool oor_possible
Definition channel.h:706
Memory ind_mem
Definition channel.h:701
ChannelCopyInfo(Memory _src_mem, Memory _dst_mem, Memory _ind_mem=Memory::NO_MEMORY, size_t _num_spaces=1, bool _is_scatter=false, bool _is_ranges=false, bool _is_direct=true, bool _oor_possible=false, size_t _addr_size=0)
Definition channel.h:685
size_t addr_size
Definition channel.h:707
size_t num_spaces
Definition channel.h:702
bool is_scatter
Definition channel.h:703
Memory src_mem
Definition channel.h:699
bool is_ranges
Definition channel.h:704
Memory dst_mem
Definition channel.h:700
bool is_direct
Definition channel.h:705
uint64_t mems[BITMASK_SIZE]
Definition channel.h:778
static const int BITMASK_SIZE
Definition channel.h:777
uint64_t ib_mems[BITMASK_SIZE]
Definition channel.h:778
NodeID node
Definition channel.h:776
SrcDstType src_type
Definition channel.h:774
MemBitmask src_bitmask
Definition channel.h:783
bool serdez_allowed
Definition channel.h:796
SupportedPath & set_max_dim(int src_dim, int dst_dim)
unsigned latency
Definition channel.h:792
SupportedPath * chain
Definition channel.h:812
MemBitmask dst_bitmask
Definition channel.h:788
SrcDstType
Definition channel.h:766
@ LOCAL_RDMA
Definition channel.h:770
@ LOCAL_KIND
Definition channel.h:768
@ SPECIFIC_MEMORY
Definition channel.h:767
@ MEMORY_BITMASK
Definition channel.h:772
@ GLOBAL_KIND
Definition channel.h:769
@ REMOTE_RDMA
Definition channel.h:771
unsigned frag_overhead
Definition channel.h:793
Memory src_mem
Definition channel.h:781
SrcDstType dst_type
Definition channel.h:774
unsigned char max_dst_dim
Definition channel.h:794
void populate_memory_bitmask(span< const Memory > mems, NodeID node, MemBitmask &bitmask)
Memory::Kind src_kind
Definition channel.h:782
Memory dst_mem
Definition channel.h:786
SupportedPath & set_max_dim(int src_and_dst_dim)
SupportedPath & allow_redops()
unsigned bandwidth
Definition channel.h:791
Memory::Kind dst_kind
Definition channel.h:787
unsigned char max_src_dim
Definition channel.h:794
bool redops_allowed
Definition channel.h:795
SupportedPath & allow_serdez()
XferDesKind xd_kind
Definition channel.h:790
SupportedPath()
Definition channel.h:799
Definition channel.h:1121
TransferOperation * op
Definition channel.h:1122
static void handle_message(NodeID sender, const NotifyXferDesCompleteMessage &args, const void *data, size_t datalen)
XferDesID xd_id
Definition channel.h:1123
static void send_request(NodeID target, TransferOperation *op, XferDesID xd_id)
static void handle_message(NodeID sender, const Write1DMessage &args, const void *data, size_t datalen)
int next_port_idx
Definition channel.h:619
XferDesID next_xd_guid
Definition channel.h:618
size_t span_start
Definition channel.h:620
static bool handle_inline(NodeID sender, const Write1DMessage &args, const void *data, size_t datalen, TimeLimit work_until)
static void handle_message(NodeID sender, const SimpleXferDesCreateMessage &args, const void *data, size_t datalen)
Definition channel.h:1249
int port_idx
Definition channel.h:1251
size_t span_size
Definition channel.h:1252
size_t span_start
Definition channel.h:1252
static void handle_message(NodeID sender, const UpdateBytesReadMessage &args, const void *data, size_t datalen)
XferDesID guid
Definition channel.h:1250
static void send_request(NodeID target, XferDesID guid, int port_idx, size_t span_start, size_t span_size)
Definition channel.h:1257
Definition channel.h:1220
size_t pre_bytes_total
Definition channel.h:1223
static void handle_message(NodeID sender, const UpdateBytesTotalMessage &args, const void *data, size_t datalen)
XferDesID guid
Definition channel.h:1221
int port_idx
Definition channel.h:1222
Definition channel.h:1229
size_t span_start
Definition channel.h:1232
int port_idx
Definition channel.h:1231
static void handle_message(NodeID sender, const UpdateBytesWriteMessage &args, const void *data, size_t datalen)
XferDesID guid
Definition channel.h:1230
static void send_request(NodeID target, XferDesID guid, int port_idx, size_t span_start, size_t span_size)
Definition channel.h:1237
size_t span_size
Definition channel.h:1232
uintptr_t channel
Definition channel.h:652
uintptr_t dma_op
Definition channel.h:649
XferDesID guid
Definition channel.h:650
NodeID launch_node
Definition channel.h:651
Definition channel.h:1208
static void send_request(NodeID target, XferDesID guid)
Definition channel.h:1212
static void handle_message(NodeID sender, const XferDesDestroyMessage &args, const void *data, size_t datalen)
XferDesID guid
Definition channel.h:1209
Memory mem
Definition channel.h:203
XferDesID peer_guid
Definition channel.h:200
CustomSerdezID serdez_id
Definition channel.h:207
RegionInstance inst
Definition channel.h:204
size_t ib_size
Definition channel.h:205
int port_type
Definition channel.h:199
int indirect_port_idx
Definition channel.h:202
int peer_port_idx
Definition channel.h:201
TransferIterator * iter
Definition channel.h:206
size_t ib_offset
Definition channel.h:205
@ DATA_PORT
Definition channel.h:195
@ SCATTER_CONTROL_PORT
Definition channel.h:197
@ GATHER_CONTROL_PORT
Definition channel.h:196
XferDesRedopInfo()
Definition channel.h:217
ReductionOpID id
Definition channel.h:211
XferDesRedopInfo(ReductionOpID _id, bool _is_fold, bool _in_place, bool _is_exclusive)
Definition channel.h:224
bool is_exclusive
Definition channel.h:214
bool is_fold
Definition channel.h:212
bool in_place
Definition channel.h:213
bool eos_received
Definition channel.h:326
size_t remaining_count
Definition channel.h:325
int current_io_port
Definition channel.h:324
ControlPort::Decoder decoder
Definition channel.h:322
int control_port_idx
Definition channel.h:323
char data[ALIGNED_FILL_STORAGE_SIZE]
Definition channel.h:350
AddressListCursor addrcursor
Definition channel.h:318
const CustomSerdezUntyped * serdez_op
Definition channel.h:303
size_t local_bytes_total
Definition channel.h:309
int peer_port_idx
Definition channel.h:305
bool is_indirect_port
Definition channel.h:307
atomic< size_t > local_bytes_cons
Definition channel.h:310
size_t ib_offset
Definition channel.h:316
TransferIterator * iter
Definition channel.h:302
atomic< bool > needs_pbt_update
Definition channel.h:308
atomic< size_t > remote_bytes_total
Definition channel.h:310
AddressList addrlist
Definition channel.h:317
size_t ib_size
Definition channel.h:316
MemoryImpl * mem
Definition channel.h:301
int indirect_port_idx
Definition channel.h:306
SequenceAssembler seq_local
Definition channel.h:311
XferDesID peer_guid
Definition channel.h:304
SequenceAssembler seq_remote
Definition channel.h:311
Memory ib_mem
Definition channel.h:315