3 #include <murphy/common/mm.h>
4 #include <murphy/common/log.h>
12 static int create_transports(pdp_t *pdp);
13 static void destroy_transports(pdp_t *pdp);
15 pdp_t *create_decision(mrp_context_t *ctx, const char *address)
19 pdp = mrp_allocz(sizeof(*pdp));
23 pdp->address = address;
25 if (init_proxies(pdp) && init_tables(pdp) && create_transports(pdp))
28 destroy_decision(pdp);
35 void destroy_decision(pdp_t *pdp)
40 destroy_transports(pdp);
47 static void notify_cb(mrp_mainloop_t *ml, mrp_deferred_t *d, void *user_data)
49 pdp_t *pdp = (pdp_t *)user_data;
53 mrp_disable_deferred(d);
54 pdp->notify_scheduled = FALSE;
55 notify_table_changes(pdp);
59 void schedule_notification(pdp_t *pdp)
62 if (pdp->notify == NULL)
63 pdp->notify = mrp_add_deferred(pdp->ctx->ml, notify_cb, pdp);
65 if (!pdp->notify_scheduled) {
66 mrp_debug("scheduling client notification");
67 mrp_enable_deferred(pdp->notify);
72 static void send_ack_reply(mrp_transport_t *t, uint32_t seq)
76 msg = create_ack_message(seq);
79 mrp_transport_send(t, msg);
85 static void send_nak_reply(mrp_transport_t *t, uint32_t seq, int error,
90 msg = create_nak_message(seq, error, errmsg);
93 mrp_transport_send(t, msg);
99 static int process_register_request(pep_proxy_t *proxy, mrp_msg_t *req,
102 mrp_transport_t *t = proxy->t;
104 uint16_t utable, uwatch, ucolumn;
105 int ntable, nwatch, ncolumn;
110 MRP_PEPMSG_STRING(NAME , &name ),
111 MRP_PEPMSG_UINT16(NTABLE , &utable ),
112 MRP_PEPMSG_UINT16(NWATCH , &uwatch ),
113 MRP_PEPMSG_UINT16(NCOLDEF, &ucolumn),
115 mrp_pep_table_t tables[utable], watches[uwatch];
116 mqi_column_def_t columns[ucolumn];
122 if (decode_register_message(req, tables, ntable, watches, nwatch,
124 if (register_proxy(proxy, name, tables, ntable, watches, nwatch,
126 send_ack_reply(t, seq);
127 proxy->notify_all = TRUE;
128 schedule_notification(proxy->pdp);
139 errmsg = "malformed register message";
142 send_nak_reply(t, seq, error, errmsg);
148 static void process_unregister_request(pep_proxy_t *proxy, uint32_t seq)
150 send_ack_reply(proxy->t, seq);
151 unregister_proxy(proxy);
155 static void process_set_request(pep_proxy_t *proxy, mrp_msg_t *req,
159 uint16_t utable, uvalue, tblid, nrow;
160 int ntable, nvalue, i;
167 if (mrp_msg_iterate_get(req, &it,
168 MRP_PEPMSG_UINT16(NCHANGE, &utable),
169 MRP_PEPMSG_UINT16(NTOTAL , &uvalue),
171 mrp_pep_data_t data[utable], *d;
172 mrp_pep_value_t values[uvalue], *v;
179 for (i = 0; i < ntable; i++) {
180 if (!mrp_msg_iterate_get(req, &it,
181 MRP_PEPMSG_UINT16(TBLID, &tblid),
182 MRP_PEPMSG_UINT16(NROW , &nrow),
185 errmsg = "malformed set message";
189 if (tblid >= proxy->ntable) {
191 errmsg = "invalid table id";
197 d->coldefs = proxy->tables[d->id].columns;
198 d->ncolumn = proxy->tables[d->id].ncolumn;
201 if (nvalue < d->ncolumn * d->nrow) {
203 errmsg = "invalid set message";
207 if (!decode_set_message(req, &it, d)) {
209 errmsg = "invalid set message";
213 v += d->ncolumn * d->nrow;
217 if (set_proxy_tables(proxy, data, ntable, &error, &errmsg)) {
218 send_ack_reply(proxy->t, seq);
225 send_nak_reply(proxy->t, seq, error, errmsg);
227 uint16_t utable, uvalue;
233 MRP_PEPMSG_UINT16(NTABLE, &utable),
234 MRP_PEPMSG_UINT16(NTOTAL, &uvalue),
236 mrp_pep_data_t tables[utable];
237 mrp_pep_value_t values[uvalue];
242 if (decode_set_message(req, tables, ntable, values, nvalue)) {
243 if (set_proxy_tables(proxy, tables, ntable, &error, &errmsg)) {
244 send_ack_reply(proxy->t, seq);
255 errmsg = "malformed set message";
258 send_nak_reply(proxy->t, seq, error, errmsg);
263 static void recv_cb(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
265 pep_proxy_t *proxy = (pep_proxy_t *)user_data;
266 char *name = proxy && proxy->name ? proxy->name : "<unknown>";
271 mrp_log_info("Message from client %p:", proxy);
272 mrp_msg_dump(msg, stdout);
275 if (!mrp_msg_get(msg,
276 MRP_PEPMSG_UINT16(MSGTYPE, &type),
277 MRP_PEPMSG_UINT32(MSGSEQ , &seq ),
279 mrp_log_error("Malformed message from client %s.", name);
280 send_nak_reply(t, 0, EINVAL, "malformed message");
284 case MRP_PEPMSG_REGISTER:
285 if (!process_register_request(proxy, msg, seq))
286 destroy_proxy(proxy);
289 case MRP_PEPMSG_UNREGISTER:
290 process_unregister_request(proxy, seq);
294 process_set_request(proxy, msg, seq);
304 static void connect_cb(mrp_transport_t *ext, void *user_data)
306 pdp_t *pdp = (pdp_t *)user_data;
310 proxy = create_proxy(pdp);
313 flags = MRP_TRANSPORT_REUSEADDR | MRP_TRANSPORT_NONBLOCK;
314 proxy->t = mrp_transport_accept(ext, proxy, flags);
316 if (proxy->t != NULL)
317 mrp_log_info("Accepted new client connection.");
319 mrp_log_error("Failed to accept new client connection.");
320 destroy_proxy(proxy);
326 static void closed_cb(mrp_transport_t *t, int error, void *user_data)
328 pep_proxy_t *proxy = (pep_proxy_t *)user_data;
329 char *name = proxy && proxy->name ? proxy->name : "<unknown>";
334 mrp_log_error("Transport to client %s closed (%d: %s).",
335 name, error, strerror(error));
337 mrp_log_info("Transport to client %s closed.", name);
339 mrp_log_info("Destroying client %s.", name);
340 destroy_proxy(proxy);
344 static int create_ext_transport(pdp_t *pdp)
346 static mrp_transport_evt_t evt = {
350 .connection = connect_cb,
360 addrlen = mrp_transport_resolve(NULL, pdp->address,
361 &addr, sizeof(addr), &type);
364 flags = MRP_TRANSPORT_REUSEADDR;
365 t = mrp_transport_create(pdp->ctx->ml, type, &evt, pdp, flags);
368 if (mrp_transport_bind(t, &addr, addrlen) &&
369 mrp_transport_listen(t, 4)) {
370 mrp_log_info("Listening on transport %s...", pdp->address);
376 mrp_log_error("Failed to bind transport to %s.", pdp->address);
379 mrp_log_error("Failed to create transport for %s.", pdp->address);
382 mrp_log_error("Invalid transport address %s.", pdp->address);
388 static void destroy_ext_transport(pdp_t *pdp)
391 mrp_transport_destroy(pdp->ext);
397 static int create_transports(pdp_t *pdp)
399 return create_ext_transport(pdp);
403 static void destroy_transports(pdp_t *pdp)
405 destroy_ext_transport(pdp);