libfreerdp-core: add non-blocking read.
authorVic Lee <llyzs@163.com>
Mon, 1 Aug 2011 04:43:53 +0000 (12:43 +0800)
committerVic Lee <llyzs@163.com>
Mon, 1 Aug 2011 06:45:28 +0000 (14:45 +0800)
libfreerdp-core/activation.c
libfreerdp-core/connection.c
libfreerdp-core/freerdp.c
libfreerdp-core/rdp.c
libfreerdp-core/rdp.h
libfreerdp-core/transport.c
libfreerdp-core/transport.h

index 95cf9d9..5121dff 100644 (file)
@@ -170,7 +170,6 @@ void rdp_send_client_font_list_pdu(rdpRdp* rdp, uint16 flags)
 void rdp_recv_server_font_map_pdu(rdpRdp* rdp, STREAM* s, rdpSettings* settings)
 {
        rdp->activated = True;
-       rdp->transport->tcp->set_blocking_mode(rdp->transport->tcp, False);
 }
 
 void rdp_recv_deactivate_all(rdpRdp* rdp, STREAM* s)
@@ -184,6 +183,5 @@ void rdp_recv_deactivate_all(rdpRdp* rdp, STREAM* s)
        stream_seek(s, lengthSourceDescriptor); /* sourceDescriptor (should be 0x00) */
 
        rdp->activated = False;
-       rdp->transport->tcp->set_blocking_mode(rdp->transport->tcp, True);
 }
 
index 5b027af..e893407 100644 (file)
@@ -95,6 +95,7 @@ boolean rdp_client_connect(rdpRdp* rdp)
        rdp->licensed = True;
 
        rdp_client_activate(rdp);
+       rdp_set_blocking_mode(rdp, False);
 
        return True;
 }
index ce75572..d46c9be 100644 (file)
@@ -54,10 +54,13 @@ boolean freerdp_get_fds(freerdp* instance, void** rfds, int* rcount, void** wfds
 boolean freerdp_check_fds(freerdp* instance)
 {
        rdpRdp* rdp;
+       int status;
 
        rdp = (rdpRdp*) instance->rdp;
 
-       rdp_recv(rdp);
+       status = rdp_check_fds(rdp);
+       if (status < 0)
+               return False;
 
        return True;
 }
index 3729ef9..4948f96 100644 (file)
@@ -344,13 +344,13 @@ void rdp_read_data_pdu(rdpRdp* rdp, STREAM* s)
 }
 
 /**
- * Receive an RDP packet.\n
+ * Process an RDP packet.\n
  * @param rdp RDP module
+ * @param s stream
  */
 
-void rdp_recv(rdpRdp* rdp)
+void rdp_process_pdu(rdpRdp* rdp, STREAM* s)
 {
-       STREAM* s;
        int length;
        uint16 pduType;
        uint16 pduLength;
@@ -359,9 +359,6 @@ void rdp_recv(rdpRdp* rdp)
        uint16 sec_flags;
        enum DomainMCSPDU MCSPDU;
 
-       s = transport_recv_stream_init(rdp->transport, 4096);
-       transport_read(rdp->transport, s);
-
        MCSPDU = DomainMCSPDU_SendDataIndication;
        mcs_read_domain_mcspdu_header(s, &MCSPDU, &length);
 
@@ -422,6 +419,47 @@ void rdp_recv(rdpRdp* rdp)
 }
 
 /**
+ * Receive an RDP packet.\n
+ * @param rdp RDP module
+ */
+
+void rdp_recv(rdpRdp* rdp)
+{
+       STREAM* s;
+
+       s = transport_recv_stream_init(rdp->transport, 4096);
+       transport_read(rdp->transport, s);
+
+       rdp_process_pdu(rdp, s);
+}
+
+static int rdp_recv_callback(rdpTransport* transport, STREAM* s, void* extra)
+{
+       rdpRdp* rdp = (rdpRdp*) extra;
+
+       rdp_process_pdu(rdp, s);
+
+       return 1;
+}
+
+/**
+ * Set non-blocking mode information.
+ * @param rdp RDP module
+ * @param blocking blocking mode
+ */
+void rdp_set_blocking_mode(rdpRdp* rdp, boolean blocking)
+{
+       rdp->transport->recv_callback = rdp_recv_callback;
+       rdp->transport->recv_extra = rdp;
+       transport_set_blocking_mode(rdp->transport, blocking);
+}
+
+int rdp_check_fds(rdpRdp* rdp)
+{
+       return transport_check_fds(rdp->transport);
+}
+
+/**
  * Instantiate new RDP module.
  * @return new RDP module
  */
