Three fixes in one commit (sorry): a) Take care of the tcpbuf if it ends while queued...
authorSteinar H. Gunderson <sesse@google.com>
Fri, 28 Sep 2007 14:28:14 +0000 (14:28 +0000)
committerSteinar H. Gunderson <sesse@google.com>
Fri, 28 Sep 2007 14:28:14 +0000 (14:28 +0000)
ares__close_sockets.c
ares_cancel.c
ares_destroy.c
ares_init.c
ares_private.h
ares_process.c
ares_send.c

index 64eccdf..c258866 100644 (file)
@@ -35,6 +35,8 @@ void ares__close_sockets(ares_channel channel, struct server_state *server)
       /* Advance server->qhead; pull out query as we go. */
       sendreq = server->qhead;
       server->qhead = sendreq->next;
+      if (sendreq->data_storage != NULL)
+        free(sendreq->data_storage);
       free(sendreq);
     }
   server->qtail = NULL;
@@ -45,12 +47,16 @@ void ares__close_sockets(ares_channel channel, struct server_state *server)
   server->tcp_buffer = NULL;
   server->tcp_lenbuf_pos = 0;
 
+  /* Reset brokenness */
+  server->is_broken = 0;
+
   /* Close the TCP and UDP sockets. */
   if (server->tcp_socket != ARES_SOCKET_BAD)
     {
       SOCK_STATE_CALLBACK(channel, server->tcp_socket, 0, 0);
       closesocket(server->tcp_socket);
       server->tcp_socket = ARES_SOCKET_BAD;
+      server->tcp_connection_generation = ++channel->tcp_connection_generation;
     }
   if (server->udp_socket != ARES_SOCKET_BAD)
     {
index 9641dab..65f86b9 100644 (file)
@@ -33,7 +33,7 @@ void ares_cancel(ares_channel channel)
     next = query->next;
     query->callback(query->arg, ARES_ETIMEOUT, NULL, 0);
     free(query->tcpbuf);
-    free(query->skip_server);
+    free(query->server_info);
     free(query);
   }
   channel->queries = NULL;
index 8d9bdbc..e844ea6 100644 (file)
@@ -65,8 +65,8 @@ void ares_destroy(ares_channel channel)
     query->callback(query->arg, ARES_EDESTRUCTION, NULL, 0);
     if (query->tcpbuf)
       free(query->tcpbuf);
-    if (query->skip_server)
-      free(query->skip_server);
+    if (query->server_info)
+      free(query->server_info);
     free(query);
   }
 
