Realm
A distributed, event-based tasking library
Loading...
Searching...
No Matches
tasks.h
Go to the documentation of this file.
1/*
2 * Copyright 2026 Stanford University, NVIDIA Corporation, Los Alamos National Laboratory
3 * SPDX-License-Identifier: Apache-2.0
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18// tasks and task scheduling for Realm
19
20#ifndef REALM_TASKS_H
21#define REALM_TASKS_H
22
23#include "realm/processor.h"
24#include "realm/id.h"
25
26#include "realm/operation.h"
27#include "realm/profiling.h"
28
29#include "realm/threads.h"
30#include "realm/pri_queue.h"
31#include "realm/bytearray.h"
32#include "realm/atomics.h"
33#include "realm/mutex.h"
34#include "realm/bgwork.h"
35
36namespace Realm {
37
38 class ProcessorImpl;
39
40 // information for a task launch
41 class Task final : public Operation {
42 public:
43 Task(Processor _proc, Processor::TaskFuncID _func_id, const void *_args,
44 size_t _arglen, const ProfilingRequestSet &reqs, Event _before_event,
45 GenEventImpl *_finish_event, EventImpl::gen_t _finish_gen, int _priority);
46
47 protected:
48 // deletion performed when reference count goes to zero
49 virtual ~Task(void);
50
51 public:
52 static void *operator new(size_t size);
53 static void operator delete(void *ptr);
54
55 virtual bool mark_ready(void);
56 virtual bool mark_started(void);
57
58 virtual void print(std::ostream &os) const;
59
60 virtual bool attempt_cancellation(int error_code, const void *reason_data,
61 size_t reason_size);
62
63 virtual void set_priority(int new_priority);
64
66
69
70 // "small-vector" optimization for task args
71 char *argdata;
72 size_t arglen;
73 static const size_t SHORT_ARGLEN_MAX = 64;
76 // ByteArray args;
77
80
81 // intrusive task list - used for pending, ready, and suspended tasks
88
89 class DeferredSpawn : public EventWaiter {
90 public:
92 void setup(ProcessorImpl *_proc, Task *_task, Event _wait_on);
93 void defer(EventImpl *_wait_impl, EventImpl::gen_t _wait_gen);
94 virtual void event_triggered(bool poisoned, TimeLimit work_until);
95 virtual void print(std::ostream &os) const;
96 virtual Event get_finish_event(void) const;
97
98 // attempts to add another task to the this deferred spawn group -
99 // returns true on success, or false if the event has already
100 // triggered, in which case 'poisoned' is set appropriately
101 bool add_task(Task *to_add, bool &poisoned);
102
103 protected:
111 };
113
114 protected:
115 virtual void mark_completed(void);
116
118
120
121 // to spread out the cost of marking a long list of tasks ready, we
122 // keep a 'marked_ready' bit in the head task of the list and the rest
123 // have a pointer to the head task (which uses a uintptr_t so we can
124 // borrow the bottom bit for avoiding races)
126
127 public: // HACK for debug - should be protected
129 };
130
131 class TaskQueue {
132 public:
134
135 // we used most of the signed integer range for priorities - we do borrow
136 // a
137 // few of the extreme values to make sure we have "infinity" and
138 // "negative infinity" and that we don't run into problems with -INT_MIN
139 typedef int priority_t;
140 static const priority_t PRI_MAX_FINITE = INT_MAX - 1;
141 static const priority_t PRI_MIN_FINITE = -(INT_MAX - 1);
144
146 public:
147 virtual void item_available(priority_t item_priority) = 0;
148 };
149
150 // starvation seems to be a problem on shared task queues
155 std::vector<NotificationCallback *> callbacks;
156 std::vector<priority_t> callback_priorities;
158
160 priority_t higher_than = PRI_NEG_INF);
161
163
165
167 // gets highest priority task available from any task queue in list
168 static Task *get_best_task(const std::vector<TaskQueue *> &queues,
169 int &task_priority);
170
171 void enqueue_task(Task *task);
172 void enqueue_tasks(Task::TaskList &tasks, size_t num_tasks);
173 bool empty() const { return task_count.load() == 0; }
174
175 private:
176 void enqueue_ready_task(Task *task, bool front = false);
177 };
178
179 // an internal task is an arbitrary blob of work that needs to happen on
180 // a processor's actual thread(s)
181 class REALM_INTERNAL_API_EXTERNAL_LINKAGE InternalTask { // needed by librealm_kokkos.so
182 protected:
183 // cannot be destroyed directly
184 virtual ~InternalTask() {}
185
186 public:
187 virtual void execute_on_processor(Processor p) = 0;
188
193 };
194
195 // a common extension for processors is to provide some context for
196 // running tasks - this can be done by subclassing and overriding
197 // `execute_task`, but simple cases can be handled with
198 // TaskContextManagers
200 public:
201 // create a context for the specified task - the value returned will
202 // be provided to the call to destroy_context
203 virtual void *create_context(Task *task) const { return nullptr; }
204 virtual void destroy_context(Task *task, void *context) const {}
205 virtual void *create_context(InternalTask *task) const { return nullptr; }
206 virtual void destroy_context(InternalTask *task, void *context) const {}
207 };
208
209 // a task scheduler in which one or more worker threads execute tasks from one
210 // or more task queues
211 // once given a task, a worker must complete it before taking on new work
212 // if a worker needs to suspend, a new worker may be spun up to start a new task
213 // this parent version tries to be agnostic to whether the threads are
214 // user or kernel threads
216 public:
218
220
221 virtual void add_task_queue(TaskQueue *queue);
222
223 virtual void remove_task_queue(TaskQueue *queue);
224
226 long long max_timeslice, int numa_domain);
227
228 // add a context manager - each new one "wraps" the previous ones,
229 // constructing its context after them and destroying before
231
232 virtual void start(void) = 0;
233 virtual void shutdown(void) = 0;
234
235 // called when thread status changes
236 virtual void thread_blocking(Thread *thread);
237 virtual void thread_ready(Thread *thread);
238
239 virtual void set_thread_priority(Thread *thread, int new_priority);
240
242
243 public:
244 // the main scheduler loop - lock should be held before calling
245 void scheduler_loop(void);
246 // an entry point that takes the scheduler lock explicitly
248
249 protected:
250 // returns true if everything went well, false if running thread
251 // may have been left in a bad state
252 virtual bool execute_task(Task *task) = 0;
253
254 virtual void execute_internal_task(InternalTask *task) = 0;
255
256 virtual Thread *worker_create(bool make_active) = 0;
257 virtual void worker_sleep(Thread *switch_to) = 0;
258 virtual void worker_wake(Thread *to_wake) = 0;
259 virtual void worker_terminate(Thread *switch_to) = 0;
260
261 // gets highest priority task available from any task queue
262 Task *get_best_ready_task(int &task_priority);
263
264 // TODO: switch this to DelegatingMutex - goal is that callers of
265 // things like thread_ready() should not have to block on
266 // contention
268 std::vector<TaskQueue *> task_queues;
269 std::vector<Thread *> idle_workers;
270 std::set<Thread *> blocked_workers;
271 // threads that block while holding a scheduler lock go here instead
272 std::set<Thread *> spinning_workers;
273
274 std::vector<const TaskContextManager *> context_managers;
275
276 // internal task list is NOT guarded by the main mutex
278
281 std::map<Thread *, int> worker_priorities;
283 int active_worker_count; // workers that are awake (i.e. using a core)
284 int unassigned_worker_count; // awake but unassigned workers
285
286 // helper for tracking/sanity-checking worker counts
287 void update_worker_count(int active_delta, int unassigned_delta, bool check = true);
288
289 // workers that are unassigned and cannot find any work would often (but not
290 // always) like to suspend until work is available - this is done via a "work
291 // counter" that monotonically increments whenever any kind of new work is available
292 // and a "suspended on" value that indicates if any threads are suspended on a
293 // particular count and need to be signalled
294 // this model allows the provider of new work to update the counter in a lock-free way
295 // and only do the condition variable broadcast if somebody is probably sleeping
296
298 public:
301
302 void set_interrupt_flag(atomic<bool> *_interrupt_flag);
303
304 // called whenever new work is available
306
307 uint64_t read_counter(void) const;
308
309 // returns true if there is new work since the old_counter value was read
310 // this is non-blocking, and may be called while holding another lock
311 bool check_for_work(uint64_t old_counter);
312
313 // waits until new work arrives - this will possibly go to sleep,
314 // so should not be called while holding another lock
315 void wait_for_work(uint64_t old_counter);
316
317 protected:
318 // 64-bit counter is used to avoid dealing with wrap-around cases
319 // bottom bits count the number of sleepers, but a max of 2^56 operations
320 // is still a lot
321 static const unsigned SLEEPER_BITS = 8;
324
325 // doorbell list popping is protected with a lock-free delegating mutex
328 };
329
331
332 virtual void wait_for_work(uint64_t old_work_counter);
333
334 // most of our work counter updates are going to come from priority queues, so a
335 // little
336 // template-fu here...
337 template <typename PQ>
338 class WorkCounterUpdater : public PQ::NotificationCallback {
339 public:
343
344 // TaskQueue-style
345 virtual void item_available(typename PQ::priority_t)
346 {
348 }
349
350 // PriorityQueue-style
351 virtual bool item_available(Thread *, typename PQ::priority_t)
352 {
354 return false; // never consumes the work
355 }
356
357 protected:
359 };
360
363
367
368 public:
369 // various configurable settings
374 };
375
377 {
378 // just return the counter value with the sleeper bits removed
379 return (counter.load_acquire() >> SLEEPER_BITS);
380 }
381
382 // returns true if there is new work since the old_counter value was read
383 // this is non-blocking, and may be called while holding another lock
385 {
386 // test the counter value without synchronization
387 return (read_counter() != old_counter);
388 }
389
390 // an implementation of ThreadedTaskScheduler that uses kernel threads
391 // for workers
393 public:
395
397
398 virtual void add_task_queue(TaskQueue *queue);
399
400 virtual void remove_task_queue(TaskQueue *queue);
401
402 virtual void start(void);
403
404 virtual void shutdown(void);
405
406 virtual void thread_starting(Thread *thread);
407
408 virtual void thread_terminating(Thread *thread);
409
410 protected:
411 virtual bool execute_task(Task *task);
412
414
415 virtual Thread *worker_create(bool make_active);
416 virtual void worker_sleep(Thread *switch_to);
417 virtual void worker_wake(Thread *to_wake);
418 virtual void worker_terminate(Thread *switch_to);
419
420 virtual void wait_for_work(uint64_t old_work_counter);
421
424
425 std::set<Thread *> all_workers;
426 std::set<Thread *> active_workers;
427 std::set<Thread *> terminating_workers;
428 std::map<Thread *, FIFOMutex::CondVar *> sleeping_threads;
430 };
431
432#ifdef REALM_USE_USER_THREADS
433 // an implementation of ThreadedTaskScheduler that uses user threads
434 // for workers (and one or more kernel threads for hosts
435 class UserThreadTaskScheduler : public ThreadedTaskScheduler {
436 public:
437 UserThreadTaskScheduler(Processor _proc, CoreReservation &_core_rsrv);
438
439 virtual ~UserThreadTaskScheduler(void);
440
441 virtual void add_task_queue(TaskQueue *queue);
442
443 virtual void remove_task_queue(TaskQueue *queue);
444
445 virtual void start(void);
446 virtual void shutdown(void);
447
448 virtual void thread_starting(Thread *thread);
449
450 virtual void thread_terminating(Thread *thread);
451
452 protected:
453 virtual bool execute_task(Task *task);
454
455 virtual void execute_internal_task(InternalTask *task);
456
457 void host_thread_loop(void);
458
459 // you can't delete a user thread until you've switched off of it, so
460 // use TLS to mark when that should happen
461 void request_user_thread_cleanup(Thread *thread);
462 void do_user_thread_cleanup(void);
463
464 virtual Thread *worker_create(bool make_active);
465 virtual void worker_sleep(Thread *switch_to);
466 virtual void worker_wake(Thread *to_wake);
467 virtual void worker_terminate(Thread *switch_to);
468
469 virtual void wait_for_work(uint64_t old_work_counter);
470
471 Processor proc;
472 CoreReservation &core_rsrv;
473
474 std::set<Thread *> all_hosts;
475 std::set<Thread *> all_workers;
476
477 int host_startups_remaining;
478 FIFOMutex::CondVar host_startup_condvar;
479
480 public:
481 int cfg_num_host_threads;
482 };
483#endif
484
485}; // namespace Realm
486
487#endif // ifndef REALM_TASKS_H
Definition bgwork.h:36
Definition threads.h:342
Definition mutex.h:299
Definition mutex.h:187
Definition threads.h:428
Definition event_impl.h:85
unsigned gen_t
Definition event_impl.h:87
Definition event_impl.h:49
Definition event.h:50
Definition mutex.h:342
Definition mutex.h:248
Definition event_impl.h:198
Definition tasks.h:181
REALM_PMTA_DEFN(InternalTask, IntrusiveListLink< InternalTask >, tl_link)
IntrusiveList< InternalTask, REALM_PMTA_USE(InternalTask, tl_link), Mutex > TaskList
Definition tasks.h:192
virtual void execute_on_processor(Processor p)=0
virtual ~InternalTask()
Definition tasks.h:184
IntrusiveListLink< InternalTask > tl_link
Definition tasks.h:189
Definition lists.h:66
Definition lists.h:119
Definition tasks.h:392
Processor proc
Definition tasks.h:422
virtual void thread_terminating(Thread *thread)
std::set< Thread * > all_workers
Definition tasks.h:425
std::map< Thread *, FIFOMutex::CondVar * > sleeping_threads
Definition tasks.h:428
std::set< Thread * > terminating_workers
Definition tasks.h:427
virtual void add_task_queue(TaskQueue *queue)
KernelThreadTaskScheduler(Processor _proc, CoreReservation &_core_rsrv)
virtual void thread_starting(Thread *thread)
CoreReservation & core_rsrv
Definition tasks.h:423
std::set< Thread * > active_workers
Definition tasks.h:426
virtual void execute_internal_task(InternalTask *task)
virtual Thread * worker_create(bool make_active)
virtual void wait_for_work(uint64_t old_work_counter)
FIFOMutex::CondVar shutdown_condvar
Definition tasks.h:429
virtual void worker_terminate(Thread *switch_to)
virtual void worker_sleep(Thread *switch_to)
virtual ~KernelThreadTaskScheduler(void)
virtual bool execute_task(Task *task)
virtual void remove_task_queue(TaskQueue *queue)
virtual void worker_wake(Thread *to_wake)
Definition operation.h:32
Definition pri_queue.h:34
Definition proc_impl.h:51
Definition processor.h:37
::realm_task_func_id_t TaskFuncID
Definition processor.h:58
Definition profiling.h:363
Definition tasks.h:199
virtual void destroy_context(InternalTask *task, void *context) const
Definition tasks.h:206
virtual void destroy_context(Task *task, void *context) const
Definition tasks.h:204
virtual void * create_context(InternalTask *task) const
Definition tasks.h:205
virtual void * create_context(Task *task) const
Definition tasks.h:203
virtual void item_available(priority_t item_priority)=0
Definition tasks.h:131
void enqueue_task(Task *task)
Task::TaskList ready_task_list
Definition tasks.h:154
FIFOMutex mutex
Definition tasks.h:153
void set_gauge(ProfilingGauges::AbsoluteRangeGauge< int > *new_gauge)
void enqueue_tasks(Task::TaskList &tasks, size_t num_tasks)
static const priority_t PRI_POS_INF
Definition tasks.h:142
atomic< priority_t > top_priority
Definition tasks.h:151
atomic< size_t > task_count
Definition tasks.h:152
std::vector< priority_t > callback_priorities
Definition tasks.h:156
static const priority_t PRI_NEG_INF
Definition tasks.h:143
ProfilingGauges::AbsoluteRangeGauge< int > * task_count_gauge
Definition tasks.h:157
bool empty() const
Definition tasks.h:173
void add_subscription(NotificationCallback *callback, priority_t higher_than=PRI_NEG_INF)
int priority_t
Definition tasks.h:139
static const priority_t PRI_MIN_FINITE
Definition tasks.h:141
void remove_subscription(NotificationCallback *callback)
static const priority_t PRI_MAX_FINITE
Definition tasks.h:140
std::vector< NotificationCallback * > callbacks
Definition tasks.h:155
static Task * get_best_task(const std::vector< TaskQueue * > &queues, int &task_priority)
Definition tasks.h:89
TaskList pending_list
Definition tasks.h:108
void defer(EventImpl *_wait_impl, EventImpl::gen_t _wait_gen)
bool add_task(Task *to_add, bool &poisoned)
Task * task
Definition tasks.h:105
virtual void event_triggered(bool poisoned, TimeLimit work_until)
void setup(ProcessorImpl *_proc, Task *_task, Event _wait_on)
virtual void print(std::ostream &os) const
ProcessorImpl * proc
Definition tasks.h:104
virtual Event get_finish_event(void) const
size_t list_length
Definition tasks.h:110
bool is_triggered
Definition tasks.h:109
Mutex pending_list_mutex
Definition tasks.h:107
bool is_poisoned
Definition tasks.h:109
Event wait_on
Definition tasks.h:106
Definition tasks.h:41
bool free_argdata
Definition tasks.h:75
REALM_PMTA_DEFN(Task, IntrusivePriorityListLink< Task >, tl_link)
virtual void set_priority(int new_priority)
virtual Status::Result get_state(void)
Event before_event
Definition tasks.h:78
virtual ~Task(void)
virtual bool attempt_cancellation(int error_code, const void *reason_data, size_t reason_size)
virtual bool mark_ready(void)
static const size_t SHORT_ARGLEN_MAX
Definition tasks.h:73
Processor proc
Definition tasks.h:67
virtual bool mark_started(void)
char * argdata
Definition tasks.h:71
REALM_PMTA_DEFN(Task, int, priority)
Processor::TaskFuncID func_id
Definition tasks.h:68
DeferredSpawn deferred_spawn
Definition tasks.h:112
atomic< bool > marked_ready
Definition tasks.h:125
virtual void print(std::ostream &os) const
virtual void mark_completed(void)
Thread * executing_thread
Definition tasks.h:119
char short_argdata[SHORT_ARGLEN_MAX]
Definition tasks.h:74
Task(Processor _proc, Processor::TaskFuncID _func_id, const void *_args, size_t _arglen, const ProfilingRequestSet &reqs, Event _before_event, GenEventImpl *_finish_event, EventImpl::gen_t _finish_gen, int _priority)
IntrusivePriorityListLink< Task > tl_link
Definition tasks.h:82
IntrusivePriorityList< Task, int, REALM_PMTA_USE(Task, tl_link), REALM_PMTA_USE(Task, priority), DummyLock > TaskList
Definition tasks.h:87
int priority
Definition tasks.h:79
atomic< uintptr_t > pending_head
Definition tasks.h:128
size_t arglen
Definition tasks.h:72
void execute_on_processor(Processor p)
Definition threads.h:241
Definition threads.h:89
WorkCounter * work_counter
Definition tasks.h:358
virtual bool item_available(Thread *, typename PQ::priority_t)
Definition tasks.h:351
virtual void item_available(typename PQ::priority_t)
Definition tasks.h:345
WorkCounterUpdater(ThreadedTaskScheduler *sched)
Definition tasks.h:340
atomic< bool > * interrupt_flag
Definition tasks.h:323
bool check_for_work(uint64_t old_counter)
Definition tasks.h:384
DelegatingMutex db_mutex
Definition tasks.h:326
atomic< uint64_t > counter
Definition tasks.h:322
void wait_for_work(uint64_t old_counter)
static const unsigned SLEEPER_BITS
Definition tasks.h:321
uint64_t read_counter(void) const
Definition tasks.h:376
DoorbellList db_list
Definition tasks.h:327
void set_interrupt_flag(atomic< bool > *_interrupt_flag)
Definition tasks.h:215
BackgroundWorkManager::Worker bgworker
Definition tasks.h:364
bool cfg_reuse_workers
Definition tasks.h:370
std::vector< Thread * > idle_workers
Definition tasks.h:269
WorkCounter work_counter
Definition tasks.h:330
virtual void start(void)=0
void add_task_context(const TaskContextManager *_manager)
std::vector< TaskQueue * > task_queues
Definition tasks.h:268
virtual void worker_terminate(Thread *switch_to)=0
FIFOMutex lock
Definition tasks.h:267
virtual Thread * worker_create(bool make_active)=0
virtual void thread_blocking(Thread *thread)
virtual void set_thread_priority(Thread *thread, int new_priority)
std::map< Thread *, int > worker_priorities
Definition tasks.h:281
virtual ~ThreadedTaskScheduler(void)
int unassigned_worker_count
Definition tasks.h:284
std::set< Thread * > spinning_workers
Definition tasks.h:272
virtual void add_task_queue(TaskQueue *queue)
Task * get_best_ready_task(int &task_priority)
PriorityQueue< Thread *, DummyLock > ResumableQueue
Definition tasks.h:279
virtual void worker_wake(Thread *to_wake)=0
atomic< bool > shutdown_flag
Definition tasks.h:282
WorkCounterUpdater< TaskQueue > wcu_task_queues
Definition tasks.h:361
InternalTask::TaskList internal_tasks
Definition tasks.h:277
int active_worker_count
Definition tasks.h:283
atomic< bool > bgworker_interrupt
Definition tasks.h:365
virtual void configure_bgworker(BackgroundWorkManager *manager, long long max_timeslice, int numa_domain)
std::set< Thread * > blocked_workers
Definition tasks.h:270
int cfg_max_active_workers
Definition tasks.h:373
virtual void remove_task_queue(TaskQueue *queue)
virtual bool execute_task(Task *task)=0
virtual void wait_for_work(uint64_t old_work_counter)
WorkCounterUpdater< ResumableQueue > wcu_resume_queue
Definition tasks.h:362
virtual void worker_sleep(Thread *switch_to)=0
int cfg_min_active_workers
Definition tasks.h:372
virtual void thread_ready(Thread *thread)
std::vector< const TaskContextManager * > context_managers
Definition tasks.h:274
virtual void execute_internal_task(InternalTask *task)=0
int cfg_max_idle_workers
Definition tasks.h:371
long long max_bgwork_timeslice
Definition tasks.h:366
void update_worker_count(int active_delta, int unassigned_delta, bool check=true)
virtual void shutdown(void)=0
ResumableQueue resumable_workers
Definition tasks.h:280
void add_internal_task(InternalTask *itask)
Definition timers.h:129
Definition mutex.h:223
Definition atomics.h:31
T load(void) const
T load_acquire(void) const
#define REALM_INTERNAL_API_EXTERNAL_LINKAGE
Definition compiler_support.h:218
#define REALM_PMTA_USE(structtype, name)
Definition lists.h:42
Definition activemsg.h:38