libfreerdp-core: add asynchronous send queue
authorMarc-André Moreau <marcandre.moreau@gmail.com>
Wed, 28 Nov 2012 18:38:01 +0000 (13:38 -0500)
committerMarc-André Moreau <marcandre.moreau@gmail.com>
Wed, 28 Nov 2012 18:38:01 +0000 (13:38 -0500)
libfreerdp/core/gateway/rpc.c
libfreerdp/core/gateway/rpc.h
libfreerdp/core/gateway/rpc_client.c
libfreerdp/core/gateway/rpc_client.h
libfreerdp/core/gateway/rpc_fault.c
winpr/libwinpr/synch/wait.c

index c811a2f..aabc1be 100644 (file)
@@ -29,6 +29,7 @@
 
 #include <winpr/crt.h>
 #include <winpr/tchar.h>
+#include <winpr/synch.h>
 #include <winpr/dsparse.h>
 
 #include <openssl/rand.h>
@@ -457,7 +458,6 @@ int rpc_recv_pdu(rdpRpc* rpc)
 
 int rpc_tsg_write(rdpRpc* rpc, BYTE* data, int length, UINT16 opnum)
 {
-       int status;
        BYTE* buffer;
        UINT32 offset;
        rdpNtlm* ntlm;
@@ -549,12 +549,9 @@ int rpc_tsg_write(rdpRpc* rpc, BYTE* data, int length, UINT16 opnum)
        offset += Buffers[1].cbBuffer;
 
        rpc_send_enqueue_pdu(rpc, buffer, request_pdu->frag_length);
-       status = rpc_send_dequeue_pdu(rpc);
-
-       free(buffer);
 
-       if (status < 0)
-               return -1;
+       WaitForSingleObject(rpc->client->PduSentEvent, INFINITE);
+       ResetEvent(rpc->client->PduSentEvent);
 
        return length;
 }
@@ -715,6 +712,8 @@ rdpRpc* rpc_new(rdpTransport* transport)
                rpc->VirtualConnectionCookieTable = rpc_virtual_connection_cookie_table_new(rpc);
 
                rpc->call_id = 1;
+
+               rpc_client_start(rpc);
        }
 
        return rpc;
index bf78441..1ae147b 100644 (file)
@@ -705,11 +705,8 @@ struct rpc_client
        HANDLE Thread;
        HANDLE StopEvent;
 
-       HANDLE SendEvent;
+       HANDLE PduSentEvent;
        HANDLE SendSemaphore;
-
-       HANDLE ReceiveEvent;
-       HANDLE ReceiveSemaphore;
 };
 typedef struct rpc_client RpcClient;
 
index ce5ba63..0c5dad6 100644 (file)
@@ -40,6 +40,7 @@ int rpc_send_enqueue_pdu(rdpRpc* rpc, BYTE* buffer, UINT32 length)
        PduEntry->Length = length;
 
        InterlockedPushEntrySList(rpc->SendQueue, &(PduEntry->ItemEntry));
+       ReleaseSemaphore(rpc->client->SendSemaphore, 1, NULL);
 
        return 0;
 }
@@ -51,6 +52,9 @@ int rpc_send_dequeue_pdu(rdpRpc* rpc)
 
        PduEntry = (RPC_PDU_ENTRY*) InterlockedPopEntrySList(rpc->SendQueue);
 
+       if (!PduEntry)
+               return 0;
+
        status = rpc_in_write(rpc, PduEntry->Buffer, PduEntry->Length);
 
        /*
@@ -62,8 +66,11 @@ int rpc_send_dequeue_pdu(rdpRpc* rpc)
        rpc->VirtualConnection->DefaultInChannel->BytesSent += status;
        rpc->VirtualConnection->DefaultInChannel->SenderAvailableWindow -= status;
 
+       free(PduEntry->Buffer);
        _aligned_free(PduEntry);
 
+       SetEvent(rpc->client->PduSentEvent);
+
        return status;
 }
 
@@ -71,30 +78,23 @@ static void* rpc_client_thread(void* arg)
 {
        rdpRpc* rpc;
        DWORD status;
-       HANDLE events[3];
+       DWORD nCount;
+       HANDLE events[2];
 
        rpc = (rdpRpc*) arg;
 
-       events[0] = rpc->client->StopEvent;
-       events[1] = rpc->client->SendEvent;
-       events[2] = rpc->client->ReceiveEvent;
+       nCount = 0;
+       events[nCount++] = rpc->client->StopEvent;
+       events[nCount++] = rpc->client->SendSemaphore;
 
        while (1)
        {
-               status = WaitForMultipleObjects(3, events, FALSE, INFINITE);
+               status = WaitForMultipleObjects(nCount, events, FALSE, INFINITE);
 
                if (WaitForSingleObject(rpc->client->StopEvent, 0) == WAIT_OBJECT_0)
                        break;
 
-               if (WaitForSingleObject(rpc->client->SendEvent, 0) == WAIT_OBJECT_0)
-               {
-                       rpc_send_dequeue_pdu(rpc);
-               }
-
-               if (WaitForSingleObject(rpc->client->ReceiveEvent, 0) == WAIT_OBJECT_0)
-               {
-
-               }
+               rpc_send_dequeue_pdu(rpc);
        }
 
        return NULL;
@@ -108,9 +108,9 @@ int rpc_client_start(rdpRpc* rpc)
                        (LPTHREAD_START_ROUTINE) rpc_client_thread,
                        rpc, CREATE_SUSPENDED, NULL);
 
-       rpc->client->SendEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
-       rpc->client->ReceiveEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
        rpc->client->StopEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
+       rpc->client->SendSemaphore = CreateSemaphore(NULL, 0, 64, NULL);
+       rpc->client->PduSentEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
 
        ResumeThread(rpc->client->Thread);
 
index 2f0b4e8..113e69b 100644 (file)
@@ -27,4 +27,6 @@
 int rpc_send_enqueue_pdu(rdpRpc* rpc, BYTE* buffer, UINT32 length);
 int rpc_send_dequeue_pdu(rdpRpc* rpc);
 
+int rpc_client_start(rdpRpc* rpc);
+
 #endif /* FREERDP_CORE_RPC_CLIENT_H */
