Coverage Report

Created: 2024-08-21 05:08

/workdir/bitcoin/src/checkqueue.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) 2012-2022 The Bitcoin Core developers
2
// Distributed under the MIT software license, see the accompanying
3
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5
#ifndef BITCOIN_CHECKQUEUE_H
6
#define BITCOIN_CHECKQUEUE_H
7
8
#include <sync.h>
9
#include <tinyformat.h>
10
#include <util/threadnames.h>
11
12
#include <algorithm>
13
#include <iterator>
14
#include <vector>
15
16
/**
17
 * Queue for verifications that have to be performed.
18
  * The verifications are represented by a type T, which must provide an
19
  * operator(), returning a bool.
20
  *
21
  * One thread (the master) is assumed to push batches of verifications
22
  * onto the queue, where they are processed by N-1 worker threads. When
23
  * the master is done adding work, it temporarily joins the worker pool
24
  * as an N'th worker, until all jobs are done.
25
  */
26
template <typename T>
27
class CCheckQueue
28
{
29
private:
30
    //! Mutex to protect the inner state
31
    Mutex m_mutex;
32
33
    //! Worker threads block on this when out of work
34
    std::condition_variable m_worker_cv;
35
36
    //! Master thread blocks on this when out of work
37
    std::condition_variable m_master_cv;
38
39
    //! The queue of elements to be processed.
40
    //! As the order of booleans doesn't matter, it is used as a LIFO (stack)
41
    std::vector<T> queue GUARDED_BY(m_mutex);
42
43
    //! The number of workers (including the master) that are idle.
44
    int nIdle GUARDED_BY(m_mutex){0};
45
46
    //! The total number of workers (including the master).
47
    int nTotal GUARDED_BY(m_mutex){0};
48
49
    //! The temporary evaluation result.
50
    bool fAllOk GUARDED_BY(m_mutex){true};
51
52
    /**
53
     * Number of verifications that haven't completed yet.
54
     * This includes elements that are no longer queued, but still in the
55
     * worker's own batches.
56
     */
57
    unsigned int nTodo GUARDED_BY(m_mutex){0};
58
59
    //! The maximum number of elements to be processed in one batch
60
    const unsigned int nBatchSize;
61
62
    std::vector<std::thread> m_worker_threads;
63
    bool m_request_stop GUARDED_BY(m_mutex){false};
64
65
    /** Internal function that does bulk of the verification work. */
66
    bool Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
67
0
    {
68
0
        std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv;
  Branch (68:41): [True: 0, False: 0]
  Branch (68:41): [True: 0, False: 0]
69
0
        std::vector<T> vChecks;
70
0
        vChecks.reserve(nBatchSize);
71
0
        unsigned int nNow = 0;
72
0
        bool fOk = true;
73
0
        do {
74
0
            {
75
0
                WAIT_LOCK(m_mutex, lock);
76
                // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
77
0
                if (nNow) {
  Branch (77:21): [True: 0, False: 0]
  Branch (77:21): [True: 0, False: 0]
78
0
                    fAllOk &= fOk;
79
0
                    nTodo -= nNow;
80
0
                    if (nTodo == 0 && !fMaster)
  Branch (80:25): [True: 0, False: 0]
  Branch (80:39): [True: 0, False: 0]
  Branch (80:25): [True: 0, False: 0]
  Branch (80:39): [True: 0, False: 0]
81
                        // We processed the last element; inform the master it can exit and return the result
82
0
                        m_master_cv.notify_one();
83
0
                } else {
84
                    // first iteration
85
0
                    nTotal++;
86
0
                }
87
                // logically, the do loop starts here
88
2
                while (queue.empty() && !m_request_stop) {
  Branch (88:24): [True: 0, False: 0]
  Branch (88:41): [True: 0, False: 0]
  Branch (88:24): [True: 2, False: 18.4E]
  Branch (88:41): [True: 0, False: 2]
89
0
                    if (fMaster && nTodo == 0) {
  Branch (89:25): [True: 0, False: 0]
  Branch (89:36): [True: 0, False: 0]
  Branch (89:25): [True: 0, False: 0]
  Branch (89:36): [True: 0, False: 0]
90
0
                        nTotal--;
91
0
                        bool fRet = fAllOk;
92
                        // reset the status for new work later
93
0
                        fAllOk = true;
94
                        // return the current status
95
0
                        return fRet;
96
0
                    }
97
0
                    nIdle++;
98
0
                    cond.wait(lock); // wait
99
0
                    nIdle--;
100
0
                }
101
2
                if (m_request_stop) {
  Branch (101:21): [True: 0, False: 0]
  Branch (101:21): [True: 2, False: 18.4E]
102
2
                    return false;
103
2
                }
104
105
                // Decide how many work units to process now.
106
                // * Do not try to do everything at once, but aim for increasingly smaller batches so
107
                //   all workers finish approximately simultaneously.
108
                // * Try to account for idle jobs which will instantly start helping.
109
                // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
110
18.4E
                nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
111
18.4E
                auto start_it = queue.end() - nNow;
112
18.4E
                vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end()));
113
18.4E
                queue.erase(start_it, queue.end());
114
                // Check whether we need to do work at all
115
18.4E
                fOk = fAllOk;
116
18.4E
            }
117
            // execute work
118
0
            for (T& check : vChecks)
  Branch (118:27): [True: 0, False: 0]
  Branch (118:27): [True: 0, False: 18.4E]
119
0
                if (fOk)
  Branch (119:21): [True: 0, False: 0]
  Branch (119:21): [True: 0, False: 0]
120
0
                    fOk = check();
121
18.4E
            vChecks.clear();
122
18.4E
        } while (true);
  Branch (122:18): [Folded - Ignored]
  Branch (122:18): [Folded - Ignored]
