summaryrefslogtreecommitdiff
path: root/src/kdpipeiodevice.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/kdpipeiodevice.cpp')
-rw-r--r--src/kdpipeiodevice.cpp951
1 files changed, 951 insertions, 0 deletions
diff --git a/src/kdpipeiodevice.cpp b/src/kdpipeiodevice.cpp
new file mode 100644
index 0000000..5661790
--- /dev/null
+++ b/src/kdpipeiodevice.cpp
@@ -0,0 +1,951 @@
+/*
+ Copyright (C) 2007 Klarälvdalens Datakonsult AB
+
+ KDPipeIODevice is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Library General Public
+ License as published by the Free Software Foundation; either
+ version 2 of the License, or (at your option) any later version.
+
+ KDPipeIODevice is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Library General Public License for more details.
+
+ You should have received a copy of the GNU Library General Public License
+ along with KDPipeIODevice; see the file COPYING.LIB. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+*/
+
+#include "kdpipeiodevice.h"
+
+#include <QtCore>
+
+#include <cassert>
+#include <memory>
+#include <algorithm>
+
+#ifdef Q_OS_WIN32
+# ifndef NOMINMAX
+# define NOMINMAX
+# endif
+# include <windows.h>
+# include <io.h>
+#else
+# include <unistd.h>
+# include <errno.h>
+#endif
+
+using namespace _gpgme_;
+
+#ifndef KDAB_CHECK_THIS
+# define KDAB_CHECK_CTOR (void)1
+# define KDAB_CHECK_DTOR KDAB_CHECK_CTOR
+# define KDAB_CHECK_THIS KDAB_CHECK_CTOR
+#endif
+
+#define LOCKED( d ) const QMutexLocker locker( &d->mutex )
+#define synchronized( d ) if ( int i = 0 ) {} else for ( const QMutexLocker locker( &d->mutex ) ; !i ; ++i )
+
+const unsigned int BUFFER_SIZE = 4096;
+const bool ALLOW_QIODEVICE_BUFFERING = true;
+
+// comment to get trace output:
+//#define qDebug if(1){}else qDebug
+
+namespace {
+class Reader : public QThread {
+ Q_OBJECT
+public:
+ Reader( int fd, Qt::HANDLE handle );
+ ~Reader();
+
+ qint64 readData( char * data, qint64 maxSize );
+
+ unsigned int bytesInBuffer() const {
+ return ( wptr + sizeof buffer - rptr ) % sizeof buffer ;
+ }
+
+ bool bufferFull() const {
+ return bytesInBuffer() == sizeof buffer - 1;
+ }
+
+ bool bufferEmpty() const {
+ return bytesInBuffer() == 0;
+ }
+
+ bool bufferContains( char ch ) {
+ const unsigned int bib = bytesInBuffer();
+ for ( unsigned int i = rptr ; i < rptr + bib ; ++i )
+ if ( buffer[i%sizeof buffer] == ch )
+ return true;
+ return false;
+ }
+
+ void notifyReadyRead();
+
+Q_SIGNALS:
+ void readyRead();
+
+protected:
+ /* reimp */ void run();
+
+private:
+ int fd;
+ Qt::HANDLE handle;
+public:
+ QMutex mutex;
+ QWaitCondition waitForCancelCondition;
+ QWaitCondition bufferNotFullCondition;
+ QWaitCondition bufferNotEmptyCondition;
+ QWaitCondition hasStarted;
+ QWaitCondition readyReadSentCondition;
+ QWaitCondition blockedConsumerIsDoneCondition;
+ bool cancel;
+ bool eof;
+ bool error;
+ bool eofShortCut;
+ int errorCode;
+ bool isReading;
+ bool consumerBlocksOnUs;
+
+private:
+ unsigned int rptr, wptr;
+ char buffer[BUFFER_SIZE+1]; // need to keep one byte free to detect empty state
+};
+
+
+Reader::Reader( int fd_, Qt::HANDLE handle_ )
+ : QThread(),
+ fd( fd_ ),
+ handle( handle_ ),
+ mutex(),
+ bufferNotFullCondition(),
+ bufferNotEmptyCondition(),
+ hasStarted(),
+ cancel( false ),
+ eof( false ),
+ error( false ),
+ eofShortCut( false ),
+ errorCode( 0 ),
+ isReading( false ),
+ consumerBlocksOnUs( false ),
+ rptr( 0 ), wptr( 0 )
+{
+
+}
+
+Reader::~Reader() {}
+
+
+class Writer : public QThread {
+ Q_OBJECT
+public:
+ Writer( int fd, Qt::HANDLE handle );
+ ~Writer();
+
+ qint64 writeData( const char * data, qint64 size );
+
+ unsigned int bytesInBuffer() const { return numBytesInBuffer; }
+
+ bool bufferFull() const {
+ return numBytesInBuffer == sizeof buffer;
+ }
+
+ bool bufferEmpty() const {
+ return numBytesInBuffer == 0;
+ }
+
+Q_SIGNALS:
+ void bytesWritten( qint64 );
+
+protected:
+ /* reimp */ void run();
+
+private:
+ int fd;
+ Qt::HANDLE handle;
+public:
+ QMutex mutex;
+ QWaitCondition bufferEmptyCondition;
+ QWaitCondition bufferNotEmptyCondition;
+ QWaitCondition hasStarted;
+ bool cancel;
+ bool error;
+ int errorCode;
+private:
+ unsigned int numBytesInBuffer;
+ char buffer[BUFFER_SIZE];
+};
+}
+
+Writer::Writer( int fd_, Qt::HANDLE handle_ )
+ : QThread(),
+ fd( fd_ ),
+ handle( handle_ ),
+ mutex(),
+ bufferEmptyCondition(),
+ bufferNotEmptyCondition(),
+ hasStarted(),
+ cancel( false ),
+ error( false ),
+ errorCode( 0 ),
+ numBytesInBuffer( 0 )
+{
+
+}
+
+Writer::~Writer() {}
+
+
+class KDPipeIODevice::Private : public QObject {
+Q_OBJECT
+ friend class ::KDPipeIODevice;
+ KDPipeIODevice * const q;
+public:
+ explicit Private( KDPipeIODevice * qq );
+ ~Private();
+
+ bool doOpen( int, Qt::HANDLE, OpenMode );
+ bool startReaderThread();
+ bool startWriterThread();
+ void stopThreads();
+
+public Q_SLOTS:
+ void emitReadyRead();
+
+private:
+ int fd;
+ Qt::HANDLE handle;
+ Reader * reader;
+ Writer * writer;
+ bool triedToStartReader;
+ bool triedToStartWriter;
+};
+
+KDPipeIODevice::Private::Private( KDPipeIODevice * qq )
+ : QObject( qq ), q( qq ),
+ fd( -1 ),
+ handle( 0 ),
+ reader( 0 ),
+ writer( 0 ),
+ triedToStartReader( false ), triedToStartWriter( false )
+{
+
+}
+
+KDPipeIODevice::Private::~Private() {
+ qDebug( "KDPipeIODevice::~Private(): Destroying %p", q );
+}
+
+KDPipeIODevice::KDPipeIODevice( QObject * p )
+ : QIODevice( p ), d( new Private( this ) )
+{
+ KDAB_CHECK_CTOR;
+}
+
+KDPipeIODevice::KDPipeIODevice( int fd, OpenMode mode, QObject * p )
+ : QIODevice( p ), d( new Private( this ) )
+{
+ KDAB_CHECK_CTOR;
+ open( fd, mode );
+}
+
+KDPipeIODevice::KDPipeIODevice( Qt::HANDLE handle, OpenMode mode, QObject * p )
+ : QIODevice( p ), d( new Private( this ) )
+{
+ KDAB_CHECK_CTOR;
+ open( handle, mode );
+}
+
+KDPipeIODevice::~KDPipeIODevice() { KDAB_CHECK_DTOR;
+ if ( isOpen() )
+ close();
+ delete d; d = 0;
+}
+
+
+bool KDPipeIODevice::open( int fd, OpenMode mode ) { KDAB_CHECK_THIS;
+
+#ifdef Q_OS_WIN32
+ return d->doOpen( fd, (HANDLE)_get_osfhandle( fd ), mode );
+#else
+ return d->doOpen( fd, 0, mode );
+#endif
+
+}
+
+bool KDPipeIODevice::open( Qt::HANDLE h, OpenMode mode ) { KDAB_CHECK_THIS;
+
+#ifdef Q_OS_WIN32
+ return d->doOpen( -1, h, mode );
+#else
+ Q_UNUSED( h );
+ Q_UNUSED( mode );
+ assert( !"KDPipeIODevice::open( Qt::HANDLE, OpenMode ) should never be called except on Windows." );
+#endif
+
+}
+
+bool KDPipeIODevice::Private::startReaderThread()
+{
+ if ( triedToStartReader )
+ return true;
+ triedToStartReader = true;
+ if ( reader && !reader->isRunning() && !reader->isFinished() ) {
+ qDebug("KDPipeIODevice::Private::startReaderThread(): locking reader (CONSUMER THREAD)" );
+ LOCKED( reader );
+ qDebug("KDPipeIODevice::Private::startReaderThread(): locked reader (CONSUMER THREAD)" );
+ reader->start( QThread::HighestPriority );
+ qDebug("KDPipeIODevice::Private::startReaderThread(): waiting for hasStarted (CONSUMER THREAD)" );
+ const bool hasStarted = reader->hasStarted.wait( &reader->mutex, 1000 );
+ qDebug("KDPipeIODevice::Private::startReaderThread(): returned from hasStarted (CONSUMER THREAD)" );
+
+ return hasStarted;
+ }
+ return true;
+}
+
+bool KDPipeIODevice::Private::startWriterThread()
+{
+ if ( triedToStartWriter )
+ return true;
+ triedToStartWriter = true;
+ if ( writer && !writer->isRunning() && !writer->isFinished() ) {
+ LOCKED( writer );
+
+ writer->start( QThread::HighestPriority );
+ if ( !writer->hasStarted.wait( &writer->mutex, 1000 ) )
+ return false;
+ }
+ return true;
+}
+
+void KDPipeIODevice::Private::emitReadyRead()
+{
+ QPointer<Private> thisPointer( this );
+ qDebug( "KDPipeIODevice::Private::emitReadyRead %p", this );
+
+ emit q->readyRead();
+
+ if ( !thisPointer )
+ return;
+
+ bool mustNotify = false;
+
+ if ( reader ) {
+ qDebug( "KDPipeIODevice::Private::emitReadyRead %p: locking reader (CONSUMER THREAD)", this );
+ synchronized( reader ) {
+ qDebug( "KDPipeIODevice::Private::emitReadyRead %p: locked reader (CONSUMER THREAD)", this );
+ reader->readyReadSentCondition.wakeAll();
+ mustNotify = !reader->bufferEmpty() && reader->isReading;
+ qDebug( "KDPipeIODevice::emitReadyRead %p: bufferEmpty: %d reader in ReadFile: %d", this, reader->bufferEmpty(), reader->isReading );
+ }
+ }
+ if ( mustNotify )
+ QTimer::singleShot( 100, this, SLOT( emitReadyRead() ) );
+ qDebug( "KDPipeIODevice::Private::emitReadyRead %p leaving", this );
+
+}
+
+bool KDPipeIODevice::Private::doOpen( int fd_, Qt::HANDLE handle_, OpenMode mode_ ) {
+
+ if ( q->isOpen() )
+ return false;
+
+#ifdef Q_OS_WIN32
+ if ( !handle_ )
+ return false;
+#else
+ if ( fd_ < 0 )
+ return false;
+#endif
+
+ if ( !(mode_ & ReadWrite) )
+ return false; // need to have at least read -or- write
+
+
+ std::auto_ptr<Reader> reader_;
+ std::auto_ptr<Writer> writer_;
+
+ if ( mode_ & ReadOnly ) {
+ reader_.reset( new Reader( fd_, handle_ ) );
+ qDebug( "KDPipeIODevice::doOpen (%p): created reader (%p) for fd %d", this, reader_.get(), fd_ );
+ connect( reader_.get(), SIGNAL(readyRead()), this, SLOT(emitReadyRead()),
+Qt::QueuedConnection );
+ }
+ if ( mode_ & WriteOnly ) {
+ writer_.reset( new Writer( fd_, handle_ ) );
+ qDebug( "KDPipeIODevice::doOpen (%p): created writer (%p) for fd %d", this, writer_.get(), fd_ );
+ connect( writer_.get(), SIGNAL(bytesWritten(qint64)), q, SIGNAL(bytesWritten(qint64)),
+Qt::QueuedConnection );
+ }
+
+ // commit to *this:
+ fd = fd_;
+ handle = handle_;
+ reader = reader_.release();
+ writer = writer_.release();
+
+ q->setOpenMode( mode_|Unbuffered );
+ return true;
+}
+
+int KDPipeIODevice::descriptor() const { KDAB_CHECK_THIS;
+ return d->fd;
+}
+
+
+Qt::HANDLE KDPipeIODevice::handle() const { KDAB_CHECK_THIS;
+ return d->handle;
+}
+
+qint64 KDPipeIODevice::bytesAvailable() const { KDAB_CHECK_THIS;
+ const qint64 base = QIODevice::bytesAvailable();
+ if ( !d->triedToStartReader ) {
+ d->startReaderThread();
+ return base;
+ }
+ if ( d->reader )
+ synchronized( d->reader ) {
+ const qint64 inBuffer = d->reader->bytesInBuffer();
+ return base + inBuffer;
+ }
+ return base;
+}
+
+qint64 KDPipeIODevice::bytesToWrite() const { KDAB_CHECK_THIS;
+ d->startWriterThread();
+ const qint64 base = QIODevice::bytesToWrite();
+ if ( d->writer )
+ synchronized( d->writer ) return base + d->writer->bytesInBuffer();
+ return base;
+}
+
+bool KDPipeIODevice::canReadLine() const { KDAB_CHECK_THIS;
+ d->startReaderThread();
+ if ( QIODevice::canReadLine() )
+ return true;
+ if ( d->reader )
+ synchronized( d->reader ) return d->reader->bufferContains( '\n' );
+ return true;
+}
+
+bool KDPipeIODevice::isSequential() const {
+ return true;
+}
+
+bool KDPipeIODevice::atEnd() const { KDAB_CHECK_THIS;
+ d->startReaderThread();
+ if ( !QIODevice::atEnd() ) {
+ qDebug( "%p: KDPipeIODevice::atEnd returns false since QIODevice::atEnd does (with bytesAvailable=%ld)", this, static_cast<long>(bytesAvailable()) );
+ return false;
+ }
+ if ( !isOpen() )
+ return true;
+ if ( d->reader->eofShortCut )
+ return true;
+ LOCKED( d->reader );
+ const bool eof = ( d->reader->error || d->reader->eof ) && d->reader->bufferEmpty();
+ if ( !eof ) {
+ if ( !d->reader->error && !d->reader->eof )
+ qDebug( "%p: KDPipeIODevice::atEnd returns false since !reader->error && !reader->eof", this );
+ if ( !d->reader->bufferEmpty() )
+ qDebug( "%p: KDPipeIODevice::atEnd returns false since !reader->bufferEmpty()", this );
+ }
+ return eof;
+}
+
+bool KDPipeIODevice::waitForBytesWritten( int msecs ) { KDAB_CHECK_THIS;
+ d->startWriterThread();
+ Writer * const w = d->writer;
+ if ( !w )
+ return true;
+ LOCKED( w );
+ qDebug( "KDPipeIODevice::waitForBytesWritten (%p,w=%p): entered locked area", this, w
+);
+ return w->bufferEmpty() || w->error || w->bufferEmptyCondition.wait( &w->mutex, msecs ) ;
+}
+
+bool KDPipeIODevice::waitForReadyRead( int msecs ) { KDAB_CHECK_THIS;
+ qDebug( "KDPipeIODEvice::waitForReadyRead()(%p)", this);
+ d->startReaderThread();
+ if ( ALLOW_QIODEVICE_BUFFERING ) {
+ if ( bytesAvailable() > 0 )
+ return true;
+ }
+ Reader * const r = d->reader;
+ if ( !r || r->eofShortCut )
+ return true;
+ LOCKED( r );
+ if ( r->bytesInBuffer() != 0 || r->eof || r->error )
+ return true;
+
+ return msecs >= 0 ? r->bufferNotEmptyCondition.wait( &r->mutex, msecs ) : r->bufferNotEmptyCondition.wait( &r->mutex );
+}
+
+template <typename T>
+class TemporaryValue {
+public:
+ TemporaryValue( T& var_, const T& tv ) : var( var_ ), oldValue( var_ ) { var = tv; }
+ ~TemporaryValue() { var = oldValue; }
+private:
+ T& var;
+ const T oldValue;
+};
+
+
+bool KDPipeIODevice::readWouldBlock() const
+{
+ d->startReaderThread();
+ LOCKED( d->reader );
+ return d->reader->bufferEmpty() && !d->reader->eof && !d->reader->error;
+}
+
+bool KDPipeIODevice::writeWouldBlock() const
+{
+ d->startWriterThread();
+ LOCKED( d->writer );
+ return !d->writer->bufferEmpty() && !d->writer->error;
+}
+
+
+qint64 KDPipeIODevice::readData( char * data, qint64 maxSize ) { KDAB_CHECK_THIS;
+ qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld", this, data, maxSize );
+ d->startReaderThread();
+ Reader * const r = d->reader;
+
+ assert( r );
+
+
+ //assert( r->isRunning() ); // wrong (might be eof, error)
+ assert( data || maxSize == 0 );
+ assert( maxSize >= 0 );
+
+ if ( r->eofShortCut ) {
+ qDebug( "%p: KDPipeIODevice::readData: hit eofShortCut, returning 0", this );
+ return 0;
+ }
+
+ if ( maxSize < 0 )
+ maxSize = 0;
+
+ if ( ALLOW_QIODEVICE_BUFFERING ) {
+ if ( bytesAvailable() > 0 )
+ maxSize = std::min( maxSize, bytesAvailable() ); // don't block
+ }
+ qDebug( "%p: KDPipeIODevice::readData: try to lock reader (CONSUMER THREAD)", this );
+ LOCKED( r );
+ qDebug( "%p: KDPipeIODevice::readData: locked reader (CONSUMER THREAD)", this );
+
+ r->readyReadSentCondition.wakeAll();
+ if ( /* maxSize > 0 && */ r->bufferEmpty() && !r->error && !r->eof ) { // ### block on maxSize == 0?
+ qDebug( "%p: KDPipeIODevice::readData: waiting for bufferNotEmptyCondition (CONSUMER THREAD)", this );
+ const TemporaryValue<bool> tmp( d->reader->consumerBlocksOnUs, true );
+ r->bufferNotEmptyCondition.wait( &r->mutex );
+ r->blockedConsumerIsDoneCondition.wakeAll();
+ qDebug( "%p: KDPipeIODevice::readData: woke up from bufferNotEmptyCondition (CONSUMER THREAD)", this );
+ }
+
+ if ( r->bufferEmpty() ) {
+ qDebug( "%p: KDPipeIODevice::readData: got empty buffer, signal eof", this );
+ // woken with an empty buffer must mean either EOF or error:
+ assert( r->eof || r->error );
+ r->eofShortCut = true;
+ return r->eof ? 0 : -1 ;
+ }
+
+ qDebug( "%p: KDPipeIODevice::readData: got bufferNotEmptyCondition, trying to read %lld bytes", this, maxSize );
+ const qint64 bytesRead = r->readData( data, maxSize );
+ qDebug( "%p: KDPipeIODevice::readData: read %lld bytes", this, bytesRead );
+ qDebug( "%p (fd=%d): KDPipeIODevice::readData: %s", this, d->fd, data );
+
+ return bytesRead;
+}
+
+qint64 Reader::readData( char * data, qint64 maxSize ) {
+ qint64 numRead = rptr < wptr ? wptr - rptr : sizeof buffer - rptr ;
+ if ( numRead > maxSize )
+ numRead = maxSize;
+
+ qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld; rptr=%u, wptr=%u (bytesInBuffer=%u); -> numRead=%lld", this,
+ data, maxSize, rptr, wptr, bytesInBuffer(), numRead );
+
+ std::memcpy( data, buffer + rptr, numRead );
+
+ rptr = ( rptr + numRead ) % sizeof buffer ;
+
+ if ( !bufferFull() ) {
+ qDebug( "%p: KDPipeIODevice::readData: signal bufferNotFullCondition", this );
+ bufferNotFullCondition.wakeAll();
+ }
+
+ return numRead;
+}
+
+qint64 KDPipeIODevice::writeData( const char * data, qint64 size ) { KDAB_CHECK_THIS;
+ d->startWriterThread();
+ Writer * const w = d->writer;
+
+ assert( w );
+ assert( w->error || w->isRunning() );
+ assert( data || size == 0 );
+ assert( size >= 0 );
+
+ LOCKED( w );
+
+ while ( !w->error && !w->bufferEmpty() ) {
+ qDebug( "%p: KDPipeIODevice::writeData: wait for empty buffer", this );
+ w->bufferEmptyCondition.wait( &w->mutex );
+ qDebug( "%p: KDPipeIODevice::writeData: empty buffer signaled", this );
+
+ }
+ if ( w->error )
+ return -1;
+
+ assert( w->bufferEmpty() );
+
+ return w->writeData( data, size );
+}
+
+qint64 Writer::writeData( const char * data, qint64 size ) {
+ assert( bufferEmpty() );
+
+ if ( size > static_cast<qint64>( sizeof buffer ) )
+ size = sizeof buffer;
+
+ std::memcpy( buffer, data, size );
+
+ numBytesInBuffer = size;
+
+ if ( !bufferEmpty() ) {
+ bufferNotEmptyCondition.wakeAll();
+ }
+ return size;
+}
+
+void KDPipeIODevice::Private::stopThreads()
+{
+ if ( triedToStartWriter )
+ {
+ if ( writer && q->bytesToWrite() > 0 )
+ q->waitForBytesWritten( -1 );
+
+ assert( q->bytesToWrite() == 0 );
+ }
+ if ( Reader * & r = reader ) {
+ disconnect( r, SIGNAL( readyRead() ), this, SLOT( emitReadyRead() ) );
+ synchronized( r ) {
+ // tell thread to cancel:
+ r->cancel = true;
+ // and wake it, so it can terminate:
+ r->waitForCancelCondition.wakeAll();
+ r->bufferNotFullCondition.wakeAll();
+ r->readyReadSentCondition.wakeAll();
+ }
+ }
+ if ( Writer * & w = writer ) {
+ synchronized( w ) {
+ // tell thread to cancel:
+ w->cancel = true;
+ // and wake it, so it can terminate:
+ w->bufferNotEmptyCondition.wakeAll();
+ }
+ }
+}
+
+void KDPipeIODevice::close() { KDAB_CHECK_THIS;
+ qDebug( "KDPipeIODevice::close(%p)", this );
+ if ( !isOpen() )
+ return;
+
+ // tell clients we're about to close:
+ emit aboutToClose();
+ d->stopThreads();
+
+#define waitAndDelete( t ) if ( t ) { t->wait(); QThread* const t2 = t; t = 0; delete t2; }
+ qDebug( "KPipeIODevice::close(%p): wait and closing writer %p", this, d->writer );
+ waitAndDelete( d->writer );
+ qDebug( "KPipeIODevice::close(%p): wait and closing reader %p", this, d->reader );
+ if ( d->reader ) {
+ LOCKED( d->reader );
+ d->reader->readyReadSentCondition.wakeAll();
+ }
+ waitAndDelete( d->reader );
+#undef waitAndDelete
+#ifdef Q_OS_WIN32
+ if ( d->fd != -1 )
+ _close( d->fd );
+ else
+ CloseHandle( d->handle );
+#else
+ ::close( d->fd );
+#endif
+
+ setOpenMode( NotOpen );
+ d->fd = -1;
+ d->handle = 0;
+}
+
+void Reader::run() {
+
+ LOCKED( this );
+
+ // too bad QThread doesn't have that itself; a signal isn't enough
+ hasStarted.wakeAll();
+
+ qDebug( "%p: Reader::run: started", this );
+
+ while ( true ) {
+ if ( !cancel && ( eof || error ) ) {
+ //notify the client until the buffer is empty and then once
+ //again so he receives eof/error. After that, wait for him
+ //to cancel
+ const bool wasEmpty = bufferEmpty();
+ qDebug( "%p: Reader::run: received eof(%d) or error(%d), waking everyone", this, eof, error );
+ notifyReadyRead();
+ if ( !cancel && wasEmpty )
+ waitForCancelCondition.wait( &mutex );
+ } else if ( !cancel && !bufferFull() && !bufferEmpty() ) {
+ qDebug( "%p: Reader::run: buffer no longer empty, waking everyone", this );
+ notifyReadyRead();
+ }
+
+ while ( !cancel && !error && bufferFull() ) {
+ notifyReadyRead();
+ if ( !cancel && bufferFull() ) {
+ qDebug( "%p: Reader::run: buffer is full, going to sleep", this );
+ bufferNotFullCondition.wait( &mutex );
+ }
+ }
+
+ if ( cancel ) {
+ qDebug( "%p: Reader::run: detected cancel", this );
+ goto leave;
+ }
+
+ if ( !eof && !error ) {
+ if ( rptr == wptr ) // optimize for larger chunks in case the buffer is empty
+ rptr = wptr = 0;
+
+ unsigned int numBytes = ( rptr + sizeof buffer - wptr - 1 ) % sizeof buffer;
+ if ( numBytes > sizeof buffer - wptr )
+ numBytes = sizeof buffer - wptr;
+
+ qDebug( "%p: Reader::run: rptr=%d, wptr=%d -> numBytes=%d", this, rptr, wptr, numBytes );
+
+ assert( numBytes > 0 );
+
+ qDebug( "%p: Reader::run: trying to read %d bytes", this, numBytes );
+#ifdef Q_OS_WIN32
+ isReading = true;
+ mutex.unlock();
+ DWORD numRead;
+ const bool ok = ReadFile( handle, buffer + wptr, numBytes, &numRead, 0 );
+ mutex.lock();
+ isReading = false;
+ if ( ok ) {
+ if ( numRead == 0 ) {
+ qDebug( "%p: Reader::run: got eof (numRead==0)", this );
+ eof = true;
+ }
+ } else { // !ok
+ errorCode = static_cast<int>( GetLastError() );
+ if ( errorCode == ERROR_BROKEN_PIPE ) {
+ assert( numRead == 0 );
+ qDebug( "%p: Reader::run: got eof (broken pipe)", this );
+ eof = true;
+ } else {
+ assert( numRead == 0 );
+ qDebug( "%p: Reader::run: got error: %s (%d)", this, strerror( errorCode ), errorCode );
+ error = true;
+ }
+ }
+#else
+ qint64 numRead;
+ mutex.unlock();
+ do {
+ numRead = ::read( fd, buffer + wptr, numBytes );
+ } while ( numRead == -1 && errno == EINTR );
+ mutex.lock();
+
+ if ( numRead < 0 ) {
+ errorCode = errno;
+ error = true;
+ qDebug( "%p: Reader::run: got error: %d", this, errorCode );
+ } else if ( numRead == 0 ) {
+ qDebug( "%p: Reader::run: eof detected", this );
+ eof = true;
+ }
+#endif
+ qDebug( "%p: Reader::run: read %ld bytes", this, static_cast<long>(numRead) );
+ qDebug( "%p: Reader::run(fd=%d): %s", this, fd, buffer );
+
+ if ( numRead > 0 ) {
+ qDebug( "%p: Reader::run: buffer before: rptr=%4d, wptr=%4d", this, rptr, wptr );
+ wptr = ( wptr + numRead ) % sizeof buffer;
+ qDebug( "%p: Reader::run: buffer after: rptr=%4d, wptr=%4d", this, rptr, wptr );
+ }
+ }
+ }
+ leave:
+ qDebug( "%p: Reader::run: terminated", this );
+}
+
+void Reader::notifyReadyRead()
+{
+ qDebug( "notifyReadyRead: %d bytes available", bytesInBuffer() );
+ assert( !cancel );
+
+ if ( consumerBlocksOnUs ) {
+ bufferNotEmptyCondition.wakeAll();
+ blockedConsumerIsDoneCondition.wait( &mutex );
+ return;
+ }
+ qDebug( "notifyReadyRead: emit signal" );
+ emit readyRead();
+ readyReadSentCondition.wait( &mutex );
+ qDebug( "notifyReadyRead: returning from waiting, leave" );
+}
+
+void Writer::run() {
+
+ LOCKED( this );
+
+ // too bad QThread doesn't have that itself; a signal isn't enough
+ hasStarted.wakeAll();
+
+ qDebug( "%p: Writer::run: started", this );
+
+ while ( true ) {
+
+ while ( !cancel && bufferEmpty() ) {
+ qDebug( "%p: Writer::run: buffer is empty, wake bufferEmptyCond listeners", this );
+ bufferEmptyCondition.wakeAll();
+ emit bytesWritten( 0 );
+ qDebug( "%p: Writer::run: buffer is empty, going to sleep", this );
+ bufferNotEmptyCondition.wait( &mutex );
+ qDebug( "%p: Writer::run: woke up", this );
+ }
+
+ if ( cancel ) {
+ qDebug( "%p: Writer::run: detected cancel", this );
+ goto leave;
+ }
+
+ assert( numBytesInBuffer > 0 );
+
+ qDebug( "%p: Writer::run: Trying to write %u bytes", this, numBytesInBuffer );
+ qint64 totalWritten = 0;
+ do {
+ mutex.unlock();
+#ifdef Q_OS_WIN32
+ DWORD numWritten;
+ qDebug( "%p (fd=%d): Writer::run: buffer before WriteFile (numBytes=%lld): %s:", this, fd, numBytesInBuffer, buffer );
+ qDebug( "%p (fd=%d): Writer::run: Going into WriteFile", this, fd );
+ if ( !WriteFile( handle, buffer + totalWritten, numBytesInBuffer - totalWritten, &numWritten, 0 ) ) {
+ mutex.lock();
+ errorCode = static_cast<int>( GetLastError() );
+ qDebug( "%p: Writer::run: got error code: %d", this, errorCode );
+ error = true;
+ goto leave;
+ }
+#else
+ qint64 numWritten;
+ do {
+ numWritten = ::write( fd, buffer + totalWritten, numBytesInBuffer - totalWritten );
+ } while ( numWritten == -1 && errno == EINTR );
+
+ if ( numWritten < 0 ) {
+ mutex.lock();
+ errorCode = errno;
+ qDebug( "%p: Writer::run: got error code: %d", this, errorCode );
+ error = true;
+ goto leave;
+ }
+#endif
+ qDebug( "%p (fd=%d): Writer::run: buffer after WriteFile (numBytes=%u): %s:", this, fd, numBytesInBuffer, buffer );
+ totalWritten += numWritten;
+ mutex.lock();
+ } while ( totalWritten < numBytesInBuffer );
+
+ qDebug( "%p: Writer::run: wrote %lld bytes", this, totalWritten );
+
+ numBytesInBuffer = 0;
+
+ qDebug( "%p: Writer::run: buffer is empty, wake bufferEmptyCond listeners", this );
+ bufferEmptyCondition.wakeAll();
+ emit bytesWritten( totalWritten );
+ }
+ leave:
+ qDebug( "%p: Writer::run: terminating", this );
+ numBytesInBuffer = 0;
+ qDebug( "%p: Writer::run: buffer is empty, wake bufferEmptyCond listeners", this );
+ bufferEmptyCondition.wakeAll();
+ emit bytesWritten( 0 );
+}
+
+// static
+std::pair<KDPipeIODevice*,KDPipeIODevice*> KDPipeIODevice::makePairOfConnectedPipes() {
+ KDPipeIODevice * read = 0;
+ KDPipeIODevice * write = 0;
+#ifdef Q_OS_WIN32
+ HANDLE rh;
+ HANDLE wh;
+ SECURITY_ATTRIBUTES sa;
+ memset( &sa, 0, sizeof(sa) );
+ sa.nLength = sizeof(sa);
+ sa.bInheritHandle = TRUE;
+ if ( CreatePipe( &rh, &wh, &sa, BUFFER_SIZE ) ) {
+ read = new KDPipeIODevice;
+ read->open( rh, ReadOnly );
+ write = new KDPipeIODevice;
+ write->open( wh, WriteOnly );
+ }
+#else
+ int fds[2];
+ if ( pipe( fds ) == 0 ) {
+ read = new KDPipeIODevice;
+ read->open( fds[0], ReadOnly );
+ write = new KDPipeIODevice;
+ write->open( fds[1], WriteOnly );
+ }
+#endif
+ return std::make_pair( read, write );
+}
+
+#ifdef KDAB_DEFINE_CHECKS
+KDAB_DEFINE_CHECKS( KDPipeIODevice ) {
+ if ( !isOpen() ) {
+ assert( openMode() == NotOpen );
+ assert( !d->reader );
+ assert( !d->writer );
+#ifdef Q_OS_WIN32
+ assert( !d->handle );
+#else
+ assert( d->fd < 0 );
+#endif
+ } else {
+ assert( openMode() != NotOpen );
+ assert( openMode() & ReadWrite );
+ if ( openMode() & ReadOnly ) {
+ assert( d->reader );
+ synchronized( d->reader )
+ assert( d->reader->eof || d->reader->error || d->reader->isRunning() );
+ }
+ if ( openMode() & WriteOnly ) {
+ assert( d->writer );
+ synchronized( d->writer )
+ assert( d->writer->error || d->writer->isRunning() );
+ }
+#ifdef Q_OS_WIN32
+ assert( d->handle );
+#else
+ assert( d->fd >= 0 );
+#endif
+ }
+}
+#endif // KDAB_DEFINE_CHECKS
+
+#include "moc_kdpipeiodevice.cpp"
+#include "kdpipeiodevice.moc"