* add todo list
authorLennart Poettering <lennart@poettering.net>
Sat, 26 Mar 2005 18:37:06 +0000 (18:37 +0000)
committerLennart Poettering <lennart@poettering.net>
Sat, 26 Mar 2005 18:37:06 +0000 (18:37 +0000)
* beef up packet scheduler

git-svn-id: file:///home/lennart/svn/public/avahi/trunk@18 941a03a8-eaeb-0310-b9a0-b1bbd8fe43fe

16 files changed:
Makefile
announce.c
announce.h
cache.c
dns.c
iface.c
iface.h
main.c
psched.c
psched.h
rr.c
server.c
subscribe.c
timeeventq.c
timeeventq.h
todo [new file with mode: 0644]

index 6798fe7..8f60172 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -1,4 +1,4 @@
-CC=gcc
+#CC=gcc
 CFLAGS=-g -O0 -Wall -W -pipe $(shell pkg-config --cflags glib-2.0) -Wno-unused
 LIBS=$(shell pkg-config --libs glib-2.0)
 
index db983d4..5b1e4e7 100644 (file)
@@ -1,6 +1,8 @@
 #include "announce.h"
 #include "util.h"
 
+#define FLX_ANNOUNCEMENT_JITTER_MSEC 0
+
 static void remove_announcement(flxServer *s, flxAnnouncement *a) {
     g_assert(s);
     g_assert(a);
@@ -21,21 +23,22 @@ static void elapse_announce(flxTimeEvent *e, void *userdata) {
     g_assert(e);
     g_assert(a);
 
-    if (a->n_announced >= 3) {
-        g_message("Enough announcements for record [%s]", t = flx_record_to_string(a->entry->record));
-        g_free(t);
-        remove_announcement(a->server, a);
-        return;
-    }
+    flx_interface_post_response(a->interface, a->entry->record, FALSE);
 
-    flx_interface_post_response(a->interface, a->entry->record);
-    a->n_announced++;
+    if (a->n_announced++ <= 8)
+        a->sec_delay *= 2;
 
     g_message("Announcement #%i on interface %s.%i for entry [%s]", a->n_announced, a->interface->hardware->name, a->interface->protocol, t = flx_record_to_string(a->entry->record));
     g_free(t);
-    
-    flx_elapse_time(&tv, 1000, 100);
-    flx_time_event_queue_update(a->server->time_event_queue, a->time_event, &tv);
+
+    if (a->n_announced >= 4) {
+        g_message("Enough announcements for record [%s]", t = flx_record_to_string(a->entry->record));
+        g_free(t);
+        remove_announcement(a->server, a);
+    } else { 
+        flx_elapse_time(&tv, a->sec_delay*1000, FLX_ANNOUNCEMENT_JITTER_MSEC);
+        flx_time_event_queue_update(a->server->time_event_queue, a->time_event, &tv);
+    }
 }
 
 static void new_announcement(flxServer *s, flxInterface *i, flxServerEntry *e) {
@@ -47,7 +50,10 @@ static void new_announcement(flxServer *s, flxInterface *i, flxServerEntry *e) {
     g_assert(i);
     g_assert(e);
 
-    if (!flx_interface_match(i, e->interface, e->protocol) || !flx_interface_relevant(i))
+    g_message("NEW ANNOUNCEMENT: %s.%i [%s]", i->hardware->name, i->protocol, t = flx_record_to_string(e->record));
+    g_free(t);
+    
+    if (!flx_interface_match(i, e->interface, e->protocol) || !i->announcing)
         return;
 
     /* We don't want duplicates */
@@ -58,18 +64,19 @@ static void new_announcement(flxServer *s, flxInterface *i, flxServerEntry *e) {
     g_message("New announcement on interface %s.%i for entry [%s]", i->hardware->name, i->protocol, t = flx_record_to_string(e->record));
     g_free(t);
     
-    flx_interface_post_response(i, e->record);
+    flx_interface_post_response(i, e->record, FALSE);
     
     a = g_new(flxAnnouncement, 1);
     a->server = s;
     a->interface = i;
     a->entry = e;
     a->n_announced = 1;
+    a->sec_delay = 1;
     
     FLX_LLIST_PREPEND(flxAnnouncement, by_interface, i->announcements, a);
     FLX_LLIST_PREPEND(flxAnnouncement, by_entry, e->announcements, a);
     
-    flx_elapse_time(&tv, 1000, 100);
+    flx_elapse_time(&tv, a->sec_delay*1000, FLX_ANNOUNCEMENT_JITTER_MSEC);
     a->time_event = flx_time_event_queue_add(s->time_event_queue, &tv, elapse_announce, a);
 }
 
@@ -79,40 +86,32 @@ void flx_announce_interface(flxServer *s, flxInterface *i) {
     g_assert(s);
     g_assert(i);
 
-    if (!flx_interface_relevant(i))
+    if (!i->announcing)
         return;
+
+    g_message("ANNOUNCE INTERFACE");
     
     for (e = s->entries; e; e = e->entry_next)
         new_announcement(s, i, e);
 }
 
-void flx_announce_entry(flxServer *s, flxServerEntry *e) {
-    g_assert(s);
+static void announce_walk_callback(flxInterfaceMonitor *m, flxInterface *i, gpointer userdata) {
+    flxServerEntry *e = userdata;
+    
+    g_assert(m);
+    g_assert(i);
     g_assert(e);
 
-    if (e->interface > 0) {
-
-        if (e->protocol != AF_UNSPEC) {
-            flxInterface *i;
-    
-            if ((i = flx_interface_monitor_get_interface(s->monitor, e->interface, e->protocol)))
-                new_announcement(s, i, e);
-        } else {
-            flxHwInterface *hw;
+    new_announcement(m->server, i, e);
+}
 
-            if ((hw = flx_interface_monitor_get_hw_interface(s->monitor, e->interface))) {
-                flxInterface *i;
+void flx_announce_entry(flxServer *s, flxServerEntry *e) {
+    g_assert(s);
+    g_assert(e);
 
-                for (i = hw->interfaces; i; i = i->by_hardware_next)
-                    new_announcement(s, i, e);
-            }
-        }
-    } else {
-        flxInterface *i;
+    g_message("ANNOUNCE ENTRY");
 
-        for (i = s->monitor->interfaces; i; i = i->interface_next)
-            new_announcement(s, i, e);
-    }
+    flx_interface_monitor_walk(s->monitor, e->interface, e->protocol, announce_walk_callback, e);
 }
 
 static flxRecord *make_goodbye_record(flxRecord *r) {
@@ -139,7 +138,7 @@ void flx_goodbye_interface(flxServer *s, flxInterface *i, gboolean goodbye) {
         for (e = s->entries; e; e = e->entry_next)
             if (flx_interface_match(i, e->interface, e->protocol)) {
                 flxRecord *g = make_goodbye_record(e->record);
-                flx_interface_post_response(i, g);
+                flx_interface_post_response(i, g, TRUE);
                 flx_record_unref(g);
             }
     }
index 0e65eae..49c5f13 100644 (file)
@@ -17,6 +17,7 @@ struct _flxAnnouncement {
     
     flxTimeEvent *time_event;
     guint n_announced;
+    guint sec_delay;
 
     FLX_LLIST_FIELDS(flxAnnouncement, by_interface);
     FLX_LLIST_FIELDS(flxAnnouncement, by_entry);
diff --git a/cache.c b/cache.c
index 9d9590b..99d79fb 100644 (file)
--- a/cache.c
+++ b/cache.c
@@ -122,7 +122,7 @@ static void elapse_func(flxTimeEvent *t, void *userdata) {
         g_message("Requesting cache entry update at %i%%.", percent);
 
         /* Request a cache update */
-        flx_interface_post_query(e->cache->interface, e->record->key);
+        flx_interface_post_query(e->cache->interface, e->record->key, TRUE);
 
         /* Check again later */
         next_expiry(e->cache, e, percent);
diff --git a/dns.c b/dns.c
index 726eb21..243c5a3 100644 (file)
--- a/dns.c
+++ b/dns.c
@@ -131,7 +131,7 @@ guint8 *flx_dns_packet_append_bytes(flxDnsPacket  *p, gconstpointer b, guint l)
     g_assert(p);
     g_assert(b);
     g_assert(l);
-    
+
     if (!(d = flx_dns_packet_extend(p, l)))
         return NULL;
 
@@ -373,9 +373,14 @@ flxRecord* flx_dns_packet_consume_record(flxDnsPacket *p, gboolean *ret_cache_fl
         }
             
         default:
-            if (!(data = flx_dns_packet_get_rptr(p)) ||
-                flx_dns_packet_skip(p, rdlength) < 0)
-                return NULL;
+
+            if (rdlength > 0) {
+
+                if (!(data = flx_dns_packet_get_rptr(p)) ||
+                    flx_dns_packet_skip(p, rdlength) < 0)
+                    return NULL;
+            } else
+                data = NULL;
 
             break;
     }
@@ -462,7 +467,7 @@ guint8* flx_dns_packet_append_record(flxDnsPacket *p, flxRecord *r, gboolean cac
 
         default:
             if (!flx_dns_packet_append_uint16(p, r->size) ||
-                !flx_dns_packet_append_bytes(p, r->data, r->size))
+                (r->size != 0 && !flx_dns_packet_append_bytes(p, r->data, r->size)))
                 return NULL;
     }
 
diff --git a/iface.c b/iface.c
index a7d210f..24d5a37 100644 (file)
--- a/iface.c
+++ b/iface.c
@@ -59,11 +59,16 @@ static void free_address(flxInterfaceMonitor *m, flxInterfaceAddress *a) {
     g_free(a);
 }
 
-static void free_interface(flxInterfaceMonitor *m, flxInterface *i) {
+static void free_interface(flxInterfaceMonitor *m, flxInterface *i, gboolean send_goodbye) {
     g_assert(m);
     g_assert(i);
 
-    flx_goodbye_interface(m->server, i, FALSE);
+    g_message("removing interface %s.%i", i->hardware->name, i->protocol);
+    flx_goodbye_interface(m->server, i, send_goodbye);
+    g_message("flushing...");
+    flx_packet_scheduler_flush_responses(i->scheduler);
+    g_message("done");
+    
     g_assert(!i->announcements);
 
     while (i->addresses)
@@ -78,12 +83,12 @@ static void free_interface(flxInterfaceMonitor *m, flxInterface *i) {
     g_free(i);
 }
 
-static void free_hw_interface(flxInterfaceMonitor *m, flxHwInterface *hw) {
+static void free_hw_interface(flxInterfaceMonitor *m, flxHwInterface *hw, gboolean send_goodbye) {
     g_assert(m);
     g_assert(hw);
 
     while (hw->interfaces)
-        free_interface(m, hw->interfaces);
+        free_interface(m, hw->interfaces, send_goodbye);
 
     FLX_LLIST_REMOVE(flxHwInterface, hardware, m->hw_interfaces, hw);
     g_hash_table_remove(m->hash_table, &hw->index);
@@ -136,7 +141,7 @@ static void new_interface(flxInterfaceMonitor *m, flxHwInterface *hw, guchar pro
     i->monitor = m;
     i->hardware = hw;
     i->protocol = protocol;
-    i->relevant = FALSE;
+    i->announcing = FALSE;
 
     FLX_LLIST_HEAD_INIT(flxInterfaceAddress, i->addresses);
     FLX_LLIST_HEAD_INIT(flxAnnouncement, i->announcements);
@@ -155,17 +160,17 @@ static void check_interface_relevant(flxInterfaceMonitor *m, flxInterface *i) {
 
     b = flx_interface_relevant(i);
 
-    if (b && !i->relevant) {
+    if (b && !i->announcing) {
         g_message("New relevant interface %s.%i", i->hardware->name, i->protocol);
 
+        i->announcing = TRUE;
         flx_announce_interface(m->server, i);
-    } else if (!b && i->relevant) {
+    } else if (!b && i->announcing) {
         g_message("Interface %s.%i no longer relevant", i->hardware->name, i->protocol);
 
+        i->announcing = FALSE;
         flx_goodbye_interface(m->server, i, FALSE);
     }
-
-    i->relevant = b;
 }
 
 static void check_hw_interface_relevant(flxInterfaceMonitor *m, flxHwInterface *hw) {
@@ -252,7 +257,7 @@ static void callback(flxNetlink *nl, struct nlmsghdr *n, gpointer userdata) {
             return;
 
         update_hw_interface_rr(m, hw, TRUE);
-        free_hw_interface(m, hw);
+        free_hw_interface(m, hw, FALSE);
         
     } else if (n->nlmsg_type == RTM_NEWADDR || n->nlmsg_type == RTM_DELADDR) {
 
@@ -336,8 +341,10 @@ static void callback(flxNetlink *nl, struct nlmsghdr *n, gpointer userdata) {
                 g_warning("NETLINK: Failed to list addrs: %s", strerror(errno));
             else
                 m->list = LIST_ADDR;
-        } else
+        } else {
             m->list = LIST_DONE;
+            g_message("Enumeration complete");
+        }
         
     } else if (n->nlmsg_type == NLMSG_ERROR && (n->nlmsg_seq == m->query_link_seq || n->nlmsg_seq == m->query_addr_seq)) {
         struct nlmsgerr *e = NLMSG_DATA (n);
@@ -375,13 +382,14 @@ fail:
 void flx_interface_monitor_free(flxInterfaceMonitor *m) {
     g_assert(m);
 
-    if (m->netlink)
-        flx_netlink_free(m->netlink);
-
     while (m->hw_interfaces)
-        free_hw_interface(m, m->hw_interfaces);
+        free_hw_interface(m, m->hw_interfaces, TRUE);
 
     g_assert(!m->interfaces);
+
+    
+    if (m->netlink)
+        flx_netlink_free(m->netlink);
     
     if (m->hash_table)
         g_hash_table_destroy(m->hash_table);
@@ -420,7 +428,7 @@ void flx_interface_send_packet(flxInterface *i, flxDnsPacket *p) {
     g_assert(i);
     g_assert(p);
 
-    if (i->relevant) {
+    if (flx_interface_relevant(i)) {
         g_message("sending on '%s.%i'", i->hardware->name, i->protocol);
 
         if (i->protocol == AF_INET && i->monitor->server->fd_ipv4 >= 0)
@@ -430,21 +438,21 @@ void flx_interface_send_packet(flxInterface *i, flxDnsPacket *p) {
     }
 }
 
-void flx_interface_post_query(flxInterface *i, flxKey *key) {
+void flx_interface_post_query(flxInterface *i, flxKey *key, gboolean immediately) {
     g_assert(i);
     g_assert(key);
 
-    if (i->relevant)
-        flx_packet_scheduler_post_query(i->scheduler, key);
+    if (flx_interface_relevant(i))
+        flx_packet_scheduler_post_query(i->scheduler, key, immediately);
 }
 
 
-void flx_interface_post_response(flxInterface *i, flxRecord *record) {
+void flx_interface_post_response(flxInterface *i, flxRecord *record, gboolean immediately) {
     g_assert(i);
     g_assert(record);
 
-    if (i->relevant)
-        flx_packet_scheduler_post_response(i->scheduler, record);
+    if (flx_interface_relevant(i))
+        flx_packet_scheduler_post_response(i->scheduler, record, immediately);
 }
 
 void flx_dump_caches(flxInterfaceMonitor *m, FILE *f) {
@@ -452,11 +460,12 @@ void flx_dump_caches(flxInterfaceMonitor *m, FILE *f) {
     g_assert(m);
 
     for (i = m->interfaces; i; i = i->interface_next) {
-        if (i->relevant) {
-            fprintf(f, ";;; INTERFACE %s.%i ;;;\n", i->hardware->name, i->protocol);
+        if (flx_interface_relevant(i)) {
+            fprintf(f, "\n;;; INTERFACE %s.%i ;;;\n", i->hardware->name, i->protocol);
             flx_cache_dump(i->cache, f);
         }
     }
+    fprintf(f, "\n");
 }
 
 gboolean flx_interface_relevant(flxInterface *i) {
diff --git a/iface.h b/iface.h
index b5c2708..16e9650 100644 (file)
--- a/iface.h
+++ b/iface.h
@@ -53,7 +53,7 @@ struct _flxInterface {
     
     flxHwInterface *hardware;
     guchar protocol;
-    gboolean relevant;
+    gboolean announcing;
 
     flxCache *cache;
     flxPacketScheduler *scheduler;
@@ -82,8 +82,8 @@ flxHwInterface* flx_interface_monitor_get_hw_interface(flxInterfaceMonitor *m, g
 
 void flx_interface_send_packet(flxInterface *i, flxDnsPacket *p);
 
-void flx_interface_post_query(flxInterface *i, flxKey *k);
-void flx_interface_post_response(flxInterface *i, flxRecord *rr);
+void flx_interface_post_query(flxInterface *i, flxKey *k, gboolean immediately);
+void flx_interface_post_response(flxInterface *i, flxRecord *rr, gboolean immediately);
 
 void flx_dump_caches(flxInterfaceMonitor *m, FILE *f);
 
diff --git a/main.c b/main.c
index 367dcd3..6f82d25 100644 (file)
--- a/main.c
+++ b/main.c
@@ -58,21 +58,21 @@ int main(int argc, char *argv[]) {
 
     flx_server_add_text(flx, 0, 0, AF_UNSPEC, FALSE, NULL, "hallo");
 
-    k = flx_key_new("_http._tcp.local.", FLX_DNS_CLASS_IN, FLX_DNS_TYPE_PTR);
-    s = flx_subscription_new(flx, k, 0, AF_UNSPEC, subscription, NULL);
-    flx_key_unref(k);
+/*     k = flx_key_new("_http._tcp.local.", FLX_DNS_CLASS_IN, FLX_DNS_TYPE_PTR); */
+/*     s = flx_subscription_new(flx, k, 0, AF_UNSPEC, subscription, NULL); */
+/*     flx_key_unref(k); */
 
     loop = g_main_loop_new(NULL, FALSE);
     
-    g_timeout_add(1000*60, quit_timeout, loop);
+    g_timeout_add(1000*30, quit_timeout, loop);
     g_timeout_add(1000, send_timeout, flx);
-    g_timeout_add(1000*10, dump_timeout, flx);
+    g_timeout_add(1000*20, dump_timeout, flx);
     
     g_main_loop_run(loop);
 
     g_main_loop_unref(loop);
 
-    flx_subscription_free(s);
+/*     flx_subscription_free(s); */
     flx_server_free(flx);
     
     return 0;
index db72ec3..162c1b2 100644 (file)
--- a/psched.c
+++ b/psched.c
@@ -1,6 +1,14 @@
+#include <string.h>
+
 #include "util.h"
 #include "psched.h"
 
+#define FLX_QUERY_HISTORY_MSEC 700
+#define FLX_QUERY_DEFER_MSEC 100
+#define FLX_RESPONSE_HISTORY_MSEC 700
+#define FLX_RESPONSE_DEFER_MSEC 20
+#define FLX_RESPONSE_JITTER_MSEC 100
+
 flxPacketScheduler *flx_packet_scheduler_new(flxServer *server, flxInterface *i) {
     flxPacketScheduler *s;
 
@@ -68,9 +76,11 @@ static guint8* packet_add_query_job(flxPacketScheduler *s, flxDnsPacket *p, flxQ
 
         qj->done = 1;
 
-        /* Drop query after 100ms from history */
-        flx_elapse_time(&tv, 100, 0);
+        /* Drop query after some time from history from history */
+        flx_elapse_time(&tv, FLX_QUERY_HISTORY_MSEC, 0);
         flx_time_event_queue_update(s->server->time_event_queue, qj->time_event, &tv);
+
+        g_get_current_time(&qj->delivery);
     }
 
     return d;
@@ -127,25 +137,47 @@ static flxQueryJob* look_for_query(flxPacketScheduler *s, flxKey *key) {
     return NULL;
 }
 
-void flx_packet_scheduler_post_query(flxPacketScheduler *s, flxKey *key) {
+flxQueryJob* query_job_new(flxPacketScheduler *s, flxKey *key) {
     flxQueryJob *qj;
-    GTimeVal tv;
     
     g_assert(s);
     g_assert(key);
 
-    if (look_for_query(s, key))
-        return;
-
     qj = g_new(flxQueryJob, 1);
+    qj->scheduler = s;
     qj->key = flx_key_ref(key);
     qj->done = FALSE;
+    qj->time_event = NULL;
+    
+    FLX_LLIST_PREPEND(flxQueryJob, jobs, s->query_jobs, qj);
 
-    flx_elapse_time(&tv, 100, 0);
-    qj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &tv, query_elapse, qj);
-    qj->scheduler = s;
+    return qj;
+}
 
-    FLX_LLIST_PREPEND(flxQueryJob, jobs, s->query_jobs, qj);
+void flx_packet_scheduler_post_query(flxPacketScheduler *s, flxKey *key, gboolean immediately) {
+    GTimeVal tv;
+    flxQueryJob *qj;
+    
+    g_assert(s);
+    g_assert(key);
+
+    flx_elapse_time(&tv, immediately ? 0 : FLX_QUERY_DEFER_MSEC, 0);
+    
+    if ((qj = look_for_query(s, key))) {
+        glong d = flx_timeval_diff(&tv, &qj->delivery);
+
+        /* Duplicate questions suppression */
+        if (d >= 0 && d <= FLX_QUERY_HISTORY_MSEC*1000) {
+            g_message("WARNING! DUPLICATE QUERY SUPPRESSION ACTIVE!");
+            return;
+        }
+
+        query_job_free(s, qj);
+    }
+
+    qj = query_job_new(s, key);
+    qj->delivery = tv;
+    qj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &qj->delivery, query_elapse, qj);
 }
 
 static guint8* packet_add_response_job(flxPacketScheduler *s, flxDnsPacket *p, flxResponseJob *rj) {
@@ -160,35 +192,32 @@ static guint8* packet_add_response_job(flxPacketScheduler *s, flxDnsPacket *p, f
 
         rj->done = 1;
 
-        /* Drop response after 1s from history */
-        flx_elapse_time(&tv, 1000, 0);
+        /* Drop response after some time from history */
+        flx_elapse_time(&tv, FLX_RESPONSE_HISTORY_MSEC, 0);
         flx_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv);
+
+        g_get_current_time(&rj->delivery);
     }
 
     return d;
 }
-                                 
 
-static void response_elapse(flxTimeEvent *e, gpointer data) {
-    flxResponseJob *rj = data;
-    flxPacketScheduler *s;
+static void send_response_packet(flxPacketScheduler *s, flxResponseJob *rj) {
     flxDnsPacket *p;
     guint n;
-    guint8 *d;
-
-    g_assert(rj);
-    s = rj->scheduler;
 
-    if (rj->done) {
-        /* Lets remove it  from the history */
-        response_job_free(s, rj);
-        return;
-    }
+    g_assert(s);
 
     p = flx_dns_packet_new_response(s->interface->hardware->mtu - 200);
-    d = packet_add_response_job(s, p, rj);
-    g_assert(d);
-    n = 1;
+    n = 0;
+
+    /* If a job was specified, put it in the packet. */
+    if (rj) {
+        guint8 *d;
+        d = packet_add_response_job(s, p, rj);
+        g_assert(d);
+        n++;
+    }
 
     /* Try to fill up packet with more responses, if available */
     for (rj = s->response_jobs; rj; rj = rj->jobs_next) {
@@ -207,6 +236,22 @@ static void response_elapse(flxTimeEvent *e, gpointer data) {
     flx_dns_packet_free(p);
 }
 
+static void response_elapse(flxTimeEvent *e, gpointer data) {
+    flxResponseJob *rj = data;
+    flxPacketScheduler *s;
+
+    g_assert(rj);
+    s = rj->scheduler;
+
+    if (rj->done) {
+        /* Lets remove it  from the history */
+        response_job_free(s, rj);
+        return;
+    }
+
+    send_response_packet(s, rj);
+}
+
 static flxResponseJob* look_for_response(flxPacketScheduler *s, flxRecord *record) {
     flxResponseJob *rj;
 
@@ -220,67 +265,180 @@ static flxResponseJob* look_for_response(flxPacketScheduler *s, flxRecord *recor
     return NULL;
 }
 
-void flx_packet_scheduler_post_response(flxPacketScheduler *s, flxRecord *record) {
+static flxResponseJob* response_job_new(flxPacketScheduler *s, flxRecord *record) {
     flxResponseJob *rj;
-    GTimeVal tv;
     
     g_assert(s);
     g_assert(record);
 
-    if (look_for_response(s, record))
-        return;
-
     rj = g_new(flxResponseJob, 1);
+    rj->scheduler = s;
     rj->record = flx_record_ref(record);
     rj->done = FALSE;
+    rj->time_event = NULL;
+    
+    FLX_LLIST_PREPEND(flxResponseJob, jobs, s->response_jobs, rj);
 
-    flx_elapse_time(&tv, 20, 100);
-    rj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &tv, response_elapse, rj);
-    rj->scheduler = s;
+    return rj;
+}
 
-    FLX_LLIST_PREPEND(flxResponseJob, jobs, s->response_jobs, rj);
+void flx_packet_scheduler_post_response(flxPacketScheduler *s, flxRecord *record, gboolean immediately) {
+    flxResponseJob *rj;
+    GTimeVal tv;
+    gchar *t;
+    
+    g_assert(s);
+    g_assert(record);
+
+    flx_elapse_time(&tv, immediately ? 0 : FLX_RESPONSE_DEFER_MSEC, immediately ? 0 : FLX_RESPONSE_JITTER_MSEC);
+    
+    /* Don't send out duplicates */
+    
+    if ((rj = look_for_response(s, record))) {
+        glong d;
+
+        d = flx_timeval_diff(&tv, &rj->delivery);
+        
+        /* If there's already a matching packet in our history or in
+         * the schedule, we do nothing. */
+        if (!!record->ttl == !!rj->record->ttl &&
+            d >= 0 && d <= FLX_RESPONSE_HISTORY_MSEC*1000) {
+            g_message("WARNING! DUPLICATE RESPONSE SUPPRESSION ACTIVE!");
+            return;
+        }
+
+        /* Either one was a goodbye packet, but the other was not, so
+         * let's drop the older one. */
+        response_job_free(s, rj);
+    }
+
+    g_message("ACCEPTED NEW RESPONSE [%s]", t = flx_record_to_string(record));
+    g_free(t);
+
+    /* Create a new job and schedule it */
+    rj = response_job_new(s, record);
+    rj->delivery = tv;
+    rj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &rj->delivery, response_elapse, rj);
 }
 
-void flx_packet_scheduler_drop_query(flxPacketScheduler *s, flxKey *key) {
+void flx_packet_scheduler_incoming_query(flxPacketScheduler *s, flxKey *key) {
+    GTimeVal tv;
     flxQueryJob *qj;
     
     g_assert(s);
     g_assert(key);
 
+    /* This function is called whenever an incoming query was
+     * receieved. We drop all scheduled queries which match here. The
+     * keyword is "DUPLICATE QUESTION SUPPRESION". */
+
     for (qj = s->query_jobs; qj; qj = qj->jobs_next)
         if (flx_key_equal(qj->key, key)) {
 
-            if (!qj->done) {
-                GTimeVal tv;
-                qj->done = TRUE;
-                
-                /* Drop query after 100ms from history */
-                flx_elapse_time(&tv, 100, 0);
-                flx_time_event_queue_update(s->server->time_event_queue, qj->time_event, &tv);
-            }
+            if (qj->done)
+                return;
 
-            break;
+            goto mark_done;
         }
+
+
+    /* No matching job was found. Add the query to the history */
+    qj = query_job_new(s, key);
+
+mark_done:
+    qj->done = TRUE;
+
+    /* Drop the query after some time */
+    flx_elapse_time(&tv, FLX_QUERY_HISTORY_MSEC, 0);
+    qj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &tv, query_elapse, qj);
+
+    g_get_current_time(&qj->delivery);
+}
+
+void response_job_set_elapse_time(flxPacketScheduler *s, flxResponseJob *rj, guint msec, guint jitter) {
+    GTimeVal tv;
+
+    g_assert(s);
+    g_assert(rj);
+
+    flx_elapse_time(&tv, msec, jitter);
+
+    if (rj->time_event)
+        flx_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv);
+    else
+        rj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &tv, response_elapse, rj);
+    
 }
 
-void flx_packet_scheduler_drop_response(flxPacketScheduler *s, flxRecord *record) {
+void flx_packet_scheduler_incoming_response(flxPacketScheduler *s, flxRecord *record) {
     flxResponseJob *rj;
     
     g_assert(s);
     g_assert(record);
 
-    for  (rj = s->response_jobs; rj; rj = rj->jobs_next)
+    /* This function is called whenever an incoming response was
+     * receieved. We drop all scheduled responses which match
+     * here. The keyword is "DUPLICATE ANSWER SUPPRESION". */
+    
+    for (rj = s->response_jobs; rj; rj = rj->jobs_next)
         if (flx_record_equal_no_ttl(rj->record, record)) {
 
-            if (!rj->done) {
-                GTimeVal tv;
-                rj->done = TRUE;
-                
-                /* Drop response after 100ms from history */
-                flx_elapse_time(&tv, 100, 0);
-                flx_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv);
+            if (rj->done) {
+
+                if (!!record->ttl == !!rj->record->ttl) {
+                    /* An entry like this is already in our history,
+                     * so let's get out of here! */
+                    
+                    return;
+                    
+                } else {
+                    /* Either one was a goodbye packet but other was
+                     * none. We remove the history entry, and add a
+                     * new one */
+                    
+                    response_job_free(s, rj);
+                    break;
+                }
+        
+            } else {
+
+                if (!!record->ttl == !!rj->record->ttl) {
+
+                    /* The incoming packet matches our scheduled
+                     * record, so let's mark that one as done */
+
+                    goto mark_done;
+                    
+                } else {
+
+                    /* Either one was a goodbye packet but other was
+                     * none. We ignore the incoming packet. */
+
+                    return;
+                }
             }
-
-            break;
         }
+
+    /* No matching job was found. Add the query to the history */
+    rj = response_job_new(s, record);
+
+mark_done:
+    rj->done = TRUE;
+                    
+    /* Drop response after 500ms from history */
+    response_job_set_elapse_time(s, rj, FLX_RESPONSE_HISTORY_MSEC, 0);
+
+    g_get_current_time(&rj->delivery);
+}
+
+void flx_packet_scheduler_flush_responses(flxPacketScheduler *s) {
+    flxResponseJob *rj;
+    
+    g_assert(s);
+
+    /* Send all scheduled responses, ignoring the scheduled time */
+    
+    for (rj = s->response_jobs; rj; rj = rj->jobs_next)
+        if (!rj->done)
+            send_response_packet(s, rj);
 }
index fae5d91..c312ce8 100644 (file)
--- a/psched.h
+++ b/psched.h
@@ -15,6 +15,7 @@ struct _flxQueryJob {
     flxTimeEvent *time_event;
     flxKey *key;
     gboolean done;
+    GTimeVal delivery;
     FLX_LLIST_FIELDS(flxQueryJob, jobs);
 };
 
@@ -23,6 +24,7 @@ struct _flxResponseJob {
     flxTimeEvent *time_event;
     flxRecord *record;
     gboolean done;
+    GTimeVal delivery;
     FLX_LLIST_FIELDS(flxResponseJob, jobs);
 };
 
@@ -38,10 +40,12 @@ struct _flxPacketScheduler {
 flxPacketScheduler *flx_packet_scheduler_new(flxServer *server, flxInterface *i);
 void flx_packet_scheduler_free(flxPacketScheduler *s);
 
-void flx_packet_scheduler_post_query(flxPacketScheduler *s, flxKey *key);
-void flx_packet_scheduler_post_response(flxPacketScheduler *s, flxRecord *record);
+void flx_packet_scheduler_post_query(flxPacketScheduler *s, flxKey *key, gboolean immediately);
+void flx_packet_scheduler_post_response(flxPacketScheduler *s, flxRecord *record, gboolean immediately);
 
-void flx_packet_scheduler_drop_query(flxPacketScheduler *s, flxKey *key);
-void flx_packet_scheduler_drop_response(flxPacketScheduler *s, flxRecord *record);
+void flx_packet_scheduler_incoming_query(flxPacketScheduler *s, flxKey *key);
+void flx_packet_scheduler_incoming_response(flxPacketScheduler *s, flxRecord *record);
+
+void flx_packet_scheduler_flush_responses(flxPacketScheduler *s);
 
 #endif
diff --git a/rr.c b/rr.c
index 4108c54..294d409 100644 (file)
--- a/rr.c
+++ b/rr.c
@@ -49,8 +49,8 @@ flxRecord *flx_record_new(flxKey *k, gconstpointer data, guint16 size, guint32 t
     flxRecord *r;
     
     g_assert(k);
-    g_assert(data);
-
+    g_assert(size == 0 || data);
+    
     r = g_new(flxRecord, 1);
     r->ref = 1;
     r->key = flx_key_ref(k);
@@ -64,6 +64,9 @@ flxRecord *flx_record_new(flxKey *k, gconstpointer data, guint16 size, guint32 t
 flxRecord *flx_record_new_full(const gchar *name, guint16 class, guint16 type, gconstpointer data, guint16 size, guint32 ttl) {
     flxRecord *r;
     flxKey *k;
+
+    g_assert(name);
+    g_assert(size == 0 || data);
     
     k = flx_key_new(name, class, type);
     r = flx_record_new(k, data, size, ttl);
index 94c7b1d..6b3c2dd 100644 (file)
--- a/server.c
+++ b/server.c
@@ -22,9 +22,11 @@ static void handle_query_key(flxServer *s, flxKey *k, flxInterface *i, const flx
     g_message("Handling query: %s", txt = flx_key_to_string(k));
     g_free(txt);
 
+    flx_packet_scheduler_incoming_query(i->scheduler, k);
+
     for (e = g_hash_table_lookup(s->rrset_by_key, k); e; e = e->by_key_next)
         if (flx_interface_match(i, e->interface, e->protocol))
-            flx_interface_post_response(i, e->record);
+            flx_interface_post_response(i, e->record, FALSE);
 }
 
 static void handle_query(flxServer *s, flxDnsPacket *p, flxInterface *i, const flxAddress *a) {
@@ -72,8 +74,7 @@ static void handle_response(flxServer *s, flxDnsPacket *p, flxInterface *i, cons
 
         flx_cache_update(i->cache, record, cache_flush, a);
 
-        if (record->ttl != 0)
-            flx_packet_scheduler_drop_response(i->scheduler, record);
+        flx_packet_scheduler_incoming_response(i->scheduler, record);
         flx_record_unref(record);
     }
 }
@@ -548,7 +549,7 @@ static void post_query_callback(flxInterfaceMonitor *m, flxInterface *i, gpointe
     g_assert(i);
     g_assert(k);
 
-    flx_interface_post_query(i, k);
+    flx_interface_post_query(i, k, FALSE);
 }
 
 void flx_server_post_query(flxServer *s, gint interface, guchar protocol, flxKey *key) {
@@ -565,7 +566,7 @@ static void post_response_callback(flxInterfaceMonitor *m, flxInterface *i, gpoi
     g_assert(i);
     g_assert(r);
 
-    flx_interface_post_response(i, r);
+    flx_interface_post_response(i, r, FALSE);
 }
 
 void flx_server_post_response(flxServer *s, gint interface, guchar protocol, flxRecord *record) {
index 3abe464..7fd264d 100644 (file)
@@ -15,7 +15,6 @@ static void elapse(flxTimeEvent *e, void *userdata) {
 
     g_message("%i. Continuous querying for %s", s->n_query, t = flx_key_to_string(s->key));
     g_free(t);
-
     
     flx_elapse_time(&tv, s->sec_delay*1000, 0);
     flx_time_event_queue_update(s->server->time_event_queue, s->time_event, &tv);
index 0d4af97..ec1a074 100644 (file)
@@ -154,3 +154,5 @@ flxTimeEvent* flx_time_event_next(flxTimeEvent *e) {
 
     return e->node->next->data;
 }
+
+
index f663f1f..0add498 100644 (file)
@@ -30,4 +30,7 @@ void flx_time_event_queue_update(flxTimeEventQueue *q, flxTimeEvent *e, const GT
 flxTimeEvent* flx_time_event_queue_root(flxTimeEventQueue *q);
 flxTimeEvent* flx_time_event_next(flxTimeEvent *e);
 
+
+
+
 #endif
diff --git a/todo b/todo
new file mode 100644 (file)
index 0000000..2aea0e1
--- /dev/null
+++ b/todo
@@ -0,0 +1,8 @@
+* Unicast responses/queries
+* Known-Answer suppression
+* Truncation
+* Probing/Conflict resolution
+* Legacy unicast
+* really send goodbye packets
+* uniqueness
+* defend our entries on incoming goodbye