This is now a working example using libevent and curl_multi_socket() for really
authorDaniel Stenberg <daniel@haxx.se>
Thu, 3 Aug 2006 22:57:04 +0000 (22:57 +0000)
committerDaniel Stenberg <daniel@haxx.se>
Thu, 3 Aug 2006 22:57:04 +0000 (22:57 +0000)
fast treatment of many simultaneous transfers

hiper/hipev.c

index e99f9086ac7af4327ada11ed615f98f62e302294..936dfeebbd36caf3a2bf618f75adbd99908f66ae 100644 (file)
@@ -10,8 +10,7 @@
  * Connect N connections. Z are idle, and X are active. Transfer as fast as
  * possible.
  *
- * Run for a specific amount of time (10 secs for now). Output detailed timing
- * information.
+ * Output detailed timing information.
  *
  * Uses libevent.
  *
    when using asynch supported libcurl. */
 #define IDLE_TIME 10
 
-struct ourfdset {
-  /* __fds_bits is what the Linux glibc headers use when they declare the
-     fd_set struct so by using this we can actually avoid the typecase for the
-     FD_SET() macro usage but it would hardly be portable */
-  char __fds_bits[NCONNECTIONS/8];
-};
-#define FD2_ZERO(x) memset(x, 0, sizeof(struct ourfdset))
-
-typedef struct ourfdset fd2_set;
-
 struct globalinfo {
   size_t dlcounter;
 };
@@ -73,6 +62,8 @@ struct connection {
   char error[CURL_ERROR_SIZE];
 };
 
+/* this is the struct associated with each file descriptor libcurl tells us
+   it is dealing with */
 struct fdinfo {
   /* create a link list of fdinfo structs */
   struct fdinfo *next;
@@ -91,15 +82,64 @@ static struct fdinfo *allsocks;
 
 static int running_handles;
 
+/* we have the timerevent global so that when the final socket-based event is
+   done, we can remove the timerevent as well */
+static struct event timerevent;
+
 /* called from libevent on action on a particular socket ("event") */
 static void eventcallback(int fd, short type, void *userp)
 {
   struct fdinfo *fdp = (struct fdinfo *)userp;
+  CURLMcode rc;
+
+  fprintf(stderr, "EVENT callback type %d\n", type);
+
+  /* tell libcurl to deal with the transfer associated with this socket */
+  do {
+    rc = curl_multi_socket(fdp->multi, fd, fdp->running_handles);
+  } while (rc == CURLM_CALL_MULTI_PERFORM);
+
+  if(rc) {
+    fprintf(stderr, "curl_multi_socket() returned %d\n", (int)rc);
+  }
+
+  fprintf(stderr, "running_handles: %d\n", *fdp->running_handles);
+  if(!*fdp->running_handles) {
+    /* last transfer is complete, kill pending timeout */
+    fprintf(stderr, "last transfer done, kill timeout\n");
+    if(evtimer_pending(&timerevent, NULL))
+      evtimer_del(&timerevent);
+  }
+}
+
+/* called from libevent when our timer event expires */
+static void timercallback(int fd, short type, void *userp)
+{
+  (void)fd; /* not used for this */
+  (void)type; /* ignored in here */
+  CURLM *multi_handle = (CURLM *)userp;
+  long timeout_ms;
+  struct timeval timeout;
+  int running_handles;
+  CURLMcode rc;
 
-  fprintf(stderr, "EVENT callback\n");
+  fprintf(stderr, "EVENT timeout\n");
 
   /* tell libcurl to deal with the transfer associated with this socket */
-  curl_multi_socket(fdp->multi, fd, fdp->running_handles);
+  do {
+    rc = curl_multi_socket(multi_handle, CURL_SOCKET_TIMEOUT,
+                           &running_handles);
+  } while (rc == CURLM_CALL_MULTI_PERFORM);
+
+  if(running_handles) {
+    /* Get the current timeout value from libcurl and set a new timeout */
+    curl_multi_timeout(multi_handle, &timeout_ms);
+
+    /* convert ms to timeval */
+    timeout.tv_sec = timeout_ms/1000;
+    timeout.tv_usec = (timeout_ms%1000)*1000;
+    evtimer_add(&timerevent, &timeout);
+  }
 }
 
 static void remsock(struct fdinfo *f)
@@ -108,6 +148,9 @@ static void remsock(struct fdinfo *f)
     /* did not find socket to remove! */
     return;
 
+  if(f->evset)
+    event_del(&f->ev);
+
   if(f->prev)
     f->prev->next = f->next;
   if(f->next)
@@ -128,16 +171,22 @@ static void setsock(struct fdinfo *fdp, curl_socket_t s, CURL *easy,
     /* first remove the existing event if the old setup was used */
     event_del(&fdp->ev);
 
-  /* now use and add the current socket setup */
+  /* now use and add the current socket setup to libevent. The EV_PERSIST is
+     the key here as otherwise libevent will automatically remove the event
+     when it occurs the first time */
   event_set(&fdp->ev, fdp->sockfd,
             (action&CURL_POLL_IN?EV_READ:0)|
-            (action&CURL_POLL_OUT?EV_WRITE:0),
+            (action&CURL_POLL_OUT?EV_WRITE:0)| EV_PERSIST,
             eventcallback, fdp);
 
   fdp->evset=1;
 
   fprintf(stderr, "event_add() for fd %d\n", s);
-  event_add(&fdp->ev, NULL); /* no timeout */
+
+  /* We don't use any socket-specific timeout but intead we use a single
+     global one. This is (mostly) because libcurl doesn't expose any
+     particular socket- based timeout value. */
+  event_add(&fdp->ev, NULL);
 }
 
 static void addsock(curl_socket_t s, CURL *easy, int action, CURLM *multi)
@@ -162,55 +211,17 @@ static void addsock(curl_socket_t s, CURL *easy, int action, CURLM *multi)
   curl_multi_assign(multi, s, fdp);
 }
 
