/workdir/bitcoin/src/checkqueue.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2012-2022 The Bitcoin Core developers |
2 | | // Distributed under the MIT software license, see the accompanying |
3 | | // file COPYING or http://www.opensource.org/licenses/mit-license.php. |
4 | | |
5 | | #ifndef BITCOIN_CHECKQUEUE_H |
6 | | #define BITCOIN_CHECKQUEUE_H |
7 | | |
8 | | #include <sync.h> |
9 | | #include <tinyformat.h> |
10 | | #include <util/threadnames.h> |
11 | | |
12 | | #include <algorithm> |
13 | | #include <iterator> |
14 | | #include <vector> |
15 | | |
16 | | /** |
17 | | * Queue for verifications that have to be performed. |
18 | | * The verifications are represented by a type T, which must provide an |
19 | | * operator(), returning a bool. |
20 | | * |
21 | | * One thread (the master) is assumed to push batches of verifications |
22 | | * onto the queue, where they are processed by N-1 worker threads. When |
23 | | * the master is done adding work, it temporarily joins the worker pool |
24 | | * as an N'th worker, until all jobs are done. |
25 | | */ |
26 | | template <typename T> |
27 | | class CCheckQueue |
28 | | { |
29 | | private: |
30 | | //! Mutex to protect the inner state |
31 | | Mutex m_mutex; |
32 | | |
33 | | //! Worker threads block on this when out of work |
34 | | std::condition_variable m_worker_cv; |
35 | | |
36 | | //! Master thread blocks on this when out of work |
37 | | std::condition_variable m_master_cv; |
38 | | |
39 | | //! The queue of elements to be processed. |
40 | | //! As the order of booleans doesn't matter, it is used as a LIFO (stack) |
41 | | std::vector<T> queue GUARDED_BY(m_mutex); |
42 | | |
43 | | //! The number of workers (including the master) that are idle. |
44 | | int nIdle GUARDED_BY(m_mutex){0}; |
45 | | |
46 | | //! The total number of workers (including the master). |
47 | | int nTotal GUARDED_BY(m_mutex){0}; |
48 | | |
49 | | //! The temporary evaluation result. |
50 | | bool fAllOk GUARDED_BY(m_mutex){true}; |
51 | | |
52 | | /** |
53 | | * Number of verifications that haven't completed yet. |
54 | | * This includes elements that are no longer queued, but still in the |
55 | | * worker's own batches. |
56 | | */ |
57 | | unsigned int nTodo GUARDED_BY(m_mutex){0}; |
58 | | |
59 | | //! The maximum number of elements to be processed in one batch |
60 | | const unsigned int nBatchSize; |
61 | | |
62 | | std::vector<std::thread> m_worker_threads; |
63 | | bool m_request_stop GUARDED_BY(m_mutex){false}; |
64 | | |
65 | | /** Internal function that does bulk of the verification work. */ |
66 | | bool Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
67 | 0 | { |
68 | 0 | std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv; Branch (68:41): [True: 0, False: 0]
Branch (68:41): [True: 0, False: 0]
|
69 | 0 | std::vector<T> vChecks; |
70 | 0 | vChecks.reserve(nBatchSize); |
71 | 0 | unsigned int nNow = 0; |
72 | 0 | bool fOk = true; |
73 | 0 | do { |
74 | 0 | { |
75 | 0 | WAIT_LOCK(m_mutex, lock); |
76 | | // first do the clean-up of the previous loop run (allowing us to do it in the same critsect) |
77 | 0 | if (nNow) { Branch (77:21): [True: 0, False: 0]
Branch (77:21): [True: 0, False: 0]
|
78 | 0 | fAllOk &= fOk; |
79 | 0 | nTodo -= nNow; |
80 | 0 | if (nTodo == 0 && !fMaster) Branch (80:25): [True: 0, False: 0]
Branch (80:39): [True: 0, False: 0]
Branch (80:25): [True: 0, False: 0]
Branch (80:39): [True: 0, False: 0]
|
81 | | // We processed the last element; inform the master it can exit and return the result |
82 | 0 | m_master_cv.notify_one(); |
83 | 0 | } else { |
84 | | // first iteration |
85 | 0 | nTotal++; |
86 | 0 | } |
87 | | // logically, the do loop starts here |
88 | 2 | while (queue.empty() && !m_request_stop) { Branch (88:24): [True: 0, False: 0]
Branch (88:41): [True: 0, False: 0]
Branch (88:24): [True: 2, False: 18.4E]
Branch (88:41): [True: 0, False: 2]
|
89 | 0 | if (fMaster && nTodo == 0) { Branch (89:25): [True: 0, False: 0]
Branch (89:36): [True: 0, False: 0]
Branch (89:25): [True: 0, False: 0]
Branch (89:36): [True: 0, False: 0]
|
90 | 0 | nTotal--; |
91 | 0 | bool fRet = fAllOk; |
92 | | // reset the status for new work later |
93 | 0 | fAllOk = true; |
94 | | // return the current status |
95 | 0 | return fRet; |
96 | 0 | } |
97 | 0 | nIdle++; |
98 | 0 | cond.wait(lock); // wait |
99 | 0 | nIdle--; |
100 | 0 | } |
101 | 2 | if (m_request_stop) { Branch (101:21): [True: 0, False: 0]
Branch (101:21): [True: 2, False: 18.4E]
|
102 | 2 | return false; |
103 | 2 | } |
104 | | |
105 | | // Decide how many work units to process now. |
106 | | // * Do not try to do everything at once, but aim for increasingly smaller batches so |
107 | | // all workers finish approximately simultaneously. |
108 | | // * Try to account for idle jobs which will instantly start helping. |
109 | | // * Don't do batches smaller than 1 (duh), or larger than nBatchSize. |
110 | 18.4E | nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1))); |
111 | 18.4E | auto start_it = queue.end() - nNow; |
112 | 18.4E | vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end())); |
113 | 18.4E | queue.erase(start_it, queue.end()); |
114 | | // Check whether we need to do work at all |
115 | 18.4E | fOk = fAllOk; |
116 | 18.4E | } |
117 | | // execute work |
118 | 0 | for (T& check : vChecks) Branch (118:27): [True: 0, False: 0]
Branch (118:27): [True: 0, False: 18.4E]
|
119 | 0 | if (fOk) Branch (119:21): [True: 0, False: 0]
Branch (119:21): [True: 0, False: 0]
|
120 | 0 | fOk = check(); |
121 | 18.4E | vChecks.clear(); |
122 | 18.4E | } while (true); Branch (122:18): [Folded - Ignored]
Branch (122:18): [Folded - Ignored]
|
123 | 0 | } Unexecuted instantiation: checkqueue.cpp:CCheckQueue<(anonymous namespace)::DumbCheck>::Loop(bool) Unexecuted instantiation: CCheckQueue<CScriptCheck>::Loop(bool) |
124 | | |
125 | | public: |
126 | | //! Mutex to ensure only one concurrent CCheckQueueControl |
127 | | Mutex m_control_mutex; |
128 | | |
129 | | //! Create a new check queue |
130 | | explicit CCheckQueue(unsigned int batch_size, int worker_threads_num) |
131 | 0 | : nBatchSize(batch_size) |
132 | 0 | { |
133 | 0 | m_worker_threads.reserve(worker_threads_num); |
134 | 0 | for (int n = 0; n < worker_threads_num; ++n) { Branch (134:25): [True: 0, False: 0]
Branch (134:25): [True: 0, False: 0]
|
135 | 0 | m_worker_threads.emplace_back([this, n]() { |
136 | 0 | util::ThreadRename(strprintf("scriptch.%i", n)); |
137 | 0 | Loop(false /* worker thread */); |
138 | 0 | }); Unexecuted instantiation: checkqueue.cpp:CCheckQueue<(anonymous namespace)::DumbCheck>::CCheckQueue(unsigned int, int)::{lambda()#1}::operator()() const Unexecuted instantiation: CCheckQueue<CScriptCheck>::CCheckQueue(unsigned int, int)::{lambda()#1}::operator()() const |
139 | 0 | } |
140 | 0 | } Unexecuted instantiation: checkqueue.cpp:CCheckQueue<(anonymous namespace)::DumbCheck>::CCheckQueue(unsigned int, int) Unexecuted instantiation: CCheckQueue<CScriptCheck>::CCheckQueue(unsigned int, int) |
141 | | |
142 | | // Since this class manages its own resources, which is a thread |
143 | | // pool `m_worker_threads`, copy and move operations are not appropriate. |
144 | | CCheckQueue(const CCheckQueue&) = delete; |
145 | | CCheckQueue& operator=(const CCheckQueue&) = delete; |
146 | | CCheckQueue(CCheckQueue&&) = delete; |
147 | | CCheckQueue& operator=(CCheckQueue&&) = delete; |
148 | | |
149 | | //! Wait until execution finishes, and return whether all evaluations were successful. |
150 | | bool Wait() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
151 | 0 | { |
152 | 0 | return Loop(true /* master thread */); |
153 | 0 | } Unexecuted instantiation: checkqueue.cpp:CCheckQueue<(anonymous namespace)::DumbCheck>::Wait() Unexecuted instantiation: CCheckQueue<CScriptCheck>::Wait() |
154 | | |
155 | | //! Add a batch of checks to the queue |
156 | | void Add(std::vector<T>&& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
157 | 0 | { |
158 | 0 | if (vChecks.empty()) { Branch (158:13): [True: 0, False: 0]
Branch (158:13): [True: 0, False: 0]
|
159 | 0 | return; |
160 | 0 | } |
161 | | |
162 | 0 | { |
163 | 0 | LOCK(m_mutex); |
164 | 0 | queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end())); |
165 | 0 | nTodo += vChecks.size(); |
166 | 0 | } |
167 | |
|
168 | 0 | if (vChecks.size() == 1) { Branch (168:13): [True: 0, False: 0]
Branch (168:13): [True: 0, False: 0]
|
169 | 0 | m_worker_cv.notify_one(); |
170 | 0 | } else { |
171 | 0 | m_worker_cv.notify_all(); |
172 | 0 | } |
173 | 0 | } Unexecuted instantiation: checkqueue.cpp:CCheckQueue<(anonymous namespace)::DumbCheck>::Add(std::vector<(anonymous namespace)::DumbCheck, std::allocator<(anonymous namespace)::DumbCheck> >&&) Unexecuted instantiation: CCheckQueue<CScriptCheck>::Add(std::vector<CScriptCheck, std::allocator<CScriptCheck> >&&) |
174 | | |
175 | | ~CCheckQueue() |
176 | 1 | { |
177 | 1 | WITH_LOCK(m_mutex, m_request_stop = true); |
178 | 1 | m_worker_cv.notify_all(); |
179 | 2 | for (std::thread& t : m_worker_threads) { Branch (179:29): [True: 0, False: 0]
Branch (179:29): [True: 2, False: 1]
|
180 | 2 | t.join(); |
181 | 2 | } |
182 | 1 | } Unexecuted instantiation: checkqueue.cpp:CCheckQueue<(anonymous namespace)::DumbCheck>::~CCheckQueue() CCheckQueue<CScriptCheck>::~CCheckQueue() Line | Count | Source | 176 | 1 | { | 177 | 1 | WITH_LOCK(m_mutex, m_request_stop = true); | 178 | 1 | m_worker_cv.notify_all(); | 179 | 2 | for (std::thread& t : m_worker_threads) { Branch (179:29): [True: 2, False: 1]
| 180 | 2 | t.join(); | 181 | 2 | } | 182 | 1 | } |
|
183 | | |
184 | 0 | bool HasThreads() const { return !m_worker_threads.empty(); } |
185 | | }; |
186 | | |
187 | | /** |
188 | | * RAII-style controller object for a CCheckQueue that guarantees the passed |
189 | | * queue is finished before continuing. |
190 | | */ |
191 | | template <typename T> |
192 | | class CCheckQueueControl |
193 | | { |
194 | | private: |
195 | | CCheckQueue<T> * const pqueue; |
196 | | bool fDone; |
197 | | |
198 | | public: |
199 | | CCheckQueueControl() = delete; |
200 | | CCheckQueueControl(const CCheckQueueControl&) = delete; |
201 | | CCheckQueueControl& operator=(const CCheckQueueControl&) = delete; |
202 | 0 | explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false) |
203 | 0 | { |
204 | | // passed queue is supposed to be unused, or nullptr |
205 | 0 | if (pqueue != nullptr) { Branch (205:13): [True: 0, False: 0]
Branch (205:13): [True: 0, False: 0]
|
206 | 0 | ENTER_CRITICAL_SECTION(pqueue->m_control_mutex); |
207 | 0 | } |
208 | 0 | } Unexecuted instantiation: checkqueue.cpp:CCheckQueueControl<(anonymous namespace)::DumbCheck>::CCheckQueueControl(CCheckQueue<(anonymous namespace)::DumbCheck>*) Unexecuted instantiation: CCheckQueueControl<CScriptCheck>::CCheckQueueControl(CCheckQueue<CScriptCheck>*) |
209 | | |
210 | | bool Wait() |
211 | 0 | { |
212 | 0 | if (pqueue == nullptr) Branch (212:13): [True: 0, False: 0]
Branch (212:13): [True: 0, False: 0]
|
213 | 0 | return true; |
214 | 0 | bool fRet = pqueue->Wait(); |
215 | 0 | fDone = true; |
216 | 0 | return fRet; |
217 | 0 | } Unexecuted instantiation: checkqueue.cpp:CCheckQueueControl<(anonymous namespace)::DumbCheck>::Wait() Unexecuted instantiation: CCheckQueueControl<CScriptCheck>::Wait() |
218 | | |
219 | | void Add(std::vector<T>&& vChecks) |
220 | 0 | { |
221 | 0 | if (pqueue != nullptr) { Branch (221:13): [True: 0, False: 0]
Branch (221:13): [True: 0, False: 0]
|
222 | 0 | pqueue->Add(std::move(vChecks)); |
223 | 0 | } |
224 | 0 | } Unexecuted instantiation: checkqueue.cpp:CCheckQueueControl<(anonymous namespace)::DumbCheck>::Add(std::vector<(anonymous namespace)::DumbCheck, std::allocator<(anonymous namespace)::DumbCheck> >&&) Unexecuted instantiation: CCheckQueueControl<CScriptCheck>::Add(std::vector<CScriptCheck, std::allocator<CScriptCheck> >&&) |
225 | | |
226 | | ~CCheckQueueControl() |
227 | 0 | { |
228 | 0 | if (!fDone) Branch (228:13): [True: 0, False: 0]
Branch (228:13): [True: 0, False: 0]
|
229 | 0 | Wait(); |
230 | 0 | if (pqueue != nullptr) { Branch (230:13): [True: 0, False: 0]
Branch (230:13): [True: 0, False: 0]
|
231 | 0 | LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex); |
232 | 0 | } |
233 | 0 | } Unexecuted instantiation: checkqueue.cpp:CCheckQueueControl<(anonymous namespace)::DumbCheck>::~CCheckQueueControl() Unexecuted instantiation: CCheckQueueControl<CScriptCheck>::~CCheckQueueControl() |
234 | | }; |
235 | | |
236 | | #endif // BITCOIN_CHECKQUEUE_H |