From d8f8be192e812adef80659cf9d1577f91ce39a5f Mon Sep 17 00:00:00 2001 From: =?utf8?q?Marc-Andr=C3=A9=20Moreau?= Date: Fri, 25 Jan 2013 12:08:00 -0500 Subject: [PATCH] xfreerdp: add asynchronous dequeuing of graphical messages --- client/X11/xfreerdp.c | 14 ++++++++ include/freerdp/freerdp.h | 3 ++ include/freerdp/update.h | 1 - libfreerdp/core/freerdp.c | 17 +++++++++- libfreerdp/core/message.c | 83 ++++++++++++++++++++++++++--------------------- libfreerdp/core/message.h | 3 +- libfreerdp/core/update.c | 9 +++-- libfreerdp/core/update.h | 2 ++ 8 files changed, 88 insertions(+), 44 deletions(-) diff --git a/client/X11/xfreerdp.c b/client/X11/xfreerdp.c index 3d20bc6..4bae7d3 100644 --- a/client/X11/xfreerdp.c +++ b/client/X11/xfreerdp.c @@ -1025,6 +1025,8 @@ int xfreerdp_run(freerdp* instance) void* wfds[32]; fd_set rfds_set; fd_set wfds_set; + int fd_msg_event; + HANDLE msg_event; int select_status; rdpChannels* channels; struct timeval timeout; @@ -1051,6 +1053,12 @@ int xfreerdp_run(freerdp* instance) xfi = ((xfContext*) instance->context)->xfi; channels = instance->context->channels; + fd_msg_event = -1; + msg_event = freerdp_get_message_queue_event_handle(instance); + + if (msg_event) + fd_msg_event = GetEventFileDescriptor(msg_event); + while (!xfi->disconnect && !freerdp_shall_disconnect(instance)) { rcount = 0; @@ -1075,6 +1083,9 @@ int xfreerdp_run(freerdp* instance) break; } + if (fd_msg_event > 0) + rfds[rcount++] = (void*) (long) fd_msg_event; + max_fds = 0; FD_ZERO(&rfds_set); FD_ZERO(&wfds_set); @@ -1130,6 +1141,9 @@ int xfreerdp_run(freerdp* instance) break; } xf_process_channel_event(channels, instance); + + if (fd_msg_event > 0) + freerdp_process_messages(instance); } FILE *fin = fopen("/tmp/tsmf.tid", "rt"); diff --git a/include/freerdp/freerdp.h b/include/freerdp/freerdp.h index 4b7e395..22343fe 100644 --- a/include/freerdp/freerdp.h +++ b/include/freerdp/freerdp.h @@ -195,6 +195,9 @@ FREERDP_API BOOL freerdp_disconnect(freerdp* instance); FREERDP_API BOOL freerdp_get_fds(freerdp* instance, void** rfds, int* rcount, void** wfds, int* wcount); FREERDP_API BOOL freerdp_check_fds(freerdp* instance); +FREERDP_API int freerdp_process_messages(freerdp* instance); +FREERDP_API HANDLE freerdp_get_message_queue_event_handle(freerdp* instance); + FREERDP_API UINT32 freerdp_error_info(freerdp* instance); FREERDP_API void freerdp_get_version(int* major, int* minor, int* revision); diff --git a/include/freerdp/update.h b/include/freerdp/update.h index 2eb7f2c..c33803f 100644 --- a/include/freerdp/update.h +++ b/include/freerdp/update.h @@ -203,7 +203,6 @@ struct rdp_update SURFACE_BITS_COMMAND surface_bits_command; SURFACE_FRAME_MARKER surface_frame_marker; - HANDLE thread; BOOL asynchronous; rdpMessage* message; wMessageQueue* queue; diff --git a/libfreerdp/core/freerdp.c b/libfreerdp/core/freerdp.c index d00db59..12ce810 100644 --- a/libfreerdp/core/freerdp.c +++ b/libfreerdp/core/freerdp.c @@ -35,7 +35,7 @@ #include #include -/* connectErrorCode is 'extern' in errorcodes.h. See comment there.*/ +/* connectErrorCode is 'extern' in error.h. See comment there.*/ /** Creates a new connection based on the settings found in the "instance" parameter * It will use the callbacks registered on the structure to process the pre/post connect operations @@ -184,6 +184,21 @@ BOOL freerdp_check_fds(freerdp* instance) return TRUE; } +HANDLE freerdp_get_message_queue_event_handle(freerdp* instance) +{ + HANDLE event = NULL; + + if (instance->update->queue) + event = MessageQueue_Event(instance->update->queue); + + return event; +} + +int freerdp_process_messages(freerdp* instance) +{ + return update_process_messages(instance->update); +} + static int freerdp_send_channel_data(freerdp* instance, int channel_id, BYTE* data, int size) { return rdp_send_channel_data(instance->context->rdp, channel_id, data, size); diff --git a/libfreerdp/core/message.c b/libfreerdp/core/message.c index be59f14..144793d 100644 --- a/libfreerdp/core/message.c +++ b/libfreerdp/core/message.c @@ -1483,65 +1483,74 @@ int message_process_pointer_update_class(rdpMessage* update, wMessage* msg, int return status; } -void* message_update_thread(void* arg) +int message_process_class(rdpMessage* update, wMessage* msg, int msgClass, int msgType) +{ + int status = 0; + + switch (msgClass) + { + case Update_Class: + status = message_process_update_class(update, msg, msgType); + break; + + case PrimaryUpdate_Class: + status = message_process_primary_update_class(update, msg, msgType); + break; + + case SecondaryUpdate_Class: + status = message_process_secondary_update_class(update, msg, msgType); + break; + + case AltSecUpdate_Class: + status = message_process_altsec_update_class(update, msg, msgType); + break; + + case WindowUpdate_Class: + status = message_process_window_update_class(update, msg, msgType); + break; + + case PointerUpdate_Class: + status = message_process_pointer_update_class(update, msg, msgType); + break; + + default: + status = -1; + break; + } + + if (status < 0) + printf("Unknown message: class: %d type: %d\n", msgClass, msgType); + + return status; +} + +int message_process_pending_updates(rdpUpdate* update) { int status; int msgClass; int msgType; wMessage message; wMessageQueue* queue; - rdpUpdate* update = (rdpUpdate*) arg; queue = update->queue; - while (MessageQueue_Wait(queue)) + while (1) { status = MessageQueue_Peek(queue, &message, TRUE); if (!status) - continue; + break; if (message.type == WMQ_QUIT) break; - status = -1; msgClass = GetMessageClass(message.type); msgType = GetMessageType(message.type); - printf("Message Class: %d Type: %d\n", msgClass, msgType); - - switch (msgClass) - { - case Update_Class: - status = message_process_update_class(update->message, &message, msgType); - break; - - case PrimaryUpdate_Class: - status = message_process_primary_update_class(update->message, &message, msgType); - break; - - case SecondaryUpdate_Class: - status = message_process_secondary_update_class(update->message, &message, msgType); - break; - - case AltSecUpdate_Class: - status = message_process_altsec_update_class(update->message, &message, msgType); - break; - - case WindowUpdate_Class: - status = message_process_window_update_class(update->message, &message, msgType); - break; - - case PointerUpdate_Class: - status = message_process_pointer_update_class(update->message, &message, msgType); - break; - } - - if (status < 0) - printf("Unknown message: class: %d type: %d\n", msgClass, msgType); + status = message_process_class(update->message, &message, msgClass, msgType); } - return NULL; + return 0; } rdpMessage* message_new() diff --git a/libfreerdp/core/message.h b/libfreerdp/core/message.h index 346a4a5..14f4282 100644 --- a/libfreerdp/core/message.h +++ b/libfreerdp/core/message.h @@ -225,10 +225,9 @@ struct rdp_message rdpUpdate* update; }; +int message_process_pending_updates(rdpUpdate* update); void message_register_interface(rdpMessage* message, rdpUpdate* update); -void* message_update_thread(void* arg); - rdpMessage* message_new(); void message_free(rdpMessage* message); diff --git a/libfreerdp/core/update.c b/libfreerdp/core/update.c index a85b9f2..cf845b3 100644 --- a/libfreerdp/core/update.c +++ b/libfreerdp/core/update.c @@ -720,6 +720,11 @@ void update_register_client_callbacks(rdpUpdate* update) update->SurfaceFrameAcknowledge = update_send_frame_acknowledge; } +int update_process_messages(rdpUpdate* update) +{ + return message_process_pending_updates(update); +} + rdpUpdate* update_new(rdpRdp* rdp) { rdpUpdate* update; @@ -759,13 +764,12 @@ rdpUpdate* update_new(rdpRdp* rdp) update->SuppressOutput = update_send_suppress_output; update->initialState = TRUE; - //update->asynchronous = TRUE; + update->asynchronous = TRUE; if (update->asynchronous) { update->queue = MessageQueue_New(); update->message = message_new(); - update->thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE) message_update_thread, update, 0, NULL); } } @@ -792,7 +796,6 @@ void update_free(rdpUpdate* update) if (update->asynchronous) { - CloseHandle(update->thread); message_free(update->message); MessageQueue_Free(update->queue); } diff --git a/libfreerdp/core/update.h b/libfreerdp/core/update.h index 95a6ee5..541490d 100644 --- a/libfreerdp/core/update.h +++ b/libfreerdp/core/update.h @@ -61,4 +61,6 @@ BOOL update_read_suppress_output(rdpUpdate* update, STREAM* s); void update_register_server_callbacks(rdpUpdate* update); void update_register_client_callbacks(rdpUpdate* update); +int update_process_messages(rdpUpdate* update); + #endif /* __UPDATE_H */ -- 2.7.4