123
0
    }
Unexecuted instantiation: checkqueue.cpp:CCheckQueue<(anonymous namespace)::DumbCheck>::Loop(bool)
Unexecuted instantiation: CCheckQueue<CScriptCheck>::Loop(bool)
124
125
public:
126
    //! Mutex to ensure only one concurrent CCheckQueueControl
127
    Mutex m_control_mutex;
128
129
    //! Create a new check queue
130
    explicit CCheckQueue(unsigned int batch_size, int worker_threads_num)
131
0
        : nBatchSize(batch_size)
132
0
    {
133
0
        m_worker_threads.reserve(worker_threads_num);
134
0
        for (int n = 0; n < worker_threads_num; ++n) {
  Branch (134:25): [True: 0, False: 0]
  Branch (134:25): [True: 0, False: 0]
135
0
            m_worker_threads.emplace_back([this, n]() {
136
0
                util::ThreadRename(strprintf("scriptch.%i", n));
137
0
                Loop(false /* worker thread */);
138
0
            });
Unexecuted instantiation: checkqueue.cpp:CCheckQueue<(anonymous namespace)::DumbCheck>::CCheckQueue(unsigned int, int)::{lambda()#1}::operator()() const
Unexecuted instantiation: CCheckQueue<CScriptCheck>::CCheckQueue(unsigned int, int)::{lambda()#1}::operator()() const
139
0
        }
140
0
    }
Unexecuted instantiation: checkqueue.cpp:CCheckQueue<(anonymous namespace)::DumbCheck>::CCheckQueue(unsigned int, int)
Unexecuted instantiation: CCheckQueue<CScriptCheck>::CCheckQueue(unsigned int, int)
141
142
    // Since this class manages its own resources, which is a thread
143
    // pool `m_worker_threads`, copy and move operations are not appropriate.
144
    CCheckQueue(const CCheckQueue&) = delete;
145
    CCheckQueue& operator=(const CCheckQueue&) = delete;
146
    CCheckQueue(CCheckQueue&&) = delete;
147
    CCheckQueue& operator=(CCheckQueue&&) = delete;
148
149
    //! Wait until execution finishes, and return whether all evaluations were successful.
150
    bool Wait() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
151
0
    {
152
0
        return Loop(true /* master thread */);
153
0
    }
Unexecuted instantiation: checkqueue.cpp:CCheckQueue<(anonymous namespace)::DumbCheck>::Wait()
Unexecuted instantiation: CCheckQueue<CScriptCheck>::Wait()
154
155
    //! Add a batch of checks to the queue
156
    void Add(std::vector<T>&& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
157
0
    {
158
0
        if (vChecks.empty()) {
  Branch (158:13): [True: 0, False: 0]
  Branch (158:13): [True: 0, False: 0]
159
0
            return;
160
0
        }
161
162
0
        {
163
0
            LOCK(m_mutex);
164
0
            queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end()));
165
0
            nTodo += vChecks.size();
166
0
        }
167
168
0
        if (vChecks.size() == 1) {
  Branch (168:13): [True: 0, False: 0]
  Branch (168:13): [True: 0, False: 0]
169
0
            m_worker_cv.notify_one();
170
0
        } else {
171
0
            m_worker_cv.notify_all();
172
0
        }
173
0
    }
Unexecuted instantiation: checkqueue.cpp:CCheckQueue<(anonymous namespace)::DumbCheck>::Add(std::vector<(anonymous namespace)::DumbCheck, std::allocator<(anonymous namespace)::DumbCheck> >&&)
Unexecuted instantiation: CCheckQueue<CScriptCheck>::Add(std::vector<CScriptCheck, std::allocator<CScriptCheck> >&&)
174
175
    ~CCheckQueue()
176
1
    {
177
1
        WITH_LOCK(m_mutex, m_request_stop = true);
178
1
        m_worker_cv.notify_all();
179
2
        for (std::thread& t : m_worker_threads) {
  Branch (179:29): [True: 0, False: 0]
  Branch (179:29): [True: 2, False: 1]
180
2
            t.join();
181
2
        }
182
1
    }
Unexecuted instantiation: checkqueue.cpp:CCheckQueue<(anonymous namespace)::DumbCheck>::~CCheckQueue()
CCheckQueue<CScriptCheck>::~CCheckQueue()
Line
Count
Source
176
1
    {
177
1
        WITH_LOCK(m_mutex, m_request_stop = true);
178
1
        m_worker_cv.notify_all();
179
2
        for (std::thread& t : m_worker_threads) {
  Branch (179:29): [True: 2, False: 1]
180
2
            t.join();
181
2
        }
182
1
    }
183
184
0
    bool HasThreads() const { return !m_worker_threads.empty(); }
185
};
186
187
/**
188
 * RAII-style controller object for a CCheckQueue that guarantees the passed
189
 * queue is finished before continuing.
190
 */
