Fix postgres notification support in the QPSQLDriver.
authorMatt Newell <newellm@blur.com>
Thu, 22 Mar 2012 17:42:56 +0000 (10:42 -0700)
committerQt by Nokia <qt-info@nokia.com>
Fri, 30 Mar 2012 21:51:11 +0000 (23:51 +0200)
This patch fixes a critical bug in the qsqlpsql driver where
notifications aren't delivered when received. Any blocking libpq
function(specifically PQexec) will read all the incoming data
from the socket, including any pending notifications. This would
cause the socket notifier to never be fired for incoming
notifications that are already queued inside libpq. The qsqldriver
test case was skipping the postgres notification test because of
this bug, now its enabled and passing. In order to fix this
bug I made a wrapper function for PQexec in QPSQLDriverPrivate
that calls _q_handleNotification via QMetaObject::callMethod
QueuedConnection in order to deliver pending notifications
when control returns to the event loop. I also added a flag
to ensure only one call is made each time the event loop is
entered.

Change-Id: I19f5297094ae7ae46bfb0717e4fca744d69f7b92
Reviewed-by: Honglei Zhang <honglei.zhang@nokia.com>
Reviewed-by: Mark Brand <mabrand@mabrand.nl>
src/sql/drivers/psql/qsql_psql.cpp
src/sql/kernel/qsqldriver.h
tests/auto/sql/kernel/qsqldatabase/tst_qsqldatabase.cpp

index ec31d54..10374bb 100644 (file)
@@ -123,14 +123,21 @@ inline void qPQfreemem(void *buffer)
 class QPSQLDriverPrivate
 {
 public:
-    QPSQLDriverPrivate() : connection(0), isUtf8(false), pro(QPSQLDriver::Version6), sn(0) {}
+    QPSQLDriverPrivate(QPSQLDriver *qq) : q(qq), connection(0), isUtf8(false), pro(QPSQLDriver::Version6), sn(0), pendingNotifyCheck(false) {}
+    QPSQLDriver *q;
     PGconn *connection;
     bool isUtf8;
     QPSQLDriver::Protocol pro;
     QSocketNotifier *sn;
     QStringList seid;
+    mutable bool pendingNotifyCheck;
 
     void appendTables(QStringList &tl, QSqlQuery &t, QChar type);
+    PGresult * exec(const char * stmt) const;
+    PGresult * exec(const QString & stmt) const;
+    QPSQLDriver::Protocol getPSQLVersion();
+    bool setEncodingUtf8();
+    void setDatestyle();
 };
 
 void QPSQLDriverPrivate::appendTables(QStringList &tl, QSqlQuery &t, QChar type)
@@ -157,6 +164,21 @@ void QPSQLDriverPrivate::appendTables(QStringList &tl, QSqlQuery &t, QChar type)
     }
 }
 
