domain-control: added domain-control plugin (modified decision-proto).
[profile/ivi/murphy.git] / src / plugins / decision-proto / client.c
1 #include <errno.h>
2
3 #include <murphy/common/mm.h>
4 #include <murphy/common/log.h>
5 #include <murphy/common/mainloop.h>
6 #include <murphy/common/transport.h>
7
8 #include "decision-types.h"
9 #include "table.h"
10 #include "message.h"
11 #include "client.h"
12
13
14 /*
15  * mark an enforcement point busy (typically while executing a callback)
16  */
17
18 #define PEP_MARK_BUSY(pep, ...) do {                \
19         (pep)->busy++;                              \
20         __VA_ARGS__                                 \
21         (pep)->busy--;                              \
22         check_destroyed(pep);                       \
23     } while (0)
24
25
26 /*
27  * a pending request
28  */
29
30 typedef struct {
31     mrp_list_hook_t      hook;           /* hook to pending request queue */
32     uint32_t             seqno;          /* sequence number/request id */
33     mrp_pep_status_cb_t  cb;             /* callback to call upon completion */
34     void                *user_data;      /* opaque callback data */
35 } pending_request_t;
36
37
38 static void recv_cb(mrp_transport_t *t, mrp_msg_t *msg, void *user_data);
39 static void recvfrom_cb(mrp_transport_t *t, mrp_msg_t *msg,
40                         mrp_sockaddr_t *addr, socklen_t addrlen,
41                         void *user_data);
42 static void closed_cb(mrp_transport_t *t, int error, void *user_data);
43
44
45 static int queue_pending(mrp_pep_t *pep, uint32_t seq,
46                          mrp_pep_status_cb_t cb, void *user_data);
47 static int notify_pending(mrp_pep_t *pep, uint32_t seq, int error,
48                           const char *msg);
49 static void purge_pending(mrp_pep_t *pep);
50
51
52
53
54 mrp_pep_t *mrp_pep_create(const char *name, mrp_mainloop_t *ml,
55                           mrp_pep_table_t *owned_tables, int nowned,
56                           mrp_pep_table_t *watched_tables, int nwatched,
57                           mrp_pep_connect_cb_t connect_cb,
58                           mrp_pep_data_cb_t data_cb, void *user_data)
59 {
60     mrp_pep_t *pep;
61
62     pep = mrp_allocz(sizeof(*pep));
63
64     if (pep != NULL) {
65         mrp_list_init(&pep->pending);
66         pep->ml = ml;
67
68         pep->name    = mrp_strdup(name);
69         pep->owned   = mrp_allocz_array(typeof(*pep->owned), nowned);
70         pep->watched = mrp_allocz_array(typeof(*pep->watched), nwatched);
71
72         if (pep->name != NULL && pep->owned != NULL && pep->watched != NULL) {
73             if (copy_pep_tables(owned_tables, pep->owned, nowned)) {
74                 pep->nowned = nowned;
75                 if (copy_pep_tables(watched_tables, pep->watched, nwatched)) {
76                     pep->nwatched   = nwatched;
77                     pep->connect_cb = connect_cb;
78                     pep->data_cb    = data_cb;
79                     pep->user_data  = user_data;
80                     pep->seqno      = 1;
81
82                     return pep;
83                 }
84             }
85         }
86
87         mrp_pep_destroy(pep);
88     }
89
90     return NULL;
91 }
92
93
94 static void destroy_pep(mrp_pep_t *pep)
95 {
96     mrp_free(pep->name);
97
98     free_pep_tables(pep->owned, pep->nowned);
99     free_pep_tables(pep->watched, pep->nwatched);
100
101     purge_pending(pep);
102
103     mrp_free(pep);
104 }
105
106
107 static inline void check_destroyed(mrp_pep_t *pep)
108 {
109     if (pep->destroyed && pep->busy <= 0) {
110         destroy_pep(pep);
111     }
112 }
113
114
115 void mrp_pep_destroy(mrp_pep_t *pep)
116 {
117     if (pep != NULL) {
118         mrp_pep_disconnect(pep);
119
120         if (pep->busy <= 0)
121             destroy_pep(pep);
122         else
123             pep->destroyed = TRUE;
124     }
125 }
126
127
128 static void notify_disconnect(mrp_pep_t *pep, uint32_t errcode,
129                               const char *errmsg)
130 {
131     PEP_MARK_BUSY(pep, {
132             pep->connected = FALSE;
133             pep->connect_cb(pep, FALSE, errcode, errmsg, pep->user_data);
134         });
135 }
136
137
138 static void notify_connect(mrp_pep_t *pep)
139 {
140     PEP_MARK_BUSY(pep, {
141             pep->connected = TRUE;
142             pep->connect_cb(pep, TRUE, 0, NULL, pep->user_data);
143         });
144 }
145
146
147 static int pep_register(mrp_pep_t *pep)
148 {
149     mrp_msg_t *msg;
150     int        success;
151
152     msg = create_register_message(pep);
153
154     if (msg != NULL) {
155         success = mrp_transport_send(pep->t, msg);
156         mrp_msg_unref(msg);
157     }
158     else
159         success = FALSE;
160
161     return success;
162 }
163
164
165 int mrp_pep_connect(mrp_pep_t *pep, const char *address)
166 {
167     static mrp_transport_evt_t evt = {
168         .closed      = closed_cb,
169         .recvmsg     = recv_cb,
170         .recvmsgfrom = recvfrom_cb,
171     };
172
173     mrp_sockaddr_t  addr;
174     socklen_t       addrlen;
175     const char     *type;
176
177     if (pep == NULL)
178         return FALSE;
179
180     addrlen = mrp_transport_resolve(NULL, address, &addr, sizeof(addr), &type);
181
182     if (addrlen > 0) {
183         pep->t = mrp_transport_create(pep->ml, type, &evt, pep, 0);
184
185         if (pep->t != NULL) {
186             if (mrp_transport_connect(pep->t, &addr, addrlen))
187                 if (pep_register(pep))
188                     return TRUE;
189
190             mrp_transport_destroy(pep->t);
191             pep->t = NULL;
192         }
193     }
194
195     return FALSE;
196 }
197
198
199 void mrp_pep_disconnect(mrp_pep_t *pep)
200 {
201     if (pep->t != NULL) {
202         mrp_transport_destroy(pep->t);
203         pep->t         = NULL;
204         pep->connected = FALSE;
205     }
206 }
207
208
209 int mrp_pep_set_data(mrp_pep_t *pep, mrp_pep_data_t *data, int ntable,
210                      mrp_pep_status_cb_t cb, void *user_data)
211 {
212     mrp_msg_t *msg;
213     uint32_t   seq = pep->seqno++;
214     int        success, i;
215
216     if (!pep->connected)
217         return FALSE;
218
219     for (i = 0; i < ntable; i++) {
220         if (data[i].id < 0 || data[i].id >= pep->nowned)
221             return FALSE;
222
223         data[i].coldefs = pep->owned[data[i].id].columns;
224         data[i].ncolumn = pep->owned[data[i].id].ncolumn;
225     }
226
227     msg = create_set_message(seq, data, ntable);
228
229     if (msg != NULL) {
230         success = mrp_transport_send(pep->t, msg);
231         mrp_msg_unref(msg);
232
233         if (success)
234             queue_pending(pep, seq, cb, user_data);
235
236         return success;
237     }
238     else
239         return FALSE;
240 }
241
242
243 static void process_ack(mrp_pep_t *pep, uint32_t seq)
244 {
245     if (seq != 0)
246         notify_pending(pep, seq, 0, NULL);
247     else
248         notify_connect(pep);
249 }
250
251
252 static void process_nak(mrp_pep_t *pep, uint32_t seq, int32_t err,
253                         const char *msg)
254 {
255     if (seq != 0)
256         notify_pending(pep, seq, err, msg);
257     else
258         notify_disconnect(pep, err, msg);
259 }
260
261
262 static void process_notify(mrp_pep_t *pep, mrp_msg_t *msg, uint32_t seq,
263                            int ntable, int ncolumn)
264 {
265     mrp_pep_table_t  *tbl;
266     mrp_pep_data_t    data[ntable], *d;
267     mrp_pep_value_t   values[ncolumn], *v;
268     mqi_column_def_t *cols;
269     void             *it;
270     int               ncol, i, j;
271     uint16_t          tblid;
272     uint16_t          nrow;
273
274     it = NULL;
275     d  = data;
276     v  = values;
277
278     for (i = 0; i < ntable; i++) {
279         if (!mrp_msg_iterate_get(msg, &it,
280                                  MRP_PEPMSG_UINT16(TBLID, &tblid),
281                                  MRP_PEPMSG_UINT16(NROW , &nrow ),
282                                  MRP_MSG_END))
283             return;
284
285         if (tblid >= pep->nwatched)
286             return;
287
288         tbl  = pep->watched + tblid;
289         cols = tbl->columns;
290         ncol = tbl->ncolumn;
291
292         d->id      = tblid;
293         d->columns = v;
294         d->coldefs = tbl->columns;
295         d->ncolumn = ncol;
296         d->nrow    = nrow;
297
298         if (!decode_notify_message(msg, &it, d))
299             return;
300
301         d++;
302         v += ncol * nrow;
303     }
304
305     pep->data_cb(pep, data, ntable, pep->user_data);
306 }
307
308
309 static void recv_cb(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
310 {
311     mrp_pep_t  *pep = (mrp_pep_t *)user_data;
312     uint16_t    type, nchange, ntotal;
313     uint32_t    seq;
314     int         ntable, ncolumn;
315     int32_t     error;
316     const char *errmsg;
317
318     MRP_UNUSED(t);
319
320     /*
321       mrp_log_info("Received message:");
322       mrp_msg_dump(msg, stdout);
323     */
324
325     if (!mrp_msg_get(msg,
326                      MRP_PEPMSG_UINT16(MSGTYPE, &type),
327                      MRP_PEPMSG_UINT32(MSGSEQ , &seq ),
328                      MRP_MSG_END)) {
329         mrp_pep_disconnect(pep);
330         notify_disconnect(pep, EINVAL, "malformed message from client");
331         return;
332     }
333
334     switch (type) {
335     case MRP_PEPMSG_ACK:
336         process_ack(pep, seq);
337         break;
338
339     case MRP_PEPMSG_NAK:
340         error  = EINVAL;
341         errmsg = "request failed, unknown error";
342
343         mrp_msg_get(msg,
344                     MRP_PEPMSG_SINT32(ERRCODE, &error),
345                     MRP_PEPMSG_STRING(ERRMSG , &errmsg),
346                     MRP_MSG_END);
347
348         process_nak(pep, seq, error, errmsg);
349         break;
350
351     case MRP_PEPMSG_NOTIFY:
352         if (mrp_msg_get(msg,
353                         MRP_PEPMSG_UINT16(NCHANGE, &nchange),
354                         MRP_PEPMSG_UINT16(NTOTAL , &ntotal),
355                         MRP_MSG_END)) {
356             ntable  = nchange;
357             ncolumn = ntotal;
358
359             process_notify(pep, msg, seq, ntable, ncolumn);
360         }
361         break;
362
363     default:
364         break;
365     }
366
367 }
368
369
370 static void recvfrom_cb(mrp_transport_t *t, mrp_msg_t *msg,
371                         mrp_sockaddr_t *addr, socklen_t addrlen,
372                         void *user_data)
373 {
374     MRP_UNUSED(addr);
375     MRP_UNUSED(addrlen);
376
377     /* XXX TODO:
378      *    This should neither be called nor be necessary to specify.
379      *    However, currently the transport layer mandates having to
380      *    give both recv and recvfrom event callbacks if no connection
381      *    event callback is given. However this is not correct because
382      *    on a client side one wants to be able to create a connection-
383      *    oriented transport without both connection and recvfrom event
384      *    callbacks. This needs to be fixed in transport by moving the
385      *    appropriate callback checks lower in the stack to the actual
386      *    transport backends.
387      */
388
389     mrp_log_error("Whoa... recvfrom called for a connected transport.");
390     exit(1);
391 }
392
393
394 static void closed_cb(mrp_transport_t *t, int error, void *user_data)
395 {
396     mrp_pep_t *pep = (mrp_pep_t *)user_data;
397
398     MRP_UNUSED(t);
399     MRP_UNUSED(pep);
400
401     if (error)
402         notify_disconnect(pep, error, strerror(error));
403     else
404         notify_disconnect(pep, ECONNRESET, "server has closed the connection");
405 }
406
407
408 static int queue_pending(mrp_pep_t *pep, uint32_t seq,
409                          mrp_pep_status_cb_t cb, void *user_data)
410 {
411     pending_request_t *pending;
412
413     pending = mrp_allocz(sizeof(*pending));
414
415     if (pending != NULL) {
416         mrp_list_init(&pending->hook);
417
418         pending->seqno     = seq;
419         pending->cb        = cb;
420         pending->user_data = user_data;
421
422         mrp_list_append(&pep->pending, &pending->hook);
423
424         return TRUE;
425     }
426     else
427         return FALSE;
428 }
429
430
431 static int notify_pending(mrp_pep_t *pep, uint32_t seq, int error,
432                           const char *msg)
433 {
434     mrp_list_hook_t   *p, *n;
435     pending_request_t *pending;
436
437     mrp_list_foreach(&pep->pending, p, n) {
438         pending = mrp_list_entry(p, typeof(*pending), hook);
439
440         if (pending->seqno == seq) {
441             PEP_MARK_BUSY(pep, {
442                     pending->cb(pep, error, msg, pending->user_data);
443                     mrp_list_delete(&pending->hook);
444                     mrp_free(pending);
445                 });
446
447             return TRUE;
448         }
449     }
450
451     return FALSE;
452 }
453
454
455 static void purge_pending(mrp_pep_t *pep)
456 {
457     mrp_list_hook_t   *p, *n;
458     pending_request_t *pending;
459
460     mrp_list_foreach(&pep->pending, p, n) {
461         pending = mrp_list_entry(p, typeof(*pending), hook);
462
463         mrp_list_delete(&pending->hook);
464         mrp_free(pending);
465     }
466 }