-static void fdinfo2fdset(fd2_set *fdread, fd2_set *fdwrite, int *maxfd)
-{
-  struct fdinfo *fdp = allsocks;
-  int writable=0;
-
-  FD2_ZERO(fdread);
-  FD2_ZERO(fdwrite);
-
-  *maxfd = 0;
-
-#if 0
-  printf("Wait for: ");
-#endif
-
-  while(fdp) {
-    if(fdp->action & CURL_POLL_IN) {
-      FD_SET(fdp->sockfd, (fd_set *)fdread);
-    }
-    if(fdp->action & CURL_POLL_OUT) {
-      FD_SET(fdp->sockfd, (fd_set *)fdwrite);
-      writable++;
-    }
-
-#if 0
-    printf("%d (%s%s) ",
-           fdp->sockfd,
-           (fdp->action & CURL_POLL_IN)?"r":"",
-           (fdp->action & CURL_POLL_OUT)?"w":"");
-#endif
-
-    if(fdp->sockfd > *maxfd)
-      *maxfd = fdp->sockfd;
-
-    fdp = fdp->next;
-  }
-#if 0
-  if(writable)
-    printf("Check for %d writable sockets\n", writable);
-#endif
-}
-
 /* on port 8999 we run a fork enabled sws that supports 'idle' and 'stream' */
 #define PORT "8999"
 
-#define HOST "192.168.1.13"
+#define HOST "127.0.0.1"
 
 #define URL_IDLE   "http://" HOST ":" PORT "/1000"
+#if 1
 #define URL_ACTIVE "http://" HOST ":" PORT "/1001"