+PGresult * QPSQLDriverPrivate::exec(const char * stmt) const
+{
+    PGresult *result = PQexec(connection, stmt);
+    if (seid.size() && !pendingNotifyCheck) {
+        pendingNotifyCheck = true;
+        QMetaObject::invokeMethod(q, "_q_handleNotification", Qt::QueuedConnection, Q_ARG(int,0));
+    }
+    return result;
+}
+
+PGresult * QPSQLDriverPrivate::exec(const QString & stmt) const
+{
+    return exec(isUtf8 ? stmt.toUtf8().constData() : stmt.toLocal8Bit().constData());
+}
+
 class QPSQLResultPrivate
 {
 public:
@@ -251,9 +273,7 @@ static QVariant::Type qDecodePSQLType(int t)
 static void qDeallocatePreparedStmt(QPSQLResultPrivate *d)
 {
     const QString stmt = QLatin1String("DEALLOCATE ") + d->preparedStmtId;
-    PGresult *result = PQexec(d->driver->connection,
-                              d->driver->isUtf8 ? stmt.toUtf8().constData()
-                                                : stmt.toLocal8Bit().constData());
+    PGresult *result = d->driver->exec(stmt);
 
     if (PQresultStatus(result) != PGRES_COMMAND_OK)
         qWarning("Unable to free statement: %s", PQerrorMessage(d->driver->connection));
@@ -431,9 +451,7 @@ bool QPSQLResult::reset (const QString& query)
         return false;
     if (!driver()->isOpen() || driver()->isOpenError())
         return false;
-    d->result = PQexec(d->driver->connection,
-                       d->driver->isUtf8 ? query.toUtf8().constData()
-                                         : query.toLocal8Bit().constData());
+    d->result = d->driver->exec(query);
     return d->processResults();
 }
 
@@ -564,9 +582,7 @@ bool QPSQLResult::prepare(const QString &query)
     const QString stmtId = qMakePreparedStmtId();
     const QString stmt = QString::fromLatin1("PREPARE %1 AS ").arg(stmtId).append(qReplacePlaceholderMarkers(query));
 
-    PGresult *result = PQexec(d->driver->connection,
-                              d->driver->isUtf8 ? stmt.toUtf8().constData()
-                                                : stmt.toLocal8Bit().constData());
+    PGresult *result = d->driver->exec(stmt);
 
     if (PQresultStatus(result) != PGRES_COMMAND_OK) {
         setLastError(qMakeError(QCoreApplication::translate("QPSQLResult",
@@ -595,26 +611,24 @@ bool QPSQLResult::exec()
     else
         stmt = QString::fromLatin1("EXECUTE %1 (%2)").arg(d->preparedStmtId).arg(params);
 
-    d->result = PQexec(d->driver->connection,
-                       d->driver->isUtf8 ? stmt.toUtf8().constData()
-                                         : stmt.toLocal8Bit().constData());
+    d->result = d->driver->exec(stmt);
 
     return d->processResults();
 }
 
 ///////////////////////////////////////////////////////////////////
 
-static bool setEncodingUtf8(PGconn* connection)
+bool QPSQLDriverPrivate::setEncodingUtf8()
 {
-    PGresult* result = PQexec(connection, "SET CLIENT_ENCODING TO 'UNICODE'");
+    PGresult* result = exec("SET CLIENT_ENCODING TO 'UNICODE'");
     int status = PQresultStatus(result);
     PQclear(result);
     return status == PGRES_COMMAND_OK;
 }
 
-static void setDatestyle(PGconn* connection)
+void QPSQLDriverPrivate::setDatestyle()
 {
-    PGresult* result = PQexec(connection, "SET DATESTYLE TO 'ISO'");
+    PGresult* result = exec("SET DATESTYLE TO 'ISO'");
     int status =  PQresultStatus(result);
     if (status != PGRES_COMMAND_OK)
         qWarning("%s", PQerrorMessage(connection));
@@ -665,10 +679,10 @@ static QPSQLDriver::Protocol qMakePSQLVersion(int vMaj, int vMin)
     return QPSQLDriver::VersionUnknown;
 }
 
-static QPSQLDriver::Protocol getPSQLVersion(PGconn* connection)
+QPSQLDriver::Protocol QPSQLDriverPrivate::getPSQLVersion()
 {
     QPSQLDriver::Protocol serverVersion = QPSQLDriver::Version6;
-    PGresult* result = PQexec(connection, "select version()");
+    PGresult* result = exec("select version()");
     int status = PQresultStatus(result);
     if (status == PGRES_COMMAND_OK || status == PGRES_TUPLES_OK) {
         QString val = QString::fromAscii(PQgetvalue(result, 0, 0));
@@ -691,7 +705,7 @@ static QPSQLDriver::Protocol getPSQLVersion(PGconn* connection)
                 //Client version before QPSQLDriver::Version9 only supports escape mode for bytea type,
                 //but bytea format is set to hex by default in PSQL 9 and above. So need to force the
                 //server use the old escape mode when connects to the new server with old client library.
-                result = PQexec(connection, "SET bytea_output=escape; ");
+                result = exec("SET bytea_output=escape; ");
                 status = PQresultStatus(result);
             } else if (serverVersion == QPSQLDriver::VersionUnknown) {
                 serverVersion = clientVersion;
@@ -726,7 +740,7 @@ QPSQLDriver::QPSQLDriver(PGconn *conn, QObject *parent)
     init();
     d->connection = conn;
     if (conn) {
-        d->pro = getPSQLVersion(d->connection);
+        d->pro = d->getPSQLVersion();
         setOpen(true);
         setOpenError(false);
     }
@@ -734,7 +748,7 @@ QPSQLDriver::QPSQLDriver(PGconn *conn, QObject *parent)
 
 void QPSQLDriver::init()
 {
-    d = new QPSQLDriverPrivate();
+    d = new QPSQLDriverPrivate(this);
 }
 
 QPSQLDriver::~QPSQLDriver()
@@ -826,9 +840,9 @@ bool QPSQLDriver::open(const QString & db,
         return false;
     }
 
-    d->pro = getPSQLVersion(d->connection);
-    d->isUtf8 = setEncodingUtf8(d->connection);
-    setDatestyle(d->connection);
+    d->pro = d->getPSQLVersion();
+    d->isUtf8 = d->setEncodingUtf8();
+    d->setDatestyle();
 
     setOpen(true);
     setOpenError(false);
@@ -865,7 +879,7 @@ bool QPSQLDriver::beginTransaction()
         qWarning("QPSQLDriver::beginTransaction: Database not open");
         return false;
     }
-    PGresult* res = PQexec(d->connection, "BEGIN");
+    PGresult* res = d->exec("BEGIN");
     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) {
         PQclear(res);
         setLastError(qMakeError(tr("Could not begin transaction"),
@@ -882,7 +896,7 @@ bool QPSQLDriver::commitTransaction()
         qWarning("QPSQLDriver::commitTransaction: Database not open");
         return false;
     }
-    PGresult* res = PQexec(d->connection, "COMMIT");
+    PGresult* res = d->exec("COMMIT");
 
     bool transaction_failed = false;
 
@@ -915,7 +929,7 @@ bool QPSQLDriver::rollbackTransaction()
         qWarning("QPSQLDriver::rollbackTransaction: Database not open");
         return false;
     }
-    PGresult* res = PQexec(d->connection, "ROLLBACK");
+    PGresult* res = d->exec("ROLLBACK");
     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) {
         setLastError(qMakeError(tr("Could not rollback transaction"),
                                 QSqlError::TransactionError, d));
@@ -1298,11 +1312,11 @@ bool QPSQLDriver::subscribeToNotificationImplementation(const QString &name)
 
     int socket = PQsocket(d->connection);
     if (socket) {
+        // Add the name to the list of subscriptions here so that QSQLDriverPrivate::exec knows
+        // to check for notifications immediately after executing the LISTEN
+        d->seid << name;
         QString query = QLatin1String("LISTEN ") + escapeIdentifier(name, QSqlDriver::TableName);
-        if (PQresultStatus(PQexec(d->connection,
-                                  d->isUtf8 ? query.toUtf8().constData()
-                                            : query.toLocal8Bit().constData())
-                          ) != PGRES_COMMAND_OK) {
+        if (PQresultStatus(d->exec(query)) != PGRES_COMMAND_OK) {
             setLastError(qMakeError(tr("Unable to subscribe"), QSqlError::StatementError, d));
             return false;
         }
@@ -1311,9 +1325,11 @@ bool QPSQLDriver::subscribeToNotificationImplementation(const QString &name)
             d->sn = new QSocketNotifier(socket, QSocketNotifier::Read);
             connect(d->sn, SIGNAL(activated(int)), this, SLOT(_q_handleNotification(int)));
         }
+    } else {
+        qWarning("QPSQLDriver::subscribeToNotificationImplementation: PQsocket didn't return a valid socket to listen on");
+        return false;
     }
 
-    d->seid << name;
     return true;
 }
 
@@ -1331,10 +1347,7 @@ bool QPSQLDriver::unsubscribeFromNotificationImplementation(const QString &name)
     }
 
     QString query = QLatin1String("UNLISTEN ") + escapeIdentifier(name, QSqlDriver::TableName);
-    if (PQresultStatus(PQexec(d->connection,
-                              d->isUtf8 ? query.toUtf8().constData()
-                                        : query.toLocal8Bit().constData())
-                      ) != PGRES_COMMAND_OK) {
+    if (PQresultStatus(d->exec(query)) != PGRES_COMMAND_OK) {
         setLastError(qMakeError(tr("Unable to unsubscribe"), QSqlError::StatementError, d));
         return false;
     }
@@ -1357,6 +1370,7 @@ QStringList QPSQLDriver::subscribedToNotificationsImplementation() const
 
 void QPSQLDriver::_q_handleNotification(int)
 {
+    d->pendingNotifyCheck = false;
     PQconsumeInput(d->connection);
 
     PGnotify *notify = 0;
@@ -1364,10 +1378,8 @@ void QPSQLDriver::_q_handleNotification(int)
         QString name(QLatin1String(notify->relname));
         if (d->seid.contains(name)) {
             emit notification(name);
-            if (notify->be_pid == PQbackendPID(d->connection))
-                emit notification(name, QSqlDriver::SelfSource);
-            else
-                emit notification(name, QSqlDriver::OtherSource);
+            QSqlDriver::NotificationSource source = (notify->be_pid == PQbackendPID(d->connection)) ? QSqlDriver::SelfSource : QSqlDriver::OtherSource;
+            emit notification(name, source);
         }
         else
             qWarning("QPSQLDriver: received notification for '%s' which isn't subscribed to.",
index 5fd7441..3ca8092 100644 (file)
@@ -122,7 +122,7 @@ public:
 
 Q_SIGNALS:
     void notification(const QString &name);
-    void notification(const QString &name, NotificationSource source);
+    void notification(const QString &name, QSqlDriver::NotificationSource source);
 
 protected:
     virtual void setOpen(bool o);
index 7acbf81..992df6e 100644 (file)
@@ -54,6 +54,8 @@
 
 #include "tst_databases.h"
 
+Q_DECLARE_METATYPE(QSqlDriver::NotificationSource)
+
 QT_FORWARD_DECLARE_CLASS(QSqlDatabase)
 struct FieldDef;
 
@@ -385,6 +387,7 @@ void tst_QSqlDatabase::populateTestTables(QSqlDatabase db)
 
 void tst_QSqlDatabase::initTestCase()
 {
+    qRegisterMetaType<QSqlDriver::NotificationSource>("QSqlDriver::NotificationSource");
     dbs.open();
 
     for (QStringList::ConstIterator it = dbs.dbNames.begin(); it != dbs.dbNames.end(); ++it) {
@@ -2064,21 +2067,17 @@ void tst_QSqlDatabase::eventNotificationPSQL()
     QSqlDatabase db = QSqlDatabase::database(dbName);
     CHECK_DATABASE(db);
 
-#if defined(Q_OS_LINUX)
-    QSKIP( "Event support doesn't work on linux");
-#endif
-
     QSqlQuery query(db);
     QString procedureName = qTableName("posteventProc", __FILE__);
-
     QSqlDriver &driver=*(db.driver());
     QVERIFY_SQL(driver, subscribeToNotification(procedureName));
-    QSignalSpy spy(db.driver(), SIGNAL(notification(const QString&)));
+    QSignalSpy spy(db.driver(), SIGNAL(notification(const QString&,QSqlDriver::NotificationSource)));
     query.exec(QString("NOTIFY \"%1\"").arg(procedureName));
     QCoreApplication::processEvents();
     QCOMPARE(spy.count(), 1);
     QList<QVariant> arguments = spy.takeFirst();
     QVERIFY(arguments.at(0).toString() == procedureName);
+    QVERIFY(qVariantValue<QSqlDriver::NotificationSource>(arguments.at(1)) == QSqlDriver::SelfSource);
     QVERIFY_SQL(driver, unsubscribeFromNotification(procedureName));
 }