index 162c40a..52b0b38 100644 (file)
@@ -236,6 +236,9 @@ void rdp_send_data_pdu(rdpRdp* rdp, STREAM* s, uint16 type, uint16 channel_id);
 void rdp_send(rdpRdp* rdp, STREAM* s);
 void rdp_recv(rdpRdp* rdp);
 
+void rdp_set_blocking_mode(rdpRdp* rdp, boolean blocking);
+int rdp_check_fds(rdpRdp* rdp);
+
 rdpRdp* rdp_new();
 void rdp_free(rdpRdp* rdp);
 
index 45cebd7..2a03a4b 100644 (file)
@@ -135,6 +135,21 @@ int transport_read(rdpTransport* transport, STREAM* s)
        return status;
 }
 
+static int transport_read_nonblocking(rdpTransport* transport)
+{
+       int status;
+
+       stream_check_size(transport->recv_buffer, 4096);
+       status = transport_read(transport, transport->recv_buffer);
+
+       if (status <= 0)
+               return status;
+
+       stream_seek(transport->recv_buffer, status);
+
+       return status;
+}
+
 int transport_write(rdpTransport* transport, STREAM* s)
 {
        int status = -1;
@@ -149,6 +164,63 @@ int transport_write(rdpTransport* transport, STREAM* s)
 
 int transport_check_fds(rdpTransport* transport)
 {
+       int pos;
+       int status;
+       uint8 header;
+       uint16 length;
+       STREAM* received;
+
+       status = transport_read_nonblocking(transport);
+       if (status <= 0)
+               return status;
+
+       while ((pos = stream_get_pos(transport->recv_buffer)) > 0)
+       {
+               /* Ensure the TPKT or Fast Path header is available. */
+               if (pos <= 4)
+                       return 0;
+
+               stream_set_pos(transport->recv_buffer, 0);
+               stream_peek_uint8(transport->recv_buffer, header);
+               if (header == 0x03) /* TPKT */
+                       length = tpkt_read_header(transport->recv_buffer);
+               else /* TODO: Fast Path */
+                       length = 0;
+
+               if (length == 0)
+               {
+                       printf("transport_check_fds: protocol error, not a TPKT header (%d).\n", header);
+                       return -1;
+               }
+
+               if (pos < length)
+               {
+                       stream_set_pos(transport->recv_buffer, pos);
+                       return 0; /* Packet is not yet completely received. */
+               }
+
+               /*
+                * A complete packet has been received. In case there are trailing data
+                * for the next packet, we copy it to the new receive buffer.
+                */
+               received = transport->recv_buffer;
+               transport->recv_buffer = stream_new(BUFFER_SIZE);
+
+               if (pos > length)
+               {
+                       stream_set_pos(received, length);
+                       stream_check_size(transport->recv_buffer, pos - length);
+                       stream_copy(transport->recv_buffer, received, pos - length);
+               }
+
+               stream_set_pos(received, 0);
+               status = transport->recv_callback(transport, received, transport->recv_extra);
+               stream_free(received);
+
+               if (status < 0)
+                       return status;
+       }
+
        return 0;
 }
 
@@ -158,6 +230,12 @@ void transport_init(rdpTransport* transport)
        transport->state = TRANSPORT_STATE_NEGO;
 }
 
+boolean transport_set_blocking_mode(rdpTransport* transport, boolean blocking)
+{
+       transport->blocking = blocking;
+       return transport->tcp->set_blocking_mode(transport->tcp, blocking);
+}
+
 rdpTransport* transport_new(rdpSettings* settings)
 {
        rdpTransport* transport;
@@ -179,6 +257,8 @@ rdpTransport* transport_new(rdpSettings* settings)
                /* buffers for blocking read/write */
                transport->recv_stream = stream_new(BUFFER_SIZE);
                transport->send_stream = stream_new(BUFFER_SIZE);
+
+               transport->blocking = True;
        }
 
        return transport;
index 23c97cb..0f591ab 100644 (file)
@@ -63,6 +63,7 @@ struct rdp_transport
        void* recv_extra;
        STREAM* recv_buffer;
        TransportRecv recv_callback;
+       boolean blocking;
 };
 
 STREAM* transport_recv_stream_init(rdpTransport* transport, int size);
@@ -75,6 +76,7 @@ boolean transport_connect_nla(rdpTransport* transport);
 int transport_read(rdpTransport* transport, STREAM* s);
 int transport_write(rdpTransport* transport, STREAM* s);
 int transport_check_fds(rdpTransport* transport);
+boolean transport_set_blocking_mode(rdpTransport* transport, boolean blocking);
 rdpTransport* transport_new(rdpSettings* settings);
 void transport_free(rdpTransport* transport);