Realm
A distributed, event-based tasking library
Loading...
Searching...
No Matches
activemsg.h
Go to the documentation of this file.
1/*
2 * Copyright 2025 Stanford University, NVIDIA Corporation
3 * SPDX-License-Identifier: Apache-2.0
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18// hopefully a more user-friendly C++ template wrapper for GASNet active
19// messages...
20
21#ifndef ACTIVEMSG_H
22#define ACTIVEMSG_H
23
24#include "realm/realm_config.h"
26#include "realm/mutex.h"
27#include "realm/serialize.h"
28#include "realm/nodeset.h"
29#include "realm/network.h"
30#include "realm/atomics.h"
31#include "realm/threads.h"
32#include "realm/bgwork.h"
33#include <type_traits>
34#include <mutex>
35
36#include <optional>
37
38namespace Realm {
39
40 namespace Config {
41 // if true, the number and min/max/avg/stddev duration of handler per
42 // message type is recorded and printed
44
45 // the maximum time we're willing to spend on inline message
46 // handlers
47 extern long long max_inline_message_time;
48 }; // namespace Config
49
50 enum
51 {
52 PAYLOAD_NONE, // no payload in packet
53 PAYLOAD_KEEP, // use payload pointer, guaranteed to be stable
54 PAYLOAD_FREE, // take ownership of payload, free when done
55 PAYLOAD_COPY, // make a copy of the payload
56 PAYLOAD_SRCPTR, // payload has been copied to the src data pool
57 PAYLOAD_PENDING, // payload needs to be copied, but hasn't yet
58 PAYLOAD_KEEPREG, // use payload pointer, AND it's registered!
59 PAYLOAD_EMPTY, // message can have payload, but this one is 0 bytes
60 };
61
62 class ActiveMessageImpl;
63
64 template <typename T, size_t INLINE_STORAGE = 256>
66 public:
67 // constructs an INACTIVE message object - call init(...) as needed
69
70 // construct a new active message for either a single recipient or a mask
71 // of recipients
72 // in addition to the header struct (T), a message can include a variable
73 // payload which can be delivered to a particular destination address
74 ActiveMessage(NodeID _target, size_t _max_payload_size = 0);
75 ActiveMessage(NodeID _target, size_t _max_payload_size,
76 const RemoteAddress &_dest_payload_addr);
77 ActiveMessage(const Realm::NodeSet &_targets, size_t _max_payload_size = 0);
78
79 // providing the payload (as a 1D or 2D reference, which must be PAYLOAD_KEEP)
80 // up front can avoid a copy if the source location is directly accessible
81 // by the networking hardware
82 ActiveMessage(NodeID _target, const void *_data, size_t _datalen);
83 ActiveMessage(NodeID _target, const LocalAddress &_src_payload_addr, size_t _datalen,
84 const RemoteAddress &_dest_payload_addr);
85 ActiveMessage(const Realm::NodeSet &_targets, const void *_data, size_t _datalen);
86 ActiveMessage(NodeID _target, const void *_data, size_t _bytes_per_line,
87 size_t _lines, size_t _line_stride);
88 ActiveMessage(NodeID _target, const LocalAddress &_src_payload_addr,
89 size_t _bytes_per_line, size_t _lines, size_t _line_stride,
90 const RemoteAddress &_dest_payload_addr);
91 ActiveMessage(const Realm::NodeSet &_targets, const void *_data,
92 size_t _bytes_per_line, size_t _lines, size_t _line_stride);
93
95
96 // a version of `init` for each constructor above
97 void init(NodeID _target, size_t _max_payload_size = 0);
98 void init(NodeID _target, size_t _max_payload_size,
99 const RemoteAddress &_dest_payload_addr);
100 void init(const Realm::NodeSet &_targets, size_t _max_payload_size = 0);
101 void init(NodeID _target, const void *_data, size_t _datalen);
102 void init(NodeID _target, const LocalAddress &_src_payload_addr, size_t _datalen,
103 const RemoteAddress &_dest_payload_addr);
104 void init(const Realm::NodeSet &_targets, const void *_data, size_t _datalen);
105 void init(NodeID _target, const void *_data, size_t _bytes_per_line, size_t _lines,
106 size_t _line_stride);
107 void init(NodeID _target, const LocalAddress &_src_payload_addr,
108 size_t _bytes_per_line, size_t _lines, size_t _line_stride,
109 const RemoteAddress &_dest_payload_addr);
110 void init(const Realm::NodeSet &_targets, const void *_data, size_t _bytes_per_line,
111 size_t _lines, size_t _line_stride);
112
113 // large messages may need to be fragmented, so use cases that can
114 // handle the fragmentation at a higher level may want to know the
115 // largest size that is fragmentation-free - the answer can depend
116 // on whether the data is to be delivered to a known RemoteAddress
117 // and/or whether the source data location is known
118 // a call that sets `with_congestion` may get a smaller value (maybe
119 // even 0) if the path to the named target(s) is getting full
120 static size_t recommended_max_payload(NodeID target, bool with_congestion);
121 static size_t recommended_max_payload(const NodeSet &targets, bool with_congestion);
122 static size_t recommended_max_payload(NodeID target,
123 const RemoteAddress &dest_payload_addr,
124 bool with_congestion);
125 static size_t recommended_max_payload(NodeID target, const void *data,
126 size_t bytes_per_line, size_t lines,
127 size_t line_stride, bool with_congestion);
128 static size_t recommended_max_payload(const NodeSet &targets, const void *data,
129 size_t bytes_per_line, size_t lines,
130 size_t line_stride, bool with_congestion);
131 static size_t
132 recommended_max_payload(NodeID target, const LocalAddress &src_payload_addr,
133 size_t bytes_per_line, size_t lines, size_t line_stride,
134 const RemoteAddress &dest_payload_addr, bool with_congestion);
135
136 // operator-> gives access to the header structure
137 T *operator->(void);
138 T &operator*(void);
139
140 // variable payload can be written to in three ways:
141 // (a) Realm-style serialization (currently eager)
142 template <typename T2>
143 bool operator<<(const T2 &to_append);
144 // (b) old memcpy-like behavior (using the various payload modes)
145 void add_payload(const void *data, size_t datalen, int payload_mode = PAYLOAD_COPY);
146 void add_payload(const void *data, size_t bytes_per_line, size_t lines,
147 size_t line_stride, int payload_mode = PAYLOAD_COPY);
148 // (c) request for a pointer to write into (writes must be completed before
149 // call to commit or cancel)
150 void *payload_ptr(size_t datalen);
151
152 // register callbacks to be called upon:
153 // a) local completion - i.e. all source data has been read and can now
154 // be safely overwritten
155 // b) remote completion - message has been received AND HANDLED by target
156 //
157 // callbacks need to be "lightweight" - for heavier work, the message
158 // handler on the target can send an explicit response message
159 template <typename CALLABLE>
160 void add_local_completion(const CALLABLE &callable);
161 template <typename CALLABLE>
162 void add_remote_completion(const CALLABLE &callable);
163
164 // every active message must eventually be commit()'ed or cancel()'ed
165 void commit(void);
166 void cancel(void);
167
168 protected:
172 uint64_t inline_capacity[INLINE_STORAGE / sizeof(uint64_t)];
173 };
174
175 // type-erased wrappers for completion callbacks
177 public:
179 virtual void invoke() = 0;
180 virtual size_t size() const = 0;
181 virtual CompletionCallbackBase *clone_at(void *p) const = 0;
182
183 static const size_t ALIGNMENT = 8;
184
185 // helper functions for invoking/cloning/destroying a collection of callbacks
186 static void invoke_all(void *start, size_t bytes);
187 static void clone_all(void *dst, const void *src, size_t bytes);
188 static void destroy_all(void *start, size_t bytes);
189 };
190
191 template <typename CALLABLE>
193 public:
194 CompletionCallback(const CALLABLE &_callable);
195 virtual void invoke();
196 virtual size_t size() const;
197 virtual CompletionCallbackBase *clone_at(void *p) const;
198
199 protected:
200 CALLABLE callable;
201 };
202
203 // per-network active message implementations are mostly opaque, but a few
204 // fields are exposed to avoid virtual function calls
206 public:
208
209 // reserves space for a local/remote completion - caller will
210 // placement-new the completion at the provided address
211 virtual void *add_local_completion(size_t size) = 0;
212 virtual void *add_remote_completion(size_t size) = 0;
213
214 virtual void commit(size_t act_payload_size) = 0;
215 virtual void cancel() = 0;
216
220 };
221
223
226
228 void record(long long t_start, long long t_end);
229 };
230
232 uint32_t chunk_id{0};
233 uint32_t total_chunks{0};
234 uint64_t msg_id{0};
235 };
236
237 // singleton class that can convert message type->ID and ID->handler
239 public:
242
243 typedef unsigned short MessageID;
244 typedef void (*MessageHandler)(NodeID sender, const void *header, const void *payload,
245 size_t payload_size, TimeLimit work_until);
246 typedef void (*MessageHandlerNoTimeout)(NodeID sender, const void *header,
247 const void *payload, size_t payload_size);
248 typedef bool (*MessageHandlerInline)(NodeID sender, const void *header,
249 const void *payload, size_t payload_size,
250 TimeLimit work_until);
251
252 template <typename T>
254
256 void record_message_handler_call(MessageID id, long long t_start, long long t_end);
258
260
262
263 typedef unsigned TypeHash;
264
275
277
278 protected:
280
281 std::vector<HandlerEntry> handlers;
282 };
283
285
301
302 template <typename T, typename T2 = T>
304 public:
307
308 // when registering an active message handler, the following three methods
309 // are looked for in class T2
310 // (a) void handle_message(NodeID, const T&, const void *, size_t, TimeLimit)
311 // (b) void handle_message(NodeID, const T&, const void *, size_t)
312 // (c) bool handle_inline(NodeID, const T&, const void *, size_t, TimeLimit)
313 //
314 // at least one of (a) or (b) must be present, with (a) being preferred
315 //
316 // if (c) is present, it will be used to attempt inline handling of
317 // active messages as they arrive, with the following constraints:
318 // (i) the handler must not block on any mutexes (trylocks are ok)
319 // (ii) the handler must not perform dynamic memory allocation/frees
320 // (iii) the handler must try very hard to stay within the specified
321 // time limit
322 // if the inline handler is unable to satisfy these requirements, it should
323 // not attempt to handle the message, returning 'false' and letting it be
324 // queued as normal
325
326 // returns either the requested kind of handler or a null pointer if
327 // it doesn't exist
333
334 // this method does nothing, but can be called to force the instantiation
335 // of a handler registration object (needed when things are inside templates)
337 };
338
339 namespace ThreadLocal {
340 // this flag will be true when we are running a message handler
341 extern thread_local bool in_message_handler;
342 }; // namespace ThreadLocal
343
345 : public BackgroundWorkItem {
346 public:
347 IncomingMessageManager(int _nodes, int _dedicated_threads,
350
351 typedef uintptr_t CallbackData;
352 typedef void (*CallbackFnptr)(NodeID, CallbackData, CallbackData);
353
354 // adds an incoming message to the queue
355 // returns true if the call was handled immediately (in which case the
356 // callback, if present, will NOT be called), or false if the message
357 // will be processed later
359 const void *hdr, size_t hdr_size, int hdr_mode,
360 const void *payload, size_t payload_size, int payload_mode,
361 CallbackFnptr callback_fnptr, CallbackData callback_data1,
362 CallbackData callback_data2, TimeLimit work_until);
363
364 void start_handler_threads(size_t stack_size);
365
366 // stalls caller until all incoming messages have been handled (and at
367 // least 'min_messages_handled' in total)
368 void drain_incoming_messages(size_t min_messages_handled);
369
370 void shutdown(void);
371
372 virtual bool do_work(TimeLimit work_until);
373
375
376 protected:
377 struct MessageBlock;
378
393
395 static MessageBlock *new_block(size_t _total_size);
396 static void free_block(MessageBlock *block);
397
398 void reset();
399
400 // called with message manager lock held
401 Message *append_message(size_t hdr_bytes_needed, size_t payload_bytes_needed);
402
403 // called _without_ message manager lock held
405
406 size_t total_size, size_used;
409 };
410
411 int get_messages(Message *&head, Message **&tail, bool wait);
412 bool return_messages(int sender, size_t num_handled, Message *head, Message **tail);
413
414 int nodes, dedicated_threads, sleeper_count;
420 int *todo_list; // list of nodes with non-empty message lists
421 int todo_oldest, todo_newest;
427 Mutex::CondVar condvar, drain_condvar;
429 std::vector<Thread *> handler_threads;
433 size_t cfg_max_available_blocks, cfg_message_block_size;
434
435 struct PairHash {
436 std::size_t operator()(const std::pair<NodeID, uint64_t> &p) const
437 {
438 return std::hash<NodeID>()(p.first) ^ (std::hash<uint64_t>()(p.second) << 1);
439 }
440 };
441
442 std::unordered_map<std::pair<NodeID, uint64_t>, std::unique_ptr<FragmentedMessage>,
443 PairHash>
445 };
446
447 template <typename UserHdr>
450 UserHdr user;
451
452 UserHdr *operator->() { return &user; }
453 const UserHdr *operator->() const { return &user; }
454 UserHdr &operator*() { return user; }
455 const UserHdr &operator*() const { return user; }
456 };
457
458 template <typename Hdr>
460
462 //
463 // class ActiveMessageAuto
464 //
465 // A thin convenience wrapper that hides the decision of whether a payload
466 // must be transmitted as a *single* ActiveMessage or split across multiple
467 // FragmentedActiveMessages – hence the name "Auto". The public interface
468 // mirrors that of `ActiveMessage` so that existing call-sites can switch to
469 // the automatic behaviour with only a type alias change.
470 //
471 // Behaviour
472 // ---------
473 // 1. When `commit()` is called and the aggregated payload size is **less
474 // than or equal to** the `max_payload_size` provided to the
475 // constructor, the object internally constructs a single
476 // `ActiveMessage<WrappedWithFragInfo<UserHdr>>` (via the `Builder`
477 // template parameter) and sends it.
478 // 2. Otherwise the payload is automatically split into the minimum number
479 // of fragments. Each fragment is sent with a compact `FragmentInfo`
480 // header and the remote side stitches them back together transparently
481 // via `FragmentedMessage`.
482 //
483 // Template Parameters
484 // -------------------
485 // * `UserHdr` – the user-defined struct that forms the logical AM header.
486 // * `Builder` – meta-function that, given a *wire* header type, produces
487 // the concrete ActiveMessage builder to use. The default
488 // simply wraps `UserHdr` with `FragmentInfo` so that the
489 // fragmentation metadata travels on the wire.
490 //
491 // Example
492 // -------
493 // struct MyHdr { int value; };
494 // ActiveMessageAuto<MyHdr> msg(target_node,
495 // ActiveMessage<MyHdr>::recommended_max_payload(
496 // target_node, /*with_congestion=*/false));
497 // msg->value = 123;
498 // msg.add_payload(data_ptr, data_bytes);
499 // msg.commit();
500 //
501
502 template <typename UserHdr,
503 template <typename> class Builder = DefaultActiveMessageBuilder>
505 public:
506 ActiveMessageAuto(NodeID target, size_t max_payload_size);
507
508 UserHdr *operator->();
509 UserHdr &operator*();
510
511 void add_payload(const void *data, size_t size);
512 void commit();
513
514 private:
516 uint64_t next_message_id(NodeID node_id);
517
518 NodeID target_;
519 size_t max_payload_size_;
520 std::vector<char> payload_;
521 UserHdr user_header_{};
522 };
523
524 template <typename UserHdr>
528
529} // namespace Realm
530
531#include "realm/activemsg.inl"
532
533#endif
Definition activemsg.h:504
ActiveMessageAuto(NodeID target, size_t max_payload_size)
void add_payload(const void *data, size_t size)
Definition activemsg.h:286
const char * name
Definition activemsg.h:296
virtual ActiveMessageHandlerTable::MessageHandler get_handler(void) const =0
virtual ~ActiveMessageHandlerRegBase(void)
Definition activemsg.h:288
ActiveMessageHandlerRegBase * next_handler
Definition activemsg.h:298
virtual ActiveMessageHandlerTable::MessageHandlerNoTimeout get_handler_notimeout(void) const =0
std::optional< const FragmentInfo &(*)(const void *)> extract_frag_info
Definition activemsg.h:299
bool must_free
Definition activemsg.h:297
ActiveMessageHandlerTable::TypeHash hash
Definition activemsg.h:295
virtual ActiveMessageHandlerTable::MessageHandlerInline get_handler_inline(void) const =0
Definition activemsg.h:303
void force_instantiation(void)
Definition activemsg.h:336
virtual ActiveMessageHandlerTable::MessageHandlerInline get_handler_inline(void) const
virtual ActiveMessageHandlerTable::MessageHandler get_handler(void) const
virtual ActiveMessageHandlerTable::MessageHandlerNoTimeout get_handler_notimeout(void) const
Definition activemsg.h:238
const char * lookup_message_name(MessageID id)
bool(* MessageHandlerInline)(NodeID sender, const void *header, const void *payload, size_t payload_size, TimeLimit work_until)
Definition activemsg.h:248
unsigned short MessageID
Definition activemsg.h:243
MessageID lookup_message_id(void) const
void(* MessageHandlerNoTimeout)(NodeID sender, const void *header, const void *payload, size_t payload_size)
Definition activemsg.h:246
void record_message_handler_call(MessageID id, long long t_start, long long t_end)
unsigned TypeHash
Definition activemsg.h:263
void(* MessageHandler)(NodeID sender, const void *header, const void *payload, size_t payload_size, TimeLimit work_until)
Definition activemsg.h:244
static void append_handler_reg(ActiveMessageHandlerRegBase *new_reg)
HandlerEntry * lookup_message_handler(MessageID id)
static ActiveMessageHandlerRegBase * pending_handlers
Definition activemsg.h:279
std::vector< HandlerEntry > handlers
Definition activemsg.h:281
Definition activemsg.h:205
virtual void commit(size_t act_payload_size)=0
virtual void * add_local_completion(size_t size)=0
void * header_base
Definition activemsg.h:217
virtual void * add_remote_completion(size_t size)=0
size_t payload_size
Definition activemsg.h:219
void * payload_base
Definition activemsg.h:218
virtual void cancel()=0
virtual ~ActiveMessageImpl()
Definition activemsg.h:207
Definition activemsg.h:65
Realm::Serialization::FixedBufferSerializer fbs
Definition activemsg.h:171
void init(NodeID _target, const LocalAddress &_src_payload_addr, size_t _bytes_per_line, size_t _lines, size_t _line_stride, const RemoteAddress &_dest_payload_addr)
static size_t recommended_max_payload(NodeID target, const void *data, size_t bytes_per_line, size_t lines, size_t line_stride, bool with_congestion)
ActiveMessage(const Realm::NodeSet &_targets, const void *_data, size_t _datalen)
ActiveMessageImpl * impl
Definition activemsg.h:169
ActiveMessage(NodeID _target, size_t _max_payload_size=0)
void init(NodeID _target, const LocalAddress &_src_payload_addr, size_t _datalen, const RemoteAddress &_dest_payload_addr)
uint64_t inline_capacity[INLINE_STORAGE/sizeof(uint64_t)]
Definition activemsg.h:172
void * payload_ptr(size_t datalen)
void init(NodeID _target, const void *_data, size_t _datalen)
void add_payload(const void *data, size_t bytes_per_line, size_t lines, size_t line_stride, int payload_mode=PAYLOAD_COPY)
void init(NodeID _target, size_t _max_payload_size=0)
ActiveMessage(const Realm::NodeSet &_targets, size_t _max_payload_size=0)
T * header
Definition activemsg.h:170
ActiveMessage(NodeID _target, const void *_data, size_t _datalen)
void init(const Realm::NodeSet &_targets, const void *_data, size_t _datalen)
bool operator<<(const T2 &to_append)
void add_payload(const void *data, size_t datalen, int payload_mode=PAYLOAD_COPY)
static size_t recommended_max_payload(NodeID target, bool with_congestion)
static size_t recommended_max_payload(NodeID target, const RemoteAddress &dest_payload_addr, bool with_congestion)
ActiveMessage(NodeID _target, const void *_data, size_t _bytes_per_line, size_t _lines, size_t _line_stride)
void init(const Realm::NodeSet &_targets, const void *_data, size_t _bytes_per_line, size_t _lines, size_t _line_stride)
static size_t recommended_max_payload(const NodeSet &targets, const void *data, size_t bytes_per_line, size_t lines, size_t line_stride, bool with_congestion)
ActiveMessage(NodeID _target, const LocalAddress &_src_payload_addr, size_t _bytes_per_line, size_t _lines, size_t _line_stride, const RemoteAddress &_dest_payload_addr)
static size_t recommended_max_payload(const NodeSet &targets, bool with_congestion)
void init(NodeID _target, const void *_data, size_t _bytes_per_line, size_t _lines, size_t _line_stride)
void add_local_completion(const CALLABLE &callable)
ActiveMessage(const Realm::NodeSet &_targets, const void *_data, size_t _bytes_per_line, size_t _lines, size_t _line_stride)
void add_remote_completion(const CALLABLE &callable)
static size_t recommended_max_payload(NodeID target, const LocalAddress &src_payload_addr, size_t bytes_per_line, size_t lines, size_t line_stride, const RemoteAddress &dest_payload_addr, bool with_congestion)
void init(const Realm::NodeSet &_targets, size_t _max_payload_size=0)
void init(NodeID _target, size_t _max_payload_size, const RemoteAddress &_dest_payload_addr)
ActiveMessage(NodeID _target, size_t _max_payload_size, const RemoteAddress &_dest_payload_addr)
ActiveMessage(NodeID _target, const LocalAddress &_src_payload_addr, size_t _datalen, const RemoteAddress &_dest_payload_addr)
Definition bgwork.h:129
Definition activemsg.h:176
static void destroy_all(void *start, size_t bytes)
virtual size_t size() const =0
static void clone_all(void *dst, const void *src, size_t bytes)
static void invoke_all(void *start, size_t bytes)
virtual CompletionCallbackBase * clone_at(void *p) const =0
Definition activemsg.h:192
virtual CompletionCallbackBase * clone_at(void *p) const
CALLABLE callable
Definition activemsg.h:200
CompletionCallback(const CALLABLE &_callable)
virtual size_t size() const
Definition threads.h:382
Definition threads.h:342
Definition activemsg.h:345
size_t total_messages_handled
Definition activemsg.h:425
void drain_incoming_messages(size_t min_messages_handled)
int todo_newest
Definition activemsg.h:421
size_t drain_min_count
Definition activemsg.h:424
Message ** heads
Definition activemsg.h:417
bool return_messages(int sender, size_t num_handled, Message *head, Message **tail)
MessageBlock * current_block
Definition activemsg.h:430
int handlers_active
Definition activemsg.h:422
uintptr_t CallbackData
Definition activemsg.h:351
atomic< bool > bgwork_requested
Definition activemsg.h:415
std::vector< Thread * > handler_threads
Definition activemsg.h:429
int dedicated_threads
Definition activemsg.h:414
std::unordered_map< std::pair< NodeID, uint64_t >, std::unique_ptr< FragmentedMessage >, PairHash > frag_message
Definition activemsg.h:444
Mutex::CondVar condvar
Definition activemsg.h:427
Message *** tails
Definition activemsg.h:418
CoreReservation * core_rsrv
Definition activemsg.h:428
int get_messages(Message *&head, Message **&tail, bool wait)
bool add_incoming_message(NodeID sender, ActiveMessageHandlerTable::MessageID msgid, const void *hdr, size_t hdr_size, int hdr_mode, const void *payload, size_t payload_size, int payload_mode, CallbackFnptr callback_fnptr, CallbackData callback_data1, CallbackData callback_data2, TimeLimit work_until)
void start_handler_threads(size_t stack_size)
bool * in_handler
Definition activemsg.h:419
IncomingMessageManager(int _nodes, int _dedicated_threads, Realm::CoreReservationSet &crs)
size_t num_available_blocks
Definition activemsg.h:432
Mutex mutex
Definition activemsg.h:426
int shutdown_flag
Definition activemsg.h:416
int * todo_list
Definition activemsg.h:420
MessageBlock * available_blocks
Definition activemsg.h:431
bool drain_pending
Definition activemsg.h:423
virtual bool do_work(TimeLimit work_until)
size_t cfg_max_available_blocks
Definition activemsg.h:433
Definition nodeset.h:117
Definition timers.h:129
Definition mutex.h:325
Definition mutex.h:223
Definition atomics.h:31
#define REALM_INTERNAL_API_EXTERNAL_LINKAGE
Definition compiler_support.h:218
Helper utility for reconstructing large ActiveMessages that were split into multiple network packets.
bool profile_activemsg_handlers
long long max_inline_message_time
thread_local bool in_message_handler
Definition activemsg.h:38
ActiveMessageHandlerTable activemsg_handler_table
int NodeID
Definition nodeset.h:40
@ PAYLOAD_EMPTY
Definition activemsg.h:59
@ PAYLOAD_KEEPREG
Definition activemsg.h:58
@ PAYLOAD_PENDING
Definition activemsg.h:57
@ PAYLOAD_SRCPTR
Definition activemsg.h:56
@ PAYLOAD_NONE
Definition activemsg.h:52
@ PAYLOAD_COPY
Definition activemsg.h:55
@ PAYLOAD_FREE
Definition activemsg.h:54
@ PAYLOAD_KEEP
Definition activemsg.h:53
Definition activemsg.h:224
void record(long long t_start, long long t_end)
atomic< size_t > count
Definition activemsg.h:225
atomic< size_t > minval
Definition activemsg.h:225
atomic< size_t > maxval
Definition activemsg.h:225
atomic< size_t > sum
Definition activemsg.h:225
atomic< size_t > sum2
Definition activemsg.h:225
MessageHandlerNoTimeout handler_notimeout
Definition activemsg.h:269
std::optional< const FragmentInfo &(*)(const void *)> extract_frag_info
Definition activemsg.h:273
const char * name
Definition activemsg.h:267
ActiveMessageHandlerStats stats
Definition activemsg.h:271
TypeHash hash
Definition activemsg.h:266
MessageHandlerInline handler_inline
Definition activemsg.h:270
MessageHandler handler
Definition activemsg.h:268
Definition activemsg.h:525
ActiveMessageHandlerReg< WrappedWithFragInfo< UserHdr >, UserHdr > reg
Definition activemsg.h:526
Definition activemsg.h:231
uint32_t total_chunks
Definition activemsg.h:233
uint64_t msg_id
Definition activemsg.h:234
uint32_t chunk_id
Definition activemsg.h:232
static MessageBlock * new_block(size_t _total_size)
static void free_block(MessageBlock *block)
void recycle_message(Message *msg, IncomingMessageManager *manager)
Message * append_message(size_t hdr_bytes_needed, size_t payload_bytes_needed)
atomic< unsigned > use_count
Definition activemsg.h:407
size_t size_used
Definition activemsg.h:406
MessageBlock * next_free
Definition activemsg.h:408
Definition activemsg.h:379
CallbackFnptr callback_fnptr
Definition activemsg.h:390
CallbackData callback_data1
Definition activemsg.h:391
void * payload
Definition activemsg.h:387
bool hdr_needs_free
Definition activemsg.h:386
void * hdr
Definition activemsg.h:384
MessageBlock * block
Definition activemsg.h:380
size_t payload_size
Definition activemsg.h:388
Message * next_msg
Definition activemsg.h:381
ActiveMessageHandlerTable::HandlerEntry * handler
Definition activemsg.h:383
size_t hdr_size
Definition activemsg.h:385
NodeID sender
Definition activemsg.h:382
bool payload_needs_free
Definition activemsg.h:389
Definition activemsg.h:435
std::size_t operator()(const std::pair< NodeID, uint64_t > &p) const
Definition activemsg.h:436
Definition network.h:58
Definition network.h:46
Definition activemsg.h:448
UserHdr * operator->()
Definition activemsg.h:452
UserHdr & operator*()
Definition activemsg.h:454
const UserHdr * operator->() const
Definition activemsg.h:453
FragmentInfo frag_info
Definition activemsg.h:449
UserHdr user
Definition activemsg.h:450
const UserHdr & operator*() const
Definition activemsg.h:455
NodeID src
Definition ucp_internal.h:1
unsigned short msgid
Definition ucp_internal.h:2