* add subscription feature - with reissuing
authorLennart Poettering <lennart@poettering.net>
Sat, 26 Mar 2005 13:58:11 +0000 (13:58 +0000)
committerLennart Poettering <lennart@poettering.net>
Sat, 26 Mar 2005 13:58:11 +0000 (13:58 +0000)
* interpret goodbye responses

git-svn-id: file:///home/lennart/svn/public/avahi/trunk@17 941a03a8-eaeb-0310-b9a0-b1bbd8fe43fe

13 files changed:
Makefile
cache.c
cache.h
iface.c
iface.h
main.c
psched.c
rr.c
rr.h
server.c
server.h
subscribe.c [new file with mode: 0644]
subscribe.h [new file with mode: 0644]

index 2731696..6798fe7 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -4,7 +4,7 @@ LIBS=$(shell pkg-config --libs glib-2.0)
 
 all: flexmdns prioq-test
 
-flexmdns: timeeventq.o main.o iface.o netlink.o server.o address.o util.o prioq.o cache.o rr.o dns.o socket.o psched.o announce.o
+flexmdns: timeeventq.o main.o iface.o netlink.o server.o address.o util.o prioq.o cache.o rr.o dns.o socket.o psched.o announce.o subscribe.o
        $(CC) -o $@ $^ $(LIBS)
 
 #test-llist: test-llist.o
