domain-control: added domain-control plugin (modified decision-proto).
[profile/ivi/murphy.git] / src / plugins / decision-proto / decision.c
1 #include <errno.h>
2
3 #include <murphy/common/mm.h>
4 #include <murphy/common/log.h>
5
6 #include "message.h"
7 #include "proxy.h"
8 #include "table.h"
9 #include "notify.h"
10 #include "decision.h"
11
12 static int create_transports(pdp_t *pdp);
13 static void destroy_transports(pdp_t *pdp);
14
15 pdp_t *create_decision(mrp_context_t *ctx, const char *address)
16 {
17     pdp_t *pdp;
18
19     pdp = mrp_allocz(sizeof(*pdp));
20
21     if (pdp != NULL) {
22         pdp->ctx     = ctx;
23         pdp->address = address;
24
25         if (init_proxies(pdp) && init_tables(pdp) && create_transports(pdp))
26             return pdp;
27         else
28             destroy_decision(pdp);
29     }
30
31     return NULL;
32 }
33
34
35 void destroy_decision(pdp_t *pdp)
36 {
37     if (pdp != NULL) {
38         destroy_proxies(pdp);
39         destroy_tables(pdp);
40         destroy_transports(pdp);
41
42         mrp_free(pdp);
43     }
44 }
45
46
47 static void notify_cb(mrp_mainloop_t *ml, mrp_deferred_t *d, void *user_data)
48 {
49     pdp_t *pdp = (pdp_t *)user_data;
50
51     MRP_UNUSED(ml);
52
53     mrp_disable_deferred(d);
54     pdp->notify_scheduled = FALSE;
55     notify_table_changes(pdp);
56 }
57
58
59 void schedule_notification(pdp_t *pdp)
60 {
61
62     if (pdp->notify == NULL)
63         pdp->notify = mrp_add_deferred(pdp->ctx->ml, notify_cb, pdp);
64
65     if (!pdp->notify_scheduled) {
66         mrp_debug("scheduling client notification");
67         mrp_enable_deferred(pdp->notify);
68     }
69 }
70
71
72 static void send_ack_reply(mrp_transport_t *t, uint32_t seq)
73 {
74     mrp_msg_t *msg;
75
76     msg = create_ack_message(seq);
77
78     if (msg != NULL) {
79         mrp_transport_send(t, msg);
80         mrp_msg_unref(msg);
81     }
82 }
83
84
85 static void send_nak_reply(mrp_transport_t *t, uint32_t seq, int error,
86                            const char *errmsg)
87 {
88     mrp_msg_t *msg;
89
90     msg = create_nak_message(seq, error, errmsg);
91
92     if (msg != NULL) {
93         mrp_transport_send(t, msg);
94         mrp_msg_unref(msg);
95     }
96 }
97
98
99 static int process_register_request(pep_proxy_t *proxy, mrp_msg_t *req,
100                                     uint32_t seq)
101 {
102     mrp_transport_t *t = proxy->t;
103     char            *name;
104     uint16_t         utable, uwatch, ucolumn;
105     int              ntable, nwatch, ncolumn;
106     int              error;
107     const char      *errmsg;
108
109     if (mrp_msg_get(req,
110                     MRP_PEPMSG_STRING(NAME   , &name   ),
111                     MRP_PEPMSG_UINT16(NTABLE , &utable ),
112                     MRP_PEPMSG_UINT16(NWATCH , &uwatch ),
113                     MRP_PEPMSG_UINT16(NCOLDEF, &ucolumn),
114                     MRP_MSG_END)) {
115         mrp_pep_table_t  tables[utable], watches[uwatch];
116         mqi_column_def_t columns[ucolumn];
117
118         ntable  = utable;
119         nwatch  = uwatch;
120         ncolumn = ucolumn;
121
122         if (decode_register_message(req, tables, ntable, watches, nwatch,
123                                     columns, ncolumn)) {
124             if (register_proxy(proxy, name, tables, ntable, watches, nwatch,
125                                &error, &errmsg)) {
126                 send_ack_reply(t, seq);
127                 proxy->notify_all = TRUE;
128                 schedule_notification(proxy->pdp);
129
130                 return TRUE;
131             }
132         }
133         else
134             goto malformed;
135     }
136     else {
137     malformed:
138         error  = EINVAL;
139         errmsg = "malformed register message";
140     }
141
142     send_nak_reply(t, seq, error, errmsg);
143
144     return FALSE;
145 }
146
147
148 static void process_unregister_request(pep_proxy_t *proxy, uint32_t seq)
149 {
150     send_ack_reply(proxy->t, seq);
151     unregister_proxy(proxy);
152 }
153
154
155 static void process_set_request(pep_proxy_t *proxy, mrp_msg_t *req,
156                                 uint32_t seq)
157 {
158 #if 1
159     uint16_t    utable, uvalue, tblid, nrow;
160     int         ntable, nvalue, i;
161     int         error;
162     const char *errmsg;
163     void       *it;
164
165     it = NULL;
166
167     if (mrp_msg_iterate_get(req, &it,
168                     MRP_PEPMSG_UINT16(NCHANGE, &utable),
169                     MRP_PEPMSG_UINT16(NTOTAL , &uvalue),
170                     MRP_MSG_END)) {
171         mrp_pep_data_t  data[utable], *d;
172         mrp_pep_value_t values[uvalue], *v;
173
174         ntable = utable;
175         nvalue = uvalue;
176         d      = data;
177         v      = values;
178
179         for (i = 0; i < ntable; i++) {
180             if (!mrp_msg_iterate_get(req, &it,
181                                      MRP_PEPMSG_UINT16(TBLID, &tblid),
182                                      MRP_PEPMSG_UINT16(NROW , &nrow),
183                                      MRP_MSG_END)) {
184                 error  = EINVAL;
185                 errmsg = "malformed set message";
186                 goto reply_nak;
187             }
188
189             if (tblid >= proxy->ntable) {
190                 error  = ENOENT;
191                 errmsg = "invalid table id";
192                 goto reply_nak;
193             }
194
195             d->id      = tblid;
196             d->columns = v;
197             d->coldefs = proxy->tables[d->id].columns;
198             d->ncolumn = proxy->tables[d->id].ncolumn;
199             d->nrow    = nrow;
200
201             if (nvalue < d->ncolumn * d->nrow) {
202                 error  = EINVAL;
203                 errmsg = "invalid set message";
204                 goto reply_nak;
205             }
206
207             if (!decode_set_message(req, &it, d)) {
208                 error  = EINVAL;
209                 errmsg = "invalid set message";
210                 goto reply_nak;
211             }
212
213             v += d->ncolumn * d->nrow;
214             d++;
215         }
216
217         if (set_proxy_tables(proxy, data, ntable, &error, &errmsg)) {
218             send_ack_reply(proxy->t, seq);
219
220             return;
221         }
222     }
223
224  reply_nak:
225     send_nak_reply(proxy->t, seq, error, errmsg);
226 #else
227     uint16_t    utable, uvalue;
228     int         ntable, nvalue;
229     int         error;
230     const char *errmsg;
231
232     if (mrp_msg_get(req,
233                     MRP_PEPMSG_UINT16(NTABLE, &utable),
234                     MRP_PEPMSG_UINT16(NTOTAL, &uvalue),
235                     MRP_MSG_END)) {
236         mrp_pep_data_t  tables[utable];
237         mrp_pep_value_t values[uvalue];
238
239         ntable = utable;
240         nvalue = uvalue;
241
242         if (decode_set_message(req, tables, ntable, values, nvalue)) {
243             if (set_proxy_tables(proxy, tables, ntable, &error, &errmsg)) {
244                 send_ack_reply(proxy->t, seq);
245
246                 return;
247             }
248         }
249         else
250             goto malformed;
251     }
252     else {
253     malformed:
254         error  = EINVAL;
255         errmsg = "malformed set message";
256     }
257
258     send_nak_reply(proxy->t, seq, error, errmsg);
259 #endif
260 }
261
262
263 static void recv_cb(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
264 {
265     pep_proxy_t *proxy = (pep_proxy_t *)user_data;
266     char        *name  = proxy && proxy->name ? proxy->name : "<unknown>";
267     uint16_t     type;
268     uint32_t     seq;
269
270     /*
271       mrp_log_info("Message from client %p:", proxy);
272       mrp_msg_dump(msg, stdout);
273     */
274
275     if (!mrp_msg_get(msg,
276                      MRP_PEPMSG_UINT16(MSGTYPE, &type),
277                      MRP_PEPMSG_UINT32(MSGSEQ , &seq ),
278                      MRP_MSG_END)) {
279         mrp_log_error("Malformed message from client %s.", name);
280         send_nak_reply(t, 0, EINVAL, "malformed message");
281     }
282     else {
283         switch (type) {
284         case MRP_PEPMSG_REGISTER:
285             if (!process_register_request(proxy, msg, seq))
286                 destroy_proxy(proxy);
287             break;
288
289         case MRP_PEPMSG_UNREGISTER:
290             process_unregister_request(proxy, seq);
291             break;
292
293         case MRP_PEPMSG_SET:
294             process_set_request(proxy, msg, seq);
295             break;
296
297         default:
298             break;
299         }
300     }
301 }
302
303
304 static void connect_cb(mrp_transport_t *ext, void *user_data)
305 {
306     pdp_t       *pdp = (pdp_t *)user_data;
307     pep_proxy_t *proxy;
308     int          flags;
309
310     proxy = create_proxy(pdp);
311
312     if (proxy != NULL) {
313         flags    = MRP_TRANSPORT_REUSEADDR | MRP_TRANSPORT_NONBLOCK;
314         proxy->t = mrp_transport_accept(ext, proxy, flags);
315
316         if (proxy->t != NULL)
317             mrp_log_info("Accepted new client connection.");
318         else {
319             mrp_log_error("Failed to accept new client connection.");
320             destroy_proxy(proxy);
321         }
322     }
323 }
324
325
326 static void closed_cb(mrp_transport_t *t, int error, void *user_data)
327 {
328     pep_proxy_t *proxy = (pep_proxy_t *)user_data;
329     char        *name  = proxy && proxy->name ? proxy->name : "<unknown>";
330
331     MRP_UNUSED(t);
332
333     if (error)
334         mrp_log_error("Transport to client %s closed (%d: %s).",
335                       name, error, strerror(error));
336     else
337         mrp_log_info("Transport to client %s closed.", name);
338
339     mrp_log_info("Destroying client %s.", name);
340     destroy_proxy(proxy);
341 }
342
343
344 static int create_ext_transport(pdp_t *pdp)
345 {
346     static mrp_transport_evt_t evt = {
347         .closed      = closed_cb,
348         .recvmsg     = recv_cb,
349         .recvmsgfrom = NULL,
350         .connection  = connect_cb,
351     };
352
353     mrp_transport_t *t;
354     mrp_sockaddr_t   addr;
355     socklen_t        addrlen;
356     int              flags;
357     const char      *type;
358
359     t       = NULL;
360     addrlen = mrp_transport_resolve(NULL, pdp->address,
361                                     &addr, sizeof(addr), &type);
362
363     if (addrlen > 0) {
364         flags = MRP_TRANSPORT_REUSEADDR;
365         t     = mrp_transport_create(pdp->ctx->ml, type, &evt, pdp, flags);
366
367         if (t != NULL) {
368             if (mrp_transport_bind(t, &addr, addrlen) &&
369                 mrp_transport_listen(t, 4)) {
370                 mrp_log_info("Listening on transport %s...", pdp->address);
371                 pdp->ext = t;
372
373                 return TRUE;
374             }
375             else
376                 mrp_log_error("Failed to bind transport to %s.", pdp->address);
377         }
378         else
379             mrp_log_error("Failed to create transport for %s.", pdp->address);
380     }
381     else
382         mrp_log_error("Invalid transport address %s.", pdp->address);
383
384     return FALSE;
385 }
386
387
388 static void destroy_ext_transport(pdp_t *pdp)
389 {
390     if (pdp != NULL) {
391         mrp_transport_destroy(pdp->ext);
392         pdp->ext = NULL;
393     }
394 }
395
396
397 static int create_transports(pdp_t *pdp)
398 {
399     return create_ext_transport(pdp);
400 }
401
402
403 static void destroy_transports(pdp_t *pdp)
404 {
405     destroy_ext_transport(pdp);
406 }