62 class ActiveMessageImpl;
64 template <
typename T,
size_t INLINE_STORAGE = 256>
87 size_t _lines,
size_t _line_stride);
89 size_t _bytes_per_line,
size_t _lines,
size_t _line_stride,
92 size_t _bytes_per_line,
size_t _lines,
size_t _line_stride);
101 void init(
NodeID _target,
const void *_data,
size_t _datalen);
105 void init(
NodeID _target,
const void *_data,
size_t _bytes_per_line,
size_t _lines,
106 size_t _line_stride);
108 size_t _bytes_per_line,
size_t _lines,
size_t _line_stride,
111 size_t _lines,
size_t _line_stride);
124 bool with_congestion);
126 size_t bytes_per_line,
size_t lines,
127 size_t line_stride,
bool with_congestion);
129 size_t bytes_per_line,
size_t lines,
130 size_t line_stride,
bool with_congestion);
133 size_t bytes_per_line,
size_t lines,
size_t line_stride,
134 const RemoteAddress &dest_payload_addr,
bool with_congestion);
142 template <
typename T2>
146 void add_payload(
const void *data,
size_t bytes_per_line,
size_t lines,
159 template <
typename CALLABLE>
161 template <
typename CALLABLE>
180 virtual size_t size()
const = 0;
183 static const size_t ALIGNMENT = 8;
191 template <
typename CALLABLE>
214 virtual void commit(
size_t act_payload_size) = 0;
228 void record(
long long t_start,
long long t_end);
245 size_t payload_size,
TimeLimit work_until);
247 const void *payload,
size_t payload_size);
249 const void *payload,
size_t payload_size,
252 template <
typename T>
302 template <
typename T,
typename T2 = T>
339 namespace ThreadLocal {
359 const void *hdr,
size_t hdr_size,
int hdr_mode,
360 const void *payload,
size_t payload_size,
int payload_mode,
361 CallbackFnptr callback_fnptr,
CallbackData callback_data1,
436 std::size_t
operator()(
const std::pair<NodeID, uint64_t> &p)
const
438 return std::hash<NodeID>()(p.first) ^ (std::hash<uint64_t>()(p.second) << 1);
442 std::unordered_map<std::pair<NodeID, uint64_t>, std::unique_ptr<FragmentedMessage>,
447 template <
typename UserHdr>
458 template <
typename Hdr>
502 template <
typename UserHdr,
516 uint64_t next_message_id(
NodeID node_id);
519 size_t max_payload_size_;
520 std::vector<char> payload_;
521 UserHdr user_header_{};
524 template <
typename UserHdr>
531#include "realm/activemsg.inl"
Definition activemsg.h:504
ActiveMessageAuto(NodeID target, size_t max_payload_size)
void add_payload(const void *data, size_t size)
Definition activemsg.h:286
const char * name
Definition activemsg.h:296
virtual ActiveMessageHandlerTable::MessageHandler get_handler(void) const =0
virtual ~ActiveMessageHandlerRegBase(void)
Definition activemsg.h:288
ActiveMessageHandlerRegBase * next_handler
Definition activemsg.h:298
virtual ActiveMessageHandlerTable::MessageHandlerNoTimeout get_handler_notimeout(void) const =0
std::optional< const FragmentInfo &(*)(const void *)> extract_frag_info
Definition activemsg.h:299
bool must_free
Definition activemsg.h:297
ActiveMessageHandlerTable::TypeHash hash
Definition activemsg.h:295
virtual ActiveMessageHandlerTable::MessageHandlerInline get_handler_inline(void) const =0
Definition activemsg.h:303
void force_instantiation(void)
Definition activemsg.h:336
~ActiveMessageHandlerReg(void)
virtual ActiveMessageHandlerTable::MessageHandlerInline get_handler_inline(void) const
virtual ActiveMessageHandlerTable::MessageHandler get_handler(void) const
virtual ActiveMessageHandlerTable::MessageHandlerNoTimeout get_handler_notimeout(void) const
ActiveMessageHandlerReg(void)
Definition activemsg.h:238
const char * lookup_message_name(MessageID id)
bool(* MessageHandlerInline)(NodeID sender, const void *header, const void *payload, size_t payload_size, TimeLimit work_until)
Definition activemsg.h:248
unsigned short MessageID
Definition activemsg.h:243
MessageID lookup_message_id(void) const
void report_message_handler_stats()
void(* MessageHandlerNoTimeout)(NodeID sender, const void *header, const void *payload, size_t payload_size)
Definition activemsg.h:246
ActiveMessageHandlerTable(void)
~ActiveMessageHandlerTable(void)
void construct_handler_table(void)
void record_message_handler_call(MessageID id, long long t_start, long long t_end)
unsigned TypeHash
Definition activemsg.h:263
void(* MessageHandler)(NodeID sender, const void *header, const void *payload, size_t payload_size, TimeLimit work_until)
Definition activemsg.h:244
static void append_handler_reg(ActiveMessageHandlerRegBase *new_reg)
HandlerEntry * lookup_message_handler(MessageID id)
static ActiveMessageHandlerRegBase * pending_handlers
Definition activemsg.h:279
std::vector< HandlerEntry > handlers
Definition activemsg.h:281
Definition activemsg.h:205
virtual void commit(size_t act_payload_size)=0
virtual void * add_local_completion(size_t size)=0
void * header_base
Definition activemsg.h:217
virtual void * add_remote_completion(size_t size)=0
size_t payload_size
Definition activemsg.h:219
void * payload_base
Definition activemsg.h:218
virtual ~ActiveMessageImpl()
Definition activemsg.h:207
Definition activemsg.h:65
Realm::Serialization::FixedBufferSerializer fbs
Definition activemsg.h:171
void init(NodeID _target, const LocalAddress &_src_payload_addr, size_t _bytes_per_line, size_t _lines, size_t _line_stride, const RemoteAddress &_dest_payload_addr)
static size_t recommended_max_payload(NodeID target, const void *data, size_t bytes_per_line, size_t lines, size_t line_stride, bool with_congestion)
ActiveMessage(const Realm::NodeSet &_targets, const void *_data, size_t _datalen)
ActiveMessageImpl * impl
Definition activemsg.h:169
ActiveMessage(NodeID _target, size_t _max_payload_size=0)
void init(NodeID _target, const LocalAddress &_src_payload_addr, size_t _datalen, const RemoteAddress &_dest_payload_addr)
uint64_t inline_capacity[INLINE_STORAGE/sizeof(uint64_t)]
Definition activemsg.h:172
void * payload_ptr(size_t datalen)
void init(NodeID _target, const void *_data, size_t _datalen)
void add_payload(const void *data, size_t bytes_per_line, size_t lines, size_t line_stride, int payload_mode=PAYLOAD_COPY)
void init(NodeID _target, size_t _max_payload_size=0)
ActiveMessage(const Realm::NodeSet &_targets, size_t _max_payload_size=0)
T * header
Definition activemsg.h:170
ActiveMessage(NodeID _target, const void *_data, size_t _datalen)
void init(const Realm::NodeSet &_targets, const void *_data, size_t _datalen)
bool operator<<(const T2 &to_append)
void add_payload(const void *data, size_t datalen, int payload_mode=PAYLOAD_COPY)
static size_t recommended_max_payload(NodeID target, bool with_congestion)
static size_t recommended_max_payload(NodeID target, const RemoteAddress &dest_payload_addr, bool with_congestion)
ActiveMessage(NodeID _target, const void *_data, size_t _bytes_per_line, size_t _lines, size_t _line_stride)
void init(const Realm::NodeSet &_targets, const void *_data, size_t _bytes_per_line, size_t _lines, size_t _line_stride)
static size_t recommended_max_payload(const NodeSet &targets, const void *data, size_t bytes_per_line, size_t lines, size_t line_stride, bool with_congestion)
ActiveMessage(NodeID _target, const LocalAddress &_src_payload_addr, size_t _bytes_per_line, size_t _lines, size_t _line_stride, const RemoteAddress &_dest_payload_addr)
static size_t recommended_max_payload(const NodeSet &targets, bool with_congestion)
void init(NodeID _target, const void *_data, size_t _bytes_per_line, size_t _lines, size_t _line_stride)
void add_local_completion(const CALLABLE &callable)
ActiveMessage(const Realm::NodeSet &_targets, const void *_data, size_t _bytes_per_line, size_t _lines, size_t _line_stride)
void add_remote_completion(const CALLABLE &callable)
static size_t recommended_max_payload(NodeID target, const LocalAddress &src_payload_addr, size_t bytes_per_line, size_t lines, size_t line_stride, const RemoteAddress &dest_payload_addr, bool with_congestion)
void init(const Realm::NodeSet &_targets, size_t _max_payload_size=0)
void init(NodeID _target, size_t _max_payload_size, const RemoteAddress &_dest_payload_addr)
ActiveMessage(NodeID _target, size_t _max_payload_size, const RemoteAddress &_dest_payload_addr)
ActiveMessage(NodeID _target, const LocalAddress &_src_payload_addr, size_t _datalen, const RemoteAddress &_dest_payload_addr)
Definition activemsg.h:176
virtual ~CompletionCallbackBase()
static void destroy_all(void *start, size_t bytes)
virtual size_t size() const =0
static void clone_all(void *dst, const void *src, size_t bytes)
static void invoke_all(void *start, size_t bytes)
virtual CompletionCallbackBase * clone_at(void *p) const =0
Definition activemsg.h:192
virtual CompletionCallbackBase * clone_at(void *p) const
CALLABLE callable
Definition activemsg.h:200
CompletionCallback(const CALLABLE &_callable)
virtual size_t size() const
Definition activemsg.h:345
size_t total_messages_handled
Definition activemsg.h:425
void drain_incoming_messages(size_t min_messages_handled)
int todo_newest
Definition activemsg.h:421
size_t drain_min_count
Definition activemsg.h:424
Message ** heads
Definition activemsg.h:417
bool return_messages(int sender, size_t num_handled, Message *head, Message **tail)
MessageBlock * current_block
Definition activemsg.h:430
int handlers_active
Definition activemsg.h:422
uintptr_t CallbackData
Definition activemsg.h:351
atomic< bool > bgwork_requested
Definition activemsg.h:415
std::vector< Thread * > handler_threads
Definition activemsg.h:429
int dedicated_threads
Definition activemsg.h:414
std::unordered_map< std::pair< NodeID, uint64_t >, std::unique_ptr< FragmentedMessage >, PairHash > frag_message
Definition activemsg.h:444
Mutex::CondVar condvar
Definition activemsg.h:427
Message *** tails
Definition activemsg.h:418
CoreReservation * core_rsrv
Definition activemsg.h:428
int get_messages(Message *&head, Message **&tail, bool wait)
bool add_incoming_message(NodeID sender, ActiveMessageHandlerTable::MessageID msgid, const void *hdr, size_t hdr_size, int hdr_mode, const void *payload, size_t payload_size, int payload_mode, CallbackFnptr callback_fnptr, CallbackData callback_data1, CallbackData callback_data2, TimeLimit work_until)
void start_handler_threads(size_t stack_size)
bool * in_handler
Definition activemsg.h:419
~IncomingMessageManager(void)
IncomingMessageManager(int _nodes, int _dedicated_threads, Realm::CoreReservationSet &crs)
size_t num_available_blocks
Definition activemsg.h:432
Mutex mutex
Definition activemsg.h:426
int shutdown_flag
Definition activemsg.h:416
int * todo_list
Definition activemsg.h:420
MessageBlock * available_blocks
Definition activemsg.h:431
bool drain_pending
Definition activemsg.h:423
virtual bool do_work(TimeLimit work_until)
size_t cfg_max_available_blocks
Definition activemsg.h:433
void handler_thread_loop(void)
Definition serialize.h:118
#define REALM_INTERNAL_API_EXTERNAL_LINKAGE
Definition compiler_support.h:218
Helper utility for reconstructing large ActiveMessages that were split into multiple network packets.
bool profile_activemsg_handlers
long long max_inline_message_time
thread_local bool in_message_handler
Definition activemsg.h:38
ActiveMessageHandlerTable activemsg_handler_table
int NodeID
Definition nodeset.h:40
@ PAYLOAD_EMPTY
Definition activemsg.h:59
@ PAYLOAD_KEEPREG
Definition activemsg.h:58
@ PAYLOAD_PENDING
Definition activemsg.h:57
@ PAYLOAD_SRCPTR
Definition activemsg.h:56
@ PAYLOAD_NONE
Definition activemsg.h:52
@ PAYLOAD_COPY
Definition activemsg.h:55
@ PAYLOAD_FREE
Definition activemsg.h:54
@ PAYLOAD_KEEP
Definition activemsg.h:53
Definition activemsg.h:224
void record(long long t_start, long long t_end)
atomic< size_t > count
Definition activemsg.h:225
atomic< size_t > minval
Definition activemsg.h:225
ActiveMessageHandlerStats(void)
atomic< size_t > maxval
Definition activemsg.h:225
atomic< size_t > sum
Definition activemsg.h:225
atomic< size_t > sum2
Definition activemsg.h:225
Definition activemsg.h:265
MessageHandlerNoTimeout handler_notimeout
Definition activemsg.h:269
std::optional< const FragmentInfo &(*)(const void *)> extract_frag_info
Definition activemsg.h:273
const char * name
Definition activemsg.h:267
ActiveMessageHandlerStats stats
Definition activemsg.h:271
TypeHash hash
Definition activemsg.h:266
MessageHandlerInline handler_inline
Definition activemsg.h:270
MessageHandler handler
Definition activemsg.h:268
Definition activemsg.h:525
ActiveMessageHandlerReg< WrappedWithFragInfo< UserHdr >, UserHdr > reg
Definition activemsg.h:526
Definition activemsg.h:231
uint32_t total_chunks
Definition activemsg.h:233
uint64_t msg_id
Definition activemsg.h:234
uint32_t chunk_id
Definition activemsg.h:232
Definition activemsg.h:394
static MessageBlock * new_block(size_t _total_size)
static void free_block(MessageBlock *block)
void recycle_message(Message *msg, IncomingMessageManager *manager)
Message * append_message(size_t hdr_bytes_needed, size_t payload_bytes_needed)
atomic< unsigned > use_count
Definition activemsg.h:407
size_t size_used
Definition activemsg.h:406
MessageBlock * next_free
Definition activemsg.h:408
Definition activemsg.h:379
CallbackFnptr callback_fnptr
Definition activemsg.h:390
CallbackData callback_data1
Definition activemsg.h:391
void * payload
Definition activemsg.h:387
bool hdr_needs_free
Definition activemsg.h:386
void * hdr
Definition activemsg.h:384
MessageBlock * block
Definition activemsg.h:380
size_t payload_size
Definition activemsg.h:388
Message * next_msg
Definition activemsg.h:381
ActiveMessageHandlerTable::HandlerEntry * handler
Definition activemsg.h:383
size_t hdr_size
Definition activemsg.h:385
NodeID sender
Definition activemsg.h:382
bool payload_needs_free
Definition activemsg.h:389
Definition activemsg.h:435
std::size_t operator()(const std::pair< NodeID, uint64_t > &p) const
Definition activemsg.h:436
Definition activemsg.h:448
UserHdr * operator->()
Definition activemsg.h:452
UserHdr & operator*()
Definition activemsg.h:454
const UserHdr * operator->() const
Definition activemsg.h:453
FragmentInfo frag_info
Definition activemsg.h:449
UserHdr user
Definition activemsg.h:450
const UserHdr & operator*() const
Definition activemsg.h:455
NodeID src
Definition ucp_internal.h:1
unsigned short msgid
Definition ucp_internal.h:2