Realm
A distributed, event-based tasking library
Loading...
Searching...
No Matches
threads.h
Go to the documentation of this file.
1/*
2 * Copyright 2025 Stanford University, NVIDIA Corporation
3 * SPDX-License-Identifier: Apache-2.0
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18// generic Realm interface to threading libraries (e.g. pthreads)
19
20#ifndef REALM_THREADS_H
21#define REALM_THREADS_H
22
23#include "realm/realm_config.h"
24#include "realm/mutex.h"
25#include "realm/atomics.h"
26#include "realm/utils.h"
28
29#ifdef REALM_USE_USER_THREADS
30#ifdef REALM_ON_MACOS
31#define _XOPEN_SOURCE
32#endif
33#endif
34
35#include <stddef.h>
36
37#include <string>
38#include <list>
39#include <set>
40#include <map>
41#include <deque>
42#include <iostream>
43
44#ifdef REALM_USE_PAPI
45// string.h is not used here, but if this is included by somebody else after
46// we include papi.h, mismatches occur because ffsll() is declared with __THROW!?
47#include <string.h>
48#include <papi.h>
49#endif
50
51namespace Realm {
52
53 namespace Threading {
54 // calls to initialize and cleanup any global state for the threading subsystem
55 bool initialize(void);
56 bool cleanup(void);
57 }; // namespace Threading
58
59 class Operation;
60
61 // ALL work inside Realm (i.e. both tasks and internal Realm work) should be done
62 // inside a Thread, which comes in (at least) two flavors:
63 // a) KernelThread - a kernel-managed thread, supporting preemptive multitasking
64 // and possibly-blocking system calls
65 // b) UserThread - a userspace-managed thread, supporting only cooperative multitasking
66 // but with (hopefully) much lower switching overhead
67 //
68 // Cooperative multitasking is handled with the help of a "scheduler" that the thread
69 // calls into when it wishes to sleep
70 //
71 // Threads return a void * on completion, available to any object that "joins" on it.
72 // A thread may also be sent a "signal", which can be delivered either synchronously
73 // (i.e.
74 // upon interaction with the scheduler) or asynchronously (e.g. via POSIX signals).
75
77 class ThreadScheduler;
78 class CoreReservation;
79
80 // from profiling.h
82
83#ifdef REALM_USE_PAPI
84 class PAPICounters;
85#endif
86
87 // template <class CONDTYPE> class ThreadWaker;
88
89 class Thread {
90 protected:
91 // thread objects are not constructed directly
92 Thread(ThreadScheduler *_scheduler);
93
94 template <typename T, void (T::*START_MTHD)(void)>
95 static void thread_entry_wrapper(void *obj);
96
98 void (*entry_wrapper)(void *),
99 const ThreadLaunchParameters &params,
100 CoreReservation &rsrv,
101 ThreadScheduler *_scheduler);
102
103#ifdef REALM_USE_USER_THREADS
104 static Thread *create_user_thread_untyped(void *target, void (*entry_wrapper)(void *),
105 const ThreadLaunchParameters &params,
106 const CoreReservation *rsrv,
107 ThreadScheduler *_scheduler);
108#endif
109
110 public:
111 // for kernel threads, the scheduler is optional - however, a thread with no scheduler
112 // is not allowed to wait on a Realm Event or any internal object
113 // a kernel thread also requires a core reservation that tells it which core(s) it may
114 // use when executing
115 template <typename T, void (T::*START_MTHD)(void)>
116 static Thread *create_kernel_thread(T *target, const ThreadLaunchParameters &params,
117 CoreReservation &rsrv,
118 ThreadScheduler *_scheduler = 0);
119
120#ifdef REALM_USE_USER_THREADS
121 // user threads must specify a scheduler - the whole point is that the OS isn't
122 // controlling them...
123 template <typename T, void (T::*START_MTHD)(void)>
124 static Thread *create_user_thread(T *target, const ThreadLaunchParameters &params,
125 const CoreReservation *rsrv,
126 ThreadScheduler *_scheduler);
127#endif
128
129 virtual ~Thread(void);
130
143
145
152
153 // adds a signal to the thread's queue, triggering an asynchronous notification
154 // if 'asynchronous' is true
155 void signal(Signal sig, bool asynchronous);
156
157 // returns the next signal in the queue, if any
159
160 // pops and handles any signals in the queue
161 void process_signals(void);
162
163 virtual void join(void) = 0; // BLOCKS until the thread completes
164 virtual void detach(void) = 0;
165
166 // called from within a thread
167 static Thread *self(void);
168 static void abort(void);
169 static void yield(void);
170
171 // called from within thread to indicate the association of an Operation with the
172 // thread
173 // for cancellation reasons
177
178 // changes the priority of the thread (and, by extension, the operation it
179 // is working on)
180 void set_priority(int new_priority);
181
182#ifdef REALM_USE_USER_THREADS
183 // perform a user-level thread switch
184 // if called from a kernel thread, that thread becomes the "host" for the user thread
185 // if called by a user thread with 'switch_to'==0, control returns to host
186 static void user_switch(Thread *switch_to);
187
188 // some systems do not appear to support user thread switching for
189 // reasons unknown, so allow code to test to see if it's working first
190 static bool test_user_switch_support(size_t stack_size = 1 << 20);
191#endif
192
193 template <typename CONDTYPE>
194 static void wait_for_condition(const CONDTYPE &cond, bool &poisoned);
195
196 // does this thread have exception handlers installed?
197 bool exceptions_permitted(void) const;
198
199 // put one of these in a try {} block to indicate that exceptions are allowed
205
206 // per-thread performance counters
211
212 protected:
213 friend class ThreadScheduler;
214
215 template <class CONDTYPE>
216 friend class ThreadWaker;
217
218 // atomically updates the thread's state, returning the old state
220
221 // updates the thread's state, but only if it's in the specified 'old_state' (i.e. an
222 // atomic compare and swap) - returns true on success and false on failure
223 bool try_update_state(Thread::State old_state, Thread::State new_state);
224
225 // send an asynchronous notification to the thread
226 virtual void alert_thread(void) = 0;
227
234 std::deque<Signal> signal_queue;
235
236#ifdef REALM_USE_PAPI
237 PAPICounters *papi_counters;
238#endif
239 };
240
242 public:
243 virtual ~ThreadScheduler(void);
244
245 // this can be used for logging or to hold a thread before it starts running
246 virtual void thread_starting(Thread *thread) = 0;
247
248 // callbacks from a thread when it wants to sleep (i.e. yielding on a co-routine
249 // interaction
250 // or blocking on some condition) or terminate - either will generally result in some
251 // other thread being woken up)
252 // virtual void thread_yielding(Thread *thread) = 0;
253 virtual void thread_blocking(Thread *thread) = 0;
254 virtual void thread_terminating(Thread *thread) = 0;
255
256 // notification that a thread is ready (this will generally come from some thread
257 // other
258 // than the one that's now ready)
259 virtual void thread_ready(Thread *thread) = 0;
260
261 virtual void set_thread_priority(Thread *thread, int new_priority) = 0;
262
263 protected:
264 // delegates friendship of Thread with subclasses
267 Thread::State new_state);
268 };
269
270 // any thread (user or kernel) will have its own stack and heap - the size of which can
271 // be controlled when the thread is launched - defaults are provided for all
272 // values, along with convenient mutators
274 public:
275 static const ptrdiff_t STACK_SIZE_DEFAULT = -1;
276 static const ptrdiff_t HEAP_SIZE_DEFAULT = -1;
277 static const ptrdiff_t ALTSTACK_SIZE_DEFAULT = -1;
278
282
284
285 ThreadLaunchParameters &set_stack_size(ptrdiff_t new_stack_size);
286 ThreadLaunchParameters &set_heap_size(ptrdiff_t new_heap_size);
287 ThreadLaunchParameters &set_alt_stack_size(ptrdiff_t new_alt_stack_size);
288 };
289
290 // Kernel threads will generally be much happier if they decide up front which core(s)
291 // each of them are going to use. Since this is a global optimization problem, we
292 // allow different parts of the system to create "reservations", which the runtime will
293 // then attempt to satisfy. A thread can be launched before this happens, but will not
294 // actually run until the reservations are satisfied.
295
296 // A reservation can request one or more cores, optionally restricted to a particular
297 // NUMA
298 // domain (as numbered by the OS). The reservation should also indicate how heavily
299 // (if at all) it intends to use the integer, floating-point, and load/store datapaths
300 // of the core(s). A reservation with EXCLUSIVE use is compatible with those expecting
301 // MINIMAL use of the same datapath, but not with any other reservation desiring
302 // EXCLUSIVE or SHARED access.
304 public:
312
313 static const int NUMA_DOMAIN_DONTCARE = -1;
314 static const ptrdiff_t STACK_SIZE_DEFAULT = -1;
315 static const ptrdiff_t HEAP_SIZE_DEFAULT = -1;
316 static const ptrdiff_t ALTSTACK_SIZE_DEFAULT = -1;
317
318 int num_cores = 1; // how many cores are requested
320 NUMA_DOMAIN_DONTCARE; // which NUMA domain the cores should come from
321 CoreUsage alu_usage = CORE_USAGE_SHARED; // "integer" datapath usage
322 CoreUsage fpu_usage = CORE_USAGE_MINIMAL; // floating-point usage
323 CoreUsage ldst_usage = CORE_USAGE_SHARED; // "memory" datapath usage
327
329
335 CoreReservationParameters &set_max_stack_size(ptrdiff_t new_max_stack_size);
336 CoreReservationParameters &set_max_heap_size(ptrdiff_t new_max_heap_size);
337 CoreReservationParameters &set_alt_stack_size(ptrdiff_t new_alt_stack_size);
338 };
339
340 class CoreReservationSet;
341
343 public:
344 CoreReservation(const std::string &_name, CoreReservationSet &crs,
345 const CoreReservationParameters &_params);
346
347 // eventually we'll get an Allocation, which is an opaque type because it's
348 // OS-dependent :(
349 struct Allocation;
350
351 // to be informed of the eventual allocation, you supply one of these:
353 public:
354 virtual ~NotificationListener(void) {}
355 virtual void notify_allocation(const CoreReservation &rsrv) = 0;
356 };
357
359
360 // This method is used to set affinity for the calling thread
361 // if begin == -1 or end == -1, the thread will be mapped to all the cores
362 bool set_affinity(int begin = -1, int end = -1);
363
364 friend std::ostream &operator<<(std::ostream &stream,
365 const CoreReservation &core_resv);
366
367 public:
368 std::string name;
370
371 // no locks needed here because we aren't multi-threaded until the allocation exists
372 Allocation *allocation;
373
374 protected:
375 friend class CoreReservationSet;
377
378 std::list<NotificationListener *> listeners;
379 };
380
381 // manages a set of core reservations and if/how they are satisfied
383 public:
385
387
388 const HardwareTopology *get_core_map(void) const;
389
391
392 // if 'dummy_reservation_ok' is set, a failed reservation will be "satisfied" with
393 // one that uses dummy (i.e. no assign cores) reservations
394 bool satisfy_reservations(bool dummy_reservation_ok = false);
395
396 void report_reservations(std::ostream &os) const;
397
398 protected:
400 std::map<CoreReservation *, CoreReservation::Allocation *> allocations;
401 };
402
403#ifdef REALM_USE_PAPI
404 class PAPICounters {
405 protected:
406 PAPICounters(void);
407 ~PAPICounters(void);
408
409 public:
410 static PAPICounters *setup_counters(const ProfilingMeasurementCollection &pmc);
411 void cleanup(void);
412
413 void start(void);
414 void suspend(void);
415 void resume(void);
416 void stop(void);
417 void record(ProfilingMeasurementCollection &pmc);
418
419 protected:
420 int papi_event_set;
421 std::map<int, size_t> event_codes;
422 std::vector<long long> event_counts;
423 };
424#endif
425
426 // move this somewhere else
427
428 class DummyLock {
429 public:
430 void lock(void) {}
431 void unlock(void) {}
432 };
433
434} // namespace Realm
435
436#include "realm/threads.inl"
437
438#endif // REALM_THREADS_H
Definition threads.h:303
ptrdiff_t alt_stack_size
Definition threads.h:326
CoreReservationParameters & set_alt_stack_size(ptrdiff_t new_alt_stack_size)
CoreUsage ldst_usage
Definition threads.h:323
CoreUsage fpu_usage
Definition threads.h:322
CoreReservationParameters & set_max_stack_size(ptrdiff_t new_max_stack_size)
CoreReservationParameters & set_ldst_usage(CoreUsage new_ldst_usage)
CoreReservationParameters & set_fpu_usage(CoreUsage new_fpu_usage)
static const int NUMA_DOMAIN_DONTCARE
Definition threads.h:313
static const ptrdiff_t ALTSTACK_SIZE_DEFAULT
Definition threads.h:316
CoreUsage alu_usage
Definition threads.h:321
ptrdiff_t max_stack_size
Definition threads.h:324
int num_cores
Definition threads.h:318
static const ptrdiff_t STACK_SIZE_DEFAULT
Definition threads.h:314
int numa_domain
Definition threads.h:319
CoreUsage
Definition threads.h:306
@ CORE_USAGE_SHARED
Definition threads.h:309
@ CORE_USAGE_NONE
Definition threads.h:307
@ CORE_USAGE_EXCLUSIVE
Definition threads.h:310
@ CORE_USAGE_MINIMAL
Definition threads.h:308
ptrdiff_t max_heap_size
Definition threads.h:325
CoreReservationParameters & set_alu_usage(CoreUsage new_alu_usage)
CoreReservationParameters & set_num_cores(int new_num_cores)
CoreReservationParameters & set_numa_domain(int new_numa_domain)
static const ptrdiff_t HEAP_SIZE_DEFAULT
Definition threads.h:315
CoreReservationParameters & set_max_heap_size(ptrdiff_t new_max_heap_size)
Definition threads.h:382
bool satisfy_reservations(bool dummy_reservation_ok=false)
CoreReservationSet(const HardwareTopology *_cm)
void report_reservations(std::ostream &os) const
const HardwareTopology * get_core_map(void) const
std::map< CoreReservation *, CoreReservation::Allocation * > allocations
Definition threads.h:400
const HardwareTopology * cm
Definition threads.h:399
void add_reservation(CoreReservation &rsrv)
virtual void notify_allocation(const CoreReservation &rsrv)=0
virtual ~NotificationListener(void)
Definition threads.h:354
Definition threads.h:342
CoreReservation(const std::string &_name, CoreReservationSet &crs, const CoreReservationParameters &_params)
std::list< NotificationListener * > listeners
Definition threads.h:378
Allocation * allocation
Definition threads.h:372
friend std::ostream & operator<<(std::ostream &stream, const CoreReservation &core_resv)
bool set_affinity(int begin=-1, int end=-1)
CoreReservationParameters params
Definition threads.h:369
void add_listener(NotificationListener *listener)
std::string name
Definition threads.h:368
void notify_listeners(void)
Definition threads.h:428
void unlock(void)
Definition threads.h:431
void lock(void)
Definition threads.h:430
Represents the topology of the host processor cores and memory.
Definition hardware_topology.h:38
Definition operation.h:32
Definition profiling.h:393
Definition threads.h:273
static const ptrdiff_t STACK_SIZE_DEFAULT
Definition threads.h:275
ptrdiff_t alt_stack_size
Definition threads.h:281
ThreadLaunchParameters & set_stack_size(ptrdiff_t new_stack_size)
ptrdiff_t stack_size
Definition threads.h:279
ThreadLaunchParameters & set_heap_size(ptrdiff_t new_heap_size)
static const ptrdiff_t ALTSTACK_SIZE_DEFAULT
Definition threads.h:277
static const ptrdiff_t HEAP_SIZE_DEFAULT
Definition threads.h:276
ThreadLaunchParameters & set_alt_stack_size(ptrdiff_t new_alt_stack_size)
ptrdiff_t heap_size
Definition threads.h:280
Definition threads.h:241
bool try_update_thread_state(Thread *thread, Thread::State old_state, Thread::State new_state)
virtual void thread_terminating(Thread *thread)=0
virtual void thread_blocking(Thread *thread)=0
virtual ~ThreadScheduler(void)
virtual void thread_starting(Thread *thread)=0
Thread::State update_thread_state(Thread *thread, Thread::State new_state)
virtual void set_thread_priority(Thread *thread, int new_priority)=0
virtual void thread_ready(Thread *thread)=0
Definition threads.h:89
Thread::State update_state(Thread::State new_state)
atomic< int > signal_count
Definition threads.h:232
int exception_handler_count
Definition threads.h:231
friend class ThreadWaker
Definition threads.h:216
State
Definition threads.h:132
@ STATE_RUNNING
Definition threads.h:135
@ STATE_CREATED
Definition threads.h:133
@ STATE_ALERTED
Definition threads.h:138
@ STATE_BLOCKING
Definition threads.h:136
@ STATE_BLOCKED
Definition threads.h:137
@ STATE_FINISHED
Definition threads.h:140
@ STATE_DELETED
Definition threads.h:141
@ STATE_READY
Definition threads.h:139
@ STATE_STARTUP
Definition threads.h:134
static void abort(void)
Signal pop_signal(void)
State get_state(void)
virtual ~Thread(void)
void record_perf_counters(ProfilingMeasurementCollection &pmc)
static Thread * create_kernel_thread(T *target, const ThreadLaunchParameters &params, CoreReservation &rsrv, ThreadScheduler *_scheduler=0)
void set_priority(int new_priority)
void process_signals(void)
Operation * get_operation(void) const
atomic< State > state
Definition threads.h:228
Signal
Definition threads.h:147
@ TSIG_INTERRUPT
Definition threads.h:150
@ TSIG_NONE
Definition threads.h:148
@ TSIG_SHOW_BACKTRACE
Definition threads.h:149
static void wait_for_condition(const CONDTYPE &cond, bool &poisoned)
static Thread * self(void)
static void thread_entry_wrapper(void *obj)
virtual void detach(void)=0
static Thread * create_kernel_thread_untyped(void *target, void(*entry_wrapper)(void *), const ThreadLaunchParameters &params, CoreReservation &rsrv, ThreadScheduler *_scheduler)
void start_perf_counters(void)
Thread(ThreadScheduler *_scheduler)
bool try_update_state(Thread::State old_state, Thread::State new_state)
bool exceptions_permitted(void) const
std::deque< Signal > signal_queue
Definition threads.h:234
Mutex signal_mutex
Definition threads.h:233
void stop_operation(Operation *op)
ThreadScheduler * scheduler
Definition threads.h:229
static void yield(void)
virtual void alert_thread(void)=0
void signal(Signal sig, bool asynchronous)
void setup_perf_counters(const ProfilingMeasurementCollection &pmc)
Operation * current_op
Definition threads.h:230
virtual void join(void)=0
void stop_perf_counters(void)
void start_operation(Operation *op)
Definition mutex.h:223
Definition atomics.h:31
bool initialize(void)
bool cleanup(void)
Definition activemsg.h:38