index add3ffe..298dd7b 100644 (file)
@@ -136,6 +136,7 @@ int ares_init_options(ares_channel *channelptr, struct ares_options *options,
   channel->nservers = -1;
   channel->ndomains = -1;
   channel->nsort = -1;
+  channel->tcp_connection_generation = 0;
   channel->lookups = NULL;
   channel->queries = NULL;
   channel->domains = NULL;
@@ -201,10 +202,12 @@ int ares_init_options(ares_channel *channelptr, struct ares_options *options,
       server = &channel->servers[i];
       server->udp_socket = ARES_SOCKET_BAD;
       server->tcp_socket = ARES_SOCKET_BAD;
+      server->tcp_connection_generation = ++channel->tcp_connection_generation;
       server->tcp_lenbuf_pos = 0;
       server->tcp_buffer = NULL;
       server->qhead = NULL;
       server->qtail = NULL;
+      server->is_broken = 0;
     }
 
   init_id_key(&channel->id_key, ARES_ID_KEY_LEN);
index f031451..dd9070a 100644 (file)
@@ -89,6 +89,11 @@ struct send_request {
   const unsigned char *data;
   size_t len;
 
+  /* The query for which we're sending this data */
+  struct query* owner_query;
+  /* The buffer we're using, if we have our own copy of the packet */
+  unsigned char *data_storage;
+
   /* Next request in queue */
   struct send_request *next;
 };
@@ -110,6 +115,17 @@ struct server_state {
   /* TCP output queue */
   struct send_request *qhead;
   struct send_request *qtail;
+
+  /* Which incarnation of this connection is this? We don't want to
+   * retransmit requests into the very same socket, but if the server
+   * closes on us and we re-open the connection, then we do want to
+   * re-send. */
+  int tcp_connection_generation;
+
+  /* Is this server broken? We mark connections as broken when a
+   * request that is queued for sending times out.
+   */
+  int is_broken;
 };
 
 struct query {
@@ -130,7 +146,7 @@ struct query {
   /* Query status */
   int try;
   int server;
-  int *skip_server;
+  struct query_server_info *server_info;   /* per-server state */
   int using_tcp;
   int error_status;
 
@@ -138,6 +154,12 @@ struct query {
   struct query *next;
 };
 
+/* Per-server state for a query */
+struct query_server_info {
+  int skip_server;  /* should we skip server, due to errors, etc? */
+  int tcp_connection_generation;  /* into which TCP connection did we send? */
+};
+
 /* An IP address pattern; matches an IP address X if X & mask == addr */
 #define PATTERN_MASK 0x1
 #define PATTERN_CIDR 0x2
@@ -188,6 +210,9 @@ struct ares_channeldata {
   /* key to use when generating new ids */
   rc4_key id_key;
 
+  /* Generation number to use for the next TCP socket open/close */
+  int tcp_connection_generation;
+
   /* Active queries */
   struct query *queries;
 
@@ -220,4 +245,3 @@ short ares__generate_new_id(rc4_key* key);
 #endif
 
 #endif /* __ARES_PRIVATE_H */
-
index 1b58029..15aa06e 100644 (file)
@@ -62,6 +62,7 @@ static void read_tcp_data(ares_channel channel, fd_set *read_fds,
 static void read_udp_packets(ares_channel channel, fd_set *read_fds,
                              ares_socket_t read_fd, time_t now);
 static void process_timeouts(ares_channel channel, time_t now);
+static void process_broken_connections(ares_channel channel, time_t now);
 static void process_answer(ares_channel channel, unsigned char *abuf,
                            int alen, int whichserver, int tcp, time_t now);
 static void handle_error(ares_channel channel, int whichserver, time_t now);
@@ -87,6 +88,7 @@ void ares_process(ares_channel channel, fd_set *read_fds, fd_set *write_fds)
   read_tcp_data(channel, read_fds, ARES_SOCKET_BAD, now);
   read_udp_packets(channel, read_fds, ARES_SOCKET_BAD, now);
   process_timeouts(channel, now);
+  process_broken_connections(channel, now);
 }
 
 /* Something interesting happened on the wire, or there was a timeout.
@@ -157,7 +159,7 @@ static void write_tcp_data(ares_channel channel,
       /* Make sure server has data to send and is selected in write_fds or
          write_fd. */
       server = &channel->servers[i];
-      if (!server->qhead || server->tcp_socket == ARES_SOCKET_BAD)
+      if (!server->qhead || server->tcp_socket == ARES_SOCKET_BAD || server->is_broken)
         continue;
 
       if(write_fds) {
@@ -216,6 +218,8 @@ static void write_tcp_data(ares_channel channel,
                       SOCK_STATE_CALLBACK(channel, server->tcp_socket, 1, 0);
                       server->qtail = NULL;
                     }
+                  if (sendreq->data_storage != NULL)
+                    free(sendreq->data_storage);
                   free(sendreq);
                 }
               else
@@ -248,6 +252,8 @@ static void write_tcp_data(ares_channel channel,
                   SOCK_STATE_CALLBACK(channel, server->tcp_socket, 1, 0);
                   server->qtail = NULL;
                 }
+              if (sendreq->data_storage != NULL)
+                free(sendreq->data_storage);
               free(sendreq);
             }
           else
@@ -278,7 +284,7 @@ static void read_tcp_data(ares_channel channel, fd_set *read_fds,
     {
       /* Make sure the server has a socket and is selected in read_fds. */
       server = &channel->servers[i];
-      if (server->tcp_socket == ARES_SOCKET_BAD)
+      if (server->tcp_socket == ARES_SOCKET_BAD || server->is_broken)
         continue;
 
       if(read_fds) {
@@ -376,7 +382,7 @@ static void read_udp_packets(ares_channel channel, fd_set *read_fds,
       /* Make sure the server has a socket and is selected in read_fds. */
       server = &channel->servers[i];
 
-      if (server->udp_socket == ARES_SOCKET_BAD)
+      if (server->udp_socket == ARES_SOCKET_BAD || server->is_broken)
         continue;
 
       if(read_fds) {
@@ -492,6 +498,20 @@ static void process_answer(ares_channel channel, unsigned char *abuf,
   end_query(channel, query, ARES_SUCCESS, abuf, alen);
 }
 
+/* Close all the connections that are no longer usable. */
+static void process_broken_connections(ares_channel channel, time_t now)
+{
+  int i;
+  for (i = 0; i < channel->nservers; i++)
+    {
+      struct server_state *server = &channel->servers[i];
+      if (server->is_broken)
+        {
+          handle_error(channel, i, now);
+        }
+    }
+}
+
 static void handle_error(ares_channel channel, int whichserver, time_t now)
 {
   struct query *query, *next;
@@ -526,7 +546,7 @@ static void skip_server(ares_channel channel, struct query *query,
    */
   if (channel->nservers > 1)
     {
-      query->skip_server[whichserver] = 1;
+      query->server_info[whichserver].skip_server = 1;
     }
 }
 
@@ -538,10 +558,21 @@ static struct query *next_server(ares_channel channel, struct query *query, time
     {
       for (; query->server < channel->nservers; query->server++)
         {
-          if (!query->skip_server[query->server])
+          struct server_state *server = &channel->servers[query->server];
+          /* We don't want to use this server if (1) we decided this
+           * connection is broken, and thus about to be closed, (2)
+           * we've decided to skip this server because of earlier
+           * errors we encountered, or (3) we already sent this query
+           * over this exact connection.
+           */
+          if (!server->is_broken &&
+               !query->server_info[query->server].skip_server &&
+               !(query->using_tcp &&
+                 (query->server_info[query->server].tcp_connection_generation ==
+                  server->tcp_connection_generation)))
             {
-              ares__send_query(channel, query, now);
-              return (query->next);
+               ares__send_query(channel, query, now);
+               return (query->next);
             }
         }
       query->server = 0;
@@ -582,8 +613,16 @@ void ares__send_query(ares_channel channel, struct query *query, time_t now)
         end_query(channel, query, ARES_ENOMEM, NULL, 0);
           return;
         }
+      /* To make the common case fast, we avoid copies by using the
+       * query's tcpbuf for as long as the query is alive. In the rare
+       * case where the query ends while it's queued for transmission,
+       * then we give the sendreq its own copy of the request packet
+       * and put it in sendreq->data_storage.
+       */
+      sendreq->data_storage = NULL;
       sendreq->data = query->tcpbuf;
       sendreq->len = query->tcplen;
+      sendreq->owner_query = query;
       sendreq->next = NULL;
       if (server->qtail)
         server->qtail->next = sendreq;
@@ -594,6 +633,8 @@ void ares__send_query(ares_channel channel, struct query *query, time_t now)
         }
       server->qtail = sendreq;
       query->timeout = 0;
+      query->server_info[query->server].tcp_connection_generation =
+        server->tcp_connection_generation;
     }
   else
     {
@@ -721,6 +762,7 @@ static int open_tcp_socket(ares_channel channel, struct server_state *server)
   SOCK_STATE_CALLBACK(channel, s, 1, 0);
   server->tcp_buffer_pos = 0;
   server->tcp_socket = s;
+  server->tcp_connection_generation = ++channel->tcp_connection_generation;
   return 0;
 }
 
@@ -839,6 +881,61 @@ static struct query *end_query (ares_channel channel, struct query *query, int s
   struct query **q, *next;
   int i;
 
+  /* First we check to see if this query ended while one of our send
+   * queues still has pointers to it.
+   */
+  for (i = 0; i < channel->nservers; i++)
+    {
+      struct server_state *server = &channel->servers[i];
+      struct send_request *sendreq;
+      for (sendreq = server->qhead; sendreq; sendreq = sendreq->next)
+        if (sendreq->owner_query == query)
+          {
+            sendreq->owner_query = NULL;
+            assert(sendreq->data_storage == NULL);
+            if (status == ARES_SUCCESS)
+              {
+                /* We got a reply for this query, but this queued
+                 * sendreq points into this soon-to-be-gone query's
+                 * tcpbuf. Probably this means we timed out and queued
+                 * the query for retransmission, then received a
+                 * response before actually retransmitting. This is
+                 * perfectly fine, so we want to keep the connection
+                 * running smoothly if we can. But in the worst case
+                 * we may have sent only some prefix of the query,
+                 * with some suffix of the query left to send. Also,
+                 * the buffer may be queued on multiple queues. To
+                 * prevent dangling pointers to the query's tcpbuf and
+                 * handle these cases, we just give such sendreqs
+                 * their own copy of the query packet.
+                 */
+               sendreq->data_storage = malloc(sendreq->len);
+               if (sendreq->data_storage != NULL)
+                 {
+                   memcpy(sendreq->data_storage, sendreq->data, sendreq->len);
+                   sendreq->data = sendreq->data_storage;
+                 }
+              }
+            if ((status != ARES_SUCCESS) || (sendreq->data_storage == NULL))
+              {
+                /* We encountered an error (probably a timeout,
+                 * suggesting the DNS server we're talking to is
+                 * probably unreachable, wedged, or severely
+                 * overloaded) or we couldn't copy the request, so
+                 * mark the connection as broken. When we get to
+                 * process_broken_connections() we'll close the
+                 * connection and try to re-send requests to another
+                 * server.
+                 */
+               server->is_broken = 1;
+               /* Just to be paranoid, zero out this sendreq... */
+               sendreq->data = NULL;
+               sendreq->len = 0;
+             }
+          }
+    }
+  /* Invoke the callback */ 
   query->callback(query->arg, status, abuf, alen);
   for (q = &channel->queries; *q; q = &(*q)->next)
     {
@@ -851,7 +948,7 @@ static struct query *end_query (ares_channel channel, struct query *query, int s
   else
     next = NULL;
   free(query->tcpbuf);
-  free(query->skip_server);
+  free(query->server_info);
   free(query);
 
   /* Simple cleanup policy: if no queries are remaining, close all
index 7f4362c..fd1450b 100644 (file)
@@ -62,8 +62,9 @@ void ares_send(ares_channel channel, const unsigned char *qbuf, int qlen,
       callback(arg, ARES_ENOMEM, NULL, 0);
       return;
     }
-  query->skip_server = malloc(channel->nservers * sizeof(int));
-  if (!query->skip_server)
+  query->server_info = malloc(channel->nservers *
+                              sizeof(query->server_info[0]));
+  if (!query->server_info)
     {
       free(query->tcpbuf);
       free(query);
@@ -93,7 +94,10 @@ void ares_send(ares_channel channel, const unsigned char *qbuf, int qlen,
   query->try = 0;
   query->server = 0;
   for (i = 0; i < channel->nservers; i++)
-    query->skip_server[i] = 0;
+    {
+      query->server_info[i].skip_server = 0;
+      query->server_info[i].tcp_connection_generation = 0;
+    }
   query->using_tcp = (channel->flags & ARES_FLAG_USEVC) || qlen > PACKETSZ;
   query->error_status = ARES_ECONNREFUSED;