libfreerdp-core: refactor RPC receiving as a synchronized queue
authorMarc-André Moreau <marcandre.moreau@gmail.com>
Thu, 29 Nov 2012 03:03:18 +0000 (22:03 -0500)
committerMarc-André Moreau <marcandre.moreau@gmail.com>
Thu, 29 Nov 2012 03:03:18 +0000 (22:03 -0500)
libfreerdp/core/gateway/rpc.c
libfreerdp/core/gateway/rpc.h
libfreerdp/core/gateway/rpc_bind.c
libfreerdp/core/gateway/rpc_client.c
libfreerdp/core/gateway/rpc_client.h
libfreerdp/core/gateway/rts.c
libfreerdp/core/gateway/tsg.c

index b67b706..9143b95 100644 (file)
@@ -738,6 +738,7 @@ rdpRpc* rpc_new(rdpTransport* transport)
                rpc_client_new(rpc);
 
                rpc->client->SynchronousSend = TRUE;
+               rpc->client->SynchronousReceive = TRUE;
        }
 
        return rpc;
index be77484..3a12e8d 100644 (file)
@@ -792,7 +792,6 @@ int rpc_in_write(rdpRpc* rpc, BYTE* data, int length);
 BOOL rpc_get_stub_data_info(rdpRpc* rpc, BYTE* header, UINT32* offset, UINT32* length);
 int rpc_recv_pdu_header(rdpRpc* rpc, BYTE* header);
 
-int rpc_recv_pdu_fragment(rdpRpc* rpc);
 RPC_PDU* rpc_recv_pdu(rdpRpc* rpc);
 
 int rpc_write(rdpRpc* rpc, BYTE* data, int length, UINT16 opnum);
index a4cd7e0..40cdc62 100644 (file)
@@ -27,6 +27,8 @@
 
 #include <winpr/crt.h>
 
+#include "rpc_client.h"
+
 #include "rpc_bind.h"
 
 /**
@@ -217,7 +219,7 @@ int rpc_recv_bind_ack_pdu(rdpRpc* rpc)
        BYTE* auth_data;
        rpcconn_hdr_t* header;
 
-       pdu = rpc_recv_pdu(rpc);
+       pdu = rpc_recv_dequeue_pdu(rpc);
 
        if (!pdu)
                return -1;
index 1c85227..d5f64b6 100644 (file)
@@ -113,21 +113,27 @@ int rpc_recv_enqueue_pdu(rdpRpc* rpc)
        InterlockedPushEntrySList(rpc->ReceiveQueue, &(pdu->ItemEntry));
        ReleaseSemaphore(rpc->client->ReceiveSemaphore, 1, NULL);
 
-       if (rpc->client->SynchronousReceive)
-       {
-               WaitForSingleObject(rpc->client->PduReceivedEvent, INFINITE);
-               ResetEvent(rpc->client->PduReceivedEvent);
-       }
-
        return 0;
 }
 
-int rpc_recv_dequeue_pdu(rdpRpc* rpc)
+RPC_PDU* rpc_recv_dequeue_pdu(rdpRpc* rpc)
 {
+       RPC_PDU* pdu;
+       DWORD dwMilliseconds;
+
+       pdu = NULL;
+       dwMilliseconds = rpc->client->SynchronousReceive ? INFINITE : 0;
+
        if (rpc->client->SynchronousReceive)
-               SetEvent(rpc->client->PduReceivedEvent);
+               rpc_recv_enqueue_pdu(rpc);
 
-       return 0;
+       if (WaitForSingleObject(rpc->client->ReceiveSemaphore, dwMilliseconds) == WAIT_OBJECT_0)
+       {
+               pdu = (RPC_PDU*) InterlockedPopEntrySList(rpc->ReceiveQueue);
+               return pdu;
+       }
+
+       return pdu;
 }
 
 static void* rpc_client_thread(void* arg)
@@ -158,7 +164,8 @@ static void* rpc_client_thread(void* arg)
 
                if (WaitForSingleObject(ReadEvent, 0) == WAIT_OBJECT_0)
                {
-
+                       if (!rpc->client->SynchronousReceive)
+                               rpc_recv_enqueue_pdu(rpc);
                }
 
                rpc_send_dequeue_pdu(rpc);
index 5a3deed..400c426 100644 (file)
@@ -27,6 +27,9 @@
 int rpc_send_enqueue_pdu(rdpRpc* rpc, BYTE* buffer, UINT32 length);
 int rpc_send_dequeue_pdu(rdpRpc* rpc);
 
+int rpc_recv_enqueue_pdu(rdpRpc* rpc);
+RPC_PDU* rpc_recv_dequeue_pdu(rdpRpc* rpc);
+
 int rpc_client_new(rdpRpc* rpc);
 int rpc_client_start(rdpRpc* rpc);
 
index ceac54a..7ef506e 100644 (file)
@@ -24,6 +24,7 @@
 #include <winpr/crt.h>
 
 #include "ncacn_http.h"
+#include "rpc_client.h"
 
 #include "rts.h"
 
@@ -56,7 +57,6 @@
 
 BOOL rts_connect(rdpRpc* rpc)
 {
-       int status;
        RPC_PDU* pdu;
        rpcconn_rts_hdr_t* rts;
        HttpResponse* http_response;
@@ -174,7 +174,7 @@ BOOL rts_connect(rdpRpc* rpc)
         *
         */
 
