From e2f377ae11de27514f2a0197c49e099e9f2115f8 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Marc-Andr=C3=A9=20Moreau?= Date: Mon, 15 Dec 2014 09:42:04 -0500 Subject: [PATCH] libfreerdp-core: fix TSG thread shutdown and input freeze problem --- client/X11/xf_client.c | 44 +++++++++++++++++++----------------- libfreerdp/core/gateway/rpc.c | 6 ++++- libfreerdp/core/gateway/rpc.h | 1 + libfreerdp/core/gateway/rpc_client.c | 32 +++++++++++++++----------- libfreerdp/core/gateway/tsg.c | 2 +- libfreerdp/core/transport.c | 33 +++++++++++++-------------- libfreerdp/core/transport.h | 1 + 7 files changed, 66 insertions(+), 53 deletions(-) diff --git a/client/X11/xf_client.c b/client/X11/xf_client.c index 71534ab..41c6625 100644 --- a/client/X11/xf_client.c +++ b/client/X11/xf_client.c @@ -1275,27 +1275,37 @@ void xf_window_free(xfContext* xfc) void* xf_input_thread(void *arg) { - xfContext* xfc; DWORD status; - HANDLE event[2]; + DWORD nCount; + HANDLE events[2]; XEvent xevent; - wMessageQueue *queue; wMessage msg; + wMessageQueue *queue; int pending_status = 1; int process_status = 1; - freerdp *instance = (freerdp*) arg; - assert(NULL != instance); - xfc = (xfContext *) instance->context; - assert(NULL != xfc); + freerdp* instance = (freerdp*) arg; + xfContext* xfc = (xfContext*) instance->context; + queue = freerdp_get_message_queue(instance, FREERDP_INPUT_MESSAGE_QUEUE); - event[0] = MessageQueue_Event(queue); - event[1] = CreateFileDescriptorEvent(NULL, FALSE, FALSE, xfc->xfds); + + nCount = 0; + events[nCount++] = MessageQueue_Event(queue); + events[nCount++] = CreateFileDescriptorEvent(NULL, FALSE, FALSE, xfc->xfds); while(1) { - status = WaitForMultipleObjects(2, event, FALSE, INFINITE); + status = WaitForMultipleObjects(nCount, events, FALSE, INFINITE); - if(status == WAIT_OBJECT_0 + 1) + if (WaitForSingleObject(events[0], 0) == WAIT_OBJECT_0) + { + if (MessageQueue_Peek(queue, &msg, FALSE)) + { + if (msg.id == WMQ_QUIT) + break; + } + } + + if (WaitForSingleObject(events[1], 0) == WAIT_OBJECT_0) { do { @@ -1324,18 +1334,10 @@ void* xf_input_thread(void *arg) break; } - else if(status == WAIT_OBJECT_0) - { - if(MessageQueue_Peek(queue, &msg, FALSE)) - { - if(msg.id == WMQ_QUIT) - break; - } - } - else - break; } + CloseHandle(events[1]); + MessageQueue_PostQuit(queue, 0); ExitThread(0); return NULL; diff --git a/libfreerdp/core/gateway/rpc.c b/libfreerdp/core/gateway/rpc.c index 039fc5d..3df1c73 100644 --- a/libfreerdp/core/gateway/rpc.c +++ b/libfreerdp/core/gateway/rpc.c @@ -320,6 +320,7 @@ BOOL rpc_get_stub_data_info(rdpRpc* rpc, BYTE* buffer, UINT32* offset, UINT32* l int rpc_out_read(rdpRpc* rpc, BYTE* data, int length) { int status; + status = BIO_read(rpc->TlsOut->bio, data, length); if (status > 0) @@ -558,8 +559,11 @@ rdpRpc* rpc_new(rdpTransport* transport) return NULL; rpc->State = RPC_CLIENT_STATE_INITIAL; + rpc->transport = transport; rpc->settings = transport->settings; + rpc->context = transport->context; + rpc->SendSeqNum = 0; rpc->ntlm = ntlm_new(); @@ -637,7 +641,7 @@ void rpc_free(rdpRpc* rpc) { if (rpc) { - rpc_client_stop(rpc); + rpc_client_free(rpc); if (rpc->ntlm) { diff --git a/libfreerdp/core/gateway/rpc.h b/libfreerdp/core/gateway/rpc.h index f201aae..0ceb841 100644 --- a/libfreerdp/core/gateway/rpc.h +++ b/libfreerdp/core/gateway/rpc.h @@ -733,6 +733,7 @@ struct rdp_rpc rdpNtlmHttp* NtlmHttpIn; rdpNtlmHttp* NtlmHttpOut; + rdpContext* context; rdpSettings* settings; rdpTransport* transport; diff --git a/libfreerdp/core/gateway/rpc_client.c b/libfreerdp/core/gateway/rpc_client.c index 1bec237..883180d 100644 --- a/libfreerdp/core/gateway/rpc_client.c +++ b/libfreerdp/core/gateway/rpc_client.c @@ -97,8 +97,6 @@ int rpc_client_on_fragment_received_event(rdpRpc* rpc) UINT32 StubLength; wStream* fragment; rpcconn_hdr_t* header; - freerdp* instance; - instance = (freerdp*)rpc->transport->settings->instance; if (!rpc->client->pdu) rpc->client->pdu = rpc_client_receive_pool_take(rpc); @@ -166,11 +164,11 @@ int rpc_client_on_fragment_received_event(rdpRpc* rpc) if ((header->common.call_id == rpc->PipeCallId) && (header->common.pfc_flags & PFC_LAST_FRAG)) { TerminateEventArgs e; - instance->context->rdp->disconnect = TRUE; + rpc->context->rdp->disconnect = TRUE; rpc->transport->tsg->state = TSG_STATE_TUNNEL_CLOSE_PENDING; EventArgsInit(&e, "freerdp"); e.code = 0; - PubSub_OnTerminate(instance->context->pubSub, instance->context, &e); + PubSub_OnTerminate(rpc->context->pubSub, rpc->context, &e); } rpc_client_fragment_pool_return(rpc, fragment); @@ -237,7 +235,7 @@ int rpc_client_on_read_event(rdpRpc* rpc) while (Stream_GetPosition(rpc->client->RecvFrag) < RPC_COMMON_FIELDS_LENGTH) { status = rpc_out_read(rpc, Stream_Pointer(rpc->client->RecvFrag), - RPC_COMMON_FIELDS_LENGTH - Stream_GetPosition(rpc->client->RecvFrag)); + RPC_COMMON_FIELDS_LENGTH - Stream_GetPosition(rpc->client->RecvFrag)); if (status < 0) { @@ -267,7 +265,7 @@ int rpc_client_on_read_event(rdpRpc* rpc) while (Stream_GetPosition(rpc->client->RecvFrag) < header->frag_length) { status = rpc_out_read(rpc, Stream_Pointer(rpc->client->RecvFrag), - header->frag_length - Stream_GetPosition(rpc->client->RecvFrag)); + header->frag_length - Stream_GetPosition(rpc->client->RecvFrag)); if (status < 0) { @@ -297,6 +295,9 @@ int rpc_client_on_read_event(rdpRpc* rpc) if (rpc_client_on_fragment_received_event(rpc) < 0) return -1; } + + if (WaitForSingleObject(Queue_Event(rpc->client->SendQueue), 0) == WAIT_OBJECT_0) + break; } return 0; @@ -395,6 +396,7 @@ int rpc_send_dequeue_pdu(rdpRpc* rpc) RpcClientCall* clientCall; rpcconn_common_hdr_t* header; RpcInChannel* inChannel; + pdu = (RPC_PDU*) Queue_Dequeue(rpc->client->SendQueue); if (!pdu) @@ -479,15 +481,16 @@ RPC_PDU* rpc_recv_peek_pdu(rdpRpc* rpc) static void* rpc_client_thread(void* arg) { - rdpRpc* rpc; + int fd; DWORD status; DWORD nCount; HANDLE events[3]; HANDLE ReadEvent; - int fd; - rpc = (rdpRpc*) arg; + rdpRpc* rpc = (rdpRpc*) arg; + fd = BIO_get_fd(rpc->TlsOut->bio, NULL); ReadEvent = CreateFileDescriptorEvent(NULL, TRUE, FALSE, fd); + nCount = 0; events[nCount++] = rpc->client->StopEvent; events[nCount++] = Queue_Event(rpc->client->SendQueue); @@ -500,7 +503,7 @@ static void* rpc_client_thread(void* arg) */ if (rpc_client_on_read_event(rpc) < 0) { - WLog_ERR(TAG, "an error occured when treating first packet"); + WLog_ERR(TAG, "an error occurred when treating first packet"); goto out; } @@ -624,17 +627,18 @@ int rpc_client_stop(rdpRpc* rpc) rpc->client->Thread = NULL; } - return rpc_client_free(rpc); + return 0; } int rpc_client_free(rdpRpc* rpc) { - RpcClient* client; - client = rpc->client; + RpcClient* client = rpc->client; if (!client) return 0; + rpc_client_stop(rpc); + if (client->SendQueue) Queue_Free(client->SendQueue); @@ -669,5 +673,7 @@ int rpc_client_free(rdpRpc* rpc) CloseHandle(client->Thread); free(client); + rpc->client = NULL; + return 0; } diff --git a/libfreerdp/core/gateway/tsg.c b/libfreerdp/core/gateway/tsg.c index 923f2a3..cd585ac 100644 --- a/libfreerdp/core/gateway/tsg.c +++ b/libfreerdp/core/gateway/tsg.c @@ -1549,9 +1549,9 @@ void tsg_free(rdpTsg* tsg) { if (tsg) { + rpc_free(tsg->rpc); free(tsg->Hostname); free(tsg->MachineName); - rpc_free(tsg->rpc); free(tsg); } } diff --git a/libfreerdp/core/transport.c b/libfreerdp/core/transport.c index abcbfe1..f2f61e2 100644 --- a/libfreerdp/core/transport.c +++ b/libfreerdp/core/transport.c @@ -1108,20 +1108,16 @@ static void* transport_client_thread(void* arg) DWORD status; DWORD nCount; HANDLE handles[8]; - freerdp* instance; - rdpContext* context; - rdpTransport* transport; - transport = (rdpTransport*) arg; - assert(NULL != transport); - assert(NULL != transport->settings); - instance = (freerdp*) transport->settings->instance; - assert(NULL != instance); - context = instance->context; - assert(NULL != instance->context); + rdpTransport* transport = (rdpTransport*) arg; + rdpContext* context = transport->context; + freerdp* instance = context->instance; + WLog_Print(transport->log, WLOG_DEBUG, "Starting transport thread"); + nCount = 0; handles[nCount++] = transport->stopEvent; handles[nCount++] = transport->connectedEvent; + status = WaitForMultipleObjects(nCount, handles, FALSE, INFINITE); if (WaitForSingleObject(transport->stopEvent, 0) == WAIT_OBJECT_0) @@ -1165,7 +1161,8 @@ static void* transport_client_thread(void* arg) rdpTransport* transport_new(rdpSettings* settings) { rdpTransport* transport; - transport = (rdpTransport*)calloc(1, sizeof(rdpTransport)); + + transport = (rdpTransport*) calloc(1, sizeof(rdpTransport)); if (!transport) return NULL; @@ -1182,6 +1179,8 @@ rdpTransport* transport_new(rdpSettings* settings) goto out_free; transport->settings = settings; + transport->context = ((freerdp*) settings->instance)->context; + /* a small 0.1ms delay when transport is blocking. */ transport->SleepInterval = 100; transport->ReceivePool = StreamPool_New(TRUE, BUFFER_SIZE); @@ -1240,6 +1239,12 @@ void transport_free(rdpTransport* transport) transport_stop(transport); + if (transport->tsg) + { + tsg_free(transport->tsg); + transport->tsg = NULL; + } + if (transport->ReceiveBuffer) Stream_Release(transport->ReceiveBuffer); @@ -1265,12 +1270,6 @@ void transport_free(rdpTransport* transport) transport->TcpIn = NULL; transport->TcpOut = NULL; - if (transport->tsg) - { - tsg_free(transport->tsg); - transport->tsg = NULL; - } - if (transport->TsgTls) { tls_free(transport->TsgTls); diff --git a/libfreerdp/core/transport.h b/libfreerdp/core/transport.h index 142d350..3868a8c 100644 --- a/libfreerdp/core/transport.h +++ b/libfreerdp/core/transport.h @@ -62,6 +62,7 @@ struct rdp_transport rdpTls* TlsIn; rdpTls* TlsOut; rdpTls* TsgTls; + rdpContext* context; rdpCredssp* credssp; rdpSettings* settings; UINT32 SleepInterval; -- 2.7.4