-
+#else
+#define URL_ACTIVE "http://localhost/"
+#endif
 
 static int socket_callback(CURL *easy,      /* easy handle */
                            curl_socket_t s, /* socket */
@@ -219,15 +230,24 @@ static int socket_callback(CURL *easy,      /* easy handle */
                            void *socketp)   /* socket pointer */
 {
   struct fdinfo *fdp = (struct fdinfo *)socketp;
+  char *whatstr[]={
+    "none",
+    "IN",
+    "OUT",
+    "INOUT",
+    "REMOVE"};
 
-  fprintf(stderr, "socket %d easy %p what %d\n", s, easy, what);
+  fprintf(stderr, "socket %d easy %p what %s\n", s, easy,
+          whatstr[what]);
 
   if(what == CURL_POLL_REMOVE)
     remsock(fdp);
   else {
     if(!fdp) {
       /* not previously known, add it and set association */
-      printf("Add info for socket %d (%d)\n", s, what);
+      printf("Add info for socket %d %s%s\n", s,
+             what&CURL_POLL_IN?"READ":"",
+             what&CURL_POLL_OUT?"WRITE":"" );
       addsock(s, easy, what, cbp);
     }
     else {
@@ -250,135 +270,19 @@ writecallback(void *ptr, size_t size, size_t nmemb, void *data)
   c->dlcounter += realsize;
   c->global->dlcounter += realsize;
 
-#if 1
   printf("%02d: %d, total %d\n",
          c->id, c->dlcounter, c->global->dlcounter);
-#endif
-  return realsize;
-}
-
-/* return the diff between two timevals, in us */
-static long tvdiff(struct timeval *newer, struct timeval *older)
-{
-  return (newer->tv_sec-older->tv_sec)*1000000+
-    (newer->tv_usec-older->tv_usec);
-}
-
-
-/* store the start time of the program in this variable */
-static struct timeval timer;
 
-static void timer_start(void)
-{
-  /* capture the time of the start moment */
-  gettimeofday(&timer, NULL);
-}
-
-static struct timeval cont; /* at this moment we continued */
-
-int still_running; /* keep number of running handles */
-
-struct conncount {
-  long time_us;
-  long laps;
-  long maxtime;
-};
-
-static struct timeval timerpause;
-static void timer_pause(void)
-{
-  /* capture the time of the pause moment */
-  gettimeofday(&timerpause, NULL);
-
-  /* If we have a previous continue (all times except the first), we can now
-     store the time for a whole "lap" */
-  if(cont.tv_sec) {
-    long lap;
-
-    lap = tvdiff(&timerpause, &cont);
-  }
-}
-
-static long paused; /* amount of us we have been pausing */
-
-static void timer_continue(void)
-{
-  /* Capture the time of the restored operation moment, now calculate how long
-     time we were paused and added that to the 'paused' variable.
-   */
-  gettimeofday(&cont, NULL);
-
-  paused += tvdiff(&cont, &timerpause);
-}
-
-static long total; /* amount of us from start to stop */
-static void timer_total(void)
-{
-  struct timeval stop;
-  /* Capture the time of the operation stopped moment, now calculate how long
-     time we were running and how much of that pausing.
-   */
-  gettimeofday(&stop, NULL);
-
-  total = tvdiff(&stop, &timer);
+  return realsize;
 }
 
 struct globalinfo info;
 struct connection *conns;
 
-long selects;
-long timeouts;
-
-long multi_socket;
-long performalive;
-long performselect;
-long topselect;
-
 int num_total;
 int num_idle;
 int num_active;
 
-static void report(void)
-{
-  int i;
-  long active = total - paused;
-  long numdl = 0;
-
-  for(i=0; i < num_total; i++) {
-    if(conns[i].dlcounter)
-      numdl++;
-  }
-
-  printf("Summary from %d simultanoues transfers (%d active)\n",
-         num_total, num_active);
-  printf("%d out of %d connections provided data\n", numdl, num_total);
-
-  printf("Total time: %ldus paused: %ldus curl_multi_socket(): %ldus\n",
-         total, paused, active);
-
-  printf("%d calls to select() "
-         "Average time: %dus\n",
-         selects, paused/selects);
-  printf(" Average number of readable connections per select() return: %d\n",
-         performselect/selects);
-
-  printf(" Max number of readable connections for a single select() "
-         "return: %d\n",
-         topselect);
-
-  printf("%ld calls to multi_socket(), "
-         "Average time: %ldus\n",
-         multi_socket, active/multi_socket);
-
-  printf("%ld select() timeouts\n", timeouts);
-
-  printf("Downloaded %ld bytes in %ld bytes/sec, %ld usec/byte\n",
-         info.dlcounter,
-         info.dlcounter/(total/1000000),
-         total/info.dlcounter);
-
-}
-
 int main(int argc, char **argv)
 {
   CURLM *multi_handle;
@@ -387,11 +291,11 @@ int main(int argc, char **argv)
   CURLMcode mcode = CURLM_OK;
   int rc;
   int i;
-  fd2_set fdsizecheck;
   int selectmaxamount;
   struct fdinfo *fdp;
   char act;
   long timeout_ms;
+  struct timeval timeout;
 
   memset(&info, 0, sizeof(struct globalinfo));
 
@@ -462,45 +366,25 @@ int main(int argc, char **argv)
   curl_multi_setopt(multi_handle, CURLMOPT_SOCKETFUNCTION, socket_callback);
   curl_multi_setopt(multi_handle, CURLMOPT_SOCKETDATA, multi_handle);
 
-  /* we start the action by calling *socket() right away */
+  /* we start the action by calling *socket_all() */
   while(CURLM_CALL_MULTI_PERFORM == curl_multi_socket_all(multi_handle,
                                                           &running_handles));
 
-  /* event_dispatch() isn't good enough for us, since we need a global timeout
-     to occur after a given time of inactivity
-   */
-
-  /* get the timeout value from libcurl */
+  /* Since we need a global timeout to occur after a given time of inactivity,
+     we add a single timeout-event. Get the timeout value from libcurl */
   curl_multi_timeout(multi_handle, &timeout_ms);
+  /* convert ms to timeval */
+  timeout.tv_sec = timeout_ms/1000;
+  timeout.tv_usec = (timeout_ms%1000)*1000;
+  evtimer_set(&timerevent, timercallback, multi_handle);
+  evtimer_add(&timerevent, &timeout);
 
-  while(running_handles) {
-    struct timeval timeout;
+  /* event_dispatch() runs the event main loop. It ends when no events are
+     left to wait for. */
 
-    /* convert ms to timeval */
-    timeout.tv_sec = timeout_ms/1000;
-    timeout.tv_usec = (timeout_ms%1000)*1000;
-
-    event_loopexit(&timeout);
-
-    /* The event_loopexit() function may have taken a while and it may or may
-       not have invoked libcurl calls during that time. During those calls,
-       the timeout situation might very well have changed, so we check the
-       timeout time again to see if we really need to call curl_multi_socket()
-       at this point! */
-
-    /* get the timeout value from libcurl */
-    curl_multi_timeout(multi_handle, &timeout_ms);
+  event_dispatch();
 
-    if(timeout_ms <= 0) {
-      /* no time left */
-      curl_multi_socket(multi_handle, CURL_SOCKET_TIMEOUT, &running_handles);
-
-      /* and get the new timeout value again */
-      curl_multi_timeout(multi_handle, &timeout_ms);
-    }
-  }
-
-  if(still_running != num_total) {
+  {
     /* something made connections fail, extract the reason and tell */
     int msgs_left;
     struct connection *cptr;
@@ -508,10 +392,10 @@ int main(int argc, char **argv)
       if (msg->msg == CURLMSG_DONE) {
         curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &cptr);
 
-        printf("%d => (%d) %s", cptr->id, msg->data.result, cptr->error);
+        printf("%d => (%d) %s\n",
+               cptr->id, msg->data.result, cptr->error);
       }
     }
-
   }
 
   curl_multi_cleanup(multi_handle);
@@ -520,7 +404,5 @@ int main(int argc, char **argv)
   for(i=0; i< num_total; i++)
     curl_easy_cleanup(conns[i].e);
 
-  report();
-
   return code;
 }