信號量範例

演示使用 Qt 進行多綫程編程。

生産者把數據寫入緩衝直到達到緩衝末端為止,此時它從頭重新開始,覆蓋現有數據。消費者綫程讀取産生數據並將其寫入標準錯誤。

信號量使之相比互斥擁有更高級的並發成為可能。若對緩衝的訪問守衛通過 QMutex ,消費者綫程與生産者綫程無法同時訪問緩衝。然而,沒有壞處讓 2 綫程操控 不同部分 的緩衝在同一時間。

範例包含 2 個類: Producer and Consumer 。兩者繼承自 QThread 。在這 2 個類之間進行通信所使用的循環緩衝和保護它的信號量都是全局變量。

替代使用 QSemaphore 以解決生産者-消費者問題是使用 QWaitCondition and QMutex . This is what the 等待條件範例 does.

全局變量

讓我們從審查循環緩衝和關聯信號量開始:

const int DataSize = 100000;
const int BufferSize = 8192;
char buffer[BufferSize];
QSemaphore freeBytes(BufferSize);
QSemaphore usedBytes;
					

DataSize is the amout of data that the producer will generate. To keep the example as simple as possible, we make it a constant. BufferSize is the size of the circular buffer. It is less than DataSize , meaning that at some point the producer will reach the end of the buffer and restart from the beginning.

To synchronize the producer and the consumer, we need two semaphores. The freeBytes semaphore controls the "free" area of the buffer (the area that the producer hasn't filled with data yet or that the consumer has already read). The usedBytes semaphore controls the "used" area of the buffer (the area that the producer has filled but that the consumer hasn't read yet).

Together, the semaphores ensure that the producer is never more than BufferSize bytes ahead of the consumer, and that the consumer never reads data that the producer hasn't generated yet.

The freeBytes 信號量被初始化采用 BufferSize , because initially the entire buffer is empty. The usedBytes semaphore is initialized to 0 (the default value if none is specified).

Producer (生産者) 類

讓我們審查代碼為 Producer 類:

class Producer : public QThread
{
public:
    void run() override
    {
        for (int i = 0; i < DataSize; ++i) {
            freeBytes.acquire();
            buffer[i % BufferSize] = "ACGT"[QRandomGenerator::global()->bounded(4)];
            usedBytes.release();
        }
    }
};
					

生産者生成 DataSize bytes of data. Before it writes a byte to the circular buffer, it must acquire a "free" byte using the freeBytes semaphore. The QSemaphore::acquire () call might block if the consumer hasn't kept up the pace with the producer.

At the end, the producer releases a byte using the usedBytes semaphore. The "free" byte has successfully been transformed into a "used" byte, ready to be read by the consumer.

Consumer (消費者) 類

讓我們現在轉到 Consumer 類:

class Consumer : public QThread
{
    Q_OBJECT
public:
    void run() override
    {
        for (int i = 0; i < DataSize; ++i) {
            usedBytes.acquire();
            fprintf(stderr, "%c", buffer[i % BufferSize]);
            freeBytes.release();
        }
        fprintf(stderr, "\n");
    }
};
					

The code is very similar to the producer, except that this time we acquire a "used" byte and release a "free" byte, instead of the opposite.

main() 函數

main() ,我們創建 2 綫程並調用 QThread::wait () 以確保 2 綫程在退齣之前都有時間完成:

int main(int argc, char *argv[])
{
    QCoreApplication app(argc, argv);
    Producer producer;
    Consumer consumer;
    producer.start();
    consumer.start();
    producer.wait();
    consumer.wait();
    return 0;
}
					

So what happens when we run the program? Initially, the producer thread is the only one that can do anything; the consumer is blocked waiting for the usedBytes semaphore to be released (its initial available() count is 0). Once the producer has put one byte in the buffer, freeBytes.available() is BufferSize - 1 and usedBytes.available() is 1. At that point, two things can happen: Either the consumer thread takes over and reads that byte, or the producer thread gets to produce a second byte.

The producer-consumer model presented in this example makes it possible to write highly concurrent multithreaded applications. On a multiprocessor machine, the program is potentially up to twice as fast as the equivalent mutex-based program, since the two threads can be active at the same time on different parts of the buffer.

Be aware though that these benefits aren't always realized. Acquiring and releasing a QSemaphore has a cost. In practice, it would probably be worthwhile to divide the buffer into chunks and to operate on chunks instead of individual bytes. The buffer size is also a parameter that must be selected carefully, based on experimentation.

文件: