/* Following fields are used by fd channels. */
CRITICAL_SECTION mutex;
+ int direction; /* 0 means we read from it,
+ * 1 means we write to it.
+ */
+
gboolean running; /* Is reader thread running. FALSE if
* EOF has been reached.
*/
channel->fd,
(guint) channel->data_avail_event,
(guint) channel->space_avail_event);
-
+
+ channel->direction = 0;
channel->buffer = g_malloc (BUFFER_SIZE);
channel->rdp = channel->wrp = 0;
channel->running = TRUE;
return 0;
}
+static unsigned __stdcall
+write_thread (void *parameter)
+{
+ GIOWin32Channel *channel = parameter;
+ guchar *buffer;
+ guint nbytes;
+
+ g_io_channel_ref ((GIOChannel *)channel);
+
+ if (channel->debug)
+ g_print ("write_thread %#x: start fd=%d, data_avail=%#x space_avail=%#x\n",
+ channel->thread_id,
+ channel->fd,
+ (guint) channel->data_avail_event,
+ (guint) channel->space_avail_event);
+
+ channel->direction = 1;
+ channel->buffer = g_malloc (BUFFER_SIZE);
+ channel->rdp = channel->wrp = 0;
+ channel->running = TRUE;
+
+ SetEvent (channel->space_avail_event);
+
+ /* We use the same event objects as for a reader thread, but with
+ * reversed meaning. So, space_avail is used if data is available
+ * for writing, and data_avail is used if space is available in the
+ * write buffer.
+ */
+
+ LOCK (channel->mutex);
+ while (channel->running || channel->rdp != channel->wrp)
+ {
+ if (channel->debug)
+ g_print ("write_thread %#x: rdp=%d, wrp=%d\n",
+ channel->thread_id, channel->rdp, channel->wrp);
+ if (channel->wrp == channel->rdp)
+ {
+ /* Buffer is empty. */
+ if (channel->debug)
+ g_print ("write_thread %#x: resetting space_avail\n",
+ channel->thread_id);
+ ResetEvent (channel->space_avail_event);
+ if (channel->debug)
+ g_print ("write_thread %#x: waiting for data\n",
+ channel->thread_id);
+ channel->revents = G_IO_OUT;
+ SetEvent (channel->data_avail_event);
+ UNLOCK (channel->mutex);
+ WaitForSingleObject (channel->space_avail_event, INFINITE);
+
+ LOCK (channel->mutex);
+ if (channel->rdp == channel->wrp)
+ break;
+
+ if (channel->debug)
+ g_print ("write_thread %#x: rdp=%d, wrp=%d\n",
+ channel->thread_id, channel->rdp, channel->wrp);
+ }
+
+ buffer = channel->buffer + channel->rdp;
+ if (channel->rdp < channel->wrp)
+ nbytes = channel->wrp - channel->rdp;
+ else
+ nbytes = BUFFER_SIZE - channel->rdp;
+
+ if (channel->debug)
+ g_print ("write_thread %#x: calling write() for %d bytes\n",
+ channel->thread_id, nbytes);
+
+ UNLOCK (channel->mutex);
+ nbytes = write (channel->fd, buffer, nbytes);
+ LOCK (channel->mutex);
+
+ if (channel->debug)
+ g_print ("write_thread %#x: write(%i) returned %d, rdp=%d, wrp=%d\n",
+ channel->thread_id, channel->fd, nbytes, channel->rdp, channel->wrp);
+
+ channel->revents = 0;
+ if (nbytes > 0)
+ channel->revents |= G_IO_OUT;
+ else if (nbytes <= 0)
+ channel->revents |= G_IO_ERR;
+
+ channel->rdp = (channel->rdp + nbytes) % BUFFER_SIZE;
+
+ if (nbytes <= 0)
+ break;
+
+ if (channel->debug)
+ g_print ("write_thread: setting data_avail for thread %#x\n",
+ channel->thread_id);
+ SetEvent (channel->data_avail_event);
+ }
+
+ channel->running = FALSE;
+ if (channel->needs_close)
+ {
+ if (channel->debug)
+ g_print ("write_thread %#x: channel fd %d needs closing\n",
+ channel->thread_id, channel->fd);
+ close (channel->fd);
+ channel->fd = -1;
+ }
+
+ UNLOCK (channel->mutex);
+
+ g_io_channel_unref ((GIOChannel *)channel);
+
+ return 0;
+}
+
static void
create_thread (GIOWin32Channel *channel,
GIOCondition condition,
return (*bytes_read > 0) ? G_IO_STATUS_NORMAL : G_IO_STATUS_EOF;
}
+
+static GIOStatus
+buffer_write (GIOWin32Channel *channel,
+ const guchar *dest,
+ gsize count,
+ gsize *bytes_written,
+ GError **err)
+{
+ guint nbytes;
+ guint left = count;
+
+ LOCK (channel->mutex);
+ if (channel->debug)
+ g_print ("buffer_write: writing to thread %#x %d bytes, rdp=%d, wrp=%d\n",
+ channel->thread_id, count, channel->rdp, channel->wrp);
+
+ if ((channel->wrp + 1) % BUFFER_SIZE == channel->rdp)
+ {
+ /* Buffer is full */
+ if (channel->debug)
+ g_print ("buffer_write: tid %#x: resetting data_avail\n",
+ channel->thread_id);
+ ResetEvent (channel->data_avail_event);
+ if (channel->debug)
+ g_print ("buffer_write: tid %#x: waiting for space\n",
+ channel->thread_id);
+ UNLOCK (channel->mutex);
+ WaitForSingleObject (channel->data_avail_event, INFINITE);
+ LOCK (channel->mutex);
+ if (channel->debug)
+ g_print ("buffer_write: tid %#x: rdp=%d, wrp=%d\n",
+ channel->thread_id, channel->rdp, channel->wrp);
+ }
+
+ nbytes = MIN ((channel->rdp + BUFFER_SIZE - channel->wrp - 1) % BUFFER_SIZE,
+ BUFFER_SIZE - channel->wrp);
+
+ UNLOCK (channel->mutex);
+ nbytes = MIN (left, nbytes);
+ if (channel->debug)
+ g_print ("buffer_write: tid %#x: writing %d bytes\n",
+ channel->thread_id, nbytes);
+ memcpy (channel->buffer + channel->wrp, dest, nbytes);
+ dest += nbytes;
+ left -= nbytes;
+ LOCK (channel->mutex);
+
+ channel->wrp = (channel->wrp + nbytes) % BUFFER_SIZE;
+ if (channel->debug)
+ g_print ("buffer_write: tid %#x: rdp=%d, wrp=%d, setting space_avail\n",
+ channel->thread_id, channel->rdp, channel->wrp);
+ SetEvent (channel->space_avail_event);
+
+ if ((channel->wrp + 1) % BUFFER_SIZE == channel->rdp)
+ {
+ /* Buffer is full */
+ if (channel->debug)
+ g_print ("buffer_write: tid %#x: resetting data_avail\n",
+ channel->thread_id);
+ ResetEvent (channel->data_avail_event);
+ }
+
+ UNLOCK (channel->mutex);
+
+ /* We have no way to indicate any errors form the actual
+ * write() call in the writer thread. Should we have?
+ */
+ *bytes_written = count - left;
+ return (*bytes_written > 0) ? G_IO_STATUS_NORMAL : G_IO_STATUS_EOF;
+}
+
+
static gboolean
g_io_win32_prepare (GSource *source,
gint *timeout)
condition_to_string (channel->revents));
LOCK (channel->mutex);
- if (channel->running && channel->wrp == channel->rdp)
+ if (channel->running)
{
- if (channel->debug)
- g_print ("g_io_win32_prepare: for thread %#x, setting channel->revents = 0\n",
- channel->thread_id);
- channel->revents = 0;
+ if (channel->direction == 0 && channel->wrp == channel->rdp)
+ {
+ if (channel->debug)
+ g_print ("g_io_win32_prepare: for thread %#x, setting channel->revents = 0\n",
+ channel->thread_id);
+ channel->revents = 0;
+ }
}
+ else
+ {
+ if (channel->direction == 1
+ && (channel->wrp + 1) % BUFFER_SIZE == channel->rdp)
+ {
+ if (channel->debug)
+ g_print ("g_io_win32_prepare: for thread %#x, setting channel->revents = %i\n",
+ channel->thread_id, 0);
+ channel->revents = 0;
+ }
+ }
UNLOCK (channel->mutex);
break;
{
GIOWin32Channel *win32_channel = (GIOWin32Channel *)channel;
gint result;
+
+ if (win32_channel->thread_id)
+ {
+ return buffer_write (win32_channel, buf, count, bytes_written, err);
+ }
result = write (win32_channel->fd, buf, count);
if (win32_channel->debug)
win32_channel->thread_id, win32_channel->fd);
win32_channel->running = FALSE;
win32_channel->needs_close = TRUE;
- SetEvent (win32_channel->data_avail_event);
+ if (win32_channel->direction == 0)
+ SetEvent (win32_channel->data_avail_event);
+ else
+ SetEvent (win32_channel->space_avail_event);
}
else
{
LOCK (win32_channel->mutex);
if (win32_channel->thread_id == 0)
- create_thread (win32_channel, condition, read_thread);
+ {
+ if (condition & G_IO_IN)
+ create_thread (win32_channel, condition, read_thread);
+ else if (condition & G_IO_OUT)
+ create_thread (win32_channel, condition, write_thread);
+ }
g_source_add_poll (source, &watch->pollfd);
UNLOCK (win32_channel->mutex);
fd->fd = (gint) win32_channel->data_avail_event;
if (win32_channel->thread_id == 0 && (condition & G_IO_IN))
- create_thread (win32_channel, condition, read_thread);
+ {
+ if (condition & G_IO_IN)
+ create_thread (win32_channel, condition, read_thread);
+ else if (condition & G_IO_OUT)
+ create_thread (win32_channel, condition, write_thread);
+ }
break;
case G_IO_WIN32_SOCKET: