3 #include <murphy/common/mm.h>
4 #include <murphy/common/log.h>
5 #include <murphy/common/mainloop.h>
6 #include <murphy/common/transport.h>
8 #include "decision-types.h"
15 * mark an enforcement point busy (typically while executing a callback)
18 #define PEP_MARK_BUSY(pep, ...) do { \
22 check_destroyed(pep); \
31 mrp_list_hook_t hook; /* hook to pending request queue */
32 uint32_t seqno; /* sequence number/request id */
33 mrp_pep_status_cb_t cb; /* callback to call upon completion */
34 void *user_data; /* opaque callback data */
38 static void recv_cb(mrp_transport_t *t, mrp_msg_t *msg, void *user_data);
39 static void recvfrom_cb(mrp_transport_t *t, mrp_msg_t *msg,
40 mrp_sockaddr_t *addr, socklen_t addrlen,
42 static void closed_cb(mrp_transport_t *t, int error, void *user_data);
45 static int queue_pending(mrp_pep_t *pep, uint32_t seq,
46 mrp_pep_status_cb_t cb, void *user_data);
47 static int notify_pending(mrp_pep_t *pep, uint32_t seq, int error,
49 static void purge_pending(mrp_pep_t *pep);
54 mrp_pep_t *mrp_pep_create(const char *name, mrp_mainloop_t *ml,
55 mrp_pep_table_t *owned_tables, int nowned,
56 mrp_pep_table_t *watched_tables, int nwatched,
57 mrp_pep_connect_cb_t connect_cb,
58 mrp_pep_data_cb_t data_cb, void *user_data)
62 pep = mrp_allocz(sizeof(*pep));
65 mrp_list_init(&pep->pending);
68 pep->name = mrp_strdup(name);
69 pep->owned = mrp_allocz_array(typeof(*pep->owned), nowned);
70 pep->watched = mrp_allocz_array(typeof(*pep->watched), nwatched);
72 if (pep->name != NULL && pep->owned != NULL && pep->watched != NULL) {
73 if (copy_pep_tables(owned_tables, pep->owned, nowned)) {
75 if (copy_pep_tables(watched_tables, pep->watched, nwatched)) {
76 pep->nwatched = nwatched;
77 pep->connect_cb = connect_cb;
78 pep->data_cb = data_cb;
79 pep->user_data = user_data;
94 static void destroy_pep(mrp_pep_t *pep)
98 free_pep_tables(pep->owned, pep->nowned);
99 free_pep_tables(pep->watched, pep->nwatched);
107 static inline void check_destroyed(mrp_pep_t *pep)
109 if (pep->destroyed && pep->busy <= 0) {
115 void mrp_pep_destroy(mrp_pep_t *pep)
118 mrp_pep_disconnect(pep);
123 pep->destroyed = TRUE;
128 static void notify_disconnect(mrp_pep_t *pep, uint32_t errcode,
132 pep->connected = FALSE;
133 pep->connect_cb(pep, FALSE, errcode, errmsg, pep->user_data);
138 static void notify_connect(mrp_pep_t *pep)
141 pep->connected = TRUE;
142 pep->connect_cb(pep, TRUE, 0, NULL, pep->user_data);
147 static int pep_register(mrp_pep_t *pep)
152 msg = create_register_message(pep);
155 success = mrp_transport_send(pep->t, msg);
165 int mrp_pep_connect(mrp_pep_t *pep, const char *address)
167 static mrp_transport_evt_t evt = {
170 .recvmsgfrom = recvfrom_cb,
180 addrlen = mrp_transport_resolve(NULL, address, &addr, sizeof(addr), &type);
183 pep->t = mrp_transport_create(pep->ml, type, &evt, pep, 0);
185 if (pep->t != NULL) {
186 if (mrp_transport_connect(pep->t, &addr, addrlen))
187 if (pep_register(pep))
190 mrp_transport_destroy(pep->t);
199 void mrp_pep_disconnect(mrp_pep_t *pep)
201 if (pep->t != NULL) {
202 mrp_transport_destroy(pep->t);
204 pep->connected = FALSE;
209 int mrp_pep_set_data(mrp_pep_t *pep, mrp_pep_data_t *data, int ntable,
210 mrp_pep_status_cb_t cb, void *user_data)
213 uint32_t seq = pep->seqno++;
219 for (i = 0; i < ntable; i++) {
220 if (data[i].id < 0 || data[i].id >= pep->nowned)
223 data[i].coldefs = pep->owned[data[i].id].columns;
224 data[i].ncolumn = pep->owned[data[i].id].ncolumn;
227 msg = create_set_message(seq, data, ntable);
230 success = mrp_transport_send(pep->t, msg);
234 queue_pending(pep, seq, cb, user_data);
243 static void process_ack(mrp_pep_t *pep, uint32_t seq)
246 notify_pending(pep, seq, 0, NULL);
252 static void process_nak(mrp_pep_t *pep, uint32_t seq, int32_t err,
256 notify_pending(pep, seq, err, msg);
258 notify_disconnect(pep, err, msg);
262 static void process_notify(mrp_pep_t *pep, mrp_msg_t *msg, uint32_t seq,
263 int ntable, int ncolumn)
265 mrp_pep_table_t *tbl;
266 mrp_pep_data_t data[ntable], *d;
267 mrp_pep_value_t values[ncolumn], *v;
268 mqi_column_def_t *cols;
278 for (i = 0; i < ntable; i++) {
279 if (!mrp_msg_iterate_get(msg, &it,
280 MRP_PEPMSG_UINT16(TBLID, &tblid),
281 MRP_PEPMSG_UINT16(NROW , &nrow ),
285 if (tblid >= pep->nwatched)
288 tbl = pep->watched + tblid;
294 d->coldefs = tbl->columns;
298 if (!decode_notify_message(msg, &it, d))
305 pep->data_cb(pep, data, ntable, pep->user_data);
309 static void recv_cb(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
311 mrp_pep_t *pep = (mrp_pep_t *)user_data;
312 uint16_t type, nchange, ntotal;
321 mrp_log_info("Received message:");
322 mrp_msg_dump(msg, stdout);
325 if (!mrp_msg_get(msg,
326 MRP_PEPMSG_UINT16(MSGTYPE, &type),
327 MRP_PEPMSG_UINT32(MSGSEQ , &seq ),
329 mrp_pep_disconnect(pep);
330 notify_disconnect(pep, EINVAL, "malformed message from client");
336 process_ack(pep, seq);
341 errmsg = "request failed, unknown error";
344 MRP_PEPMSG_SINT32(ERRCODE, &error),
345 MRP_PEPMSG_STRING(ERRMSG , &errmsg),
348 process_nak(pep, seq, error, errmsg);
351 case MRP_PEPMSG_NOTIFY:
353 MRP_PEPMSG_UINT16(NCHANGE, &nchange),
354 MRP_PEPMSG_UINT16(NTOTAL , &ntotal),
359 process_notify(pep, msg, seq, ntable, ncolumn);
370 static void recvfrom_cb(mrp_transport_t *t, mrp_msg_t *msg,
371 mrp_sockaddr_t *addr, socklen_t addrlen,
378 * This should neither be called nor be necessary to specify.
379 * However, currently the transport layer mandates having to
380 * give both recv and recvfrom event callbacks if no connection
381 * event callback is given. However this is not correct because
382 * on a client side one wants to be able to create a connection-
383 * oriented transport without both connection and recvfrom event
384 * callbacks. This needs to be fixed in transport by moving the
385 * appropriate callback checks lower in the stack to the actual
386 * transport backends.
389 mrp_log_error("Whoa... recvfrom called for a connected transport.");
394 static void closed_cb(mrp_transport_t *t, int error, void *user_data)
396 mrp_pep_t *pep = (mrp_pep_t *)user_data;
402 notify_disconnect(pep, error, strerror(error));
404 notify_disconnect(pep, ECONNRESET, "server has closed the connection");
408 static int queue_pending(mrp_pep_t *pep, uint32_t seq,
409 mrp_pep_status_cb_t cb, void *user_data)
411 pending_request_t *pending;
413 pending = mrp_allocz(sizeof(*pending));
415 if (pending != NULL) {
416 mrp_list_init(&pending->hook);
418 pending->seqno = seq;
420 pending->user_data = user_data;
422 mrp_list_append(&pep->pending, &pending->hook);
431 static int notify_pending(mrp_pep_t *pep, uint32_t seq, int error,
434 mrp_list_hook_t *p, *n;
435 pending_request_t *pending;
437 mrp_list_foreach(&pep->pending, p, n) {
438 pending = mrp_list_entry(p, typeof(*pending), hook);
440 if (pending->seqno == seq) {
442 pending->cb(pep, error, msg, pending->user_data);
443 mrp_list_delete(&pending->hook);
455 static void purge_pending(mrp_pep_t *pep)
457 mrp_list_hook_t *p, *n;
458 pending_request_t *pending;
460 mrp_list_foreach(&pep->pending, p, n) {
461 pending = mrp_list_entry(p, typeof(*pending), hook);
463 mrp_list_delete(&pending->hook);