efl_io_copier: add inactivity_timeout.
authorGustavo Sverzut Barbieri <barbieri@profusion.mobi>
Sat, 22 Oct 2016 12:52:22 +0000 (10:52 -0200)
committerGustavo Sverzut Barbieri <barbieri@profusion.mobi>
Sat, 22 Oct 2016 12:52:22 +0000 (10:52 -0200)
This is handful to error the copier with ETIMEDOUT if there are no
reads or writes in the given amount of time.

Since copiers are usable to download data or handle network clients,
it's easy to set a timeout and disconnect, let's say UDP clients that
are gone.

src/examples/ecore/efl_io_copier_example.c
src/examples/ecore/efl_net_server_example.c
src/lib/ecore/efl_io_copier.c
src/lib/ecore/efl_io_copier.eo

index d6b2907..cedd41b 100644 (file)
@@ -322,6 +322,8 @@ static const Ecore_Getopt options = {
                              "If set will limit buffer size to this limit of bytes. If used alongside with --line-delimiter and that delimiter was not found but bffer limit was reached, the line event will be triggered without the delimiter at the end."),
     ECORE_GETOPT_STORE_ULONG('c', "read-chunk-size",
                              "If set will change the base chunk size used while reading."),
+    ECORE_GETOPT_STORE_DOUBLE('t', "inactivity-timeout",
+                              "If greater than zero, specifies the number of seconds without any reads or writes that the copier will be timed out."),
     ECORE_GETOPT_VERSION('V', "version"),
     ECORE_GETOPT_COPYRIGHT('C', "copyright"),
     ECORE_GETOPT_LICENSE('L', "license"),
@@ -359,11 +361,13 @@ main(int argc, char **argv)
    char *line_delimiter = NULL;
    unsigned long buffer_limit = 0;
    unsigned long read_chunk_size = 0;
+   double timeout = 0.0;
    Eina_Bool quit_option = EINA_FALSE;
    Ecore_Getopt_Value values[] = {
      ECORE_GETOPT_VALUE_STR(line_delimiter),
      ECORE_GETOPT_VALUE_ULONG(buffer_limit),
      ECORE_GETOPT_VALUE_ULONG(read_chunk_size),
+     ECORE_GETOPT_VALUE_DOUBLE(timeout),
 
      /* standard block to provide version, copyright, license and help */
      ECORE_GETOPT_VALUE_BOOL(quit_option), /* -V/--version quits */
@@ -723,6 +727,7 @@ main(int argc, char **argv)
                     efl_io_copier_line_delimiter_set(efl_added, &line_delm_slice), /* optional */
                     efl_io_copier_buffer_limit_set(efl_added, buffer_limit), /* optional, defaults to unlimited */
                     efl_io_copier_read_chunk_size_set(efl_added, read_chunk_size), /* optional, defaults to 4096 */
+                    efl_io_copier_inactivity_timeout_set(efl_added, timeout), /* optional, defaults to 0.0 (disabled) */
                     efl_event_callback_array_add(efl_added, copier_cbs(), NULL) /* recommended, at least EFL_IO_COPIER_EVENT_DONE. */
                     );
    if (!copier)
index d5980c1..2028a42 100644 (file)
@@ -7,6 +7,7 @@
 
 static int retval = EXIT_SUCCESS;
 static Eina_Bool echo = EINA_FALSE;
+static double timeout = 10.0;
 
 /* NOTE: client i/o events are only used as debug, you can omit these */
 
@@ -67,6 +68,15 @@ _echo_copier_error(void *data EINA_UNUSED, const Efl_Event *event)
    Eo *copier = event->object;
    const Eina_Error *perr = event->info;
 
+   if (*perr == ETIMEDOUT)
+     {
+        Eo *client = efl_io_copier_source_get(copier);
+        fprintf(stderr, "INFO: client '%s' timed out, delete it.\n",
+                efl_net_socket_address_remote_get(client));
+        efl_del(copier);
+        return;
+     }
+
    retval = EXIT_FAILURE;
 
    fprintf(stderr, "ERROR: echo copier %p failed %d '%s', close and del.\n",
@@ -146,6 +156,12 @@ _send_copier_done(void *data, const Efl_Event *event)
              efl_net_socket_address_remote_get(client),
              slice.len, EINA_SLICE_STR_PRINT(slice));
 
+   if (d->recv_copier)
+     {
+        /* only start the reader inactivity timeout once the sender is done */
+        efl_io_copier_inactivity_timeout_set(d->recv_copier, efl_io_copier_inactivity_timeout_get(copier));
+     }
+
    fprintf(stderr, "INFO: send copier done, check if should close %p\n", copier);
    _send_recv_done(d, copier);
 }
@@ -161,6 +177,14 @@ _send_copier_error(void *data, const Efl_Event *event)
    uint64_t offset;
    Eina_Slice slice;
 
+   if (*perr == ETIMEDOUT)
+     {
+        fprintf(stderr, "INFO: client '%s' timed out send, delete it.\n",
+                efl_net_socket_address_remote_get(client));
+        efl_io_closer_close(copier); /* forces client to be closed, thus closes the recv copier as an effect */
+        return;
+     }
+
    retval = EXIT_FAILURE;
 
    offset = efl_io_buffer_position_read_get(buffer);
@@ -235,12 +259,20 @@ static void
 _recv_copier_error(void *data, const Efl_Event *event)
 {
    Eo *copier = event->object;
-   Eo *buffer = efl_io_copier_source_get(copier);
-   Eo *client = efl_io_copier_destination_get(copier);
+   Eo *buffer = efl_io_copier_destination_get(copier);
+   Eo *client = efl_io_copier_source_get(copier);
    const Eina_Error *perr = event->info;
    Send_Recv_Data *d = data;
    Eina_Slice slice;
 
+   if (*perr == ETIMEDOUT)
+     {
+        fprintf(stderr, "INFO: client '%s' timed out recv, delete it.\n",
+                efl_net_socket_address_remote_get(client));
+        efl_io_closer_close(copier); /* forces client to be closed, thus closes the send copier as an effect */
+        return;
+     }
+
    retval = EXIT_FAILURE;
 
    if (!efl_io_buffer_slice_get(buffer, &slice))
@@ -303,6 +335,7 @@ _server_client_add(void *data EINA_UNUSED, const Efl_Event *event)
         Eo *echo_copier = efl_add(EFL_IO_COPIER_CLASS, efl_parent_get(client),
                                   efl_io_copier_source_set(efl_added, client),
                                   efl_io_copier_destination_set(efl_added, client),
+                                  efl_io_copier_inactivity_timeout_set(efl_added, timeout),
                                   efl_event_callback_array_add(efl_added, echo_copier_cbs(), client),
                                   efl_io_closer_close_on_destructor_set(efl_added, EINA_TRUE) /* we want to auto-close as we have a single copier */
                                   );
@@ -345,6 +378,7 @@ _server_client_add(void *data EINA_UNUSED, const Efl_Event *event)
         d->send_copier = efl_add(EFL_IO_COPIER_CLASS, efl_parent_get(client),
                                  efl_io_copier_source_set(efl_added, send_buffer),
                                  efl_io_copier_destination_set(efl_added, client),
+                                 efl_io_copier_inactivity_timeout_set(efl_added, timeout),
                                  efl_event_callback_array_add(efl_added, send_copier_cbs(), d),
                                  efl_io_closer_close_on_destructor_set(efl_added, EINA_FALSE) /* we must wait both copiers to finish before we close! */
                                  );
@@ -361,6 +395,7 @@ _server_client_add(void *data EINA_UNUSED, const Efl_Event *event)
         d->recv_copier = efl_add(EFL_IO_COPIER_CLASS, efl_parent_get(client),
                                  efl_io_copier_source_set(efl_added, client),
                                  efl_io_copier_destination_set(efl_added, recv_buffer),
+                                 efl_io_copier_inactivity_timeout_set(efl_added, 0.0), /* we'll only set an inactivity timeout once the sender is done */
                                  efl_event_callback_array_add(efl_added, recv_copier_cbs(), d),
                                  efl_io_closer_close_on_destructor_set(efl_added, EINA_FALSE) /* we must wait both copiers to finish before we close! */
                                  );
@@ -433,6 +468,9 @@ static const Ecore_Getopt options = {
                             "If true, excess clients will be immediately rejected."),
     ECORE_GETOPT_STORE_BOOL(0, "ipv6-only",
                             "If true (default), only IPv6 clients will be allowed for a server if an IPv6 was used, otherwise IPv4 clients will be automatically converted into IPv6 and handled transparently."),
+    ECORE_GETOPT_STORE_DOUBLE('t', "inactivity-timeout",
+                              "The timeout in seconds to disconnect a client. The timeout is restarted for each client when there is some activity. It's particularly useful for UDP where there is no disconnection event."),
+
     ECORE_GETOPT_VERSION('V', "version"),
     ECORE_GETOPT_COPYRIGHT('C', "copyright"),
     ECORE_GETOPT_LICENSE('L', "license"),
@@ -463,6 +501,7 @@ main(int argc, char **argv)
      ECORE_GETOPT_VALUE_UINT(clients_limit),
      ECORE_GETOPT_VALUE_BOOL(clients_reject_excess),
      ECORE_GETOPT_VALUE_BOOL(ipv6_only),
+     ECORE_GETOPT_VALUE_DOUBLE(timeout),
 
      /* standard block to provide version, copyright, license and help */
      ECORE_GETOPT_VALUE_BOOL(quit_option), /* -V/--version quits */
index 5a1fd79..80b1b62 100644 (file)
@@ -12,6 +12,7 @@ typedef struct _Efl_Io_Copier_Data
 {
    Efl_Io_Reader *source;
    Efl_Io_Writer *destination;
+   Efl_Future *inactivity_timer;
    Efl_Future *job;
    Eina_Binbuf *buf;
    uint8_t *read_chunk; /* TODO: method to grow Eina_Binbuf so we can expand it and read directly to that */
@@ -21,6 +22,7 @@ typedef struct _Efl_Io_Copier_Data
    struct {
       uint64_t read, written, total;
    } progress;
+   double inactivity_timeout;
    Eina_Bool closed;
    Eina_Bool done;
    Eina_Bool close_on_exec;
@@ -68,10 +70,32 @@ static void _efl_io_copier_read(Eo *o, Efl_Io_Copier_Data *pd);
   while (0)
 
 static void
+_efl_io_copier_inactivity_timeout_cb(void *data, const Efl_Event *ev EINA_UNUSED)
+{
+   Eo *o = data;
+   Eina_Error err = ETIMEDOUT;
+   efl_event_callback_call(o, EFL_IO_COPIER_EVENT_ERROR, &err);
+}
+
+static void
+_efl_io_copier_inactivity_timeout_reschedule(Eo *o, Efl_Io_Copier_Data *pd)
+{
+   if (pd->inactivity_timer) efl_future_cancel(pd->inactivity_timer);
+   if (pd->inactivity_timeout <= 0.0) return;
+
+   efl_future_use(&pd->inactivity_timer, efl_loop_timeout(efl_loop_get(o), pd->inactivity_timeout, o));
+   efl_future_then(pd->inactivity_timer, _efl_io_copier_inactivity_timeout_cb, NULL, NULL, o);
+   efl_future_link(o, pd->inactivity_timer);
+}
+
+static void
 _efl_io_copier_job(void *data, const Efl_Event *ev EINA_UNUSED)
 {
    Eo *o = data;
    Efl_Io_Copier_Data *pd = efl_data_scope_get(o, MY_CLASS);
+   uint64_t old_read = pd->progress.read;
+   uint64_t old_written = pd->progress.written;
+   uint64_t old_total = pd->progress.total;
 
    _COPIER_DBG(o, pd);
    efl_ref(o);
@@ -82,7 +106,13 @@ _efl_io_copier_job(void *data, const Efl_Event *ev EINA_UNUSED)
    if (pd->destination && efl_io_writer_can_write_get(pd->destination))
      _efl_io_copier_write(o, pd);
 
-   efl_event_callback_call(o, EFL_IO_COPIER_EVENT_PROGRESS, NULL);
+   if ((old_read != pd->progress.read) ||
+       (old_written != pd->progress.written) ||
+       (old_total != pd->progress.total))
+     {
+        efl_event_callback_call(o, EFL_IO_COPIER_EVENT_PROGRESS, NULL);
+        _efl_io_copier_inactivity_timeout_reschedule(o, pd);
+     }
 
    if (!pd->source || efl_io_reader_eos_get(pd->source))
      {
@@ -90,6 +120,7 @@ _efl_io_copier_job(void *data, const Efl_Event *ev EINA_UNUSED)
             ((!pd->destination) || (eina_binbuf_length_get(pd->buf) == 0)))
           {
              pd->done = EINA_TRUE;
+             if (pd->inactivity_timer) efl_future_cancel(pd->inactivity_timer);
              efl_event_callback_call(o, EFL_IO_COPIER_EVENT_DONE, NULL);
           }
      }
@@ -430,12 +461,14 @@ _efl_io_copier_destination_closed(void *data, const Efl_Event *event EINA_UNUSED
         if (!pd->done)
           {
              pd->done = EINA_TRUE;
+             if (pd->inactivity_timer) efl_future_cancel(pd->inactivity_timer);
              efl_event_callback_call(o, EFL_IO_COPIER_EVENT_DONE, NULL);
           }
      }
    else
      {
         Eina_Error err = EBADF;
+        if (pd->inactivity_timer) efl_future_cancel(pd->inactivity_timer);
         efl_event_callback_call(o, EFL_IO_COPIER_EVENT_ERROR, &err);
      }
 }
@@ -574,6 +607,9 @@ _efl_io_copier_efl_io_closer_close(Eo *o, Efl_Io_Copier_Data *pd)
    if (pd->job)
      efl_future_cancel(pd->job);
 
+   if (pd->inactivity_timer)
+     efl_future_cancel(pd->inactivity_timer);
+
    if (pd->source)
      {
         if (efl_isa(pd->source, EFL_IO_SIZER_MIXIN))
@@ -646,12 +682,26 @@ _efl_io_copier_binbuf_steal(Eo *o EINA_UNUSED, Efl_Io_Copier_Data *pd)
    return ret;
 }
 
+EOLIAN static void
+_efl_io_copier_inactivity_timeout_set(Eo *o, Efl_Io_Copier_Data *pd, double seconds)
+{
+   pd->inactivity_timeout = seconds;
+   _efl_io_copier_inactivity_timeout_reschedule(o, pd);
+}
+
+EOLIAN static double
+_efl_io_copier_inactivity_timeout_get(Eo *o EINA_UNUSED, Efl_Io_Copier_Data *pd)
+{
+   return pd->inactivity_timeout;
+}
+
 EOLIAN static Eo *
 _efl_io_copier_efl_object_constructor(Eo *o, Efl_Io_Copier_Data *pd)
 {
    pd->buf = eina_binbuf_new();
    pd->close_on_exec = EINA_TRUE;
    pd->close_on_destructor = EINA_TRUE;
+   pd->inactivity_timeout = 0.0;
 
    EINA_SAFETY_ON_NULL_RETURN_VAL(pd->buf, NULL);
 
@@ -687,6 +737,9 @@ _efl_io_copier_efl_object_destructor(Eo *o, Efl_Io_Copier_Data *pd)
    if (pd->job)
      efl_future_cancel(pd->job);
 
+   if (pd->inactivity_timer)
+     efl_future_cancel(pd->inactivity_timer);
+
    if (efl_io_closer_close_on_destructor_get(o) &&
        (!efl_io_closer_closed_get(o)))
      efl_io_closer_close(o);
index 28dcf82..2967120 100644 (file)
@@ -88,6 +88,21 @@ class Efl.Io.Copier (Efl.Loop_User, Efl.Io.Closer) {
            }
         }
 
+        @property inactivity_timeout {
+            [[Terminate the copier with ETIMEDOUT if it becomes inactive for some time.
+
+              If the copier cannot do any read or write in the given
+              amount of seconds, then the copier will emit "error"
+              event with ETIMEDOUT value.
+
+              This is specified in seconds and is only active for
+              greater-than zero. Defaults to inactive.
+            ]]
+            values {
+                seconds: double; [[Number inactive seconds to timeout this copier. If zero or less, it will be disabled.]]
+            }
+        }
+
         binbuf_steal {
            [[Steals the internal binbuf and return it to caller.