extract QWindowsPipeReader from qlocalsocket_win.cpp
authorJoerg Bornemann <joerg.bornemann@nokia.com>
Sun, 11 Dec 2011 16:09:10 +0000 (17:09 +0100)
committerQt by Nokia <qt-info@nokia.com>
Fri, 16 Dec 2011 14:14:12 +0000 (15:14 +0100)
The code for reading named pipes can now be used in
other places as well.

Change-Id: Id734617a3927e369491a6c5daf965169ceb01f74
Reviewed-by: Oswald Buddenhagen <oswald.buddenhagen@nokia.com>
src/corelib/io/io.pri
src/corelib/io/qwindowspipereader.cpp [new file with mode: 0644]
src/corelib/io/qwindowspipereader_p.h [new file with mode: 0644]
src/network/socket/qlocalsocket.h
src/network/socket/qlocalsocket_p.h
src/network/socket/qlocalsocket_win.cpp

index ef11621..84bc6f3 100644 (file)
@@ -75,6 +75,8 @@ win32 {
 
         SOURCES += io/qfilesystemwatcher_win.cpp
         HEADERS += io/qfilesystemwatcher_win_p.h
+        HEADERS += io/qwindowspipereader_p.h
+        SOURCES += io/qwindowspipereader.cpp
         HEADERS += io/qwindowspipewriter_p.h
         SOURCES += io/qwindowspipewriter.cpp
         SOURCES += io/qfilesystemengine_win.cpp
diff --git a/src/corelib/io/qwindowspipereader.cpp b/src/corelib/io/qwindowspipereader.cpp
new file mode 100644 (file)
index 0000000..0c471e0
--- /dev/null
@@ -0,0 +1,315 @@
+/****************************************************************************
+**
+** Copyright (C) 2011 Nokia Corporation and/or its subsidiary(-ies).
+** All rights reserved.
+** Contact: Nokia Corporation (qt-info@nokia.com)
+**
+** This file is part of the QtCore module of the Qt Toolkit.
+**
+** $QT_BEGIN_LICENSE:LGPL$
+** GNU Lesser General Public License Usage
+** This file may be used under the terms of the GNU Lesser General Public
+** License version 2.1 as published by the Free Software Foundation and
+** appearing in the file LICENSE.LGPL included in the packaging of this
+** file. Please review the following information to ensure the GNU Lesser
+** General Public License version 2.1 requirements will be met:
+** http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html.
+**
+** In addition, as a special exception, Nokia gives you certain additional
+** rights. These rights are described in the Nokia Qt LGPL Exception
+** version 1.1, included in the file LGPL_EXCEPTION.txt in this package.
+**
+** GNU General Public License Usage
+** Alternatively, this file may be used under the terms of the GNU General
+** Public License version 3.0 as published by the Free Software Foundation
+** and appearing in the file LICENSE.GPL included in the packaging of this
+** file. Please review the following information to ensure the GNU General
+** Public License version 3.0 requirements will be met:
+** http://www.gnu.org/copyleft/gpl.html.
+**
+** Other Usage
+** Alternatively, this file may be used in accordance with the terms and
+** conditions contained in a signed written agreement between you and Nokia.
+**
+**
+**
+**
+**
+** $QT_END_LICENSE$
+**
+****************************************************************************/
+
+#include "qwindowspipereader_p.h"
+#include <qdebug.h>
+#include <qelapsedtimer.h>
+#include <qeventloop.h>
+#include <qtimer.h>
+#include <qwineventnotifier.h>
+
+QT_BEGIN_NAMESPACE
+
+QWindowsPipeReader::QWindowsPipeReader(QObject *parent)
+    : QObject(parent),
+      handle(INVALID_HANDLE_VALUE),
+      readBufferMaxSize(0),
+      actualReadBufferSize(0),
+      emitReadyReadTimer(new QTimer(this)),
+      pipeBroken(false)
+{
+    emitReadyReadTimer->setSingleShot(true);
+    connect(emitReadyReadTimer, SIGNAL(timeout()), SIGNAL(readyRead()));
+
+    ZeroMemory(&overlapped, sizeof(overlapped));
+    overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
+    dataReadNotifier = new QWinEventNotifier(overlapped.hEvent, this);
+    connect(dataReadNotifier, SIGNAL(activated(HANDLE)), SLOT(readEventSignalled()));
+}
+
+QWindowsPipeReader::~QWindowsPipeReader()
+{
+    CloseHandle(overlapped.hEvent);
+}
+
+/*!
+    Sets the handle to read from. The handle must be valid.
+ */
+void QWindowsPipeReader::setHandle(HANDLE hPipeReadEnd)
+{
+    readBuffer.clear();
+    actualReadBufferSize = 0;
+    handle = hPipeReadEnd;
+    pipeBroken = false;
+    dataReadNotifier->setEnabled(true);
+}
+
+/*!
+    Stops the asynchronous read sequence.
+    This function assumes that the file already has been closed.
+    It does not cancel any I/O operation.
+ */
+void QWindowsPipeReader::stop()
+{
+    dataReadNotifier->setEnabled(false);
+    readSequenceStarted = false;
+    handle = INVALID_HANDLE_VALUE;
+    ResetEvent(overlapped.hEvent);
+}
+
+/*!
+    Returns the number of bytes we've read so far.
+ */
+qint64 QWindowsPipeReader::bytesAvailable() const
+{
+    return actualReadBufferSize;
+}
+
+/*!
+    Stops the asynchronous read sequence.
+ */
+qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
+{
+    if (pipeBroken && actualReadBufferSize == 0)
+        return -1;  // signal EOF
+
+    qint64 readSoFar;
+    // If startAsyncRead() has read data, copy it to its destination.
+    if (maxlen == 1 && actualReadBufferSize > 0) {
+        *data = readBuffer.getChar();
+        actualReadBufferSize--;
+        readSoFar = 1;
+    } else {
+        qint64 bytesToRead = qMin(qint64(actualReadBufferSize), maxlen);
+        readSoFar = 0;
+        while (readSoFar < bytesToRead) {
+            const char *ptr = readBuffer.readPointer();
+            int bytesToReadFromThisBlock = qMin(bytesToRead - readSoFar,
+                                                qint64(readBuffer.nextDataBlockSize()));
+            memcpy(data + readSoFar, ptr, bytesToReadFromThisBlock);
+            readSoFar += bytesToReadFromThisBlock;
+            readBuffer.free(bytesToReadFromThisBlock);
+            actualReadBufferSize -= bytesToReadFromThisBlock;
+        }
+    }
+
+    if (!pipeBroken) {
+        if (!actualReadBufferSize)
+            emitReadyReadTimer->stop();
+        if (!readSequenceStarted)
+            startAsyncRead();
+    }
+
+    return readSoFar;
+}
+
+bool QWindowsPipeReader::canReadLine() const
+{
+    return readBuffer.indexOf('\n', actualReadBufferSize) >= 0;
+}
+
+/*!
+    \internal
+    Will be called whenever the read operation completes.
+    Returns true, if readyRead() has been emitted.
+ */
+bool QWindowsPipeReader::readEventSignalled()
+{
+    if (!completeAsyncRead()) {
+        pipeBroken = true;
+        emit pipeClosed();
+        return false;
+    }
+    startAsyncRead();
+    emitReadyReadTimer->stop();
+    emit readyRead();
+    return true;
+}
+
+/*!
+    \internal
+    Reads data from the socket into the readbuffer
+ */
+void QWindowsPipeReader::startAsyncRead()
+{
+    do {
+        DWORD bytesToRead = checkPipeState();
+        if (pipeBroken)
+            return;
+
+        if (bytesToRead == 0) {
+            // There are no bytes in the pipe but we need to
+            // start the overlapped read with some buffer size.
+            bytesToRead = initialReadBufferSize;
+        }
+
+        if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) {
+            bytesToRead = readBufferMaxSize - readBuffer.size();
+            if (bytesToRead == 0) {
+                // Buffer is full. User must read data from the buffer
+                // before we can read more from the pipe.
+                return;
+            }
+        }
+
+        char *ptr = readBuffer.reserve(bytesToRead);
+
+        readSequenceStarted = true;
+        if (ReadFile(handle, ptr, bytesToRead, NULL, &overlapped)) {
+            completeAsyncRead();
+        } else {
+            switch (GetLastError()) {
+                case ERROR_IO_PENDING:
+                    // This is not an error. We're getting notified, when data arrives.
+                    return;
+                case ERROR_MORE_DATA:
+                    // This is not an error. The synchronous read succeeded.
+                    // We're connected to a message mode pipe and the message
+                    // didn't fit into the pipe's system buffer.
+                    completeAsyncRead();
+                    break;
+                case ERROR_PIPE_NOT_CONNECTED:
+                    {
+                        // It may happen, that the other side closes the connection directly
+                        // after writing data. Then we must set the appropriate socket state.
+                        pipeBroken = true;
+                        emit pipeClosed();
+                        return;
+                    }
+                default:
+                    emit winError(GetLastError(), QLatin1String("QWindowsPipeReader::startAsyncRead"));
+                    return;
+            }
+        }
+    } while (!readSequenceStarted);
+}
+
+/*!
+    \internal
+    Sets the correct size of the read buffer after a read operation.
+    Returns false, if an error occurred or the connection dropped.
+ */
+bool QWindowsPipeReader::completeAsyncRead()
+{
+    ResetEvent(overlapped.hEvent);
+    readSequenceStarted = false;
+
+    DWORD bytesRead;
+    if (!GetOverlappedResult(handle, &overlapped, &bytesRead, TRUE)) {
+        switch (GetLastError()) {
+        case ERROR_MORE_DATA:
+            // This is not an error. We're connected to a message mode
+            // pipe and the message didn't fit into the pipe's system
+            // buffer. We will read the remaining data in the next call.
+            break;
+        case ERROR_BROKEN_PIPE:
+        case ERROR_PIPE_NOT_CONNECTED:
+            return false;
+        default:
+            emit winError(GetLastError(), QLatin1String("QWindowsPipeReader::completeAsyncRead"));
+            return false;
+        }
+    }
+
+    actualReadBufferSize += bytesRead;
+    readBuffer.truncate(actualReadBufferSize);
+    if (!emitReadyReadTimer->isActive())
+        emitReadyReadTimer->start();
+    return true;
+}
+
+/*!
+    \internal
+    Returns the number of available bytes in the pipe.
+    Sets QWindowsPipeReader::pipeBroken to true if the connection is broken.
+ */
+DWORD QWindowsPipeReader::checkPipeState()
+{
+    DWORD bytes;
+    if (PeekNamedPipe(handle, NULL, 0, NULL, &bytes, NULL)) {
+        return bytes;
+    } else {
+        if (!pipeBroken) {
+            pipeBroken = true;
+            emit pipeClosed();
+        }
+    }
+    return 0;
+}
+
+/*!
+    Waits for the completion of the asynchronous read operation.
+    Returns true, if we've emitted the readyRead signal.
+ */
+bool QWindowsPipeReader::waitForReadyRead(int msecs)
+{
+    Q_ASSERT(readSequenceStarted);
+    DWORD result = WaitForSingleObject(overlapped.hEvent, msecs == -1 ? INFINITE : msecs);
+    switch (result) {
+        case WAIT_OBJECT_0:
+            return readEventSignalled();
+        case WAIT_TIMEOUT:
+            return false;
+    }
+
+    qWarning("QWindowsPipeReader::waitForReadyRead WaitForSingleObject failed with error code %d.", int(GetLastError()));
+    return false;
+}
+
+/*!
+    Waits until the pipe is closed.
+ */
+bool QWindowsPipeReader::waitForPipeClosed(int msecs)
+{
+    const int sleepTime = 10;
+    QElapsedTimer stopWatch;
+    stopWatch.start();
+    forever {
+        checkPipeState();
+        if (pipeBroken)
+            return true;
+        if (stopWatch.hasExpired(msecs - sleepTime))
+            return false;
+        Sleep(sleepTime);
+    }
+}
+
+QT_END_NAMESPACE
diff --git a/src/corelib/io/qwindowspipereader_p.h b/src/corelib/io/qwindowspipereader_p.h
new file mode 100644 (file)
index 0000000..12dd593
--- /dev/null
@@ -0,0 +1,122 @@
+/****************************************************************************
+**
+** Copyright (C) 2011 Nokia Corporation and/or its subsidiary(-ies).
+** All rights reserved.
+** Contact: Nokia Corporation (qt-info@nokia.com)
+**
+** This file is part of the QtCore module of the Qt Toolkit.
+**
+** $QT_BEGIN_LICENSE:LGPL$
+** GNU Lesser General Public License Usage
+** This file may be used under the terms of the GNU Lesser General Public
+** License version 2.1 as published by the Free Software Foundation and
+** appearing in the file LICENSE.LGPL included in the packaging of this
+** file. Please review the following information to ensure the GNU Lesser
+** General Public License version 2.1 requirements will be met:
+** http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html.
+**
+** In addition, as a special exception, Nokia gives you certain additional
+** rights. These rights are described in the Nokia Qt LGPL Exception
+** version 1.1, included in the file LGPL_EXCEPTION.txt in this package.
+**
+** GNU General Public License Usage
+** Alternatively, this file may be used under the terms of the GNU General
+** Public License version 3.0 as published by the Free Software Foundation
+** and appearing in the file LICENSE.GPL included in the packaging of this
+** file. Please review the following information to ensure the GNU General
+** Public License version 3.0 requirements will be met:
+** http://www.gnu.org/copyleft/gpl.html.
+**
+** Other Usage
+** Alternatively, this file may be used in accordance with the terms and
+** conditions contained in a signed written agreement between you and Nokia.
+**
+**
+**
+**
+**
+** $QT_END_LICENSE$
+**
+****************************************************************************/
+
+#ifndef QWINDOWSPIPEREADER_P_H
+#define QWINDOWSPIPEREADER_P_H
+
+//
+//  W A R N I N G
+//  -------------
+//
+// This file is not part of the Qt API.  It exists purely as an
+// implementation detail.  This header file may change from version to
+// version without notice, or even be removed.
+//
+// We mean it.
+//
+
+#include <qbytearray.h>
+#include <qobject.h>
+#include <qtimer.h>
+#include <qt_windows.h>
+
+#include <private/qringbuffer_p.h>
+
+QT_BEGIN_HEADER
+
+QT_BEGIN_NAMESPACE
+
+QT_MODULE(Core)
+
+class QWinEventNotifier;
+
+class Q_CORE_EXPORT QWindowsPipeReader : public QObject
+{
+    Q_OBJECT
+public:
+    explicit QWindowsPipeReader(QObject *parent = 0);
+    ~QWindowsPipeReader();
+
+    void setHandle(HANDLE hPipeReadEnd);
+    void stop();
+
+    void setMaxReadBufferSize(qint64 size) { readBufferMaxSize = size; }
+    qint64 maxReadBufferSize() const { return readBufferMaxSize; }
+
+    bool isPipeClosed() const { return pipeBroken; }
+    qint64 bytesAvailable() const;
+    qint64 read(char *data, qint64 maxlen);
+    bool canReadLine() const;
+    bool waitForReadyRead(int msecs);
+    bool waitForPipeClosed(int msecs);
+
+    void startAsyncRead();
+    bool completeAsyncRead();
+
+Q_SIGNALS:
+    void winError(ulong, const QString &);
+    void readyRead();
+    void pipeClosed();
+
+private Q_SLOTS:
+    bool readEventSignalled();
+
+private:
+    DWORD checkPipeState();
+
+private:
+    HANDLE handle;
+    OVERLAPPED overlapped;
+    QWinEventNotifier *dataReadNotifier;
+    qint64 readBufferMaxSize;
+    QRingBuffer readBuffer;
+    int actualReadBufferSize;
+    bool readSequenceStarted;
+    QTimer *emitReadyReadTimer;
+    bool pipeBroken;
+    static const qint64 initialReadBufferSize = 4096;
+};
+
+QT_END_NAMESPACE
+
+QT_END_HEADER
+
+#endif // QWINDOWSPIPEREADER_P_H
index a30f370..74c54bf 100644 (file)
@@ -131,9 +131,9 @@ private:
     Q_PRIVATE_SLOT(d_func(), void _q_stateChanged(QAbstractSocket::SocketState))
     Q_PRIVATE_SLOT(d_func(), void _q_error(QAbstractSocket::SocketError))
 #elif defined(Q_OS_WIN)
-    Q_PRIVATE_SLOT(d_func(), void _q_notified())
     Q_PRIVATE_SLOT(d_func(), void _q_canWrite())
     Q_PRIVATE_SLOT(d_func(), void _q_pipeClosed())
+    Q_PRIVATE_SLOT(d_func(), void _q_winError(ulong, const QString &))
 #else
     Q_PRIVATE_SLOT(d_func(), void _q_stateChanged(QAbstractSocket::SocketState))
     Q_PRIVATE_SLOT(d_func(), void _q_error(QAbstractSocket::SocketError))
index 3278178..b256f84 100644 (file)
@@ -63,8 +63,8 @@
 #if defined(QT_LOCALSOCKET_TCP)
 #   include "qtcpsocket.h"
 #elif defined(Q_OS_WIN)
+#   include "private/qwindowspipereader_p.h"
 #   include "private/qwindowspipewriter_p.h"
-#   include "private/qringbuffer_p.h"
 #   include <qwineventnotifier.h>
 #else
 #   include "private/qabstractsocketengine_p.h"
@@ -131,25 +131,13 @@ public:
     ~QLocalSocketPrivate();
     void destroyPipeHandles();
     void setErrorString(const QString &function);
-    void _q_notified();
     void _q_canWrite();
     void _q_pipeClosed();
-    DWORD checkPipeState();
-    void startAsyncRead();
-    bool completeAsyncRead();
-    void checkReadyRead();
+    void _q_winError(ulong windowsError, const QString &function);
     HANDLE handle;
-    OVERLAPPED overlapped;
     QWindowsPipeWriter *pipeWriter;
-    qint64 readBufferMaxSize;
-    QRingBuffer readBuffer;
-    int actualReadBufferSize;
-    QWinEventNotifier *dataReadNotifier;
+    QWindowsPipeReader *pipeReader;
     QLocalSocket::LocalSocketError error;
-    bool readSequenceStarted;
-    QTimer *emitReadyReadTimer;
-    bool pipeClosed;
-    static const qint64 initialReadBufferSize = 4096;
 #else
     QLocalUnixSocket unixSocket;
     QString generateErrorString(QLocalSocket::LocalSocketError, const QString &function) const;
index 9d7fb4e..1b0ee0d 100644 (file)
@@ -50,19 +50,21 @@ QT_BEGIN_NAMESPACE
 void QLocalSocketPrivate::init()
 {
     Q_Q(QLocalSocket);
-    emitReadyReadTimer = new QTimer(q);
-    emitReadyReadTimer->setSingleShot(true);
-    QObject::connect(emitReadyReadTimer, SIGNAL(timeout()), q, SIGNAL(readyRead()));
-    memset(&overlapped, 0, sizeof(overlapped));
-    overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
-    dataReadNotifier = new QWinEventNotifier(overlapped.hEvent, q);
-    q->connect(dataReadNotifier, SIGNAL(activated(HANDLE)), q, SLOT(_q_notified()));
+    pipeReader = new QWindowsPipeReader(q);
+    q->connect(pipeReader, SIGNAL(readyRead()), SIGNAL(readyRead()));
+    q->connect(pipeReader, SIGNAL(pipeClosed()), SLOT(_q_pipeClosed()), Qt::QueuedConnection);
+    q->connect(pipeReader, SIGNAL(winError(ulong, const QString &)), SLOT(_q_winError(ulong, const QString &)));
 }
 
 void QLocalSocketPrivate::setErrorString(const QString &function)
 {
+    DWORD windowsError = GetLastError();
+    _q_winError(windowsError, function);
+}
+
+void QLocalSocketPrivate::_q_winError(ulong windowsError, const QString &function)
+{
     Q_Q(QLocalSocket);
-    BOOL windowsError = GetLastError();
     QLocalSocket::LocalSocketState currentState = state;
 
     // If the connectToServer fails due to WaitNamedPipe() time-out, assume ConnectionError  
@@ -106,13 +108,9 @@ void QLocalSocketPrivate::setErrorString(const QString &function)
 
 QLocalSocketPrivate::QLocalSocketPrivate() : QIODevicePrivate(),
        handle(INVALID_HANDLE_VALUE),
+       pipeReader(0),
        pipeWriter(0),
-       readBufferMaxSize(0),
-       actualReadBufferSize(0),
        error(QLocalSocket::UnknownSocketError),
-       readSequenceStarted(false),
-       emitReadyReadTimer(0),
-       pipeClosed(false),
        state(QLocalSocket::UnconnectedState)
 {
 }
@@ -120,7 +118,6 @@ QLocalSocketPrivate::QLocalSocketPrivate() : QIODevicePrivate(),
 QLocalSocketPrivate::~QLocalSocketPrivate()
 {
     destroyPipeHandles();
-    CloseHandle(overlapped.hEvent);
 }
 
 void QLocalSocketPrivate::destroyPipeHandles()
@@ -200,129 +197,7 @@ qint64 QLocalSocket::readData(char *data, qint64 maxSize)
 {
     Q_D(QLocalSocket);
 
-    if (d->pipeClosed && d->actualReadBufferSize == 0)
-        return -1;  // signal EOF
-
-    qint64 readSoFar;
-    // If startAsyncRead() read data, copy it to its destination.
-    if (maxSize == 1 && d->actualReadBufferSize > 0) {
-        *data = d->readBuffer.getChar();
-        d->actualReadBufferSize--;
-        readSoFar = 1;
-    } else {
-        qint64 bytesToRead = qMin(qint64(d->actualReadBufferSize), maxSize);
-        readSoFar = 0;
-        while (readSoFar < bytesToRead) {
-            const char *ptr = d->readBuffer.readPointer();
-            int bytesToReadFromThisBlock = qMin(bytesToRead - readSoFar,
-                                                qint64(d->readBuffer.nextDataBlockSize()));
-            memcpy(data + readSoFar, ptr, bytesToReadFromThisBlock);
-            readSoFar += bytesToReadFromThisBlock;
-            d->readBuffer.free(bytesToReadFromThisBlock);
-            d->actualReadBufferSize -= bytesToReadFromThisBlock;
-        }
-    }
-
-    if (!d->pipeClosed) {
-        if (!d->actualReadBufferSize)
-            d->emitReadyReadTimer->stop();
-        if (!d->readSequenceStarted)
-            d->startAsyncRead();
-    }
-
-    return readSoFar;
-}
-
-/*!
-    \internal
-    Reads data from the socket into the readbuffer
- */
-void QLocalSocketPrivate::startAsyncRead()
-{
-    do {
-        DWORD bytesToRead = checkPipeState();
-        if (pipeClosed)
-            return;
-
-        if (bytesToRead == 0) {
-            // There are no bytes in the pipe but we need to
-            // start the overlapped read with some buffer size.
-            bytesToRead = initialReadBufferSize;
-        }
-
-        if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) {
-            bytesToRead = readBufferMaxSize - readBuffer.size();
-            if (bytesToRead == 0) {
-                // Buffer is full. User must read data from the buffer
-                // before we can read more from the pipe.
-                return;
-            }
-        }
-
-        char *ptr = readBuffer.reserve(bytesToRead);
-
-        readSequenceStarted = true;
-        if (ReadFile(handle, ptr, bytesToRead, NULL, &overlapped)) {
-            completeAsyncRead();
-        } else {
-            switch (GetLastError()) {
-                case ERROR_IO_PENDING:
-                    // This is not an error. We're getting notified, when data arrives.
-                    return;
-                case ERROR_MORE_DATA:
-                    // This is not an error. The synchronous read succeeded.
-                    // We're connected to a message mode pipe and the message
-                    // didn't fit into the pipe's system buffer.
-                    completeAsyncRead();
-                    break;
-                case ERROR_PIPE_NOT_CONNECTED:
-                    {
-                        // It may happen, that the other side closes the connection directly
-                        // after writing data. Then we must set the appropriate socket state.
-                        pipeClosed = true;
-                        Q_Q(QLocalSocket);
-                        QTimer::singleShot(0, q, SLOT(_q_pipeClosed()));
-                        return;
-                    }
-                default:
-                    setErrorString(QLatin1String("QLocalSocketPrivate::startAsyncRead"));
-                    return;
-            }
-        }
-    } while (!readSequenceStarted);
-}
-
-/*!
-    \internal
-    Sets the correct size of the read buffer after a read operation.
-    Returns false, if an error occurred or the connection dropped.
- */
-bool QLocalSocketPrivate::completeAsyncRead()
-{
-    ResetEvent(overlapped.hEvent);
-    readSequenceStarted = false;
-
-    DWORD bytesRead;
-    if (!GetOverlappedResult(handle, &overlapped, &bytesRead, TRUE)) {
-        switch (GetLastError()) {
-        case ERROR_MORE_DATA:
-            // This is not an error. We're connected to a message mode
-            // pipe and the message didn't fit into the pipe's system
-            // buffer. We will read the remaining data in the next call.
-            break;
-        case ERROR_PIPE_NOT_CONNECTED:
-            return false;
-        default:
-            setErrorString(QLatin1String("QLocalSocketPrivate::completeAsyncRead"));
-            return false;
-        }
-    }
-
-    actualReadBufferSize += bytesRead;
-    readBuffer.truncate(actualReadBufferSize);
-    if (!emitReadyReadTimer->isActive())
-        emitReadyReadTimer->start();
-    return true;
+    return d->pipeReader->read(data, maxSize);
 }
 
 qint64 QLocalSocket::writeData(const char *data, qint64 maxSize)
