Realm
A distributed, event-based tasking library
Loading...
Searching...
No Matches
barrier_impl.h
Go to the documentation of this file.
1/*
2 * Copyright 2025 Stanford University, NVIDIA Corporation
3 * SPDX-License-Identifier: Apache-2.0
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18// Barrier implementations for Realm
19
20#ifndef REALM_BARRIER_IMPL_H
21#define REALM_BARRIER_IMPL_H
22
23#include "realm/event.h"
24#include "realm/event_impl.h"
25#include "realm/id.h"
26#include "realm/nodeset.h"
27#include "realm/redop.h"
28
29#include <vector>
30#include <map>
31#include <memory>
32
33namespace Realm {
34
44
45 struct RemoteNotification;
46
48 std::vector<RemoteNotification> remotes;
49 std::vector<char> reduction;
50 };
51
55
57 public:
58 virtual ~BarrierCommunicator() = default;
59
60 virtual void adjust(NodeID target, Barrier barrier, int delta, Event wait_on,
61 NodeID sender, bool forwarded, const void *data, size_t datalen);
62
63 virtual void trigger(NodeID target, ID::IDType barrier_id, const void *data,
64 size_t datalen, size_t max_payload_size = 0);
65
66 virtual void subscribe(NodeID target, ID::IDType barrier_id,
67 EventImpl::gen_t subscribe_gen, NodeID subscriber,
68 bool forwarded);
69
70 virtual size_t recommend_max_payload(NodeID node, size_t size,
71 bool with_congestion = true);
72 };
73
78
79#ifndef BARRIER_ENABLE_BROADCAST
98#endif
99
100 class BarrierImpl : public EventImpl {
101 public:
103
104 static const int BARRIER_TIMESTAMP_NODEID_SHIFT = 48;
106
108 BarrierImpl(BarrierCommunicator *_barrier_comm, int _broadcast_radix = 4);
110
111 void init(ID _me, unsigned _init_owner);
112
113 static ID make_id(const BarrierImpl &dummy, int owner, ID::IDType index)
114 {
115 return ID::make_barrier(owner, index, 0);
116 }
117
118 // get the Barrier (id+generation) for the current (i.e. untriggered) generation
120
121 // helper to create the Barrier for an arbitrary generation
123
124 static BarrierImpl *create_barrier(unsigned expected_arrivals, ReductionOpID redopid,
125 const void *initial_value = 0,
126 size_t initial_value_size = 0);
127
128 // test whether an event has triggered without waiting
129 virtual bool has_triggered(gen_t needed_gen, bool &poisoned);
130
131 virtual void subscribe(gen_t subscribe_gen);
132
133 virtual void external_wait(gen_t needed_gen, bool &poisoned);
134 virtual bool external_timedwait(gen_t needed_gen, bool &poisoned, long long max_ns);
135
136 virtual bool add_waiter(gen_t needed_gen,
137 EventWaiter *waiter /*, bool pre_subscribed = false*/);
138
139 // use this sparingly - it has to hunt through waiter lists while
140 // holding locks
141 virtual bool remove_waiter(gen_t needed_gen, EventWaiter *waiter);
142
143 // used to adjust a barrier's arrival count either up or down
144 // if delta > 0, timestamp is current time (on requesting node)
145 // if delta < 0, timestamp says which positive adjustment this arrival must wait for
146 void adjust_arrival(gen_t barrier_gen, int delta, Barrier::timestamp_t timestamp,
147 Event wait_on, NodeID sender, bool forwarded,
148 const void *reduce_value, size_t reduce_value_size,
149 TimeLimit work_until);
150
151 void handle_remote_subscription(NodeID subscriber, EventImpl::gen_t subscribe_gen,
152 bool forwarded, const void *data, size_t datalen);
153
154 void handle_remote_trigger(NodeID sender, ID::IDType barrier_id,
155 EventImpl::gen_t trigger_gen,
156 EventImpl::gen_t previous_gen, EventImpl::gen_t first_gen,
157 ReductionOpID redop_id, NodeID migration_target,
158 int broadcast_index, unsigned base_count, const void *data,
159 size_t datalen, TimeLimit work_until);
160
161 bool get_result(gen_t result_gen, void *value, size_t value_size);
162
163 protected:
164 void broadcast_trigger(const std::vector<RemoteNotification> &ordered_notifications,
165 const std::vector<NodeID> &broadcast_targets,
166 EventImpl::gen_t oldest_previous,
167 EventImpl::gen_t broadcast_previous,
168 EventImpl::gen_t first_generation, NodeID migration_target,
170 const void *data, size_t datalen,
171 bool include_notifications = true);
172
173 public:
178
179 std::unique_ptr<BarrierCommunicator> barrier_comm;
180
181 Mutex mutex; // controls which local thread has access to internal data (not
182 // runtime-visible event)
183
184 // class to track per-generation status
186 public:
189 std::map<Barrier::timestamp_t, int> pending;
190 };
191
194 std::map<int, PerNodeUpdates *> pernode;
195
198
200 };
201
202 std::map<gen_t, Generation *> generations;
203
204 // external waiters on this node are notifies via a condition variable
206 // use kernel mutex for timedwait functionality
209
210 // a list of remote waiters and the latest generation they're interested in
211 // also the latest generation that each node (that has ever subscribed) has been told
212 // about
213 std::map<unsigned, gen_t> remote_subscribe_gens, remote_trigger_gens;
214 std::map<gen_t, gen_t> held_triggers;
215
216 unsigned base_arrival_count = 0;
218 const ReductionOpUntyped *redop = nullptr;
219 std::unique_ptr<char[]> initial_value{};
220 unsigned value_capacity = 0;
221 std::vector<char> final_values;
223 std::vector<std::pair<int, std::vector<RemoteNotification>>> ordered_buffer;
225 };
226}; // namespace Realm
227
228#include "realm/barrier_impl.inl"
229
230#endif // ifndef REALM_BARRIER_IMPL_H
Definition barrier_impl.h:56
virtual void trigger(NodeID target, ID::IDType barrier_id, const void *data, size_t datalen, size_t max_payload_size=0)
virtual size_t recommend_max_payload(NodeID node, size_t size, bool with_congestion=true)
virtual void subscribe(NodeID target, ID::IDType barrier_id, EventImpl::gen_t subscribe_gen, NodeID subscriber, bool forwarded)
virtual ~BarrierCommunicator()=default
virtual void adjust(NodeID target, Barrier barrier, int delta, Event wait_on, NodeID sender, bool forwarded, const void *data, size_t datalen)
Definition barrier_impl.h:185
std::map< int, PerNodeUpdates * > pernode
Definition barrier_impl.h:194
int unguarded_delta
Definition barrier_impl.h:192
EventWaiter::EventWaiterList local_waiters
Definition barrier_impl.h:193
void handle_adjustment(Barrier::timestamp_t ts, int delta)
Definition barrier_impl.h:100
void adjust_arrival(gen_t barrier_gen, int delta, Barrier::timestamp_t timestamp, Event wait_on, NodeID sender, bool forwarded, const void *reduce_value, size_t reduce_value_size, TimeLimit work_until)
bool has_external_waiters
Definition barrier_impl.h:205
bool needs_ordering
Definition barrier_impl.h:222
static const int BARRIER_TIMESTAMP_NODEID_SHIFT
Definition barrier_impl.h:104
Barrier current_barrier(Barrier::timestamp_t timestamp=0) const
virtual bool remove_waiter(gen_t needed_gen, EventWaiter *waiter)
static const ID::ID_Types ID_TYPE
Definition barrier_impl.h:102
const ReductionOpUntyped * redop
Definition barrier_impl.h:218
std::unique_ptr< char[]> initial_value
Definition barrier_impl.h:219
virtual bool has_triggered(gen_t needed_gen, bool &poisoned)
unsigned base_arrival_count
Definition barrier_impl.h:216
Mutex mutex
Definition barrier_impl.h:181
virtual void subscribe(gen_t subscribe_gen)
static ID make_id(const BarrierImpl &dummy, int owner, ID::IDType index)
Definition barrier_impl.h:113
virtual bool external_timedwait(gen_t needed_gen, bool &poisoned, long long max_ns)
void init(ID _me, unsigned _init_owner)
std::vector< std::pair< int, std::vector< RemoteNotification > > > ordered_buffer
Definition barrier_impl.h:223
std::vector< char > final_values
Definition barrier_impl.h:221
Barrier make_barrier(gen_t gen, Barrier::timestamp_t timestamp=0) const
BarrierImpl * next_free
Definition barrier_impl.h:177
KernelMutex external_waiter_mutex
Definition barrier_impl.h:207
std::map< gen_t, Generation * > generations
Definition barrier_impl.h:202
static BarrierImpl * create_barrier(unsigned expected_arrivals, ReductionOpID redopid, const void *initial_value=0, size_t initial_value_size=0)
std::unique_ptr< BarrierCommunicator > barrier_comm
Definition barrier_impl.h:179
atomic< gen_t > gen_subscribed
Definition barrier_impl.h:175
void broadcast_trigger(const std::vector< RemoteNotification > &ordered_notifications, const std::vector< NodeID > &broadcast_targets, EventImpl::gen_t oldest_previous, EventImpl::gen_t broadcast_previous, EventImpl::gen_t first_generation, NodeID migration_target, unsigned base_arrival_count, ReductionOpID redop_id, const void *data, size_t datalen, bool include_notifications=true)
bool get_result(gen_t result_gen, void *value, size_t value_size)
ReductionOpID redop_id
Definition barrier_impl.h:217
BarrierImpl(BarrierCommunicator *_barrier_comm, int _broadcast_radix=4)
virtual bool add_waiter(gen_t needed_gen, EventWaiter *waiter)
void handle_remote_trigger(NodeID sender, ID::IDType barrier_id, EventImpl::gen_t trigger_gen, EventImpl::gen_t previous_gen, EventImpl::gen_t first_gen, ReductionOpID redop_id, NodeID migration_target, int broadcast_index, unsigned base_count, const void *data, size_t datalen, TimeLimit work_until)
unsigned value_capacity
Definition barrier_impl.h:220
int broadcast_radix
Definition barrier_impl.h:224
static atomic< Barrier::timestamp_t > barrier_adjustment_timestamp
Definition barrier_impl.h:105
std::map< unsigned, gen_t > remote_trigger_gens
Definition barrier_impl.h:213
void handle_remote_subscription(NodeID subscriber, EventImpl::gen_t subscribe_gen, bool forwarded, const void *data, size_t datalen)
gen_t first_generation
Definition barrier_impl.h:176
std::map< gen_t, gen_t > held_triggers
Definition barrier_impl.h:214
atomic< gen_t > generation
Definition barrier_impl.h:174
virtual void external_wait(gen_t needed_gen, bool &poisoned)
std::map< unsigned, gen_t > remote_subscribe_gens
Definition barrier_impl.h:213
KernelMutex::CondVar external_waiter_condvar
Definition barrier_impl.h:208
Definition event.h:242
::realm_barrier_timestamp_t timestamp_t
Definition event.h:245
Definition event_impl.h:85
NodeID owner
Definition event_impl.h:122
unsigned gen_t
Definition event_impl.h:87
Definition event_impl.h:49
Definition event.h:50
Definition id.h:30
ID_Types
Definition id.h:250
@ ID_BARRIER
Definition id.h:254
static ID make_barrier(unsigned creator_node, unsigned barrier_idx, unsigned generation)
::realm_id_t IDType
Definition id.h:32
Definition mutex.h:359
Definition mutex.h:273
Definition timers.h:129
Definition mutex.h:223
Definition atomics.h:31
Definition activemsg.h:38
int NodeID
Definition nodeset.h:40
::realm_reduction_op_id_t ReductionOpID
Definition event.h:38
Barrier::timestamp_t last_ts
Definition barrier_impl.h:188
std::map< Barrier::timestamp_t, int > pending
Definition barrier_impl.h:189
Definition barrier_impl.h:35
unsigned base_arrival_count
Definition barrier_impl.h:41
NodeID migration_target
Definition barrier_impl.h:40
EventImpl::gen_t previous_gen
Definition barrier_impl.h:37
EventImpl::gen_t trigger_gen
Definition barrier_impl.h:36
ReductionOpID redop_id
Definition barrier_impl.h:39
int broadcast_index
Definition barrier_impl.h:42
EventImpl::gen_t first_generation
Definition barrier_impl.h:38
Definition barrier_impl.h:52
BarrierTriggerMessageArgsInternal internal
Definition barrier_impl.h:53
Definition barrier_impl.h:80
EventImpl::gen_t trigger_gen
Definition barrier_impl.h:82
ReductionOpID redop_id
Definition barrier_impl.h:85
NodeID migration_target
Definition barrier_impl.h:86
ID::IDType barrier_id
Definition barrier_impl.h:81
static void send_request(NodeID target, ID::IDType barrier_id, EventImpl::gen_t trigger_gen, EventImpl::gen_t previous_gen, EventImpl::gen_t first_generation, ReductionOpID redop_id, NodeID migration_target, unsigned base_arrival_count, const void *data, size_t datalen)
EventImpl::gen_t previous_gen
Definition barrier_impl.h:83
unsigned base_arrival_count
Definition barrier_impl.h:87
static void handle_message(NodeID sender, const BarrierTriggerMessage &msg, const void *data, size_t datalen, TimeLimit work_until)
EventImpl::gen_t first_generation
Definition barrier_impl.h:84
Definition barrier_impl.h:47
std::vector< RemoteNotification > remotes
Definition barrier_impl.h:48
std::vector< char > reduction
Definition barrier_impl.h:49
Definition redop.h:56
Definition barrier_impl.h:74
NodeID node
Definition barrier_impl.h:75
EventImpl::gen_t previous_gen
Definition barrier_impl.h:76
EventImpl::gen_t trigger_gen
Definition barrier_impl.h:76