-       pdu = rpc_recv_pdu(rpc);
+       pdu = rpc_recv_dequeue_pdu(rpc);
 
        if (!pdu)
                return FALSE;
@@ -213,7 +213,7 @@ BOOL rts_connect(rdpRpc* rpc)
         *
         */
 
-       pdu = rpc_recv_pdu(rpc);
+       pdu = rpc_recv_dequeue_pdu(rpc);
 
        if (!pdu)
                return FALSE;
index 8a0dac5..63f3358 100644 (file)
@@ -35,6 +35,8 @@
 #include <winpr/ndr.h>
 #include <winpr/error.h>
 
+#include "rpc_client.h"
+
 #include "tsg.h"
 
 /**
@@ -212,7 +214,7 @@ BOOL TsProxyCreateTunnelReadResponse(rdpTsg* tsg)
        PTSG_PACKET_CAPS_RESPONSE packetCapsResponse;
        PTSG_PACKET_QUARENC_RESPONSE packetQuarEncResponse;
 
-       pdu = rpc_recv_pdu(rpc);
+       pdu = rpc_recv_dequeue_pdu(rpc);
 
        if (!pdu)
                return FALSE;
@@ -560,7 +562,7 @@ BOOL TsProxyAuthorizeTunnelReadResponse(rdpTsg* tsg)
        rdpRpc* rpc = tsg->rpc;
        PTSG_PACKET_RESPONSE packetResponse;
 
-       pdu = rpc_recv_pdu(rpc);
+       pdu = rpc_recv_dequeue_pdu(rpc);
 
        if (!pdu)
                return FALSE;
@@ -793,7 +795,7 @@ BOOL TsProxyCreateChannelReadResponse(rdpTsg* tsg)
        UINT32 length;
        rdpRpc* rpc = tsg->rpc;
 
-       pdu = rpc_recv_pdu(rpc);
+       pdu = rpc_recv_dequeue_pdu(rpc);
 
        if (!pdu)
                return FALSE;
@@ -1104,7 +1106,7 @@ int tsg_read(rdpTsg* tsg, BYTE* data, UINT32 length)
        }
        else
        {
-               tsg->pdu = rpc_recv_pdu(rpc);
+               tsg->pdu = rpc_recv_dequeue_pdu(rpc);
 
                if ((tsg->pdu->Flags & RPC_PDU_FLAG_STUB) && (tsg->pdu->Length == 4))
                {
@@ -1114,7 +1116,7 @@ int tsg_read(rdpTsg* tsg, BYTE* data, UINT32 length)
                }
 
                tsg->PendingPdu = TRUE;
-               tsg->BytesAvailable = rpc->pdu->Length;
+               tsg->BytesAvailable = tsg->pdu->Length;
                tsg->BytesRead = 0;
 
                CopyLength = (tsg->BytesAvailable > length) ? length : tsg->BytesAvailable;