Implement watches for GIOChannels for write file descriptors on Win32
authorMarcus Brinkmann <mb@g10code.de>
Sat, 11 Mar 2006 21:03:00 +0000 (21:03 +0000)
committerTor Lillqvist <tml@src.gnome.org>
Sat, 11 Mar 2006 21:03:00 +0000 (21:03 +0000)
2006-03-02  Marcus Brinkmann  <mb@g10code.de>

Implement watches for GIOChannels for write file descriptors on
Win32 (#333098).

* glib/giowin32.c (GIOWin32Channel): Add a new direction field.
(read_thread): Initialize direction.
(write_thread): New function.
(buffer_write): New function.
(g_io_win32_prepare): Handle the G_IO_WIN32_FILE_DESC case for the
write direction.
(g_io_win32_fd_write): Call buffer_write() if there is a writer
thread.
(g_io_win32_fd_close): Set space_avail_event for writer threads.
(g_io_win32_fd_create_watch): Create the writer thread if
condition is G_IO_OUT.
(g_io_channel_win32_make_pollfd): Likewise here.

ChangeLog
ChangeLog.pre-2-10
ChangeLog.pre-2-12
glib/giowin32.c

index f0c1ea3..b8bdfc3 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,21 @@
+2006-03-02  Marcus Brinkmann  <mb@g10code.de>
+
+       Implement watches for GIOChannels for write file descriptors on
+       Win32 (#333098).
+       
+       * glib/giowin32.c (GIOWin32Channel): Add a new direction field.
+       (read_thread): Initialize direction.
+       (write_thread): New function.
+       (buffer_write): New function.
+       (g_io_win32_prepare): Handle the G_IO_WIN32_FILE_DESC case for the
+       write direction.
+       (g_io_win32_fd_write): Call buffer_write() if there is a writer
+       thread.
+       (g_io_win32_fd_close): Set space_avail_event for writer threads.
+       (g_io_win32_fd_create_watch): Create the writer thread if
+       condition is G_IO_OUT.
+       (g_io_channel_win32_make_pollfd): Likewise here.
+
 2006-03-09  Matthias Clasen  <mclasen@redhat.com>
 
        * Makefile.am: Add ChangeLog.pre-2.8 to EXTRA_DIST.
index f0c1ea3..b8bdfc3 100644 (file)
@@ -1,3 +1,21 @@
+2006-03-02  Marcus Brinkmann  <mb@g10code.de>
+
+       Implement watches for GIOChannels for write file descriptors on
+       Win32 (#333098).
+       
+       * glib/giowin32.c (GIOWin32Channel): Add a new direction field.
+       (read_thread): Initialize direction.
+       (write_thread): New function.
+       (buffer_write): New function.
+       (g_io_win32_prepare): Handle the G_IO_WIN32_FILE_DESC case for the
+       write direction.
+       (g_io_win32_fd_write): Call buffer_write() if there is a writer
+       thread.
+       (g_io_win32_fd_close): Set space_avail_event for writer threads.
+       (g_io_win32_fd_create_watch): Create the writer thread if
+       condition is G_IO_OUT.
+       (g_io_channel_win32_make_pollfd): Likewise here.
+
 2006-03-09  Matthias Clasen  <mclasen@redhat.com>
 
        * Makefile.am: Add ChangeLog.pre-2.8 to EXTRA_DIST.
index f0c1ea3..b8bdfc3 100644 (file)
@@ -1,3 +1,21 @@
+2006-03-02  Marcus Brinkmann  <mb@g10code.de>
+
+       Implement watches for GIOChannels for write file descriptors on
+       Win32 (#333098).
+       
+       * glib/giowin32.c (GIOWin32Channel): Add a new direction field.
+       (read_thread): Initialize direction.
+       (write_thread): New function.
+       (buffer_write): New function.
+       (g_io_win32_prepare): Handle the G_IO_WIN32_FILE_DESC case for the
+       write direction.
+       (g_io_win32_fd_write): Call buffer_write() if there is a writer
+       thread.
+       (g_io_win32_fd_close): Set space_avail_event for writer threads.
+       (g_io_win32_fd_create_watch): Create the writer thread if
+       condition is G_IO_OUT.
+       (g_io_channel_win32_make_pollfd): Likewise here.
+
 2006-03-09  Matthias Clasen  <mclasen@redhat.com>
 
        * Makefile.am: Add ChangeLog.pre-2.8 to EXTRA_DIST.
index ff164a1..7a9b6d7 100644 (file)
@@ -77,6 +77,10 @@ struct _GIOWin32Channel {
   /* Following fields are used by fd channels. */
   CRITICAL_SECTION mutex;
 
+  int direction;               /* 0 means we read from it,
+                                * 1 means we write to it.
+                                */
+
   gboolean running;            /* Is reader thread running. FALSE if
                                 * EOF has been reached.
                                 */
@@ -391,7 +395,8 @@ read_thread (void *parameter)
             channel->fd,
             (guint) channel->data_avail_event,
             (guint) channel->space_avail_event);
-  
+
+  channel->direction = 0;
   channel->buffer = g_malloc (BUFFER_SIZE);
   channel->rdp = channel->wrp = 0;
   channel->running = TRUE;
@@ -486,6 +491,117 @@ read_thread (void *parameter)
   return 0;
 }
 
+static unsigned __stdcall
+write_thread (void *parameter)
+{
+  GIOWin32Channel *channel = parameter;
+  guchar *buffer;
+  guint nbytes;
+
+  g_io_channel_ref ((GIOChannel *)channel);
+
+  if (channel->debug)
+    g_print ("write_thread %#x: start fd=%d, data_avail=%#x space_avail=%#x\n",
+            channel->thread_id,
+            channel->fd,
+            (guint) channel->data_avail_event,
+            (guint) channel->space_avail_event);
+  
+  channel->direction = 1;
+  channel->buffer = g_malloc (BUFFER_SIZE);
+  channel->rdp = channel->wrp = 0;
+  channel->running = TRUE;
+
+  SetEvent (channel->space_avail_event);
+
+  /* We use the same event objects as for a reader thread, but with
+   * reversed meaning. So, space_avail is used if data is available
+   * for writing, and data_avail is used if space is available in the
+   * write buffer.
+   */
+
+  LOCK (channel->mutex);
+  while (channel->running || channel->rdp != channel->wrp)
+    {
+      if (channel->debug)
+       g_print ("write_thread %#x: rdp=%d, wrp=%d\n",
+                channel->thread_id, channel->rdp, channel->wrp);
+      if (channel->wrp == channel->rdp)
+       {
+         /* Buffer is empty. */
+         if (channel->debug)
+           g_print ("write_thread %#x: resetting space_avail\n",
+                    channel->thread_id);
+         ResetEvent (channel->space_avail_event);
+         if (channel->debug)
+           g_print ("write_thread %#x: waiting for data\n",
+                    channel->thread_id);
+         channel->revents = G_IO_OUT;
+         SetEvent (channel->data_avail_event);
+         UNLOCK (channel->mutex);
+         WaitForSingleObject (channel->space_avail_event, INFINITE);
+
+         LOCK (channel->mutex);
+         if (channel->rdp == channel->wrp)
+           break;
+
+         if (channel->debug)
+           g_print ("write_thread %#x: rdp=%d, wrp=%d\n",
+                    channel->thread_id, channel->rdp, channel->wrp);
+       }
+      
+      buffer = channel->buffer + channel->rdp;
+      if (channel->rdp < channel->wrp)
+       nbytes = channel->wrp - channel->rdp;
+      else
+       nbytes = BUFFER_SIZE - channel->rdp;
+
+      if (channel->debug)
+       g_print ("write_thread %#x: calling write() for %d bytes\n",
+                channel->thread_id, nbytes);
+
+      UNLOCK (channel->mutex);
+      nbytes = write (channel->fd, buffer, nbytes);
+      LOCK (channel->mutex);
+
+      if (channel->debug)
+       g_print ("write_thread %#x: write(%i) returned %d, rdp=%d, wrp=%d\n",
+                channel->thread_id, channel->fd, nbytes, channel->rdp, channel->wrp);
+
+      channel->revents = 0;
+      if (nbytes > 0)
+       channel->revents |= G_IO_OUT;
+      else if (nbytes <= 0)
+       channel->revents |= G_IO_ERR;
+
+      channel->rdp = (channel->rdp + nbytes) % BUFFER_SIZE;
+
+      if (nbytes <= 0)
+       break;
+
+      if (channel->debug)
+       g_print ("write_thread: setting data_avail for thread %#x\n",
+                channel->thread_id);
+      SetEvent (channel->data_avail_event);
+    }
+  
+  channel->running = FALSE;
+  if (channel->needs_close)
+    {
+      if (channel->debug)
+       g_print ("write_thread %#x: channel fd %d needs closing\n",
+                channel->thread_id, channel->fd);
+      close (channel->fd);
+      channel->fd = -1;
+    }
+
+  UNLOCK (channel->mutex);
+  
+  g_io_channel_unref ((GIOChannel *)channel);
+  
+  return 0;
+}
+
 static void
 create_thread (GIOWin32Channel     *channel,
               GIOCondition         condition,
@@ -575,6 +691,78 @@ buffer_read (GIOWin32Channel *channel,
   return (*bytes_read > 0) ? G_IO_STATUS_NORMAL : G_IO_STATUS_EOF;
 }
 
+
+static GIOStatus
+buffer_write (GIOWin32Channel *channel,
+             const guchar    *dest,
+             gsize            count,
+             gsize           *bytes_written,
+             GError         **err)
+{
+  guint nbytes;
+  guint left = count;
+  
+  LOCK (channel->mutex);
+  if (channel->debug)
+    g_print ("buffer_write: writing to thread %#x %d bytes, rdp=%d, wrp=%d\n",
+            channel->thread_id, count, channel->rdp, channel->wrp);
+  
+  if ((channel->wrp + 1) % BUFFER_SIZE == channel->rdp)
+    {
+      /* Buffer is full */
+      if (channel->debug)
+       g_print ("buffer_write: tid %#x: resetting data_avail\n",
+                channel->thread_id);
+      ResetEvent (channel->data_avail_event);
+      if (channel->debug)
+       g_print ("buffer_write: tid %#x: waiting for space\n",
+                channel->thread_id);
+      UNLOCK (channel->mutex);
+      WaitForSingleObject (channel->data_avail_event, INFINITE);
+      LOCK (channel->mutex);
+      if (channel->debug)
+       g_print ("buffer_write: tid %#x: rdp=%d, wrp=%d\n",
+                channel->thread_id, channel->rdp, channel->wrp);
+    }
+   
+  nbytes = MIN ((channel->rdp + BUFFER_SIZE - channel->wrp - 1) % BUFFER_SIZE,
+               BUFFER_SIZE - channel->wrp);
+
+  UNLOCK (channel->mutex);
+  nbytes = MIN (left, nbytes);
+  if (channel->debug)
+    g_print ("buffer_write: tid %#x: writing %d bytes\n",
+            channel->thread_id, nbytes);
+  memcpy (channel->buffer + channel->wrp, dest, nbytes);
+  dest += nbytes;
+  left -= nbytes;
+  LOCK (channel->mutex);
+
+  channel->wrp = (channel->wrp + nbytes) % BUFFER_SIZE;
+  if (channel->debug)
+    g_print ("buffer_write: tid %#x: rdp=%d, wrp=%d, setting space_avail\n",
+            channel->thread_id, channel->rdp, channel->wrp);
+  SetEvent (channel->space_avail_event);
+
+  if ((channel->wrp + 1) % BUFFER_SIZE == channel->rdp)
+    {
+      /* Buffer is full */
+      if (channel->debug)
+       g_print ("buffer_write: tid %#x: resetting data_avail\n",
+                channel->thread_id);
+      ResetEvent (channel->data_avail_event);
+    }
+
+  UNLOCK (channel->mutex);
+  
+  /* We have no way to indicate any errors form the actual
+   * write() call in the writer thread. Should we have?
+   */
+  *bytes_written = count - left;
+  return (*bytes_written > 0) ? G_IO_STATUS_NORMAL : G_IO_STATUS_EOF;
+}
+
+
 static gboolean
 g_io_win32_prepare (GSource *source,
                    gint    *timeout)
@@ -601,13 +789,27 @@ g_io_win32_prepare (GSource *source,
                 condition_to_string (channel->revents));
       
       LOCK (channel->mutex);
-      if (channel->running && channel->wrp == channel->rdp)
+      if (channel->running)
        {
-         if (channel->debug)
-           g_print ("g_io_win32_prepare: for thread %#x, setting channel->revents = 0\n",
-                    channel->thread_id);
-         channel->revents = 0;
+         if (channel->direction == 0 && channel->wrp == channel->rdp)
+           {
+             if (channel->debug)
+               g_print ("g_io_win32_prepare: for thread %#x, setting channel->revents = 0\n",
+                        channel->thread_id);
+             channel->revents = 0;
+           }
        }
+      else
+       {
+         if (channel->direction == 1
+             && (channel->wrp + 1) % BUFFER_SIZE == channel->rdp)
+           {
+             if (channel->debug)
+               g_print ("g_io_win32_prepare: for thread %#x, setting channel->revents = %i\n",
+                        channel->thread_id, 0);
+             channel->revents = 0;
+           }
+       }         
       UNLOCK (channel->mutex);
       break;
 
@@ -964,6 +1166,11 @@ g_io_win32_fd_write (GIOChannel  *channel,
 {
   GIOWin32Channel *win32_channel = (GIOWin32Channel *)channel;
   gint result;
+
+  if (win32_channel->thread_id)
+    {
+      return buffer_write (win32_channel, buf, count, bytes_written, err);
+    }
   
   result = write (win32_channel->fd, buf, count);
   if (win32_channel->debug)
@@ -1061,7 +1268,10 @@ g_io_win32_fd_close (GIOChannel *channel,
                 win32_channel->thread_id, win32_channel->fd);
       win32_channel->running = FALSE;
       win32_channel->needs_close = TRUE;
-      SetEvent (win32_channel->data_avail_event);
+      if (win32_channel->direction == 0)
+       SetEvent (win32_channel->data_avail_event);
+      else
+       SetEvent (win32_channel->space_avail_event);
     }
   else
     {
@@ -1105,7 +1315,12 @@ g_io_win32_fd_create_watch (GIOChannel    *channel,
 
   LOCK (win32_channel->mutex);
   if (win32_channel->thread_id == 0)
-    create_thread (win32_channel, condition, read_thread);
+    {
+      if (condition & G_IO_IN)
+       create_thread (win32_channel, condition, read_thread);
+      else if (condition & G_IO_OUT)
+       create_thread (win32_channel, condition, write_thread);
+    }
 
   g_source_add_poll (source, &watch->pollfd);
   UNLOCK (win32_channel->mutex);
@@ -1720,7 +1935,12 @@ g_io_channel_win32_make_pollfd (GIOChannel   *channel,
       fd->fd = (gint) win32_channel->data_avail_event;
 
       if (win32_channel->thread_id == 0 && (condition & G_IO_IN))
-       create_thread (win32_channel, condition, read_thread);
+       {
+         if (condition & G_IO_IN)
+           create_thread (win32_channel, condition, read_thread);
+         else if (condition & G_IO_OUT)
+           create_thread (win32_channel, condition, write_thread);
+       }
       break;
 
     case G_IO_WIN32_SOCKET: