Realm
A distributed, event-based tasking library
Loading...
Searching...
No Matches
proc_impl.h
Go to the documentation of this file.
1/*
2 * Copyright 2026 Stanford University, NVIDIA Corporation, Los Alamos National Laboratory
3 * SPDX-License-Identifier: Apache-2.0
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18// Processor/ProcessorGroup implementations for Realm
19
20#ifndef REALM_PROC_IMPL_H
21#define REALM_PROC_IMPL_H
22
23#include "realm/processor.h"
24#include "realm/id.h"
25
26#include "realm/atomics.h"
27#include "realm/network.h"
28#include "realm/operation.h"
29#include "realm/profiling.h"
30#include "realm/sampling.h"
31
32#include "realm/runtime_impl.h"
33#include "realm/event_impl.h"
34#include "realm/rsrv_impl.h"
35
36#include "realm/tasks.h"
37#include "realm/threads.h"
38#include "realm/codedesc.h"
39
40namespace Realm {
41
42 class ProcessorGroupImpl;
43
44 namespace ThreadLocal {
45 // if nonzero, prevents application thread from yielding execution
46 // resources on an Event wait
47 extern thread_local int scheduler_lock;
48 }; // namespace ThreadLocal
49
51 ProcessorImpl { // needed by librealm_kokkos.so
52 public:
54 int _num_cores = 1);
55
56 virtual ~ProcessorImpl(void);
57
58 virtual void enqueue_task(Task *task) = 0;
59 virtual void enqueue_tasks(Task::TaskList &tasks, size_t num_tasks) = 0;
60
61 virtual void spawn_task(Processor::TaskFuncID func_id, const void *args,
62 size_t arglen, const ProfilingRequestSet &reqs,
63 Event start_event, GenEventImpl *finish_event,
64 EventImpl::gen_t finish_gen, int priority) = 0;
65
66 // starts worker threads and performs any per-processor initialization
67 virtual void start_threads(void);
68
69 // blocks until things are cleaned up
70 virtual void shutdown(void);
71
72 virtual void add_to_group(ProcessorGroupImpl *group) = 0;
73
74 virtual void remove_from_group(ProcessorGroupImpl *group) = 0;
75
76 virtual bool register_task(Processor::TaskFuncID func_id, CodeDescriptor &codedesc,
77 const ByteArrayRef &user_data);
78
79 // runs an internal Realm operation on this processor
80 virtual void add_internal_task(InternalTask *task);
81
84
85 // A helper function to get the kind of a processor when we only have the processor
86 // ID
88 Processor processor);
89
90 protected:
91 friend class Task;
92
93 // Event free list cache variables
95
96 virtual void execute_task(Processor::TaskFuncID func_id,
97 const ByteArrayRef &task_args);
98
100 static const size_t MAX_ENTRIES = 4;
102 EventImpl *events[MAX_ENTRIES];
103 EventImpl::gen_t generations[MAX_ENTRIES];
104 Task *tasks[MAX_ENTRIES];
105 size_t counts[MAX_ENTRIES];
106 size_t ages[MAX_ENTRIES];
108
109 void clear()
110 {
111 current_age = 0;
112 memset(events, 0, sizeof(events));
113 memset(tasks, 0, sizeof(tasks));
114 memset(counts, 0, sizeof(counts));
115 memset(ages, 0, sizeof(ages));
116 memset(generations, 0, sizeof(generations));
117 }
118
119 void flush()
120 {
121 for(size_t i = 0; i < MAX_ENTRIES; i++) {
122 if(tasks[i] != nullptr) {
123 tasks[i]->remove_reference();
124 }
125 }
126 clear();
127 }
128 };
129
130 // helper function for spawn implementations
131 void enqueue_or_defer_task(Task *task, Event start_event, DeferredSpawnCache *cache);
132
133 public:
137 };
138
139 // generic local task processor - subclasses must create and configure a task
140 // scheduler and pass in with the set_scheduler() method
142 public:
144 int num_cores = 1);
145 virtual ~LocalTaskProcessor(void);
146
147 virtual void enqueue_task(Task *task);
148 virtual void enqueue_tasks(Task::TaskList &tasks, size_t num_tasks);
149
150 virtual void spawn_task(Processor::TaskFuncID func_id, const void *args,
151 size_t arglen, const ProfilingRequestSet &reqs,
152 Event start_event, GenEventImpl *finish_event,
153 EventImpl::gen_t finish_gen, int priority);
154
155 virtual bool register_task(Processor::TaskFuncID func_id, CodeDescriptor &codedesc,
156 const ByteArrayRef &user_data);
157
158 // starts worker threads and performs any per-processor initialization
159 virtual void start_threads(void);
160
161 // blocks until things are cleaned up
162 virtual void shutdown(void);
163
164 virtual void add_to_group(ProcessorGroupImpl *group);
165
167
168 // runs an internal Realm operation on this processor
169 virtual void add_internal_task(InternalTask *task);
170
171 protected:
173
175 TaskQueue task_queue; // ready tasks
178
183
185 std::map<Processor::TaskFuncID, TaskTableEntry> task_table;
186
188 const ByteArrayRef &task_args);
189 };
190
191 // three simple subclasses for:
192 // a) "CPU" processors, which request a dedicated core and use user threads
193 // when possible
194 // b) "utility" processors, which also use user threads but share cores with
195 // other runtime threads
196 // c) "IO" processors, which use kernel threads so that blocking IO calls
197 // are permitted
198 //
199 // each of these is implemented just by supplying the right kind of scheduler to
200 // LocalTaskProcessor in the constructor
201
203 public:
205 size_t _stack_size, bool _force_kthreads,
206 BackgroundWorkManager *bgwork, long long bgwork_timeslice);
207 virtual ~LocalCPUProcessor(void);
208
209 protected:
211 };
212
214 public:
216 CoreReservationSet &crs, size_t _stack_size,
217 bool _force_kthreads, bool _pin_util_proc,
218 BackgroundWorkManager *bgwork, long long bgwork_timeslice);
220
221 protected:
223 };
224
226 public:
228 size_t _stack_size, int _concurrent_io_threads);
229 virtual ~LocalIOProcessor(void);
230
231 protected:
233 };
234
236 public:
238 int _num_cores = 1);
239 virtual ~RemoteProcessor(void);
240
241 virtual void enqueue_task(Task *task);
242 virtual void enqueue_tasks(Task::TaskList &tasks, size_t num_tasks);
243
244 virtual void add_to_group(ProcessorGroupImpl *group);
245
247
248 virtual void spawn_task(Processor::TaskFuncID func_id, const void *args,
249 size_t arglen, const ProfilingRequestSet &reqs,
250 Event start_event, GenEventImpl *finish_event,
251 EventImpl::gen_t finish_gen, int priority);
252 };
253
255 public:
256 // TODO:: pass in runtime_impl, currently we are getting an error when we try to
257 // pass in runtime_impl dynamic_table.inl:129:40: error: constructor for
258 // 'Realm::DynamicTableNode<Realm::ProcessorGroupImpl, 16, Realm::UnfairMutex,
259 // unsigned long long>' must explicitly initialize the member 'elems' which does not
260 // have a default constructor
261 // | DynamicTableNode<ET, _SIZE, LT, IT>::DynamicTableNode(int _level, IT
262 // _first_index, IT _last_index)
263 // We need to update the DynamicTableAllocator to take in a runtime_impl
265
266 virtual ~ProcessorGroupImpl(void);
267
269
270 void init(ID _me, int _owner);
271
272 static ID make_id(const ProcessorGroupImpl &dummy, int owner, ID::IDType index)
273 {
274 return ID::make_procgroup(owner, 0, index);
275 }
276
278
279 void destroy(void);
280
281 void get_group_members(std::vector<Processor> &member_list);
282
283 virtual void enqueue_task(Task *task);
284 virtual void enqueue_tasks(Task::TaskList &tasks, size_t num_tasks);
285
286 virtual void add_to_group(ProcessorGroupImpl *group);
287
289
290 virtual void spawn_task(Processor::TaskFuncID func_id, const void *args,
291 size_t arglen, const ProfilingRequestSet &reqs,
292 Event start_event, GenEventImpl *finish_event,
293 EventImpl::gen_t finish_gen, int priority);
294
295 public: // protected:
298 std::vector<ProcessorImpl *> members;
301
303
304 TaskQueue task_queue; // ready tasks
307
309 public:
310 void defer(ProcessorGroupImpl *_pg, Event wait_on);
311 virtual void event_triggered(bool poisoned, TimeLimit work_until);
312 virtual void print(std::ostream &os) const;
313 virtual Event get_finish_event(void) const;
314
315 protected:
317 };
319 };
320
321 // a task registration can take a while if remote processors and/or JITs are
322 // involved
324 public:
325 TaskRegistration(const CodeDescriptor &_codedesc, const ByteArrayRef &_userdata,
326 GenEventImpl *_finish_event, EventImpl::gen_t _finish_gen,
327 const ProfilingRequestSet &_requests);
328
329 protected:
330 // deletion performed when reference count goes to zero
331 virtual ~TaskRegistration(void);
332
333 public:
334 virtual void print(std::ostream &os) const;
335
338 };
339
341 public:
342 RemoteTaskRegistration(TaskRegistration *reg_op, int _target_node);
343
344 virtual void request_cancellation(void);
345
346 virtual void print(std::ostream &os) const;
347
348 protected:
350 };
351
352 // active messages
362
366
367 static void handle_message(NodeID sender, const RegisterTaskCompleteMessage &msg,
368 const void *data, size_t datalen);
369 };
370
376
377 static void handle_message(NodeID sender, const SpawnTaskMessage &msg,
378 const void *data, size_t datalen);
379 };
380
384
385 static void handle_message(NodeID sender, const ProcGroupCreateMessage &msg,
386 const void *data, size_t datalen);
387 };
388
392
393 static void handle_message(NodeID sender, const ProcGroupDestroyMessage &msg,
394 const void *data, size_t datalen);
395 };
396
399
400 static void handle_message(NodeID sender, const ProcGroupDestroyAckMessage &msg,
401 const void *data, size_t datalen);
402 };
403
404 namespace ThreadLocal {
405 // Assume zero initialized
406 extern thread_local Processor current_processor;
407 } // namespace ThreadLocal
408
409}; // namespace Realm
410
411#endif // ifndef REALM_PROC_IMPL_H
Definition bgwork.h:36
Definition bytearray.h:30
Definition bytearray.h:53
Definition codedesc.h:249
Definition threads.h:382
Definition threads.h:342
Definition dynamic_table.h:105
Definition event_impl.h:85
unsigned gen_t
Definition event_impl.h:87
Definition event_impl.h:49
Definition event.h:50
Definition event_impl.h:198
Definition id.h:30
ID_Types
Definition id.h:250
@ ID_PROCGROUP
Definition id.h:260
static ID make_procgroup(unsigned owner_node, unsigned creator_node, unsigned pgroup_idx)
::realm_id_t IDType
Definition id.h:32
Definition tasks.h:181
Definition proc_impl.h:202
CoreReservation * core_rsrv
Definition proc_impl.h:210
LocalCPUProcessor(RuntimeImpl *runtime_impl, Processor _me, CoreReservationSet &crs, size_t _stack_size, bool _force_kthreads, BackgroundWorkManager *bgwork, long long bgwork_timeslice)
virtual ~LocalCPUProcessor(void)
Definition proc_impl.h:225
CoreReservation * core_rsrv
Definition proc_impl.h:232
virtual ~LocalIOProcessor(void)
LocalIOProcessor(RuntimeImpl *runtime_impl, Processor _me, CoreReservationSet &crs, size_t _stack_size, int _concurrent_io_threads)
Definition proc_impl.h:141
virtual void enqueue_tasks(Task::TaskList &tasks, size_t num_tasks)
void set_scheduler(ThreadedTaskScheduler *_sched)
virtual bool register_task(Processor::TaskFuncID func_id, CodeDescriptor &codedesc, const ByteArrayRef &user_data)
TaskQueue task_queue
Definition proc_impl.h:175
virtual void add_internal_task(InternalTask *task)
ProfilingGauges::AbsoluteRangeGauge< int > ready_task_count
Definition proc_impl.h:176
virtual void spawn_task(Processor::TaskFuncID func_id, const void *args, size_t arglen, const ProfilingRequestSet &reqs, Event start_event, GenEventImpl *finish_event, EventImpl::gen_t finish_gen, int priority)
virtual void execute_task(Processor::TaskFuncID func_id, const ByteArrayRef &task_args)
virtual void start_threads(void)
ThreadedTaskScheduler * sched
Definition proc_impl.h:174
virtual void add_to_group(ProcessorGroupImpl *group)
virtual ~LocalTaskProcessor(void)
virtual void remove_from_group(ProcessorGroupImpl *group)
RWLock task_table_mutex
Definition proc_impl.h:184
virtual void enqueue_task(Task *task)
DeferredSpawnCache deferred_spawn_cache
Definition proc_impl.h:177
virtual void shutdown(void)
std::map< Processor::TaskFuncID, TaskTableEntry > task_table
Definition proc_impl.h:185
LocalTaskProcessor(RuntimeImpl *runtime_impl, Processor _me, Processor::Kind _kind, int num_cores=1)
Definition proc_impl.h:213
CoreReservation * core_rsrv
Definition proc_impl.h:222
virtual ~LocalUtilityProcessor(void)
LocalUtilityProcessor(RuntimeImpl *runtime_impl, Processor _me, CoreReservationSet &crs, size_t _stack_size, bool _force_kthreads, bool _pin_util_proc, BackgroundWorkManager *bgwork, long long bgwork_timeslice)
Definition operation.h:75
Definition operation.h:32
void remove_reference(void)
virtual void print(std::ostream &os) const
virtual Event get_finish_event(void) const
void defer(ProcessorGroupImpl *_pg, Event wait_on)
virtual void event_triggered(bool poisoned, TimeLimit work_until)
ProcessorGroupImpl * pg
Definition proc_impl.h:316
Definition proc_impl.h:254
static const ID::ID_Types ID_TYPE
Definition proc_impl.h:268
static ID make_id(const ProcessorGroupImpl &dummy, int owner, ID::IDType index)
Definition proc_impl.h:272
DeferredSpawnCache deferred_spawn_cache
Definition proc_impl.h:306
virtual void spawn_task(Processor::TaskFuncID func_id, const void *args, size_t arglen, const ProfilingRequestSet &reqs, Event start_event, GenEventImpl *finish_event, EventImpl::gen_t finish_gen, int priority)
void init(ID _me, int _owner)
ProfilingGauges::AbsoluteRangeGauge< int > * ready_task_count
Definition proc_impl.h:305
ProcessorGroupImpl * next_free
Definition proc_impl.h:300
std::vector< ProcessorImpl * > members
Definition proc_impl.h:298
void set_group_members(span< const Processor > member_list)
bool members_requested
Definition proc_impl.h:297
bool members_valid
Definition proc_impl.h:296
void request_group_members(void)
virtual void enqueue_tasks(Task::TaskList &tasks, size_t num_tasks)
void get_group_members(std::vector< Processor > &member_list)
DeferredDestroy deferred_destroy
Definition proc_impl.h:318
ReservationImpl lock
Definition proc_impl.h:299
TaskQueue task_queue
Definition proc_impl.h:304
virtual void add_to_group(ProcessorGroupImpl *group)
virtual void enqueue_task(Task *task)
virtual ~ProcessorGroupImpl(void)
virtual void remove_from_group(ProcessorGroupImpl *group)
Definition processor.h:199
Definition proc_impl.h:51
int num_cores
Definition proc_impl.h:136
virtual void remove_from_group(ProcessorGroupImpl *group)=0
virtual void enqueue_task(Task *task)=0
void enqueue_or_defer_task(Task *task, Event start_event, DeferredSpawnCache *cache)
ProcessorImpl(RuntimeImpl *runtime_impl, Processor _me, Processor::Kind _kind, int _num_cores=1)
GenEventImpl * create_genevent()
virtual void start_threads(void)
virtual ~ProcessorImpl(void)
virtual void add_internal_task(InternalTask *task)
Processor::Kind kind
Definition proc_impl.h:135
virtual bool register_task(Processor::TaskFuncID func_id, CodeDescriptor &codedesc, const ByteArrayRef &user_data)
static Processor::Kind get_processor_kind(RuntimeImpl *runtime_impl, Processor processor)
virtual void shutdown(void)
virtual void spawn_task(Processor::TaskFuncID func_id, const void *args, size_t arglen, const ProfilingRequestSet &reqs, Event start_event, GenEventImpl *finish_event, EventImpl::gen_t finish_gen, int priority)=0
virtual void execute_task(Processor::TaskFuncID func_id, const ByteArrayRef &task_args)
virtual void enqueue_tasks(Task::TaskList &tasks, size_t num_tasks)=0
void free_genevent(GenEventImpl *)
virtual void add_to_group(ProcessorGroupImpl *group)=0
Processor me
Definition proc_impl.h:134
LocalEventTableAllocator::FreeList free_local_events
Definition proc_impl.h:94
Definition processor.h:37
Kind
Definition processor.h:65
::realm_task_func_id_t TaskFuncID
Definition processor.h:58
void(* TaskFuncPtr)(const void *args, size_t arglen, const void *user_data, size_t user_data_len, Processor proc)
Definition processor.h:59
Definition profiling.h:363
Definition mutex.h:398
Definition proc_impl.h:235
virtual void spawn_task(Processor::TaskFuncID func_id, const void *args, size_t arglen, const ProfilingRequestSet &reqs, Event start_event, GenEventImpl *finish_event, EventImpl::gen_t finish_gen, int priority)
virtual void remove_from_group(ProcessorGroupImpl *group)
virtual void enqueue_tasks(Task::TaskList &tasks, size_t num_tasks)
virtual ~RemoteProcessor(void)
virtual void add_to_group(ProcessorGroupImpl *group)
virtual void enqueue_task(Task *task)
RemoteProcessor(RuntimeImpl *runtime_impl, Processor _me, Processor::Kind _kind, int _num_cores=1)
Definition proc_impl.h:340
virtual void request_cancellation(void)
RemoteTaskRegistration(TaskRegistration *reg_op, int _target_node)
int target_node
Definition proc_impl.h:349
virtual void print(std::ostream &os) const
Definition rsrv_impl.h:57
Definition runtime_impl.h:264
Definition tasks.h:131
Definition proc_impl.h:323
ByteArray userdata
Definition proc_impl.h:337
TaskRegistration(const CodeDescriptor &_codedesc, const ByteArrayRef &_userdata, GenEventImpl *_finish_event, EventImpl::gen_t _finish_gen, const ProfilingRequestSet &_requests)
virtual void print(std::ostream &os) const
virtual ~TaskRegistration(void)
CodeDescriptor codedesc
Definition proc_impl.h:336
Definition tasks.h:41
Definition tasks.h:215
Definition timers.h:129
Definition mutex.h:223
Definition utils.h:84
#define REALM_INTERNAL_API_EXTERNAL_LINKAGE
Definition compiler_support.h:218
thread_local int scheduler_lock
thread_local Processor current_processor
Definition activemsg.h:38
int NodeID
Definition nodeset.h:40
Processor::TaskFuncPtr fnptr
Definition proc_impl.h:180
ByteArray user_data
Definition proc_impl.h:181
Definition proc_impl.h:381
size_t num_members
Definition proc_impl.h:383
static void handle_message(NodeID sender, const ProcGroupCreateMessage &msg, const void *data, size_t datalen)
ProcessorGroup pgrp
Definition proc_impl.h:382
Definition proc_impl.h:397
static void handle_message(NodeID sender, const ProcGroupDestroyAckMessage &msg, const void *data, size_t datalen)
ProcessorGroup pgrp
Definition proc_impl.h:398
Definition proc_impl.h:389
static void handle_message(NodeID sender, const ProcGroupDestroyMessage &msg, const void *data, size_t datalen)
Event wait_on
Definition proc_impl.h:391
ProcessorGroup pgrp
Definition proc_impl.h:390
size_t current_age
Definition proc_impl.h:107
Mutex mutex
Definition proc_impl.h:101
void clear()
Definition proc_impl.h:109
void flush()
Definition proc_impl.h:119
Definition proc_impl.h:363
static void handle_message(NodeID sender, const RegisterTaskCompleteMessage &msg, const void *data, size_t datalen)
RemoteTaskRegistration * reg_op
Definition proc_impl.h:364
bool successful
Definition proc_impl.h:365
Definition proc_impl.h:353
Processor::TaskFuncID func_id
Definition proc_impl.h:355
RemoteTaskRegistration * reg_op
Definition proc_impl.h:357
NodeID sender
Definition proc_impl.h:354
static void handle_message(NodeID sender, const RegisterTaskMessage &msg, const void *data, size_t datalen)
Processor::Kind kind
Definition proc_impl.h:356
Definition proc_impl.h:371
size_t offset
Definition proc_impl.h:375
size_t total_bytes
Definition proc_impl.h:375
Event finish_event
Definition proc_impl.h:373
Processor::TaskFuncID func_id
Definition proc_impl.h:374
static void handle_message(NodeID sender, const SpawnTaskMessage &msg, const void *data, size_t datalen)
Processor proc
Definition proc_impl.h:372