191
template <typename T>
192
class CCheckQueueControl
193
{
194
private:
195
    CCheckQueue<T> * const pqueue;
196
    bool fDone;
197
198
public:
199
    CCheckQueueControl() = delete;
200
    CCheckQueueControl(const CCheckQueueControl&) = delete;
201
    CCheckQueueControl& operator=(const CCheckQueueControl&) = delete;
202
0
    explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false)
203
0
    {
204
        // passed queue is supposed to be unused, or nullptr
205
0
        if (pqueue != nullptr) {
  Branch (205:13): [True: 0, False: 0]
  Branch (205:13): [True: 0, False: 0]
206
0
            ENTER_CRITICAL_SECTION(pqueue->m_control_mutex);
207
0
        }
208
0
    }
Unexecuted instantiation: checkqueue.cpp:CCheckQueueControl<(anonymous namespace)::DumbCheck>::CCheckQueueControl(CCheckQueue<(anonymous namespace)::DumbCheck>*)
Unexecuted instantiation: CCheckQueueControl<CScriptCheck>::CCheckQueueControl(CCheckQueue<CScriptCheck>*)
209
210
    bool Wait()
211
0
    {
212
0
        if (pqueue == nullptr)
  Branch (212:13): [True: 0, False: 0]
  Branch (212:13): [True: 0, False: 0]
213
0
            return true;
214
0
        bool fRet = pqueue->Wait();
215
0
        fDone = true;
216
0
        return fRet;
217
0
    }
Unexecuted instantiation: checkqueue.cpp:CCheckQueueControl<(anonymous namespace)::DumbCheck>::Wait()
Unexecuted instantiation: CCheckQueueControl<CScriptCheck>::Wait()
218
219
    void Add(std::vector<T>&& vChecks)
220
0
    {
221
0
        if (pqueue != nullptr) {
  Branch (221:13): [True: 0, False: 0]
  Branch (221:13): [True: 0, False: 0]
222
0
            pqueue->Add(std::move(vChecks));
223
0
        }
224
0
    }
Unexecuted instantiation: checkqueue.cpp:CCheckQueueControl<(anonymous namespace)::DumbCheck>::Add(std::vector<(anonymous namespace)::DumbCheck, std::allocator<(anonymous namespace)::DumbCheck> >&&)
Unexecuted instantiation: CCheckQueueControl<CScriptCheck>::Add(std::vector<CScriptCheck, std::allocator<CScriptCheck> >&&)
225
226
    ~CCheckQueueControl()
227
0
    {
228
0
        if (!fDone)
  Branch (228:13): [True: 0, False: 0]
  Branch (228:13): [True: 0, False: 0]
229
0
            Wait();
230
0
        if (pqueue != nullptr) {
  Branch (230:13): [True: 0, False: 0]
  Branch (230:13): [True: 0, False: 0]
231
0
            LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex);
232
0
        }
233
0
    }
Unexecuted instantiation: checkqueue.cpp:CCheckQueueControl<(anonymous namespace)::DumbCheck>::~CCheckQueueControl()
Unexecuted instantiation: CCheckQueueControl<CScriptCheck>::~CCheckQueueControl()
234
};
235
236
#endif // BITCOIN_CHECKQUEUE_H