@@ -347,26 +222,6 @@ void QLocalSocket::abort()
     close();
 }
 
-/*!
-    \internal
-    Returns the number of available bytes in the pipe.
-    Sets QLocalSocketPrivate::pipeClosed to true if the connection is broken.
- */
-DWORD QLocalSocketPrivate::checkPipeState()
-{
-    Q_Q(QLocalSocket);
-    DWORD bytes;
-    if (PeekNamedPipe(handle, NULL, 0, NULL, &bytes, NULL)) {
-        return bytes;
-    } else {
-        if (!pipeClosed) {
-            pipeClosed = true;
-            QTimer::singleShot(0, q, SLOT(_q_pipeClosed()));
-        }
-    }
-    return 0;
-}
-
 void QLocalSocketPrivate::_q_pipeClosed()
 {
     Q_Q(QLocalSocket);
@@ -384,10 +239,9 @@ void QLocalSocketPrivate::_q_pipeClosed()
     emit q->stateChanged(state);
     emit q->disconnected();
 
-    readSequenceStarted = false;
+    pipeReader->stop();
     destroyPipeHandles();
     handle = INVALID_HANDLE_VALUE;
-    ResetEvent(overlapped.hEvent);
 
     if (pipeWriter) {
         delete pipeWriter;
@@ -399,7 +253,7 @@ qint64 QLocalSocket::bytesAvailable() const
 {
     Q_D(const QLocalSocket);
     qint64 available = QIODevice::bytesAvailable();
-    available += (qint64) d->actualReadBufferSize;
+    available += d->pipeReader->bytesAvailable();
     return available;
 }
 
@@ -412,8 +266,7 @@ qint64 QLocalSocket::bytesToWrite() const
 bool QLocalSocket::canReadLine() const
 {
     Q_D(const QLocalSocket);
-    return (QIODevice::canReadLine()
-            || d->readBuffer.indexOf('\n', d->actualReadBufferSize) != -1);
+    return QIODevice::canReadLine() || d->pipeReader->canReadLine();
 }
 
 void QLocalSocket::close()
@@ -475,15 +328,14 @@ bool QLocalSocket::setSocketDescriptor(quintptr socketDescriptor,
               LocalSocketState socketState, OpenMode openMode)
 {
     Q_D(QLocalSocket);
-    d->readBuffer.clear();
-    d->actualReadBufferSize = 0;
-    QIODevice::open(openMode);
-    d->handle = (int*)socketDescriptor;
+    d->pipeReader->stop();
+    d->handle = reinterpret_cast<HANDLE>(socketDescriptor);
     d->state = socketState;
-    d->pipeClosed = false;
+    d->pipeReader->setHandle(d->handle);
+    QIODevice::open(openMode);
     emit stateChanged(d->state);
     if (d->state == ConnectedState && openMode.testFlag(QIODevice::ReadOnly))
-        d->startAsyncRead();
+        d->pipeReader->startAsyncRead();
     return true;
 }
 
@@ -494,19 +346,6 @@ void QLocalSocketPrivate::_q_canWrite()
         q->close();
 }
 
-void QLocalSocketPrivate::_q_notified()
-{
-    Q_Q(QLocalSocket);
-    if (!completeAsyncRead()) {
-        pipeClosed = true;
-        QTimer::singleShot(0, q, SLOT(_q_pipeClosed()));
-        return;
-    }
-    startAsyncRead();
-    emitReadyReadTimer->stop();
-    emit q->readyRead();
-}
-
 quintptr QLocalSocket::socketDescriptor() const
 {
     Q_D(const QLocalSocket);
@@ -516,13 +355,13 @@ quintptr QLocalSocket::socketDescriptor() const
 qint64 QLocalSocket::readBufferSize() const
 {
     Q_D(const QLocalSocket);
-    return d->readBufferMaxSize;
+    return d->pipeReader->maxReadBufferSize();
 }
 
 void QLocalSocket::setReadBufferSize(qint64 size)
 {
     Q_D(QLocalSocket);
-    d->readBufferMaxSize = size;
+    d->pipeReader->setMaxReadBufferSize(size);
 }
 
 bool QLocalSocket::waitForConnected(int msecs)
@@ -540,18 +379,10 @@ bool QLocalSocket::waitForDisconnected(int msecs)
         qWarning("QLocalSocket::waitForDisconnected isn't supported for write only pipes.");
         return false;
     }
-    QIncrementalSleepTimer timer(msecs);
-    forever {
-        d->checkPipeState();
-        if (d->pipeClosed)
-            d->_q_pipeClosed();
-        if (state() == UnconnectedState)
-            return true;
-        Sleep(timer.nextSleepTime());
-        if (timer.hasTimedOut())
-            break;
+    if (d->pipeReader->waitForPipeClosed(msecs)) {
+        d->_q_pipeClosed();
+        return true;
     }
-
     return false;
 }
 
@@ -572,28 +403,18 @@ bool QLocalSocket::waitForReadyRead(int msecs)
         return false;
 
     // We already know that the pipe is gone, but did not enter the event loop yet.
-    if (d->pipeClosed) {
+    if (d->pipeReader->isPipeClosed()) {
         d->_q_pipeClosed();
         return false;
     }
 
-    Q_ASSERT(d->readSequenceStarted);
-    DWORD result = WaitForSingleObject(d->overlapped.hEvent, msecs == -1 ? INFINITE : msecs);
-    switch (result) {
-        case WAIT_OBJECT_0:
-            d->_q_notified();
-            // We just noticed that the pipe is gone.
-            if (d->pipeClosed) {
-                d->_q_pipeClosed();
-                return false;
-            }
-            return true;
-        case WAIT_TIMEOUT:
-            return false;
-    }
+    bool result = d->pipeReader->waitForReadyRead(msecs);
 
-    qWarning("QLocalSocket::waitForReadyRead WaitForSingleObject failed with error code %d.", int(GetLastError()));
-    return false;
+    // We just noticed that the pipe is gone.
+    if (d->pipeReader->isPipeClosed())
+        d->_q_pipeClosed();
+
+    return result;
 }
 
 bool QLocalSocket::waitForBytesWritten(int msecs)