index 368d2e2..712e36e 100644 (file)
@@ -132,6 +132,26 @@ const RPC_FAULT_CODE RPC_FAULT_CODES[] =
        DEFINE_RPC_FAULT_CODE(RPC_X_ENUM_VALUE_OUT_OF_RANGE)
        DEFINE_RPC_FAULT_CODE(RPC_X_BYTE_COUNT_TOO_SMALL)
        DEFINE_RPC_FAULT_CODE(RPC_X_BAD_STUB_DATA)
+       DEFINE_RPC_FAULT_CODE(RPC_S_NO_INTERFACES)
+       DEFINE_RPC_FAULT_CODE(RPC_S_CALL_CANCELLED)
+       DEFINE_RPC_FAULT_CODE(RPC_S_BINDING_INCOMPLETE)
+       DEFINE_RPC_FAULT_CODE(RPC_S_COMM_FAILURE)
+       DEFINE_RPC_FAULT_CODE(RPC_S_UNSUPPORTED_AUTHN_LEVEL)
+       DEFINE_RPC_FAULT_CODE(RPC_S_NO_PRINC_NAME)
+       DEFINE_RPC_FAULT_CODE(RPC_S_NOT_RPC_ERROR)
+       DEFINE_RPC_FAULT_CODE(RPC_S_UUID_LOCAL_ONLY)
+       DEFINE_RPC_FAULT_CODE(RPC_S_SEC_PKG_ERROR)
+       DEFINE_RPC_FAULT_CODE(RPC_S_NOT_CANCELLED)
+       DEFINE_RPC_FAULT_CODE(RPC_X_INVALID_ES_ACTION)
+       DEFINE_RPC_FAULT_CODE(RPC_X_WRONG_ES_VERSION)
+       DEFINE_RPC_FAULT_CODE(RPC_X_WRONG_STUB_VERSION)
+       DEFINE_RPC_FAULT_CODE(RPC_X_INVALID_PIPE_OBJECT)
+       DEFINE_RPC_FAULT_CODE(RPC_X_WRONG_PIPE_ORDER)
+       DEFINE_RPC_FAULT_CODE(RPC_X_WRONG_PIPE_VERSION)
+       DEFINE_RPC_FAULT_CODE(RPC_S_COOKIE_AUTH_FAILED)
+       DEFINE_RPC_FAULT_CODE(RPC_S_GROUP_MEMBER_NOT_FOUND)
+       DEFINE_RPC_FAULT_CODE(EPT_S_CANT_CREATE)
+       DEFINE_RPC_FAULT_CODE(RPC_S_INVALID_OBJECT)
        { 0, NULL }
 };
 
index 157ceda..91ea29c 100644 (file)
@@ -153,7 +153,6 @@ DWORD WaitForMultipleObjects(DWORD nCount, const HANDLE* lpHandles, BOOL bWaitAl
        fd_set fds;
        ULONG Type;
        PVOID Object;
-       WINPR_EVENT* event;
        struct timeval timeout;
 
        maxfd = 0;
@@ -168,11 +167,22 @@ DWORD WaitForMultipleObjects(DWORD nCount, const HANDLE* lpHandles, BOOL bWaitAl
                if (!winpr_Handle_GetInfo(lpHandles[index], &Type, &Object))
                        return WAIT_FAILED;
 
-               if (Type != HANDLE_TYPE_EVENT)
+               if (Type == HANDLE_TYPE_EVENT)
+               {
+                       fd = ((WINPR_EVENT*) Object)->pipe_fd[0];
+               }
+               else if (Type == HANDLE_TYPE_SEMAPHORE)
+               {
+#ifdef WINPR_PIPE_SEMAPHORE
+                       fd = ((WINPR_SEMAPHORE*) Object)->pipe_fd[0];
+#else
                        return WAIT_FAILED;
-
-               event = (WINPR_EVENT*) Object;
-               fd = event->pipe_fd[0];
+#endif
+               }
+               else
+               {
+                       return WAIT_FAILED;
+               }
 
                FD_SET(fd, &fds);
 
@@ -197,10 +207,24 @@ DWORD WaitForMultipleObjects(DWORD nCount, const HANDLE* lpHandles, BOOL bWaitAl
        for (index = 0; index < nCount; index++)
        {
                winpr_Handle_GetInfo(lpHandles[index], &Type, &Object);
-               fd = ((WINPR_EVENT*) Object)->pipe_fd[0];
+
+               if (Type == HANDLE_TYPE_EVENT)
+                       fd = ((WINPR_EVENT*) Object)->pipe_fd[0];
+               else if (Type == HANDLE_TYPE_SEMAPHORE)
+                       fd = ((WINPR_SEMAPHORE*) Object)->pipe_fd[0];
 
                if (FD_ISSET(fd, &fds))
+               {
+                       if (Type == HANDLE_TYPE_SEMAPHORE)
+                       {
+                               int length = read(fd, &length, 1);
+
+                               if (length != 1)
+                                       return WAIT_FAILED;
+                       }
+
                        return (WAIT_OBJECT_0 + index);
+               }
        }
 
        return WAIT_FAILED;