Realm
A distributed, event-based tasking library
Loading...
Searching...
No Matches
pri_queue.h
Go to the documentation of this file.
1/*
2 * Copyright 2025 Stanford University, NVIDIA Corporation
3 * SPDX-License-Identifier: Apache-2.0
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18// templated priority queue
19
20#ifndef REALM_PRI_QUEUE_H
21#define REALM_PRI_QUEUE_H
22
23#include <deque>
24#include <map>
25
26#include "realm/sampling.h"
27
28namespace Realm {
29
30 // a priority queue is templated on the type of thing stored in the queue
31 // and on the kind of lock used to protect it (e.g. you can use a DummyLock
32 // if mutual exclusion is provided outside of these calls)
33 template <typename T, typename LT>
35 public:
38
39 typedef T ITEMTYPE;
40
41 // we used most of the signed integer range for priorities - we do borrow a
42 // few of the extreme values to make sure we have "infinity" and "negative infinity"
43 // and that we don't run into problems with -INT_MIN
44 // attempts to enqueue an item with a priority outside the "finite" range will result
45 // in the priority being silently clamped to the finite range
46 typedef int priority_t;
47 static const priority_t PRI_MAX_FINITE = INT_MAX - 1;
48 static const priority_t PRI_MIN_FINITE = -(INT_MAX - 1);
51
52 // two ways to add an item -
53 // 1) add it to the end of the list at that priority (i.e. FIFO order)
54 // 2) "unget" adds it to the front of the list (i.e. LIFO order)
55 void put(T item, priority_t priority, bool add_to_back = true);
56
57 // getting an item is always from the front of the list and can be filtered to
58 // ignore things that aren't above a specified priority
59 // the priority of the retrieved item (if any) is returned in *item_priority
60 T get(priority_t *item_priority, priority_t higher_than = PRI_NEG_INF);
61
62 // peek is like get, but doesn't remove the element (this is only really useful when
63 // you don't have multiple getters)
64 T peek(priority_t *item_priority, priority_t higher_than = PRI_NEG_INF) const;
65
66 // similarly, the empty-ness query can also ignore things below a certain priority
67 // this call is lock-free (and is again of questionable utility with multiple readers)
68 bool empty(priority_t higher_than = PRI_NEG_INF) const;
69
70 // it is possible to subscribe to queue updates - notifications are sent when
71 // a new item arrives at a higher priority level than what is already available
72 // and offers the item for immediate retrieval - if a callback returns true, the
73 // item is considered to have been consumed
74 // note that these callbacks are performed with the queue's lock HELD - watch out for
75 // deadlock scenarios
77 public:
78 virtual bool item_available(T item, priority_t item_priority) = 0;
79 };
80
81 // adds (or modifies) a subscription - only items above the specified priority will
82 // result in callbacks
84 priority_t higher_than = PRI_NEG_INF);
86
88
89 protected:
90 // helper that performs notifications for a new item - returns true if a callback
91 // consumes the item
92 bool perform_notifications(T item, priority_t item_priority);
93
94 // 'highest_priority' may be read without the lock held, but only written with the
95 // lock
97
98 // this lock protects everything else
99 mutable LT lock;
100
101 // the actual queue - priorities are negated here to that queue.begin() gives us the
102 // "highest" priority
103 std::map<priority_t, std::deque<T>> queue;
104
105 // notification subscriptions
106 std::map<NotificationCallback *, priority_t> subscriptions;
107
109
110 template <typename T2, typename LT2>
111 friend std::ostream &operator<<(std::ostream &os, const PriorityQueue<T2, LT2> &pq);
112 };
113
114 template <typename T, typename LT>
115 std::ostream &operator<<(std::ostream &os, const PriorityQueue<T, LT> &pq)
116 {
117 pq.lock.lock();
118 os << "PQ{\n";
119 for(typename std::map<typename PriorityQueue<T, LT>::priority_t,
120 std::deque<T>>::const_iterator it = pq.queue.begin();
121 it != pq.queue.end(); ++it) {
122 os << " [" << -(it->first) << "]: ";
123 typename std::deque<T>::const_iterator it2 = it->second.begin();
124 assert(it2 != it->second.end());
125 os << ((const void *)(*it2));
126 while((++it2) != it->second.end())
127 os << ", " << ((const void *)(*it2));
128 os << "\n";
129 }
130 os << "}\n";
131 pq.lock.unlock();
132 return os;
133 }
134
135}; // namespace Realm
136
137#include "realm/pri_queue.inl"
138
139#endif // ifndef REALM_PRI_QUEUE_H
virtual bool item_available(T item, priority_t item_priority)=0
Definition pri_queue.h:34
void remove_subscription(NotificationCallback *callback)
static const priority_t PRI_MAX_FINITE
Definition pri_queue.h:47
void add_subscription(NotificationCallback *callback, priority_t higher_than=PRI_NEG_INF)
void set_gauge(ProfilingGauges::AbsoluteRangeGauge< int > *new_gauge)
static const priority_t PRI_NEG_INF
Definition pri_queue.h:50
priority_t highest_priority
Definition pri_queue.h:96
std::map< priority_t, std::deque< T > > queue
Definition pri_queue.h:103
bool empty(priority_t higher_than=PRI_NEG_INF) const
static const priority_t PRI_POS_INF
Definition pri_queue.h:49
int priority_t
Definition pri_queue.h:46
bool perform_notifications(T item, priority_t item_priority)
void put(T item, priority_t priority, bool add_to_back=true)
T peek(priority_t *item_priority, priority_t higher_than=PRI_NEG_INF) const
LT lock
Definition pri_queue.h:99
static const priority_t PRI_MIN_FINITE
Definition pri_queue.h:48
T ITEMTYPE
Definition pri_queue.h:39
std::map< NotificationCallback *, priority_t > subscriptions
Definition pri_queue.h:106
friend std::ostream & operator<<(std::ostream &os, const PriorityQueue< T2, LT2 > &pq)
T get(priority_t *item_priority, priority_t higher_than=PRI_NEG_INF)
ProfilingGauges::AbsoluteRangeGauge< int > * entries_in_queue
Definition pri_queue.h:108
Definition activemsg.h:38
std::ostream & operator<<(std::ostream &os, const DenseRectangleList< N, T > &drl)