libfreerdp-core: fix TSG thread shutdown and input freeze problem
authorMarc-André Moreau <marcandre.moreau@gmail.com>
Mon, 15 Dec 2014 14:42:04 +0000 (09:42 -0500)
committerMarc-André Moreau <marcandre.moreau@gmail.com>
Mon, 15 Dec 2014 14:42:04 +0000 (09:42 -0500)
client/X11/xf_client.c
libfreerdp/core/gateway/rpc.c
libfreerdp/core/gateway/rpc.h
libfreerdp/core/gateway/rpc_client.c
libfreerdp/core/gateway/tsg.c
libfreerdp/core/transport.c
libfreerdp/core/transport.h

index 71534ab..41c6625 100644 (file)
@@ -1275,27 +1275,37 @@ void xf_window_free(xfContext* xfc)
 
 void* xf_input_thread(void *arg)
 {
-       xfContext* xfc;
        DWORD status;
-       HANDLE event[2];
+       DWORD nCount;
+       HANDLE events[2];
        XEvent xevent;
-       wMessageQueue *queue;
        wMessage msg;
+       wMessageQueue *queue;
        int pending_status = 1;
        int process_status = 1;
-       freerdp *instance = (freerdp*) arg;
-       assert(NULL != instance);
-       xfc = (xfContext *) instance->context;
-       assert(NULL != xfc);
+       freerdp* instance = (freerdp*) arg;
+       xfContext* xfc = (xfContext*) instance->context;
+
        queue = freerdp_get_message_queue(instance, FREERDP_INPUT_MESSAGE_QUEUE);
-       event[0] = MessageQueue_Event(queue);
-       event[1] = CreateFileDescriptorEvent(NULL, FALSE, FALSE, xfc->xfds);
+
+       nCount = 0;
+       events[nCount++] = MessageQueue_Event(queue);
+       events[nCount++] = CreateFileDescriptorEvent(NULL, FALSE, FALSE, xfc->xfds);
 
        while(1)
        {
-               status = WaitForMultipleObjects(2, event, FALSE, INFINITE);
+               status = WaitForMultipleObjects(nCount, events, FALSE, INFINITE);
                
-               if(status == WAIT_OBJECT_0 + 1)
+               if (WaitForSingleObject(events[0], 0) == WAIT_OBJECT_0)
+               {
+                       if (MessageQueue_Peek(queue, &msg, FALSE))
+                       {
+                               if (msg.id == WMQ_QUIT)
+                                       break;
+                       }
+               }
+
+               if (WaitForSingleObject(events[1], 0) == WAIT_OBJECT_0)
                {
                        do
                        {
@@ -1324,18 +1334,10 @@ void* xf_input_thread(void *arg)
                                break;
 
                }
-               else if(status == WAIT_OBJECT_0)
-               {
-                       if(MessageQueue_Peek(queue, &msg, FALSE))
-                       {
-                               if(msg.id == WMQ_QUIT)
-                                       break;
-                       }
-               }
-               else
-                       break;
        }
 
+       CloseHandle(events[1]);
+
        MessageQueue_PostQuit(queue, 0);
        ExitThread(0);
        return NULL;
index 039fc5d..3df1c73 100644 (file)
@@ -320,6 +320,7 @@ BOOL rpc_get_stub_data_info(rdpRpc* rpc, BYTE* buffer, UINT32* offset, UINT32* l
 int rpc_out_read(rdpRpc* rpc, BYTE* data, int length)
 {
        int status;
+
        status = BIO_read(rpc->TlsOut->bio, data, length);
 
        if (status > 0)
@@ -558,8 +559,11 @@ rdpRpc* rpc_new(rdpTransport* transport)
                return NULL;
 
        rpc->State = RPC_CLIENT_STATE_INITIAL;
+
        rpc->transport = transport;
        rpc->settings = transport->settings;
+       rpc->context = transport->context;
+
        rpc->SendSeqNum = 0;
 
        rpc->ntlm = ntlm_new();
@@ -637,7 +641,7 @@ void rpc_free(rdpRpc* rpc)
 {
        if (rpc)
        {
-               rpc_client_stop(rpc);
+               rpc_client_free(rpc);
 
                if (rpc->ntlm)
                {
index f201aae..0ceb841 100644 (file)
@@ -733,6 +733,7 @@ struct rdp_rpc
        rdpNtlmHttp* NtlmHttpIn;
        rdpNtlmHttp* NtlmHttpOut;
 
+       rdpContext* context;
        rdpSettings* settings;
        rdpTransport* transport;
 
index 1bec237..883180d 100644 (file)
@@ -97,8 +97,6 @@ int rpc_client_on_fragment_received_event(rdpRpc* rpc)
        UINT32 StubLength;
        wStream* fragment;
        rpcconn_hdr_t* header;
-       freerdp* instance;
-       instance = (freerdp*)rpc->transport->settings->instance;
 
        if (!rpc->client->pdu)
                rpc->client->pdu = rpc_client_receive_pool_take(rpc);
@@ -166,11 +164,11 @@ int rpc_client_on_fragment_received_event(rdpRpc* rpc)
                if ((header->common.call_id == rpc->PipeCallId) && (header->common.pfc_flags & PFC_LAST_FRAG))
                {
                        TerminateEventArgs e;
-                       instance->context->rdp->disconnect = TRUE;
+                       rpc->context->rdp->disconnect = TRUE;
                        rpc->transport->tsg->state = TSG_STATE_TUNNEL_CLOSE_PENDING;
                        EventArgsInit(&e, "freerdp");
                        e.code = 0;
-                       PubSub_OnTerminate(instance->context->pubSub, instance->context, &e);
+                       PubSub_OnTerminate(rpc->context->pubSub, rpc->context, &e);
                }
 
                rpc_client_fragment_pool_return(rpc, fragment);
@@ -237,7 +235,7 @@ int rpc_client_on_read_event(rdpRpc* rpc)
                while (Stream_GetPosition(rpc->client->RecvFrag) < RPC_COMMON_FIELDS_LENGTH)
                {
                        status = rpc_out_read(rpc, Stream_Pointer(rpc->client->RecvFrag),
-                                                                 RPC_COMMON_FIELDS_LENGTH - Stream_GetPosition(rpc->client->RecvFrag));
+                                       RPC_COMMON_FIELDS_LENGTH - Stream_GetPosition(rpc->client->RecvFrag));
 
                        if (status < 0)
                        {
@@ -267,7 +265,7 @@ int rpc_client_on_read_event(rdpRpc* rpc)
                while (Stream_GetPosition(rpc->client->RecvFrag) < header->frag_length)
                {
                        status = rpc_out_read(rpc, Stream_Pointer(rpc->client->RecvFrag),
-                                                                 header->frag_length - Stream_GetPosition(rpc->client->RecvFrag));
+                                       header->frag_length - Stream_GetPosition(rpc->client->RecvFrag));
 
                        if (status < 0)
                        {
@@ -297,6 +295,9 @@ int rpc_client_on_read_event(rdpRpc* rpc)
                        if (rpc_client_on_fragment_received_event(rpc) < 0)
                                return -1;
                }
+
+               if (WaitForSingleObject(Queue_Event(rpc->client->SendQueue), 0) == WAIT_OBJECT_0)
+                       break;
        }
 
        return 0;
@@ -395,6 +396,7 @@ int rpc_send_dequeue_pdu(rdpRpc* rpc)
        RpcClientCall* clientCall;
        rpcconn_common_hdr_t* header;
        RpcInChannel* inChannel;
+
        pdu = (RPC_PDU*) Queue_Dequeue(rpc->client->SendQueue);
 
        if (!pdu)
@@ -479,15 +481,16 @@ RPC_PDU* rpc_recv_peek_pdu(rdpRpc* rpc)
 
 static void* rpc_client_thread(void* arg)
 {
-       rdpRpc* rpc;
+       int fd;
        DWORD status;
        DWORD nCount;
        HANDLE events[3];
        HANDLE ReadEvent;
-       int fd;
-       rpc = (rdpRpc*) arg;
+       rdpRpc* rpc = (rdpRpc*) arg;
+
        fd = BIO_get_fd(rpc->TlsOut->bio, NULL);
        ReadEvent = CreateFileDescriptorEvent(NULL, TRUE, FALSE, fd);
+
        nCount = 0;
        events[nCount++] = rpc->client->StopEvent;
        events[nCount++] = Queue_Event(rpc->client->SendQueue);
@@ -500,7 +503,7 @@ static void* rpc_client_thread(void* arg)
         */
        if (rpc_client_on_read_event(rpc) < 0)
        {
-               WLog_ERR(TAG, "an error occured when treating first packet");
+               WLog_ERR(TAG, "an error occurred when treating first packet");
                goto out;
        }
 
@@ -624,17 +627,18 @@ int rpc_client_stop(rdpRpc* rpc)
                rpc->client->Thread = NULL;
        }
 
-       return rpc_client_free(rpc);
+       return 0;
 }
 
 int rpc_client_free(rdpRpc* rpc)
 {
-       RpcClient* client;
-       client = rpc->client;
+       RpcClient* client = rpc->client;
 
        if (!client)
                return 0;
 
+       rpc_client_stop(rpc);
+
        if (client->SendQueue)
                Queue_Free(client->SendQueue);
 
@@ -669,5 +673,7 @@ int rpc_client_free(rdpRpc* rpc)
                CloseHandle(client->Thread);
 
        free(client);
+       rpc->client = NULL;
+
        return 0;
 }
index 923f2a3..cd585ac 100644 (file)
@@ -1549,9 +1549,9 @@ void tsg_free(rdpTsg* tsg)
 {
        if (tsg)
        {
+               rpc_free(tsg->rpc);
                free(tsg->Hostname);
                free(tsg->MachineName);
-               rpc_free(tsg->rpc);
                free(tsg);
        }
 }
index abcbfe1..f2f61e2 100644 (file)
@@ -1108,20 +1108,16 @@ static void* transport_client_thread(void* arg)
        DWORD status;
        DWORD nCount;
        HANDLE handles[8];
-       freerdp* instance;
-       rdpContext* context;
-       rdpTransport* transport;
-       transport = (rdpTransport*) arg;
-       assert(NULL != transport);
-       assert(NULL != transport->settings);
-       instance = (freerdp*) transport->settings->instance;
-       assert(NULL != instance);
-       context = instance->context;
-       assert(NULL != instance->context);
+       rdpTransport* transport = (rdpTransport*) arg;
+       rdpContext* context = transport->context;
+       freerdp* instance = context->instance;
+
        WLog_Print(transport->log, WLOG_DEBUG, "Starting transport thread");
+
        nCount = 0;
        handles[nCount++] = transport->stopEvent;
        handles[nCount++] = transport->connectedEvent;
+
        status = WaitForMultipleObjects(nCount, handles, FALSE, INFINITE);
 
        if (WaitForSingleObject(transport->stopEvent, 0) == WAIT_OBJECT_0)
@@ -1165,7 +1161,8 @@ static void* transport_client_thread(void* arg)
 rdpTransport* transport_new(rdpSettings* settings)
 {
        rdpTransport* transport;
-       transport = (rdpTransport*)calloc(1, sizeof(rdpTransport));
+
+       transport = (rdpTransport*) calloc(1, sizeof(rdpTransport));
 
        if (!transport)
                return NULL;
@@ -1182,6 +1179,8 @@ rdpTransport* transport_new(rdpSettings* settings)
                goto out_free;
 
        transport->settings = settings;
+       transport->context = ((freerdp*) settings->instance)->context;
+
        /* a small 0.1ms delay when transport is blocking. */
        transport->SleepInterval = 100;
        transport->ReceivePool = StreamPool_New(TRUE, BUFFER_SIZE);
@@ -1240,6 +1239,12 @@ void transport_free(rdpTransport* transport)
 
        transport_stop(transport);
 
+       if (transport->tsg)
+       {
+               tsg_free(transport->tsg);
+               transport->tsg = NULL;
+       }
+
        if (transport->ReceiveBuffer)
                Stream_Release(transport->ReceiveBuffer);
 
@@ -1265,12 +1270,6 @@ void transport_free(rdpTransport* transport)
        transport->TcpIn = NULL;
        transport->TcpOut = NULL;
 
-       if (transport->tsg)
-       {
-               tsg_free(transport->tsg);
-               transport->tsg = NULL;
-       }
-
        if (transport->TsgTls)
        {
                tls_free(transport->TsgTls);
index 142d350..3868a8c 100644 (file)
@@ -62,6 +62,7 @@ struct rdp_transport
        rdpTls* TlsIn;
        rdpTls* TlsOut;
        rdpTls* TsgTls;
+       rdpContext* context;
        rdpCredssp* credssp;
        rdpSettings* settings;
        UINT32 SleepInterval;