18#ifndef LOWLEVEL_CHANNEL
19#define LOWLEVEL_CHANNEL
26#ifndef REALM_ON_WINDOWS
35#include <unordered_set>
52 class TransferIterator;
59#define REALM_XFERDES_KINDS(__op__) \
61 __op__(XFER_DISK_READ) \
62 __op__(XFER_DISK_WRITE) \
63 __op__(XFER_SSD_READ) \
64 __op__(XFER_SSD_WRITE) \
65 __op__(XFER_GPU_TO_FB) \
66 __op__(XFER_GPU_FROM_FB) \
67 __op__(XFER_GPU_IN_FB) \
68 __op__(XFER_GPU_PEER_FB) \
69 __op__(XFER_MEM_CPY) \
70 __op__(XFER_GASNET_READ) \
71 __op__(XFER_GASNET_WRITE) \
72 __op__(XFER_REMOTE_WRITE) \
73 __op__(XFER_HDF5_READ) \
74 __op__(XFER_HDF5_WRITE) \
75 __op__(XFER_FILE_READ) \
76 __op__(XFER_FILE_WRITE) \
77 __op__(XFER_ADDR_SPLIT) \
78 __op__(XFER_MEM_FILL) \
79 __op__(XFER_GPU_SC_IN_FB) \
80 __op__(XFER_GPU_SC_PEER_FB)
85#define C_ENUMS(name) name,
92#define STRING_KIND_CASE(kind) \
93 case XferDesKind::kind: \
98#undef STRING_KIND_CASE
99 return os <<
"UNKNOWN_KIND";
235 namespace ControlPort {
238 static const unsigned CTRL_LO_MORE = 0;
239 static const unsigned CTRL_LO_LAST = 1;
240 static const unsigned CTRL_MID = 2;
241 static const unsigned CTRL_HIGH = 3;
253 bool encode(
unsigned &data,
size_t count,
int port,
bool last);
277 bool decode(
unsigned data,
size_t &count,
int &port,
bool &last);
388 const std::vector<XferDesPortInfo> &inputs_info,
389 const std::vector<XferDesPortInfo> &outputs_info,
int _priority,
390 const void *_fill_data,
size_t fill_size);
477 virtual void print(std::ostream &os)
const;
488 template <
void (XferDes::*UPDATE)(
int port_
idx,
size_t offset,
size_t size)>
493 void add_span(
int port_idx,
size_t offset,
size_t size);
542 XferDesID _guid,
const std::vector<XferDesPortInfo> &inputs_info,
543 const std::vector<XferDesPortInfo> &outputs_info,
int _priority,
544 const void *_fill_data,
size_t _fill_size,
size_t _fill_total);
560 XferDesID _guid,
const std::vector<XferDesPortInfo> &inputs_info,
561 const std::vector<XferDesPortInfo> &outputs_info,
int _priority,
578 XferDesID _guid,
const std::vector<XferDesPortInfo> &inputs_info,
579 const std::vector<XferDesPortInfo> &outputs_info,
int _priority);
599 XferDesID _guid,
const std::vector<XferDesPortInfo> &inputs_info,
600 const std::vector<XferDesPortInfo> &outputs_info,
int _priority);
622 const void *data,
size_t datalen);
624 const void *data,
size_t datalen,
TimeLimit work_until);
639 const std::vector<XferDesPortInfo> &inputs_info,
640 const std::vector<XferDesPortInfo> &outputs_info,
642 const void *fill_data,
size_t fill_size,
643 size_t fill_total) = 0;
656 const void *data,
size_t datalen);
669 const std::vector<XferDesPortInfo> &inputs_info,
670 const std::vector<XferDesPortInfo> &outputs_info,
672 const void *fill_data,
size_t fill_size,
685 size_t _num_spaces = 1,
bool _is_scatter =
false,
686 bool _is_ranges =
false,
bool _is_direct =
true,
687 bool _oor_possible =
false,
size_t _addr_size = 0)
715 :
node(Network::my_node_id)
822 const std::vector<SupportedPath> &
get_paths(
void)
const;
832 size_t total_bytes,
const std::vector<size_t> *src_frags,
833 const std::vector<size_t> *dst_frags,
XferDesKind *kind_ret = 0,
834 unsigned *bw_ret = 0,
unsigned *lat_ret = 0);
856 unsigned bandwidth,
unsigned latency,
unsigned frag_overhead,
859 bool dst_global,
unsigned bandwidth,
unsigned latency,
863 unsigned latency,
unsigned frag_overhead,
866 bool dst_global,
unsigned bandwidth,
unsigned latency,
882 const std::vector<XferDesPortInfo> &inputs_info,
883 const std::vector<XferDesPortInfo> &outputs_info,
885 const void *fill_data,
size_t fill_size,
886 size_t fill_total) = 0;
901 template <
typename S>
905 template <
typename S>
911 const std::vector<Channel::SupportedPath> &_paths,
914 const std::vector<Channel::SupportedPath> &_paths);
918 template <
typename S>
921 template <
typename S>
934 std::vector<Channel::SupportedPath>
paths;
978 const std::vector<size_t> *src_frags,
979 const std::vector<size_t> *dst_frags,
981 unsigned *lat_ret = 0)
override;
999 template <
typename CHANNEL,
typename XD>
1017 template <
typename CHANNEL,
typename XD>
1021 const std::string &_name,
int _numa_domain = -1);
1055 const std::vector<XferDesPortInfo> &inputs_info,
1056 const std::vector<XferDesPortInfo> &outputs_info,
1058 const void *fill_data,
size_t fill_size,
1076 const std::vector<XferDesPortInfo> &inputs_info,
1077 const std::vector<XferDesPortInfo> &outputs_info,
1079 const void *fill_data,
size_t fill_size,
1096 const std::vector<XferDesPortInfo> &inputs_info,
1097 const std::vector<XferDesPortInfo> &outputs_info,
1099 const void *fill_data,
size_t fill_size,
1115 const std::vector<XferDesPortInfo> &inputs_info,
1116 const std::vector<XferDesPortInfo> &outputs_info,
1118 const void *fill_data,
size_t fill_size,
1130 const void *data,
size_t datalen);
1136 struct XferDesRemoteWriteMessage {
1141 size_t span_start, pre_bytes_total;
1143 static void handle_message(
NodeID sender,
1144 const XferDesRemoteWriteMessage &args,
1148 static void send_request(
NodeID target,
1150 const void *src_buf,
size_t nbytes,
1156 size_t pre_bytes_total)
1162 amsg->next_xd_guid = next_xd_guid;
1163 amsg->next_port_idx = next_port_idx;
1164 amsg->span_start = span_start;
1165 assert(span_size <= UINT_MAX);
1166 amsg->span_size = (unsigned)span_size;
1167 amsg->pre_bytes_total = pre_bytes_total;
1171 static void send_request(NodeID target,
1172 const RemoteAddress& dst_buf,
1173 const void *src_buf,
size_t nbytes, off_t src_str,
1174 size_t nlines, RemoteWriteRequest* req,
1175 XferDesID next_xd_guid,
1179 size_t pre_bytes_total)
1181 ActiveMessage<XferDesRemoteWriteMessage> amsg(target,
1186 amsg->next_xd_guid = next_xd_guid;
1187 amsg->next_port_idx = next_port_idx;
1188 amsg->span_start = span_start;
1189 assert(span_size <= UINT_MAX);
1190 amsg->span_size = (unsigned)span_size;
1191 amsg->pre_bytes_total = pre_bytes_total;
1196 struct XferDesRemoteWriteAckMessage {
1197 RemoteWriteRequest* req;
1199 static void handle_message(NodeID sender,
1200 const XferDesRemoteWriteAckMessage &args,
1203 static void send_request(NodeID target, RemoteWriteRequest* req)
1205 ActiveMessage<XferDesRemoteWriteAckMessage> amsg(target);
1215 const void *data,
size_t datalen);
1230 const void *data,
size_t datalen);
1239 const void *data,
size_t datalen);
1259 const void *data,
size_t datalen);
1360#include "realm/transfer/channel.inl"
#define REALM_XFERDES_KINDS(__op__)
Definition channel.h:59
Definition activemsg.h:65
Definition address_list.h:101
Definition address_list.h:56
virtual long progress_xd(XferDes *xd, long max_nr)
virtual Memory suggest_ib_memories_for_node(NodeID node) const
SupportedPath & add_path(span< const Memory > src_mems, span< const Memory > dst_mems, unsigned bandwidth, unsigned latency, unsigned frag_overhead, XferDesKind xd_kind)
virtual void shutdown()
Definition channel.h:721
virtual bool support_idindexed_fields(Memory src_mem, Memory dst_mem) const
Definition channel.h:741
NodeID node
Definition channel.h:729
SupportedPath & add_path(bool local_loopback, unsigned bandwidth, unsigned latency, unsigned frag_overhead, XferDesKind xd_kind)
SupportedPath & add_path(Memory::Kind src_kind, bool src_global, Memory::Kind dst_kind, bool dst_global, unsigned bandwidth, unsigned latency, unsigned frag_overhead, XferDesKind xd_kind)
SupportedPath & add_path(span< const Memory > src_mems, Memory::Kind dst_kind, bool dst_global, unsigned bandwidth, unsigned latency, unsigned frag_overhead, XferDesKind xd_kind)
virtual bool needs_wrapping_iterator() const
Definition channel.h:843
bool has_non_redop_path
Definition channel.h:737
virtual void enqueue_ready_xd(XferDes *xd)=0
virtual bool supports_indirection_memory(Memory mem) const
Queries if a given mem can be used as an indirection buffer.
const std::vector< SupportedPath > & get_paths(void) const
void print(std::ostream &os) const
std::vector< SupportedPath > paths
Definition channel.h:872
virtual uint64_t supports_path(ChannelCopyInfo channel_copy_info, CustomSerdezID src_serdez_id, CustomSerdezID dst_serdez_id, ReductionOpID redop_id, size_t total_bytes, const std::vector< size_t > *src_frags, const std::vector< size_t > *dst_frags, XferDesKind *kind_ret=0, unsigned *bw_ret=0, unsigned *lat_ret=0)
virtual ~Channel()
Definition channel.h:718
SupportedPath & add_path(Memory::Kind src_kind, bool src_global, span< const Memory > dst_mems, unsigned bandwidth, unsigned latency, unsigned frag_overhead, XferDesKind xd_kind)
XferDesKind kind
Definition channel.h:731
virtual XferDesFactory * get_factory()=0
virtual long submit(Request **requests, long nr)=0
virtual RemoteChannelInfo * construct_remote_info() const
Channel(XferDesKind _kind)
Definition channel.h:714
virtual Memory suggest_ib_memories() const
bool has_redop_path
Definition channel.h:734
virtual void wakeup_xd(XferDes *xd)=0
virtual bool supports_redop(ReductionOpID redop_id) const
virtual long available()=0
void update_channel_state(void)
unsigned short port_shift
Definition channel.h:281
size_t temp_count
Definition channel.h:280
bool decode(unsigned data, size_t &count, int &port, bool &last)
void set_port_count(size_t ports)
State
Definition channel.h:259
@ STATE_IDLE
Definition channel.h:262
@ STATE_SENT_HIGH
Definition channel.h:263
@ STATE_SENT_MID
Definition channel.h:264
@ STATE_INIT
Definition channel.h:260
@ STATE_DONE
Definition channel.h:265
@ STATE_HAVE_PORT_COUNT
Definition channel.h:261
unsigned char state
Definition channel.h:267
bool encode(unsigned &data, size_t count, int port, bool last)
unsigned short port_shift
Definition channel.h:256
Definition custom_serdez.h:150
Definition event_impl.h:49
Definition channel.h:1087
GASNetChannel(BackgroundWorkManager *bgwork, XferDesKind _kind)
static const bool is_ordered
Definition channel.h:1093
long submit(Request **requests, long nr)
virtual XferDes * create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)
void * mem_base
Definition channel.h:177
off_t gas_off
Definition channel.h:178
void notify_request_read_done(Request *req)
bool progress_xd(GASNetChannel *channel, TimeLimit work_until)
long get_requests(Request **requests, long nr)
GASNetXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int _priority)
void notify_request_write_done(Request *req)
~GASNetXferDes()
Definition channel.h:581
static const int MEMORY_INDEX_WIDTH
Definition id.h:58
Definition inst_layout.h:267
virtual XferDes * create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)=0
SimpleXferDesFactory factory_singleton
Definition channel.h:891
LocalChannel(XferDesKind _kind)
virtual XferDesFactory * get_factory()
Definition memcpy_channel.h:55
const void * src_base
Definition channel.h:170
void * dst_base
Definition channel.h:171
Definition channel.h:1045
bool is_stopped
Definition channel.h:1063
virtual XferDes * create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)
static const bool is_ordered
Definition channel.h:1050
virtual long submit(Request **requests, long nr)
MemfillChannel(BackgroundWorkManager *bgwork)
MemfillXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int _priority, const void *_fill_data, size_t _fill_size, size_t _fill_total)
virtual Request * dequeue_request()
bool progress_xd(MemfillChannel *channel, TimeLimit work_until)
long get_requests(Request **requests, long nr)
virtual bool request_available()
virtual void enqueue_request(Request *req)
Kind
Definition memory.h:59
static const Memory NO_MEMORY
Definition memory.h:49
Definition channel.h:1066
virtual bool supports_redop(ReductionOpID redop_id) const
static const bool is_ordered
Definition channel.h:1071
MemreduceChannel(BackgroundWorkManager *bgwork)
bool is_stopped
Definition channel.h:1084
virtual long submit(Request **requests, long nr)
virtual XferDes * create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)
MemreduceXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int _priority, XferDesRedopInfo _redop_info)
long get_requests(Request **requests, long nr)
const ReductionOpUntyped * redop
Definition channel.h:570
bool progress_xd(MemreduceChannel *channel, TimeLimit work_until)
XferDesRedopInfo redop_info
Definition channel.h:569
virtual RemoteChannel * create_remote_channel()=0
virtual ~RemoteChannelInfo()
Definition channel.h:897
static RemoteChannelInfo * deserialize_new(S &deserializer)
XferDesFactory * get_factory() override
long available() override
uint64_t supports_path(ChannelCopyInfo channel_copy_info, CustomSerdezID src_serdez_id, CustomSerdezID dst_serdez_id, ReductionOpID redop_id, size_t total_bytes, const std::vector< size_t > *src_frags, const std::vector< size_t > *dst_frags, XferDesKind *kind_ret=0, unsigned *bw_ret=0, unsigned *lat_ret=0) override
uintptr_t get_remote_ptr() const
SimpleXferDesFactory factory_singleton
Definition channel.h:995
const std::set< Memory > indirect_memories
Definition channel.h:996
long submit(Request **requests, long nr) override
std::unordered_set< ReductionOpID > supported_redops
Definition channel.h:994
RemoteChannel(uintptr_t _remote_ptr)
RWLock mutex
Definition channel.h:992
void register_redop(ReductionOpID redop_id)
void enqueue_ready_xd(XferDes *xd) override
Definition channel.h:988
void wakeup_xd(XferDes *xd) override
Definition channel.h:989
bool supports_indirection_memory(Memory mem) const override
Queries if a given mem can be used as an indirection buffer.
bool supports_redop(ReductionOpID redop_id) const override
uintptr_t remote_ptr
Definition channel.h:993
RemoteChannel(uintptr_t _remote_ptr, const std::vector< Memory > &indirect_memories)
Definition channel.h:1106
long submit(Request **requests, long nr)
static const bool is_ordered
Definition channel.h:1112
virtual XferDes * create_xfer_des(uintptr_t dma_op, NodeID launch_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)
RemoteWriteChannel(BackgroundWorkManager *bgwork)
const void * src_base
Definition channel.h:186
virtual void update_bytes_write(int port_idx, size_t offset, size_t size)
~RemoteWriteXferDes()
Definition channel.h:602
RemoteWriteXferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int _priority)
void notify_request_read_done(Request *req)
bool progress_xd(RemoteWriteChannel *channel, TimeLimit work_until)
long get_requests(Request **requests, long nr)
void notify_request_write_done(Request *req)
Dimension dim
Definition channel.h:127
off_t src_str
Definition channel.h:117
off_t dst_pstr
Definition channel.h:119
size_t write_seq_count
Definition channel.h:130
int src_port_idx
Definition channel.h:113
off_t src_off
Definition channel.h:115
off_t dst_off
Definition channel.h:115
off_t src_pstr
Definition channel.h:119
off_t dst_str
Definition channel.h:117
size_t nlines
Definition channel.h:121
Dimension
Definition channel.h:105
@ DIM_3D
Definition channel.h:108
@ DIM_1D
Definition channel.h:106
@ DIM_2D
Definition channel.h:107
bool is_write_done
Definition channel.h:125
size_t write_seq_pos
Definition channel.h:130
size_t nbytes
Definition channel.h:121
XferDes * xd
Definition channel.h:112
size_t read_seq_pos
Definition channel.h:129
int dst_port_idx
Definition channel.h:113
bool is_read_done
Definition channel.h:123
size_t nplanes
Definition channel.h:121
size_t read_seq_count
Definition channel.h:129
atomic< Mutex * > mutex
Definition channel.h:164
atomic< size_t > contig_amount_x2
Definition channel.h:161
std::map< size_t, size_t > spans
Definition channel.h:165
atomic< size_t > first_noncontig
Definition channel.h:163
size_t add_span(size_t pos, size_t count)
size_t span_exists(size_t start, size_t count)
SequenceAssembler(const SequenceAssembler ©_from)
void swap(SequenceAssembler &other)
Definition serialize.h:363
SimpleRemoteChannelInfo()
static Serialization::PolymorphicSerdezSubclass< RemoteChannelInfo, SimpleRemoteChannelInfo > serdez_subclass
Definition channel.h:929
std::vector< Channel::SupportedPath > paths
Definition channel.h:934
SimpleRemoteChannelInfo(NodeID _owner, XferDesKind _kind, uintptr_t _remote_ptr, const std::vector< Channel::SupportedPath > &_paths)
static RemoteChannelInfo * deserialize_new(S &deserializer)
uintptr_t remote_ptr
Definition channel.h:933
std::vector< Memory > indirect_memories
Definition channel.h:935
SimpleRemoteChannelInfo(NodeID _owner, XferDesKind _kind, uintptr_t _remote_ptr, const std::vector< Channel::SupportedPath > &_paths, const std::vector< Memory > &indirect_memories)
virtual RemoteChannel * create_remote_channel()
NodeID owner
Definition channel.h:931
bool serialize(S &serializer) const
XferDesKind kind
Definition channel.h:932
virtual void create_xfer_des(uintptr_t dma_op, NodeID launch_node, NodeID target_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)
SimpleXferDesFactory(uintptr_t _channel)
virtual bool needs_release()
uintptr_t channel
Definition channel.h:676
Definition channel.h:1018
void pull()
Definition channel.h:1029
SingleXDQChannel(BackgroundWorkManager *bgwork, XferDesKind _kind, const std::string &_name, int _numa_domain=-1)
virtual void wakeup_xd(XferDes *xd)
virtual long progress_xd(XferDes *xd, long max_nr)
Definition channel.h:1035
XDQueue< CHANNEL, XD > xdq
Definition channel.h:1042
long available()
Definition channel.h:1030
virtual void enqueue_ready_xd(XferDes *xd)
Definition transfer.h:663
Definition channel.h:1000
XDQueue(LocalChannel *_channel, const std::string &_name, bool _ordered)
Mutex mutex
Definition channel.h:1013
XferDes::XferDesList ready_xds
Definition channel.h:1014
void enqueue_xd(XD *xd, bool at_front=false)
LocalChannel * channel
Definition channel.h:1011
virtual bool do_work(TimeLimit work_until)
friend CHANNEL
Definition channel.h:1009
bool ordered_mode
Definition channel.h:1012
bool in_ordered_worker
Definition channel.h:1012
virtual ~XferDesFactory()
Definition channel.h:634
virtual bool needs_release()=0
virtual void create_xfer_des(uintptr_t dma_op, NodeID launch_node, NodeID target_node, XferDesID guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int priority, XferDesRedopInfo redop_info, const void *fill_data, size_t fill_size, size_t fill_total)=0
Definition channel.h:1275
XferDes * xd
Definition channel.h:1298
SequenceAssembler inline_pre_write[INLINE_PORTS]
Definition channel.h:1301
std::map< int, size_t > extra_bytes_total
Definition channel.h:1303
void add_update_pre_bytes_total_received(void)
void update_pre_bytes_write(int port_idx, size_t span_start, size_t span_size)
unsigned get_update_pre_bytes_total_received(void)
Mutex extra_mutex
Definition channel.h:1302
atomic< unsigned > refcount
Definition channel.h:1297
static const int INLINE_PORTS
Definition channel.h:1296
std::map< int, SequenceAssembler > extra_pre_write
Definition channel.h:1304
atomic< unsigned > nb_update_pre_bytes_total_calls_received
Definition channel.h:1299
void update_pre_bytes_total(int port_idx, size_t pre_bytes_total)
size_t inline_bytes_total[INLINE_PORTS]
Definition channel.h:1300
void set_real_xd(XferDes *_xd)
Definition channel.h:1307
atomic< XferDesID > next_to_assign_idx
Definition channel.h:1354
void update_next_bytes_read(XferDesID xd_guid, int port_idx, size_t span_start, size_t span_size)
static XferDesQueue * get_singleton()
bool enqueue_xferDes_local(XferDes *xd, bool add_to_queue=true)
Mutex queues_lock
Definition channel.h:1353
void update_pre_bytes_total(XferDesID xd_guid, int port_idx, size_t pre_bytes_total)
void destroy_xferDes(XferDesID guid)
void update_pre_bytes_write(XferDesID xd_guid, int port_idx, size_t span_start, size_t span_size)
Mutex guid_lock
Definition channel.h:1350
~XferDesQueue()
Definition channel.h:1321
@ NODE_BITS
Definition channel.h:1311
@ INDEX_BITS
Definition channel.h:1312
std::map< XferDesID, uintptr_t > guid_to_xd
Definition channel.h:1351
XferDesQueue()
Definition channel.h:1314
XferDesID get_guid(NodeID execution_node)
Definition channel.h:1325
XferDes * xd
Definition channel.h:482
virtual void event_triggered(bool poisoned, TimeLimit work_until)
XferDesQueue * xferDes_queue
Definition channel.h:481
virtual Event get_finish_event(void) const
virtual void print(std::ostream &os) const
void defer(XferDesQueue *_xferDes_queue, XferDes *_xd, Event wait_on)
size_t sizes[MAX_ENTRIES]
Definition channel.h:502
void add_span(int port_idx, size_t offset, size_t size)
int ports[MAX_ENTRIES]
Definition channel.h:500
static const size_t MAX_ENTRIES
Definition channel.h:497
size_t offsets[MAX_ENTRIES]
Definition channel.h:501
XferDes * xd
Definition channel.h:499
SequenceCache(XferDes *_xd, size_t _flush_bytes=0)
size_t flush_bytes
Definition channel.h:503
size_t total_bytes
Definition channel.h:503
REALM_PMTA_DEFN(XferDes, IntrusivePriorityListLink< XferDes >, xd_link)
Mutex update_write_lock
Definition channel.h:357
Channel * channel
Definition channel.h:342
void mark_completed(TimeLimit work_until)
Mutex available_req_mutex
Definition channel.h:383
virtual Request * dequeue_request()
Definition channel.h:444
atomic< int64_t > bytes_write_pending
Definition channel.h:294
REALM_PMTA_DEFN(XferDes, int, priority)
virtual void update_bytes_read(int port_idx, size_t offset, size_t size)
SequenceCache<&XferDes::update_bytes_write > WriteSequenceCache
Definition channel.h:506
void update_progress(void)
std::queue< Request * > available_reqs
Definition channel.h:384
uint64_t current_out_port_remain
Definition channel.h:298
void default_notify_request_read_done(Request *req)
long default_get_requests(Request **requests, long nr, unsigned flags=0)
void add_update_pre_bytes_total_received(void)
size_t update_control_info(ReadSequenceCache *rseqcache)
atomic< unsigned > progress_counter
Definition channel.h:364
IntrusivePriorityListLink< XferDes > xd_link
Definition channel.h:373
atomic< bool > iteration_completed
Definition channel.h:293
AlignedStorage inline_fill_storage
Definition channel.h:352
void * fill_data
Definition channel.h:344
virtual void notify_request_write_done(Request *req)
uint64_t max_req_size
Definition channel.h:329
SequenceCache<&XferDes::update_bytes_read > ReadSequenceCache
Definition channel.h:505
size_t fill_size
Definition channel.h:345
std::vector< XferPort > output_ports
Definition channel.h:319
static const size_t ALIGNED_FILL_STORAGE_SIZE
Definition channel.h:347
void update_next_bytes_read(int port_idx, size_t offset, size_t size)
virtual bool request_available()
Definition channel.h:438
ControlPortState output_control
Definition channel.h:327
NodeID launch_node
Definition channel.h:291
bool record_address_consumption(size_t total_read_bytes, size_t total_write_bytes)
virtual Event request_metadata()
void update_pre_bytes_total(int port_idx, size_t pre_bytes_total)
bool check_for_progress(unsigned last_counter)
REALM_ALIGNED_TYPE_CONST(AlignedStorage, UnalignedStorage, 16)
unsigned current_progress(void)
ControlPortState input_control
Definition channel.h:327
Mutex xd_lock
Definition channel.h:357
uint64_t current_in_port_remain
Definition channel.h:298
Mutex update_read_lock
Definition channel.h:357
atomic< bool > transfer_completed
Definition channel.h:295
size_t get_addresses(size_t min_xfer_size, ReadSequenceCache *rseqcache)
XferDesQueue * xferDes_queue
Definition channel.h:289
int priority
Definition channel.h:331
XferDes(uintptr_t _dma_op, Channel *_channel, NodeID _launch_node, XferDesID _guid, const std::vector< XferDesPortInfo > &inputs_info, const std::vector< XferDesPortInfo > &outputs_info, int _priority, const void *_fill_data, size_t fill_size)
void replicate_fill_data(size_t new_size)
DeferredXDEnqueue deferred_enqueue
Definition channel.h:484
virtual void update_bytes_write(int port_idx, size_t offset, size_t size)
virtual long get_requests(Request **requests, long nr)=0
atomic< unsigned > reference_count
Definition channel.h:366
uint64_t current_out_port_mask
Definition channel.h:297
size_t get_addresses(size_t min_xfer_size, ReadSequenceCache *rseqcache, const InstanceLayoutPieceBase *&in_nonaffine, const InstanceLayoutPieceBase *&out_nonaffine)
@ XFERDES_NO_GUID
Definition channel.h:339
virtual void notify_request_read_done(Request *req)
void default_notify_request_write_done(Request *req)
atomic< unsigned > nb_update_pre_bytes_total_calls_received
Definition channel.h:370
IntrusivePriorityList< XferDes, int, REALM_PMTA_USE(XferDes, xd_link), REALM_PMTA_USE(XferDes, priority), DummyLock > XferDesList
Definition channel.h:378
void update_pre_bytes_write(int port_idx, size_t offset, size_t size)
uint64_t current_in_port_mask
Definition channel.h:297
XferDesKind kind
Definition channel.h:336
unsigned nb_update_pre_bytes_total_calls_expected
Definition channel.h:368
XferDesID guid
Definition channel.h:334
void remove_reference(void)
uintptr_t dma_op
Definition channel.h:288
virtual void enqueue_request(Request *req)
Definition channel.h:459
size_t orig_fill_size
Definition channel.h:345
std::vector< XferPort > input_ports
Definition channel.h:319
#define REALM_INTERNAL_API_EXTERNAL_LINKAGE
Definition compiler_support.h:218
#define REALM_PMTA_USE(structtype, name)
Definition lists.h:42
#define STRING_KIND_CASE(kind, desc)
#define C_ENUMS(name, desc)
Definition memory.h:60
Definition activemsg.h:38
int NodeID
Definition nodeset.h:40
XferDesKind
Definition channel.h:84
int CustomSerdezID
Definition custom_serdez.h:148
bool serialize(S &serdez, const ByteArrayRef &a)
void destroy_xfer_des(XferDesID _guid)
REALM_CUDA_HD bool operator==(const Point< N, T > &lhs, const Point< N, T2 > &rhs)
unsigned long long XferDesID
Definition channel.h:56
::realm_reduction_op_id_t ReductionOpID
Definition event.h:38
std::ostream & operator<<(std::ostream &os, const DenseRectangleList< N, T > &drl)
bool oor_possible
Definition channel.h:705
Memory ind_mem
Definition channel.h:700
ChannelCopyInfo(Memory _src_mem, Memory _dst_mem, Memory _ind_mem=Memory::NO_MEMORY, size_t _num_spaces=1, bool _is_scatter=false, bool _is_ranges=false, bool _is_direct=true, bool _oor_possible=false, size_t _addr_size=0)
Definition channel.h:684
size_t addr_size
Definition channel.h:706
size_t num_spaces
Definition channel.h:701
bool is_scatter
Definition channel.h:702
Memory src_mem
Definition channel.h:698
bool is_ranges
Definition channel.h:703
Memory dst_mem
Definition channel.h:699
bool is_direct
Definition channel.h:704
uint64_t mems[BITMASK_SIZE]
Definition channel.h:782
static const int BITMASK_SIZE
Definition channel.h:781
uint64_t ib_mems[BITMASK_SIZE]
Definition channel.h:782
NodeID node
Definition channel.h:780
SrcDstType src_type
Definition channel.h:778
MemBitmask src_bitmask
Definition channel.h:787
bool serdez_allowed
Definition channel.h:800
SupportedPath & set_max_dim(int src_dim, int dst_dim)
unsigned latency
Definition channel.h:796
SupportedPath * chain
Definition channel.h:816
MemBitmask dst_bitmask
Definition channel.h:792
SrcDstType
Definition channel.h:770
@ LOCAL_RDMA
Definition channel.h:774
@ LOCAL_KIND
Definition channel.h:772
@ SPECIFIC_MEMORY
Definition channel.h:771
@ MEMORY_BITMASK
Definition channel.h:776
@ GLOBAL_KIND
Definition channel.h:773
@ REMOTE_RDMA
Definition channel.h:775
unsigned frag_overhead
Definition channel.h:797
Memory src_mem
Definition channel.h:785
SrcDstType dst_type
Definition channel.h:778
unsigned char max_dst_dim
Definition channel.h:798
void populate_memory_bitmask(span< const Memory > mems, NodeID node, MemBitmask &bitmask)
Memory::Kind src_kind
Definition channel.h:786
Memory dst_mem
Definition channel.h:790
SupportedPath & set_max_dim(int src_and_dst_dim)
SupportedPath & allow_redops()
unsigned bandwidth
Definition channel.h:795
Memory::Kind dst_kind
Definition channel.h:791
unsigned char max_src_dim
Definition channel.h:798
bool redops_allowed
Definition channel.h:799
SupportedPath & allow_serdez()
XferDesKind xd_kind
Definition channel.h:794
SupportedPath()
Definition channel.h:803
Definition channel.h:1125
TransferOperation * op
Definition channel.h:1126
static void handle_message(NodeID sender, const NotifyXferDesCompleteMessage &args, const void *data, size_t datalen)
XferDesID xd_id
Definition channel.h:1127
static void send_request(NodeID target, TransferOperation *op, XferDesID xd_id)
static void handle_message(NodeID sender, const Write1DMessage &args, const void *data, size_t datalen)
int next_port_idx
Definition channel.h:618
XferDesID next_xd_guid
Definition channel.h:617
size_t span_start
Definition channel.h:619
static bool handle_inline(NodeID sender, const Write1DMessage &args, const void *data, size_t datalen, TimeLimit work_until)
static void handle_message(NodeID sender, const SimpleXferDesCreateMessage &args, const void *data, size_t datalen)
Definition channel.h:1253
int port_idx
Definition channel.h:1255
size_t span_size
Definition channel.h:1256
size_t span_start
Definition channel.h:1256
static void handle_message(NodeID sender, const UpdateBytesReadMessage &args, const void *data, size_t datalen)
XferDesID guid
Definition channel.h:1254
static void send_request(NodeID target, XferDesID guid, int port_idx, size_t span_start, size_t span_size)
Definition channel.h:1261
Definition channel.h:1224
size_t pre_bytes_total
Definition channel.h:1227
static void handle_message(NodeID sender, const UpdateBytesTotalMessage &args, const void *data, size_t datalen)
XferDesID guid
Definition channel.h:1225
int port_idx
Definition channel.h:1226
Definition channel.h:1233
size_t span_start
Definition channel.h:1236
int port_idx
Definition channel.h:1235
static void handle_message(NodeID sender, const UpdateBytesWriteMessage &args, const void *data, size_t datalen)
XferDesID guid
Definition channel.h:1234
static void send_request(NodeID target, XferDesID guid, int port_idx, size_t span_start, size_t span_size)
Definition channel.h:1241
size_t span_size
Definition channel.h:1236
uintptr_t channel
Definition channel.h:651
uintptr_t dma_op
Definition channel.h:648
XferDesID guid
Definition channel.h:649
NodeID launch_node
Definition channel.h:650
Definition channel.h:1212
static void send_request(NodeID target, XferDesID guid)
Definition channel.h:1216
static void handle_message(NodeID sender, const XferDesDestroyMessage &args, const void *data, size_t datalen)
XferDesID guid
Definition channel.h:1213
Memory mem
Definition channel.h:202
XferDesID peer_guid
Definition channel.h:199
CustomSerdezID serdez_id
Definition channel.h:206
RegionInstance inst
Definition channel.h:203
size_t ib_size
Definition channel.h:204
int port_type
Definition channel.h:198
@ DATA_PORT
Definition channel.h:194
@ SCATTER_CONTROL_PORT
Definition channel.h:196
@ GATHER_CONTROL_PORT
Definition channel.h:195
int indirect_port_idx
Definition channel.h:201
int peer_port_idx
Definition channel.h:200
TransferIterator * iter
Definition channel.h:205
size_t ib_offset
Definition channel.h:204
XferDesRedopInfo()
Definition channel.h:216
ReductionOpID id
Definition channel.h:210
XferDesRedopInfo(ReductionOpID _id, bool _is_fold, bool _in_place, bool _is_exclusive)
Definition channel.h:223
bool is_exclusive
Definition channel.h:213
bool is_fold
Definition channel.h:211
bool in_place
Definition channel.h:212
bool eos_received
Definition channel.h:325
size_t remaining_count
Definition channel.h:324
int current_io_port
Definition channel.h:323
ControlPort::Decoder decoder
Definition channel.h:321
int control_port_idx
Definition channel.h:322
char data[ALIGNED_FILL_STORAGE_SIZE]
Definition channel.h:349
AddressListCursor addrcursor
Definition channel.h:317
const CustomSerdezUntyped * serdez_op
Definition channel.h:302
size_t local_bytes_total
Definition channel.h:308
int peer_port_idx
Definition channel.h:304
bool is_indirect_port
Definition channel.h:306
atomic< size_t > local_bytes_cons
Definition channel.h:309
size_t ib_offset
Definition channel.h:315
TransferIterator * iter
Definition channel.h:301
atomic< bool > needs_pbt_update
Definition channel.h:307
atomic< size_t > remote_bytes_total
Definition channel.h:309
AddressList addrlist
Definition channel.h:316
size_t ib_size
Definition channel.h:315
MemoryImpl * mem
Definition channel.h:300
int indirect_port_idx
Definition channel.h:305
SequenceAssembler seq_local
Definition channel.h:310
XferDesID peer_guid
Definition channel.h:303
SequenceAssembler seq_remote
Definition channel.h:310
Memory ib_mem
Definition channel.h:314