演示使用 Qt 進行多綫程編程。
生産者把數據寫入緩衝直到達到緩衝末端為止,此時它從頭重新開始,覆蓋現有數據。消費者綫程讀取産生數據並將其寫入標準錯誤。
相比單獨采用互斥,等待條件使之擁有更高級的並發成為可能。若對緩衝區的訪問僅僅被守衛通過 QMutex ,消費者綫程與生産者綫程無法同時訪問緩衝。然而,沒有壞處讓 2 綫程操控 不同部分 的緩衝在同一時間。
範例包含 2 個類:
Producer
and
Consumer
。兩者繼承自
QThread
。在這 2 個類之間進行通信所使用的循環緩衝和保護它的同步工具都是全局變量。
替代使用 QWaitCondition and QMutex 以解決生産者-消費者問題是使用 QSemaphore . This is what the 信號量範例 does.
讓我們從審查循環緩衝和關聯同步工具開始:
const int DataSize = 100000; const int BufferSize = 8192; char buffer[BufferSize]; QWaitCondition bufferNotEmpty; QWaitCondition bufferNotFull; QMutex mutex; int numUsedBytes = 0;
DataSize
is the amount of data that the producer will generate. To keep the example as simple as possible, we make it a constant.
BufferSize
is the size of the circular buffer. It is less than
DataSize
, meaning that at some point the producer will reach the end of the buffer and restart from the beginning.
To synchronize the producer and the consumer, we need two wait conditions and one mutex. The
bufferNotEmpty
condition is signalled when the producer has generated some data, telling the consumer that it can start reading it. The
bufferNotFull
condition is signalled when the consumer has read some data, telling the producer that it can generate more. The
numUsedBytes
is the number of bytes in the buffer that contain data.
Together, the wait conditions, the mutex, and the
numUsedBytes
counter ensure that the producer is never more than
BufferSize
bytes ahead of the consumer, and that the consumer never reads data that the producer hasn't generated yet.
讓我們審查代碼為
Producer
類:
class Producer : public QThread { public: Producer(QObject *parent = NULL) : QThread(parent) { } void run() override { for (int i = 0; i < DataSize; ++i) { mutex.lock(); if (numUsedBytes == BufferSize) bufferNotFull.wait(&mutex); mutex.unlock(); buffer[i % BufferSize] = "ACGT"[QRandomGenerator::global()->bounded(4)]; mutex.lock(); ++numUsedBytes; bufferNotEmpty.wakeAll(); mutex.unlock(); } } };
生産者生成
DataSize
bytes of data. Before it writes a byte to the circular buffer, it must first check whether the buffer is full (i.e.,
numUsedBytes
等於
BufferSize
). If the buffer is full, the thread waits on the
bufferNotFull
條件。
At the end, the producer increments
numUsedBytes
and signalls that the condition
bufferNotEmpty
is true, since
numUsedBytes
is necessarily greater than 0.
We guard all accesses to the
numUsedBytes
variable with a mutex. In addition, the
QWaitCondition::wait
() function accepts a mutex as its argument. This mutex is unlocked before the thread is put to sleep and locked when the thread wakes up. Furthermore, the transition from the locked state to the wait state is atomic, to prevent race conditions from occurring.
讓我們轉到
Consumer
類:
class Consumer : public QThread { Q_OBJECT public: Consumer(QObject *parent = NULL) : QThread(parent) { } void run() override { for (int i = 0; i < DataSize; ++i) { mutex.lock(); if (numUsedBytes == 0) bufferNotEmpty.wait(&mutex); mutex.unlock(); fprintf(stderr, "%c", buffer[i % BufferSize]); mutex.lock(); --numUsedBytes; bufferNotFull.wakeAll(); mutex.unlock(); } fprintf(stderr, "\n"); } signals: void stringConsumed(const QString &text); };
The code is very similar to the producer. Before we read the byte, we check whether the buffer is empty (
numUsedBytes
is 0) instead of whether it's full and wait on the
bufferNotEmpty
condition if it's empty. After we've read the byte, we decrement
numUsedBytes
(instead of incrementing it), and we signal the
bufferNotFull
condition (instead of the
bufferNotEmpty
condition).
在
main()
,我們創建 2 綫程並調用
QThread::wait
() 以確保 2 綫程在退齣之前都有時間完成:
int main(int argc, char *argv[]) { QCoreApplication app(argc, argv); Producer producer; Consumer consumer; producer.start(); consumer.start(); producer.wait(); consumer.wait(); return 0; }
So what happens when we run the program? Initially, the producer thread is the only one that can do anything; the consumer is blocked waiting for the
bufferNotEmpty
condition to be signalled (
numUsedBytes
is 0). Once the producer has put one byte in the buffer,
numUsedBytes
is
BufferSize
- 1 and the
bufferNotEmpty
condition is signalled. At that point, two things can happen: Either the consumer thread takes over and reads that byte, or the producer gets to produce a second byte.
The producer-consumer model presented in this example makes it possible to write highly concurrent multithreaded applications. On a multiprocessor machine, the program is potentially up to twice as fast as the equivalent mutex-based program, since the two threads can be active at the same time on different parts of the buffer.
Be aware though that these benefits aren't always realized. Locking and unlocking a QMutex has a cost. In practice, it would probably be worthwhile to divide the buffer into chunks and to operate on chunks instead of individual bytes. The buffer size is also a parameter that must be selected carefully, based on experimentation.
文件: