#include <errno.h>
#include <pulse/utf8.h>
+#include <pulse/timeval.h>
#include <pulsecore/pulsecore-config.h>
#include <pulsecore/module.h>
#include <pulsecore/llist.h>
#define INVALID_SEQNO (~(uint32_t)0)
#define INVALID_REQUEST (~(uint16_t)0)
+#define DISCONNECTED -1
+#define CONNECTED 0
+#define CONNECTING 1
+
#define PUSH_VALUE(msg, tag, typ, val) \
mrp_msg_append(msg, MRP_MSG_TAG_##typ(RESPROTO_##tag, val))
const char *atype;
pa_bool_t connected;
struct {
+ pa_time_event *evt;
+ pa_usec_t period;
+ } connect;
+ struct {
uint32_t request;
uint32_t reply;
} seqno;
#ifdef WITH_RESOURCES
static void resource_attribute_destroy(resource_interface *,
resource_attribute *);
-static pa_bool_t resource_transport_connect(resource_interface *);
+static int resource_transport_connect(resource_interface *);
static void resource_xport_closed_evt(mrp_transport_t *, int, void *);
static mrp_msg_t *resource_create_request(uint32_t, mrp_resproto_request_t);
static pa_bool_t resource_send_message(resource_interface *, mrp_msg_t *,
uint32_t, uint16_t, uint32_t);
-static pa_bool_t resource_set_create(struct userdata *, uint32_t,
- mir_direction, const char *,
- const char *, uint32_t, pa_proplist *);
-static pa_bool_t resource_set_destroy(struct userdata *, uint32_t);
-static pa_bool_t resource_set_acquire(struct userdata *, uint32_t, uint32_t);
+static pa_bool_t resource_set_create_node(struct userdata *, mir_node *,
+ pa_bool_t);
+static pa_bool_t resource_set_create_all(struct userdata *);
+static pa_bool_t resource_set_destroy_node(struct userdata *, uint32_t);
+static pa_bool_t resource_set_destroy_all(struct userdata *);
static pa_bool_t resource_push_attributes(mrp_msg_t *, resource_interface *,
pa_proplist *);
mrp_sockaddr_t *, socklen_t, void *);
static void resource_set_create_response(struct userdata *, mir_node *,
mrp_msg_t *, void **);
+static void resource_set_create_response_abort(struct userdata *,
+ mrp_msg_t *, void **);
static pa_bool_t resource_fetch_seqno(mrp_msg_t *, void **, uint32_t *);
static pa_bool_t resource_fetch_request(mrp_msg_t *, void **, uint16_t *);
mrp_resproto_state_t *);
static pa_bool_t resource_fetch_rset_mask(mrp_msg_t *, void **,
mrp_resproto_state_t *);
+
+static pa_bool_t resource_transport_create(struct userdata *, pa_murphyif *);
+static void resource_transport_destroy(pa_murphyif *);
+
+static void connect_attempt(pa_mainloop_api *, pa_time_event *,
+ const struct timeval *, void *);
+static void schedule_connect(struct userdata *, resource_interface *);
+static void cancel_schedule(struct userdata *, resource_interface *);
#endif
const char *ctl_addr,
const char *res_addr)
{
-#ifdef WITH_RESOURCES
- static mrp_transport_evt_t ev = {
- { .recvmsg = resource_recv_msg },
- { .recvmsgfrom = resource_recvfrom_msg },
- .closed = resource_xport_closed_evt,
- .connection = NULL
- };
-#endif
-
pa_murphyif *murphyif;
domctl_interface *dif;
resource_interface *rif;
pa_log("can't resolve resource transport address '%s'", rif->addr);
}
else {
- rif->transp = mrp_transport_create(murphyif->ml, rif->atype, &ev, u,0);
+ rif->connect.period = 1 * PA_USEC_PER_SEC;
- if ((rif->transp))
- resource_transport_connect(rif);
- else
+ if (!resource_transport_create(u, murphyif)) {
pa_log("failed to create resource transport");
+ schedule_connect(u, rif);
+ }
+ else {
+ if (resource_transport_connect(rif) == DISCONNECTED)
+ schedule_connect(u, rif);
+ }
}
rif->seqno.request = 1;
#endif
#ifdef WITH_RESOURCES
+ resource_transport_destroy(murphyif);
+
pa_xfree((void *)rif->atype);
pa_hashmap_free(rif->nodes, NULL, NULL);
pa_murphyif *murphyif;
resource_interface *rif;
const char *class;
- uint32_t audio_flags = 0;
- pa_proplist *proplist = NULL;
- pa_sink_input *sinp;
- pa_source_output *sout;
+ int state;
pa_assert(u);
pa_assert(node);
- pa_assert(node->implement = mir_stream);
+ pa_assert(node->implement == mir_stream);
pa_assert(node->direction == mir_input || node->direction == mir_output);
pa_assert(node->zone);
pa_assert(!node->rsetid);
pa_assert_se((murphyif = u->murphyif));
rif = &murphyif->resource;
- resource_transport_connect(rif);
+ state = resource_transport_connect(rif);
- if (node->direction == mir_output) {
- if ((sout = pa_idxset_get_by_index(core->source_outputs, node->paidx)))
- proplist = sout->proplist;
- }
- else {
- if ((sinp = pa_idxset_get_by_index(core->sink_inputs, node->paidx)))
- proplist = sinp->proplist;
- }
+ switch (state) {
+
+ case CONNECTING:
+ resource_set_create_all(u);
+ break;
+
+ case CONNECTED:
+ node->localrset = resource_set_create_node(u, node, TRUE);
+ break;
- node->localrset = resource_set_create(u, node->index, node->direction,
- class, node->zone, audio_flags,
- proplist);
+ case DISCONNECTED:
+ break;
+ }
}
void pa_murphyif_destroy_resource_set(struct userdata *u, mir_node *node)
node->rsetid);
}
else {
- if (resource_set_destroy(u, rsetid))
+ if (resource_set_destroy_node(u, rsetid))
pa_log_debug("resource set %u destruction request", rsetid);
else {
pa_log("falied to destroy resourse set %u for node '%s'",
rsetid, node->amname);
}
+
+ pa_xfree(node->rsetid);
+
+ node->localrset = FALSE;
+ node->rsetid = NULL;
}
pa_murphyif_delete_node(u, node);
MRP_UNUSED(dc);
MRP_UNUSED(user_data);
- if (connected) {
+ if (connected)
pa_log_info("Successfully registered to Murphy.");
+ else {
+ pa_log_error("Domain control Connection to Murphy failed (%d: %s).",
+ errcode, errmsg);
}
- else
- pa_log_error("Connection to Murphy failed (%d: %s).", errcode, errmsg);
}
static void domctl_watch_notify(mrp_domctl_t *dc, mrp_domctl_data_t *tables,
}
}
-static pa_bool_t resource_transport_connect(resource_interface *rif)
+static int resource_transport_connect(resource_interface *rif)
{
+ int status;
+
pa_assert(rif);
- if (!rif->connected) {
- if (mrp_transport_connect(rif->transp, &rif->saddr, rif->alen)) {
+ if (rif->connected)
+ status = CONNECTED;
+ else {
+ if (!mrp_transport_connect(rif->transp, &rif->saddr, rif->alen))
+ status = DISCONNECTED;
+ else {
pa_log_info("resource transport connected to '%s'", rif->addr);
rif->connected = TRUE;
- }
- else {
- pa_log("can't connect resource transport to '%s'", rif->addr);
- return FALSE;
+ status = CONNECTING;
}
}
- return TRUE;
+ return status;
}
static void resource_xport_closed_evt(mrp_transport_t *transp, int error,
rif = &murphyif->resource;
if (!error)
- pa_log("peer has closed the resource transport connection");
+ pa_log("Resource transport connection closed by peer");
else {
- pa_log("resource transport connection closed with error %d (%s)",
+ pa_log("Resource transport connection closed with error %d (%s)",
error, strerror(error));
}
- rif->connected = FALSE;
+ resource_transport_destroy(murphyif);
+ resource_set_destroy_all(u);
+ schedule_connect(u, rif);
}
static mrp_msg_t *resource_create_request(uint32_t seqno,
return success;
}
-
-static pa_bool_t resource_set_create(struct userdata *u,
- uint32_t nodidx,
- mir_direction dir,
- const char *class,
- const char *zone,
- uint32_t audio_flags,
- pa_proplist *proplist)
+static pa_bool_t resource_set_create_node(struct userdata *u,
+ mir_node *node,
+ pa_bool_t acquire)
{
- static uint32_t rset_flags = 0 /* RESPROTO_RSETFLAG_AUTORELEASE */ ;
+ static uint32_t rset_flags = RESPROTO_RSETFLAG_NOEVENTS |
+ RESPROTO_RSETFLAG_AUTOACQUIRE ;
+ pa_core *core;
pa_murphyif *murphyif;
resource_interface *rif;
resource_request *req;
mrp_msg_t *msg;
uint16_t reqid;
uint32_t seqno;
+ const char *class;
+ pa_sink_input *sinp;
+ pa_source_output *sout;
const char *resnam;
+ uint32_t audio_flags = 0;
+ uint32_t priority = 0;
+ pa_proplist *proplist = NULL;
pa_bool_t success = TRUE;
pa_assert(u);
- pa_assert(nodidx != PA_IDXSET_INVALID);
- pa_assert(dir == mir_input || dir == mir_output);
- pa_assert(class);
- pa_assert(zone);
+ pa_assert(node);
+ pa_assert(node->index != PA_IDXSET_INVALID);
+ pa_assert(node->implement == mir_stream);
+ pa_assert(node->direction == mir_input || node->direction == mir_output);
+ pa_assert(node->zone);
+ pa_assert(!node->rsetid);
+
+ pa_assert_se((core = u->core));
+ pa_assert_se((class = pa_nodeset_get_class(u, node->type)));
+
+ if (node->direction == mir_output) {
+ if ((sout = pa_idxset_get_by_index(core->source_outputs, node->paidx)))
+ proplist = sout->proplist;
+ }
+ else {
+ if ((sinp = pa_idxset_get_by_index(core->sink_inputs, node->paidx)))
+ proplist = sinp->proplist;
+ }
pa_assert_se((murphyif = u->murphyif));
rif = &murphyif->resource;
reqid = RESPROTO_CREATE_RESOURCE_SET;
seqno = rif->seqno.request++;
- resnam = (dir == mir_input) ? rif->inpres : rif->outres;
+ resnam = (node->direction == mir_input) ? rif->inpres : rif->outres;
pa_assert(resnam);
msg = resource_create_request(seqno, reqid);
if (PUSH_VALUE(msg, RESOURCE_FLAGS , UINT32, rset_flags) &&
- PUSH_VALUE(msg, RESOURCE_PRIORITY, UINT32, 0) &&
+ PUSH_VALUE(msg, RESOURCE_PRIORITY, UINT32, priority) &&
PUSH_VALUE(msg, CLASS_NAME , STRING, class) &&
- PUSH_VALUE(msg, ZONE_NAME , STRING, zone) &&
+ PUSH_VALUE(msg, ZONE_NAME , STRING, node->zone) &&
PUSH_VALUE(msg, RESOURCE_NAME , STRING, resnam) &&
PUSH_VALUE(msg, RESOURCE_FLAGS , UINT32, audio_flags) &&
+ PUSH_VALUE(msg, ATTRIBUTE_NAME , STRING, "policy") &&
+ PUSH_VALUE(msg, ATTRIBUTE_VALUE , STRING, "strict") &&
PUSH_ATTRS(msg, rif, proplist) &&
PUSH_VALUE(msg, SECTION_END , UINT8 , 0) )
{
- success = resource_send_message(rif, msg, nodidx, reqid, seqno);
+ success = resource_send_message(rif, msg, node->index, reqid, seqno);
}
else {
success = FALSE;
mrp_msg_unref(msg);
}
+ if (success)
+ pa_log_debug("requested resource set for '%s'", node->amname);
+ else
+ pa_log_debug("failed to create resource set for '%s'", node->amname);
+
return success;
}
-static pa_bool_t resource_set_destroy(struct userdata *u, uint32_t rsetid)
+static pa_bool_t resource_set_create_all(struct userdata *u)
+{
+ uint32_t idx;
+ mir_node *node;
+ pa_bool_t success;
+
+ pa_assert(u);
+
+ success = TRUE;
+
+ idx = PA_IDXSET_INVALID;
+
+ while ((node = pa_nodeset_iterate_nodes(u, &idx))) {
+ if (node->implement == mir_stream && !node->rsetid) {
+ node->localrset = resource_set_create_node(u, node, FALSE);
+ success &= node->localrset;
+ }
+ }
+
+ return success;
+}
+
+static pa_bool_t resource_set_destroy_node(struct userdata *u, uint32_t rsetid)
{
pa_murphyif *murphyif;
resource_interface *rif;
return success;
}
-static pa_bool_t resource_set_acquire(struct userdata *u,
- uint32_t nodidx,
- uint32_t rsetid)
+static pa_bool_t resource_set_destroy_all(struct userdata *u)
{
pa_murphyif *murphyif;
resource_interface *rif;
- mrp_msg_t *msg;
- uint16_t reqid;
- uint32_t seqno;
+ uint32_t idx;
+ mir_node *node;
+ uint32_t rsetid;
+ char *e;
pa_bool_t success;
pa_assert(u);
-
pa_assert_se((murphyif = u->murphyif));
+
rif = &murphyif->resource;
- reqid = RESPROTO_ACQUIRE_RESOURCE_SET;
- seqno = rif->seqno.request++;
- msg = resource_create_request(seqno, reqid);
+ success = TRUE;
- if (PUSH_VALUE(msg, RESOURCE_SET_ID, UINT32, rsetid))
- success = resource_send_message(rif, msg, nodidx, reqid, seqno);
- else {
- success = FALSE;
- mrp_msg_unref(msg);
+ idx = PA_IDXSET_INVALID;
+
+ while ((node = pa_nodeset_iterate_nodes(u, &idx))) {
+ if (node->implement == mir_stream && node->localrset) {
+ pa_log_debug("destroying resource set for '%s'", node->amname);
+
+ if (rif->connected && node->rsetid) {
+ rsetid = strtoul(node->rsetid, &e, 10);
+
+ if (e == node->rsetid || *e)
+ success = FALSE;
+ else
+ success &= resource_set_destroy_node(u, rsetid);
+ }
+
+ pa_xfree(node->rsetid);
+
+ node->localrset = FALSE;
+ node->rsetid = NULL;
+ }
}
return success;
}
+
static pa_bool_t resource_push_attributes(mrp_msg_t *msg,
resource_interface *rif,
pa_proplist *proplist)
if (reqid != RESPROTO_DESTROY_RESOURCE_SET) {
pa_log("got response (reqid:%u seqno:%u) but can't "
"find the corresponding node", reqid, seqno);
+ resource_set_create_response_abort(u, msg, &curs);
}
}
else {
switch (reqid) {
case RESPROTO_CREATE_RESOURCE_SET:
- resource_set_create_response(u,node,msg,&curs);
+ resource_set_create_response(u, node, msg, &curs);
break;
-#if 0
- case RESPROTO_ACQUIRE_RESOURCE_SET:
- resource_set_acquire_response(u,node,msg,&curs);
+ case RESPROTO_DESTROY_RESOURCE_SET:
break;
- case RESPROTO_RELEASE_RESOURCE_SET:
- resource_set_release_response(u,node,msg,&curs);
- break;
- case RESPROTO_RESOURCES_EVENT:
- resource_event(u, seqno, msg, &curs);
- break;
-#endif
default:
pa_log("ignoring unsupported resource request "
"type %u", reqid);
}
}
+
static void resource_set_create_response(struct userdata *u, mir_node *node,
mrp_msg_t *msg, void **pcursor)
{
}
node->rsetid = pa_sprintf_malloc("%d", rsetid);
-
+
if (pa_murphyif_add_node(u, node) == 0) {
pa_log_debug("resource set was successfully created");
mir_node_print(node, buf, sizeof(buf));
pa_log_debug("modified node:\n%s", buf);
-
- if (resource_set_acquire(u, node->index, rsetid))
- pa_log_debug("acquire request sent");
- else
- pa_log("failed to send acquire request");
}
else {
pa_log("failed to create resource set: "
- "conflicting resource set id");
+ "conflicting resource set id");
+ }
+}
+
+static void resource_set_create_response_abort(struct userdata *u,
+ mrp_msg_t *msg, void **pcursor)
+{
+ int status;
+ uint32_t rsetid;
+
+ pa_assert(u);
+ pa_assert(msg);
+ pa_assert(pcursor);
+
+ if (!resource_fetch_status(msg, pcursor, &status) || (status == 0 &&
+ !resource_fetch_rset_id(msg, pcursor, &rsetid)))
+ {
+ pa_log("ignoring malformed response to resource set creation");
+ return;
+ }
+
+ if (status) {
+ pa_log("creation of resource set failed. error code %u", status);
+ return;
}
+
+ if (resource_set_destroy_node(u, rsetid))
+ pa_log_debug("destroying resource set %u", rsetid);
+ else
+ pa_log("attempt to destroy resource set %u failed", rsetid);
}
return TRUE;
}
+static pa_bool_t resource_transport_create(struct userdata *u,
+ pa_murphyif *murphyif)
+{
+ static mrp_transport_evt_t ev = {
+ { .recvmsg = resource_recv_msg },
+ { .recvmsgfrom = resource_recvfrom_msg },
+ .closed = resource_xport_closed_evt,
+ .connection = NULL
+ };
+
+ resource_interface *rif;
+
+ pa_assert(u);
+ pa_assert(murphyif);
+
+ rif = &murphyif->resource;
+
+ if (!rif->transp)
+ rif->transp = mrp_transport_create(murphyif->ml, rif->atype, &ev, u,0);
+
+ return rif->transp ? TRUE : FALSE;
+}
+
+static void resource_transport_destroy(pa_murphyif *murphyif)
+{
+ resource_interface *rif;
+
+ pa_assert(murphyif);
+ rif = &murphyif->resource;
+
+ if (rif->transp)
+ mrp_transport_destroy(rif->transp);
+
+ rif->transp = NULL;
+ rif->connected = FALSE;
+}
+
+static void connect_attempt(pa_mainloop_api *a,
+ pa_time_event *e,
+ const struct timeval *t,
+ void *data)
+{
+ struct userdata *u = (struct userdata *)data;
+ pa_murphyif *murphyif;
+ resource_interface *rif;
+
+ int state;
+
+ pa_assert(u);
+ pa_assert_se((murphyif = u->murphyif));
+
+ rif = &murphyif->resource;
+
+ if (!resource_transport_create(u, murphyif))
+ schedule_connect(u, rif);
+ else {
+ state = resource_transport_connect(rif);
+
+ switch (state) {
+
+ case CONNECTING:
+ resource_set_create_all(u);
+ cancel_schedule(u, rif);
+ break;
+
+ case CONNECTED:
+ cancel_schedule(u, rif);
+ break;
+
+ case DISCONNECTED:
+ schedule_connect(u, rif);
+ break;
+ }
+ }
+}
+
+static void schedule_connect(struct userdata *u, resource_interface *rif)
+{
+ pa_core *core;
+ pa_mainloop_api *mainloop;
+ struct timeval when;
+ pa_time_event *tev;
+
+ pa_assert(u);
+ pa_assert(rif);
+ pa_assert_se((core = u->core));
+ pa_assert_se((mainloop = core->mainloop));
+
+ pa_gettimeofday(&when);
+ pa_timeval_add(&when, rif->connect.period);
+
+ if ((tev = rif->connect.evt))
+ mainloop->time_restart(tev, &when);
+ else {
+ rif->connect.evt = mainloop->time_new(mainloop, &when,
+ connect_attempt, u);
+ }
+}
+
+static void cancel_schedule(struct userdata *u, resource_interface *rif)
+{
+ pa_core *core;
+ pa_mainloop_api *mainloop;
+ pa_time_event *tev;
+
+ pa_assert(u);
+ pa_assert(rif);
+ pa_assert_se((core = u->core));
+ pa_assert_se((mainloop = core->mainloop));
+
+ if ((tev = rif->connect.evt)) {
+ mainloop->time_free(tev);
+ rif->connect.evt = NULL;
+ }
+}
#endif