diff --git a/cache.c b/cache.c
index b668d33..9d9590b 100644 (file)
--- a/cache.c
+++ b/cache.c
@@ -6,7 +6,7 @@ static void remove_entry(flxCache *c, flxCacheEntry *e, gboolean remove_from_has
     g_assert(c);
     g_assert(e);
 
-    g_message("remvoin from cache: %p %p", c, e);
+    g_message("removing from cache: %p %p", c, e);
     
     if (remove_from_hash_table) {
         flxCacheEntry *t;
@@ -18,10 +18,12 @@ static void remove_entry(flxCache *c, flxCacheEntry *e, gboolean remove_from_has
             g_hash_table_remove(c->hash_table, e->record->key);
     }
         
-    flx_record_unref(e->record);
-
     if (e->time_event)
         flx_time_event_queue_remove(c->server->time_event_queue, e->time_event);
+
+    flx_subscription_notify(c->server, c->interface, e->record, FLX_SUBSCRIPTION_REMOVE);
+
+    flx_record_unref(e->record);
     
     g_free(e);
 }
@@ -71,7 +73,7 @@ flxCacheEntry *flx_cache_lookup_record(flxCache *c, flxRecord *r) {
     g_assert(r);
 
     for (e = flx_cache_lookup_key(c, r->key); e; e = e->by_name_next)
-        if (e->record->size == r->size && !memcmp(e->record->data, r->data, r->size))
+        if (flx_record_equal_no_ttl(e->record, r))
             return e;
 
     return NULL;
@@ -127,6 +129,16 @@ static void elapse_func(flxTimeEvent *t, void *userdata) {
     }
 }
 
+static void update_time_event(flxCache *c, flxCacheEntry *e) {
+    g_assert(c);
+    g_assert(e);
+    
+    if (e->time_event)
+        flx_time_event_queue_update(c->server->time_event_queue, e->time_event, &e->expiry);
+    else
+        e->time_event = flx_time_event_queue_add(c->server->time_event_queue, &e->expiry, elapse_func, e);
+}
+
 static void next_expiry(flxCache *c, flxCacheEntry *e, guint percent) {
     gulong usec;
 
@@ -142,14 +154,10 @@ static void next_expiry(flxCache *c, flxCacheEntry *e, guint percent) {
     usec = g_random_int_range(usec*percent, usec*(percent+2));
     
     g_time_val_add(&e->expiry, usec);
-
-    if (e->time_event)
-        flx_time_event_queue_update(c->server->time_event_queue, e->time_event, &e->expiry);
-    else
-        e->time_event = flx_time_event_queue_add(c->server->time_event_queue, &e->expiry, elapse_func, e);
+    update_time_event(c, e);
 }
 
-flxCacheEntry *flx_cache_update(flxCache *c, flxRecord *r, gboolean unique, const flxAddress *a) {
+void flx_cache_update(flxCache *c, flxRecord *r, gboolean unique, const flxAddress *a) {
     flxCacheEntry *e, *t;
     gchar *txt;
     
@@ -159,55 +167,77 @@ flxCacheEntry *flx_cache_update(flxCache *c, flxRecord *r, gboolean unique, cons
     g_message("cache update: %s", (txt = flx_record_to_string(r)));
     g_free(txt);
 
-    if ((t = e = flx_cache_lookup_key(c, r->key))) {
+    if (r->ttl == 0) {
 
-/*         g_message("found prev cache entry"); */
+        /* This is a goodbye request */
 
-        if (unique) {
-            /* Drop all entries but the first which we replace */
-            while (e->by_name_next)
-                remove_entry(c, e->by_name_next, TRUE);
+        if ((e = flx_cache_lookup_record(c, r))) {
 
-        } else {
-            /* Look for exactly the same entry */
-            for (; e; e = e->by_name_next)
-                if (flx_record_equal(e->record, r))
-                    break;
+            e->state = FLX_CACHE_FINAL;
+            g_get_current_time(&e->timestamp);
+            e->expiry = e->timestamp;
+            g_time_val_add(&e->expiry, 1000000); /* 1s */
+            update_time_event(c, e);
         }
-    }
-    
-    if (e) {
 
-/*         g_message("found matching cache entry"); */
+    } else {
 
-        /* We are the first in the linked list so let's replace the hash table key with the new one */
-        if (e->by_name_prev == NULL)
-            g_hash_table_replace(c->hash_table, r->key, e);
-        
-        /* Update the record */
-        flx_record_unref(e->record);
-        e->record = flx_record_ref(r);
+        /* This is an update request */
 
+        if ((t = e = flx_cache_lookup_key(c, r->key))) {
         
-    } else {
-        /* No entry found, therefore we create a new one */
-
+            if (unique) {
+                
+                /* For unique records, remove all entries but one */
+                while (e->by_name_next)
+                    remove_entry(c, e->by_name_next, TRUE);
+                
+            } else {
+                
+                /* For non-unique record, look for exactly the same entry */
+                for (; e; e = e->by_name_next)
+                    if (flx_record_equal_no_ttl(e->record, r))
+                        break;
+            }
+        }
+    
+        if (e) {
+            
+/*         g_message("found matching cache entry"); */
+            
+            /* We are the first in the linked list so let's replace the hash table key with the new one */
+            if (e->by_name_prev == NULL)
+                g_hash_table_replace(c->hash_table, r->key, e);
+            
+            /* Notify subscribers */
+            if (!flx_record_equal_no_ttl(e->record, r))
+                flx_subscription_notify(c->server, c->interface, r, FLX_SUBSCRIPTION_CHANGE);    
+            
+            /* Update the record */
+            flx_record_unref(e->record);
+            e->record = flx_record_ref(r);
+            
+        } else {
+            /* No entry found, therefore we create a new one */
+            
 /*         g_message("couldn't find matching cache entry"); */
+            
+            e = g_new(flxCacheEntry, 1);
+            e->cache = c;
+            e->time_event = NULL;
+            e->record = flx_record_ref(r);
+            FLX_LLIST_PREPEND(flxCacheEntry, by_name, t, e);
+            g_hash_table_replace(c->hash_table, e->record->key, t);
+            
+            /* Notify subscribers */
+            flx_subscription_notify(c->server, c->interface, e->record, FLX_SUBSCRIPTION_NEW);
+        } 
         
-        e = g_new(flxCacheEntry, 1);
-        e->cache = c;
-        e->time_event = NULL;
-        e->record = flx_record_ref(r);
-        FLX_LLIST_PREPEND(flxCacheEntry, by_name, t, e);
-        g_hash_table_replace(c->hash_table, e->record->key, t);
-    } 
-
-    e->origin = *a;
-    g_get_current_time(&e->timestamp);
-    next_expiry(c, e, 80);
-    e->state = FLX_CACHE_VALID;
-
-    return e;
+        e->origin = *a;
+        g_get_current_time(&e->timestamp);
+        next_expiry(c, e, 80);
+        e->state = FLX_CACHE_VALID;
+    }
 }
 
 void flx_cache_drop_key(flxCache *c, flxKey *k) {
diff --git a/cache.h b/cache.h
index f51e18e..e7c8e7e 100644 (file)
--- a/cache.h
+++ b/cache.h
@@ -33,7 +33,6 @@ struct flxCacheEntry {
     flxTimeEvent *time_event;
 
     FLX_LLIST_FIELDS(flxCacheEntry, by_name);
-    
 };
 
 struct _flxCache {
@@ -50,7 +49,7 @@ void flx_cache_free(flxCache *c);
 flxCacheEntry *flx_cache_lookup_key(flxCache *c, flxKey *k);
 flxCacheEntry *flx_cache_lookup_record(flxCache *c, flxRecord *r);
 
-flxCacheEntry *flx_cache_update(flxCache *c, flxRecord *r, gboolean unique, const flxAddress *a);
+void flx_cache_update(flxCache *c, flxRecord *r, gboolean unique, const flxAddress *a);
 
 void flx_cache_drop_key(flxCache *c, flxKey *k);
 void flx_cache_drop_record(flxCache *c,  flxRecord *r);
diff --git a/iface.c b/iface.c
index 5e6c94a..a7d210f 100644 (file)
--- a/iface.c
+++ b/iface.c
@@ -489,3 +489,33 @@ gboolean flx_interface_match(flxInterface *i, gint index, guchar protocol) {
     return TRUE;
 }
 
+
+void flx_interface_monitor_walk(flxInterfaceMonitor *m, gint interface, guchar protocol, flxInterfaceMonitorWalkCallback callback, gpointer userdata) {
+    g_assert(m);
+    g_assert(callback);
+    
+    if (interface > 0) {
+        if (protocol != AF_UNSPEC) {
+            flxInterface *i;
+            
+            if ((i = flx_interface_monitor_get_interface(m, interface, protocol)))
+                callback(m, i, userdata);
+            
+        } else {
+            flxHwInterface *hw;
+            flxInterface *i;
+
+            if ((hw = flx_interface_monitor_get_hw_interface(m, interface)))
+                for (i = hw->interfaces; i; i = i->by_hardware_next)
+                    if (flx_interface_match(i, interface, protocol))
+                        callback(m, i, userdata);
+        }
+        
+    } else {
+        flxInterface *i;
+        
+        for (i = m->interfaces; i; i = i->interface_next)
+            if (flx_interface_match(i, interface, protocol))
+                callback(m, i, userdata);
+    }
+}
diff --git a/iface.h b/iface.h
index 85e535c..b5c2708 100644 (file)
--- a/iface.h
+++ b/iface.h
@@ -92,4 +92,8 @@ gboolean flx_interface_address_relevant(flxInterfaceAddress *a);
 
 gboolean flx_interface_match(flxInterface *i, gint index, guchar protocol);
 
+typedef void (*flxInterfaceMonitorWalkCallback)(flxInterfaceMonitor *m, flxInterface *i, gpointer userdata);
+    
+void flx_interface_monitor_walk(flxInterfaceMonitor *m, gint index, guchar protocol, flxInterfaceMonitorWalkCallback callback, gpointer userdata);
+
 #endif
diff --git a/main.c b/main.c
index 47f3cd7..367dcd3 100644 (file)
--- a/main.c
+++ b/main.c
@@ -4,6 +4,7 @@
 
 #include "flx.h"
 #include "server.h"
+#include "subscribe.h"
 
 static gboolean quit_timeout(gpointer data) {
     g_main_loop_quit(data);
@@ -31,15 +32,36 @@ static gboolean dump_timeout(gpointer data) {
     return TRUE;
 }
 
+static void subscription(flxSubscription *s, flxRecord *r, gint interface, guchar protocol, flxSubscriptionEvent event, gpointer userdata) {
+    gchar *t;
+    
+    g_assert(s);
+    g_assert(r);
+    g_assert(interface > 0);
+    g_assert(protocol != AF_UNSPEC);
+
+    g_message("SUBSCRIPTION: record [%s] on %i.%i is %s", t = flx_record_to_string(r), interface, protocol,
+              event == FLX_SUBSCRIPTION_NEW ? "new" : (event == FLX_SUBSCRIPTION_CHANGE ? "changed" : "removed"));
+
+    g_free(t);
+}
+
+
 int main(int argc, char *argv[]) {
     flxServer *flx;
     gchar *r;
     GMainLoop *loop = NULL;
+    flxSubscription *s;
+    flxKey *k;
 
     flx = flx_server_new(NULL);
 
     flx_server_add_text(flx, 0, 0, AF_UNSPEC, FALSE, NULL, "hallo");
 
+    k = flx_key_new("_http._tcp.local.", FLX_DNS_CLASS_IN, FLX_DNS_TYPE_PTR);
+    s = flx_subscription_new(flx, k, 0, AF_UNSPEC, subscription, NULL);
+    flx_key_unref(k);
+
     loop = g_main_loop_new(NULL, FALSE);
     
     g_timeout_add(1000*60, quit_timeout, loop);
@@ -50,7 +72,7 @@ int main(int argc, char *argv[]) {
 
     g_main_loop_unref(loop);
 
-
+    flx_subscription_free(s);
     flx_server_free(flx);
     
     return 0;
index ebae9d3..db72ec3 100644 (file)
--- a/psched.c
+++ b/psched.c
@@ -214,7 +214,7 @@ static flxResponseJob* look_for_response(flxPacketScheduler *s, flxRecord *recor
     g_assert(record);
 
     for (rj = s->response_jobs; rj; rj = rj->jobs_next)
-        if (flx_record_equal(rj->record, record))
+        if (flx_record_equal_no_ttl(rj->record, record))
             return rj;
 
     return NULL;
@@ -270,7 +270,7 @@ void flx_packet_scheduler_drop_response(flxPacketScheduler *s, flxRecord *record
     g_assert(record);
 
     for  (rj = s->response_jobs; rj; rj = rj->jobs_next)
-        if (flx_record_equal(rj->record, record)) {
+        if (flx_record_equal_no_ttl(rj->record, record)) {
 
             if (!rj->done) {
                 GTimeVal tv;
diff --git a/rr.c b/rr.c
index c984934..4108c54 100644 (file)
--- a/rr.c
+++ b/rr.c
@@ -50,12 +50,11 @@ flxRecord *flx_record_new(flxKey *k, gconstpointer data, guint16 size, guint32 t
     
     g_assert(k);
     g_assert(data);
-    g_assert(size > 0);
 
     r = g_new(flxRecord, 1);
     r->ref = 1;
     r->key = flx_key_ref(k);
-    r->data = g_memdup(data, size);
+    r->data = size > 0 ? g_memdup(data, size) : NULL;
     r->size = size;
     r->ttl = ttl;
 
@@ -154,10 +153,17 @@ gchar *flx_record_to_string(flxRecord *r) {
         }
 
         case FLX_DNS_TYPE_TXT: {
-            g_assert(((guchar*) r->data)[0] == r->size-1);
 
-            memcpy(t, r->data+1, ((guchar*) r->data)[0]);
-            t[((guchar*) r->data)[0]] = 0;
+            if (r->size == 0)
+                t[0] = 0;
+            else {
+                guchar l = ((guchar*) r->data)[0];
+
+                if ((size_t) l+1 <= r->size) {
+                    memcpy(t, r->data+1, ((guchar*) r->data)[0]);
+                    t[((guchar*) r->data)[0]] = 0;
+                }
+            }
             break;
         }
 
@@ -203,7 +209,7 @@ gchar *flx_record_to_string(flxRecord *r) {
     }
 
     p = flx_key_to_string(r->key);
-    s = g_strdup_printf("[%s %s ; ttl=%u]", p, t, r->ttl);
+    s = g_strdup_printf("%s %s ; ttl=%u", p, t, r->ttl);
     g_free(p);
     
     return s;
@@ -224,12 +230,12 @@ guint flx_key_hash(const flxKey *k) {
     return g_str_hash(k->name) + k->type + k->class;
 }
 
-gboolean flx_record_equal(const flxRecord *a, const flxRecord *b) {
+gboolean flx_record_equal_no_ttl(const flxRecord *a, const flxRecord *b) {
     g_assert(a);
     g_assert(b);
 
     return flx_key_equal(a->key, b->key) &&
-        a->ttl == b->ttl &&
+/*        a->ttl == b->ttl && */
         a->size == b->size &&
-        memcmp(a->data, b->data, a->size) == 0;
+        (a->size == 0 || memcmp(a->data, b->data, a->size) == 0);
 }
diff --git a/rr.h b/rr.h
index a31b632..f17c36f 100644 (file)
--- a/rr.h
+++ b/rr.h
@@ -56,6 +56,6 @@ const gchar *flx_dns_type_to_string(guint16 type);
 gchar *flx_key_to_string(flxKey *k); /* g_free() the result! */
 gchar *flx_record_to_string(flxRecord *r);  /* g_free() the result! */
 
-gboolean flx_record_equal(const flxRecord *a, const flxRecord *b);
+gboolean flx_record_equal_no_ttl(const flxRecord *a, const flxRecord *b);
 
 #endif
index 0aaaed1..94c7b1d 100644 (file)
--- a/server.c
+++ b/server.c
@@ -8,7 +8,7 @@
 #include "util.h"
 #include "iface.h"
 #include "socket.h"
-
+#include "subscribe.h"
 
 static void handle_query_key(flxServer *s, flxKey *k, flxInterface *i, const flxAddress *a) {
     flxServerEntry *e;
@@ -71,7 +71,9 @@ static void handle_response(flxServer *s, flxDnsPacket *p, flxInterface *i, cons
         g_free(txt);
 
         flx_cache_update(i->cache, record, cache_flush, a);
-        flx_packet_scheduler_drop_response(i->scheduler, record);
+
+        if (record->ttl != 0)
+            flx_packet_scheduler_drop_response(i->scheduler, record);
         flx_record_unref(record);
     }
 }
@@ -258,10 +260,13 @@ flxServer *flx_server_new(GMainContext *c) {
         s->context = g_main_context_default();
     
     s->current_id = 1;
+
+    FLX_LLIST_HEAD_INIT(flxServerEntry, s->entries);
     s->rrset_by_id = g_hash_table_new(g_int_hash, g_int_equal);
     s->rrset_by_key = g_hash_table_new((GHashFunc) flx_key_hash, (GEqualFunc) flx_key_equal);
 
-    FLX_LLIST_HEAD_INIT(flxServerEntry, s->entries);
+    FLX_LLIST_HEAD_INIT(flxSubscription, s->subscriptions);
+    s->subscription_hashtable = g_hash_table_new((GHashFunc) flx_key_hash, (GEqualFunc) flx_key_equal);
 
     s->monitor = flx_interface_monitor_new(s);
     s->time_event_queue = flx_time_event_queue_new(s->context);
@@ -300,6 +305,10 @@ void flx_server_free(flxServer* s) {
     flx_interface_monitor_free(s->monitor);
     
     flx_server_remove(s, 0);
+
+    while (s->subscriptions)
+        flx_subscription_free(s->subscriptions);
+    g_hash_table_destroy(s->subscription_hashtable);
     
     g_hash_table_destroy(s->rrset_by_id);
     g_hash_table_destroy(s->rrset_by_key);
@@ -532,60 +541,36 @@ void flx_server_add_text(
     flx_server_add_full(s, id, interface, protocol, unique, name, FLX_DNS_CLASS_IN, FLX_DNS_TYPE_TXT, buf, l+1, FLX_DEFAULT_TTL);
 }
 
+static void post_query_callback(flxInterfaceMonitor *m, flxInterface *i, gpointer userdata) {
+    flxKey *k = userdata;
+
+    g_assert(m);
+    g_assert(i);
+    g_assert(k);
+
+    flx_interface_post_query(i, k);
+}
+
 void flx_server_post_query(flxServer *s, gint interface, guchar protocol, flxKey *key) {
     g_assert(s);
     g_assert(key);
-    
-    if (interface > 0) {
-        if (protocol != AF_UNSPEC) {
-            flxInterface *i;
-            
-            if ((i = flx_interface_monitor_get_interface(s->monitor, interface, protocol)))
-                flx_interface_post_query(i, key);
-        } else {
-            flxHwInterface *hw;
-            flxInterface *i;
-
-            if ((hw = flx_interface_monitor_get_hw_interface(s->monitor, interface)))
-                for (i = hw->interfaces; i; i = i->by_hardware_next)
-                    if (flx_interface_match(i, interface, protocol))
-                        flx_interface_post_query(i, key);
-        }
-        
-    } else {
-        flxInterface *i;
-        
-        for (i = s->monitor->interfaces; i; i = i->interface_next)
-            if (flx_interface_match(i, interface, protocol))
-                flx_interface_post_query(i, key);
-    }
+
+    flx_interface_monitor_walk(s->monitor, interface, protocol, post_query_callback, key);
+}
+
+static void post_response_callback(flxInterfaceMonitor *m, flxInterface *i, gpointer userdata) {
+    flxRecord *r = userdata;
+
+    g_assert(m);
+    g_assert(i);
+    g_assert(r);
+
+    flx_interface_post_response(i, r);
 }
 
 void flx_server_post_response(flxServer *s, gint interface, guchar protocol, flxRecord *record) {
     g_assert(s);
     g_assert(record);
-    
-    if (interface > 0) {
-        if (protocol != AF_UNSPEC) {
-            flxInterface *i;
-            
-            if ((i = flx_interface_monitor_get_interface(s->monitor, interface, protocol)))
-                flx_interface_post_response(i, record);
-        } else {
-            flxHwInterface *hw;
-            flxInterface *i;
-
-            if ((hw = flx_interface_monitor_get_hw_interface(s->monitor, interface)))
-                for (i = hw->interfaces; i; i = i->by_hardware_next)
-                    if (flx_interface_match(i, interface, protocol))
-                        flx_interface_post_response(i, record);
-        }
-        
-    } else {
-        flxInterface *i;
-        
-        for (i = s->monitor->interfaces; i; i = i->interface_next)
-            if (flx_interface_match(i, interface, protocol))
-                flx_interface_post_response(i, record);
-    }
+
+    flx_interface_monitor_walk(s->monitor, interface, protocol, post_response_callback, record);
 }
index b7addf6..d050bcc 100644 (file)
--- a/server.h
+++ b/server.h
@@ -9,6 +9,7 @@ typedef struct _flxServerEntry flxServerEntry;
 #include "llist.h"
 #include "timeeventq.h"
 #include "announce.h"
+#include "subscribe.h"
 
 struct _flxServerEntry {
     flxRecord *record;
@@ -31,10 +32,12 @@ struct _flxServer {
 
     gint current_id;
     
+    FLX_LLIST_HEAD(flxServerEntry, entries);
     GHashTable *rrset_by_id;
     GHashTable *rrset_by_key;
 
-    FLX_LLIST_HEAD(flxServerEntry, entries);
+    FLX_LLIST_HEAD(flxSubscription, subscriptions);
+    GHashTable *subscription_hashtable;
 
     flxTimeEventQueue *time_event_queue;
     
diff --git a/subscribe.c b/subscribe.c
new file mode 100644 (file)
index 0000000..3abe464
--- /dev/null
@@ -0,0 +1,103 @@
+#include "subscribe.h"
+#include "util.h"
+
+static void elapse(flxTimeEvent *e, void *userdata) {
+    flxSubscription *s = userdata;
+    GTimeVal tv;
+    gchar *t;
+    
+    g_assert(s);
+
+    flx_server_post_query(s->server, s->interface, s->protocol, s->key);
+
+    if (s->n_query++ <= 8)
+        s->sec_delay *= 2;
+
+    g_message("%i. Continuous querying for %s", s->n_query, t = flx_key_to_string(s->key));
+    g_free(t);
+
+    
+    flx_elapse_time(&tv, s->sec_delay*1000, 0);
+    flx_time_event_queue_update(s->server->time_event_queue, s->time_event, &tv);
+}
+
+static void scan_cache_callback(flxInterfaceMonitor *m, flxInterface *i, gpointer userdata) {
+    flxSubscription *s = userdata;
+    flxCacheEntry *e;
+
+    g_assert(m);
+    g_assert(i);
+    g_assert(s);
+
+    for (e = flx_cache_lookup_key(i->cache, s->key); e; e = e->by_name_next)
+        s->callback(s, e->record, i->hardware->index, i->protocol, FLX_SUBSCRIPTION_NEW, s->userdata);
+}
+
+flxSubscription *flx_subscription_new(flxServer *server, flxKey *key, gint interface, guchar protocol, flxSubscriptionCallback callback, gpointer userdata) {
+    flxSubscription *s, *t;
+    GTimeVal tv;
+
+    g_assert(server);
+    g_assert(key);
+    g_assert(callback);
+
+    s = g_new(flxSubscription, 1);
+    s->server = server;
+    s->key = flx_key_ref(key);
+    s->interface = interface;
+    s->protocol = protocol;
+    s->callback = callback;
+    s->userdata = userdata;
+    s->n_query = 1;
+    s->sec_delay = 1;
+
+    flx_server_post_query(s->server, s->interface, s->protocol, s->key);
+    
+    flx_elapse_time(&tv, s->sec_delay*1000, 0);
+    s->time_event = flx_time_event_queue_add(server->time_event_queue, &tv, elapse, s);
+
+    FLX_LLIST_PREPEND(flxSubscription, subscriptions, server->subscriptions, s);
+
+    /* Add the new entry to the subscription hash table */
+    t = g_hash_table_lookup(server->subscription_hashtable, key);
+    FLX_LLIST_PREPEND(flxSubscription, by_key, t, s);
+    g_hash_table_replace(server->subscription_hashtable, key, t);
+
+    /* Scan the caches */
+    flx_interface_monitor_walk(s->server->monitor, s->interface, s->protocol, scan_cache_callback, s);
+    
+    return s;
+}
+
+void flx_subscription_free(flxSubscription *s) {
+    flxSubscription *t;
+    
+    g_assert(s);
+
+    FLX_LLIST_REMOVE(flxSubscription, subscriptions, s->server->subscriptions, s);
+
+    t = g_hash_table_lookup(s->server->subscription_hashtable, s->key);
+    FLX_LLIST_REMOVE(flxSubscription, by_key, t, s);
+    if (t)
+        g_hash_table_replace(s->server->subscription_hashtable, t->key, t);
+    else
+        g_hash_table_remove(s->server->subscription_hashtable, s->key);
+    
+    flx_time_event_queue_remove(s->server->time_event_queue, s->time_event);
+    flx_key_unref(s->key);
+
+    
+    g_free(s);
+}
+
+void flx_subscription_notify(flxServer *server, flxInterface *i, flxRecord *record, flxSubscriptionEvent event) {
+    flxSubscription *s;
+    
+    g_assert(server);
+    g_assert(record);
+
+    for (s = g_hash_table_lookup(server->subscription_hashtable, record->key); s; s = s->by_key_next)
+        if (flx_interface_match(i, s->interface, s->protocol))
+            s->callback(s, record, i->hardware->index, i->protocol, event, s->userdata);
+    
+}
diff --git a/subscribe.h b/subscribe.h
new file mode 100644 (file)
index 0000000..75818c7
--- /dev/null
@@ -0,0 +1,39 @@
+#ifndef foosubscribehfoo
+#define foosubscribehfoo
+
+typedef struct _flxSubscription flxSubscription;
+
+#include "llist.h"
+#include "server.h"
+
+typedef enum {
+    FLX_SUBSCRIPTION_NEW,
+    FLX_SUBSCRIPTION_REMOVE,
+    FLX_SUBSCRIPTION_CHANGE
+} flxSubscriptionEvent;
+
+typedef void (*flxSubscriptionCallback)(flxSubscription *s, flxRecord *record, gint interface, guchar protocol, flxSubscriptionEvent event, gpointer userdata);
+
+struct _flxSubscription {
+    flxServer *server;
+    flxKey *key;
+    gint interface;
+    guchar protocol;
+    gint n_query;
+    guint sec_delay;
+
+    flxTimeEvent *time_event;
+
+    flxSubscriptionCallback callback;
+    gpointer userdata;
+
+    FLX_LLIST_FIELDS(flxSubscription, subscriptions);
+    FLX_LLIST_FIELDS(flxSubscription, by_key);
+};
+
+flxSubscription *flx_subscription_new(flxServer *s, flxKey *key, gint interface, guchar protocol, flxSubscriptionCallback callback, gpointer userdata);
+void flx_subscription_free(flxSubscription *s);
+
+void flx_subscription_notify(flxServer *s, flxInterface *i, flxRecord *record, flxSubscriptionEvent event);
+
+#endif