#include "utils.h"
#ifdef WITH_RESOURCES
-#define INVALID_ID (~(uint32_t)0)
-#define INVALID_INDEX (~(uint32_t)0)
-#define INVALID_SEQNO (~(uint32_t)0)
-#define INVALID_REQUEST (~(uint16_t)0)
+#define INVALID_ID (~(uint32_t)0)
+#define INVALID_INDEX (~(uint32_t)0)
+#define INVALID_SEQNO (~(uint32_t)0)
+#define INVALID_REQUEST (~(uint16_t)0)
#define DISCONNECTED -1
#define CONNECTED 0
#define CONNECTING 1
-#define RESCOL_NAMES "rsetid,autorel,state,grant,pid,policy"
-#define RESCOL_RSETID 0
-#define RESCOL_AUTOREL 1
-#define RESCOL_STATE 2
-#define RESCOL_GRANT 3
-#define RESCOL_PID 4
-#define RESCOL_POLICY 5
+#define RESCOL_NAMES "rsetid,autorel,state,grant,pid,policy,name"
+#define RESCOL_RSETID 0
+#define RESCOL_AUTOREL 1
+#define RESCOL_STATE 2
+#define RESCOL_GRANT 3
+#define RESCOL_PID 4
+#define RESCOL_POLICY 5
+#define RESCOL_RSETNAME 6
-#define RSET_RELEASE 1
-#define RSET_ACQUIRE 2
+#define RSET_RELEASE 1
+#define RSET_ACQUIRE 2
+
+#define RSET_INPUT 0
+#define RSET_OUTPUT 1
#define PUSH_VALUE(msg, tag, typ, val) \
mrp_msg_append(msg, MRP_MSG_TAG_##typ(RESPROTO_##tag, val))
typedef struct {
const char *name;
+ const char *tblnam;
int tblidx;
} audio_resource_t;
struct {
pa_hashmap *rsetid;
pa_hashmap *pid;
+ unsigned nres[2];
} nodes;
PA_LLIST_HEAD(resource_attribute, attrs);
PA_LLIST_HEAD(resource_request, reqs);
#ifdef WITH_RESOURCES
typedef struct {
+ const char *hashkey;
+ bool dead;
const char *id;
- bool autorel;
+ bool autorel;
int state;
- bool grant;
+ bool grant;
const char *policy;
+ const char *name;
+ const char *pid;
} rset_data;
typedef struct {
size_t nnode;
mir_node **nodes;
rset_data *rset;
+ bool type[2];
+ uint32_t updid;
} rset_hash;
#endif
static void resource_xport_closed_evt(mrp_transport_t *, int, void *);
static mrp_msg_t *resource_create_request(uint32_t, mrp_resproto_request_t);
-static bool resource_send_message(resource_interface *, mrp_msg_t *,
+static bool resource_send_message(resource_interface *, mrp_msg_t *,
uint32_t, uint16_t, uint32_t);
-static bool resource_set_create_node(struct userdata *, mir_node *,
+static bool resource_set_create_node(struct userdata *, mir_node *,
pa_nodeset_resdef *, bool);
-static bool resource_set_create_all(struct userdata *);
-static bool resource_set_destroy_node(struct userdata *, uint32_t);
-static bool resource_set_destroy_all(struct userdata *);
+static bool resource_set_create_all(struct userdata *);
+static bool resource_set_destroy_node(struct userdata *, uint32_t);
+static bool resource_set_destroy_all(struct userdata *);
static void resource_set_notification(struct userdata *, const char *,
int, mrp_domctl_value_t **);
+static void resource_set_enforce_policy(struct userdata *, rset_hash *);
static bool resource_push_attributes(mrp_msg_t *, resource_interface *,
pa_proplist *);
-static void resource_recv_msg(mrp_transport_t *, mrp_msg_t *, void *);
-static void resource_recvfrom_msg(mrp_transport_t *, mrp_msg_t *,
- mrp_sockaddr_t *, socklen_t, void *);
-static void resource_set_create_response(struct userdata *, mir_node *,
- mrp_msg_t *, void **);
-static void resource_set_create_response_abort(struct userdata *,
- mrp_msg_t *, void **);
+static void resource_recv_msg(mrp_transport_t *, mrp_msg_t *, void *);
+static void resource_recvfrom_msg(mrp_transport_t *, mrp_msg_t *,
+ mrp_sockaddr_t *, socklen_t, void *);
+static void resource_set_create_response(struct userdata *, mir_node *,
+ mrp_msg_t *, void **);
+static void resource_set_create_response_abort(struct userdata *,
+ mrp_msg_t *, void **);
static bool resource_fetch_seqno(mrp_msg_t *, void **, uint32_t *);
static bool resource_fetch_request(mrp_msg_t *, void **, uint16_t *);
static rset_data *pid_hashmap_remove_rset(struct userdata *, const char *);
static void rset_hashmap_free(void *, void *);
-static rset_hash *rset_hashmap_put(struct userdata *, const char *, mir_node *);
+static rset_hash *rset_hashmap_put(struct userdata *, const char *,
+ int, mir_node *);
static rset_hash *rset_hashmap_get(struct userdata *u, const char *rsetid);
static int rset_hashmap_remove(struct userdata *,const char *,mir_node*);
res->name = pa_xstrdup(name);
#ifdef WITH_DOMCTL
snprintf(table, sizeof(table), "%s_users", name);
+ res->tblnam = pa_xstrdup(table);
res->tblidx = pa_murphyif_add_watch(u, table, columns, NULL, maxrow);
#endif
}
const char *pid;
rset_data *rset;
rset_hash *rh;
+ int type;
char buf[64];
pa_assert(u);
}
}
else {
- if ((rh = rset_hashmap_put(u, node->rsetid, node))) {
+ type = (node->direction == mir_input) ? RSET_INPUT : RSET_OUTPUT;
+
+ if ((rh = rset_hashmap_put(u, node->rsetid, type, node))) {
rset = rh->rset;
- pa_log_debug("enforce policies on node %u '%s' rsetid:%s autorel:%s "
- "state:%s grant:%s policy:%s", node->paidx, node->amname,
- rset->id, rset->autorel ? "yes":"no",
+ pa_log_debug("enforce policies on node %u '%s' hashkey:%s "
+ "autorel:%s state:%s grant:%s policy:%s",
+ node->paidx, node->amname,
+ rset->hashkey, rset->autorel ? "yes":"no",
rset->state == RSET_ACQUIRE ? "acquire":"release",
rset->grant ? "yes":"no", rset->policy);
int nrow,
mrp_domctl_value_t **values)
{
+ static uint32_t updid;
+
pa_murphyif *murphyif;
resource_interface *rif;
+ int type;
int r;
mrp_domctl_value_t *row;
mrp_domctl_value_t *crsetid;
mrp_domctl_value_t *cgrant;
mrp_domctl_value_t *cpid;
mrp_domctl_value_t *cpolicy;
+ mrp_domctl_value_t *crsetname;
char rsetid[32];
+ char name[256];
const char *pid;
mir_node *node, **nodes;
rset_hash *rh;
rset_data rset, *rs;
size_t i, size;
+ unsigned nrset;
+ void *it;
pa_assert(u);
pa_assert(table);
pa_assert_se((murphyif = u->murphyif));
rif = &murphyif->resource;
- for (r = 0; r < nrow; r++) {
+ if (pa_streq(table, rif->inpres.tblnam))
+ type = RSET_INPUT;
+ else if (pa_streq(table, rif->outres.tblnam))
+ type = RSET_OUTPUT;
+ else {
+ pa_log_debug("ignoring unregistered table '%s'", table);
+ return;
+ }
+
+ updid++;
+
+ for (r = 0, nrset = 0; r < nrow; r++) {
row = values[r];
- crsetid = row + RESCOL_RSETID;
- cautorel = row + RESCOL_AUTOREL;
- cstate = row + RESCOL_STATE;
- cgrant = row + RESCOL_GRANT;
- cpid = row + RESCOL_PID;
- cpolicy = row + RESCOL_POLICY;
-
- if (crsetid->type != MRP_DOMCTL_UNSIGNED ||
- cautorel->type != MRP_DOMCTL_INTEGER ||
- cstate->type != MRP_DOMCTL_INTEGER ||
- cgrant->type != MRP_DOMCTL_INTEGER ||
- cpid->type != MRP_DOMCTL_STRING ||
- cpolicy->type != MRP_DOMCTL_STRING )
+ crsetid = row + RESCOL_RSETID;
+ cautorel = row + RESCOL_AUTOREL;
+ cstate = row + RESCOL_STATE;
+ cgrant = row + RESCOL_GRANT;
+ cpid = row + RESCOL_PID;
+ cpolicy = row + RESCOL_POLICY;
+ crsetname = row + RESCOL_RSETNAME;
+
+ if (crsetid->type != MRP_DOMCTL_UNSIGNED ||
+ cautorel->type != MRP_DOMCTL_INTEGER ||
+ cstate->type != MRP_DOMCTL_INTEGER ||
+ cgrant->type != MRP_DOMCTL_INTEGER ||
+ cpid->type != MRP_DOMCTL_STRING ||
+ cpolicy->type != MRP_DOMCTL_STRING ||
+ crsetname->type != MRP_DOMCTL_STRING )
{
pa_log("invalid field type in '%s' (%d|%d|%d|%d|%d|%d)", table,
crsetid->type, cautorel->type, cstate->type,
continue;
}
- snprintf(rsetid, sizeof(rsetid), "%d", crsetid->s32);
+ snprintf(rsetid, sizeof(rsetid), "%d" , crsetid->s32);
+ if (crsetname->str[0] && !pa_streq(crsetname->str, "<unknown>"))
+ snprintf(name, sizeof(name), "#%s", crsetname->str);
+ else
+ name[0] = 0;
+
pid = cpid->str;
+ memset(&rset, 0, sizeof(rset));
rset.id = rsetid;
rset.autorel = cautorel->s32;
rset.state = cstate->s32;
rset.grant = cgrant->s32;
rset.policy = cpolicy->str;
-
+ rset.name = name;
+ rset.pid = pid;
if (rset.autorel != 0 && rset.autorel != 1) {
pa_log_debug("invalid autorel %d in table '%s'",
pa_log_debug("invalid grant %d in table '%s'", rset.grant, table);
continue;
}
+ if (!rset.policy) {
+ pa_log_debug("invalid 'policy' string in table '%s'", table);
+ continue;
+ }
+
+ if (rset.name[0] == '#') {
+ rset.hashkey = rset.name;
- if (!(rh = rset_hashmap_get(u, rset.id))) {
- if (!pid) {
- pa_log_debug("can't find node for resource set %s "
- "(pid in resource set unknown)", rset.id);
+ if (!(rh = rset_hashmap_put(u, rset.hashkey, type, NULL))) {
+ pa_log_debug("can't add to hashmap '%s' resource set",
+ rset.hashkey);
continue;
}
+ }
+ else {
+ rset.hashkey = rset.id;
- if ((node = pid_hashmap_remove_node(u, pid))) {
- pa_log_debug("found node %s for resource-set '%s'",
- node->amname, rset.id);
-
- if (!(rh = node_put_rset(u, node, &rset))) {
- pa_log("can't register resource set for node '%s': "
- "failed to set rsetid", node->amname);
+ if (!(rh = rset_hashmap_get(u, rset.hashkey))) {
+ if (!pid) {
+ pa_log_debug("can't find node for resource set %s "
+ "(pid in resource set unknown)", rset.id);
continue;
}
- }
- else {
- if (pid_hashmap_put(u, pid, NULL, rset_data_dup(&rset)) < 0) {
- if (!(rs = pid_hashmap_get_rset(u, pid)))
- pa_log("failed to add resource set to pid hash");
- else {
- if (!pa_streq(rs->id, rset.id)) {
- pa_log("process %s appears to have multiple resour"
- "ce sets (%s and %s)", pid, rs->id,rset.id);
- }
- pa_log_debug("update resource-set %s data in "
- "pid hash (pid %s)", rs->id, pid);
- rset_data_copy(rs, &rset);
+
+ if ((node = pid_hashmap_remove_node(u, pid))) {
+ pa_log_debug("found node %s for resource-set '%s'",
+ node->amname, rset.id);
+
+ if (!(rh = node_put_rset(u, node, &rset))) {
+ pa_log("can't register resource set for node '%s': "
+ "failed to set rsetid", node->amname);
+ continue;
}
}
else {
- pa_log_debug("can't find node for resource set %s. "
- "Beleive the stream will appear later on",
- rset.id);
- }
+ if (pid_hashmap_put(u,pid,NULL,rset_data_dup(&rset)) < 0) {
+ if (!(rs = pid_hashmap_get_rset(u, pid)))
+ pa_log("failed to add resource set to pid hash");
+ else {
+ if (!pa_streq(rs->id, rset.id)) {
+ pa_log("process %s appears to have multiple "
+ "resource sets (%s and %s)",
+ pid, rs->id,rset.id);
+ }
+ pa_log_debug("update resource-set %s data in "
+ "pid hash (pid %s)", rs->id, pid);
+ rset_data_copy(rs, &rset);
+ }
+ }
+ else {
+ pa_log_debug("can't find node for resource set %s. "
+ "Beleive the stream will appear later on",
+ rset.id);
+ }
- continue;
+ continue;
+ }
}
}
rset_data_update(rh->rset, &rset);
- /* we need to make a copy of this as node_enforce_resource_policy()
- will delete/modify it */
- size = sizeof(mir_node *) * (rh->nnode + 1);
- nodes = alloca(size);
- memcpy(nodes, rh->nodes, size);
+ rh->updid = updid;
+ nrset++;
+
+ resource_set_enforce_policy(u, rh);
+
+ } /* for each row */
- for (i = 0; (node = nodes[i]); i++) {
- pa_log_debug("%u: resource notification for node '%s' autorel:%s "
- "state:%s grant:%s pid:%s policy:%s", i,
- node->amname, rset.autorel ? "yes":"no",
- rset.state == RSET_ACQUIRE ? "acquire":"release",
- rset.grant ? "yes":"no", pid, rset.policy);
+ if (nrset < rif->nodes.nres[type]) {
+ pa_log_debug("some of the resource sets were not updated => "
+ "find the %u resource sets that need to be deleted",
+ pa_hashmap_size(rif->nodes.rsetid) - nrset);
- node_enforce_resource_policy(u, node, &rset);
+ PA_HASHMAP_FOREACH(rh, rif->nodes.rsetid, it) {
+ if (rh->type[type] && rh->updid != updid) {
+ pa_log_debug("'%s' was not updated => assume it's gone "
+ "and delete it", rh->rset->hashkey);
+
+ rh->rset->dead = true;
+ rh->rset->grant = false;
+
+ resource_set_enforce_policy(u, rh);
+ }
}
}
}
+static void resource_set_enforce_policy(struct userdata *u, rset_hash *rh) {
+ size_t i, size;
+ mir_node *node, **nodes;
+
+ /* we need to make a copy of this as node_enforce_resource_policy()
+ will delete/modify it */
+ size = sizeof(mir_node *) * (rh->nnode + 1);
+ nodes = alloca(size);
+ memcpy(nodes, rh->nodes, size);
+
+ for (i = 0; (node = nodes[i]); i++) {
+ pa_log_debug("%u: resource notification for node '%s' hashkey=%s "
+ "autorel:%s state:%s grant:%s pid:%s policy:%s",
+ i, node->amname, rh->rset->hashkey,
+ rh->rset->autorel ? "yes":"no",
+ rh->rset->state == RSET_ACQUIRE ? "acquire":"release",
+ rh->rset->grant ? "yes":"no",
+ rh->rset->pid, rh->rset->policy);
+
+ node_enforce_resource_policy(u, node, rh->rset);
+ }
+}
+
static bool resource_push_attributes(mrp_msg_t *msg,
resource_interface *rif,
resource_interface *rif;
pa_proplist *pl;
rset_hash *rh;
+ int type;
pa_assert(u);
pa_assert(node);
pa_assert_se((murphyif = u->murphyif));
rif = &murphyif->resource;
+ type = (node->direction == mir_input) ? RSET_INPUT : RSET_OUTPUT;
pa_log_debug("setting rsetid %s for node %s", rset->id, node->amname);
return NULL;
}
- if (!(rh = rset_hashmap_put(u, node->rsetid, node))) {
+ if (!(rh = rset_hashmap_put(u, node->rsetid, type, node))) {
pa_log("conflicting rsetid %s for %s", node->rsetid, node->amname);
return NULL;
}
pa_assert(src->id);
pa_assert(src->policy);
- pa_assert_se(pa_streq(src->id, dst->id));
+ pa_assert(pa_streq(src->hashkey, dst->hashkey));
+ pa_xfree((void *)dst->id);
pa_xfree((void *)dst->policy);
+ pa_xfree((void *)dst->name);
+ pa_xfree((void *)dst->pid);
+ dst->id = pa_xstrdup(src->id);
dst->autorel = src->autorel;
dst->state = src->state;
dst->grant = src->grant;
dst->policy = pa_xstrdup(src->policy);
+ dst->name = pa_xstrdup(src->name);
+ dst->pid = pa_xstrdup(src->pid);
}
static void rset_data_free(rset_data *rset)
{
if (rset) {
+ pa_xfree((void *)rset->hashkey);
pa_xfree((void *)rset->id);
pa_xfree((void *)rset->policy);
+ pa_xfree((void *)rset->name);
pa_xfree(rset);
}
}
static rset_hash *rset_hashmap_put(struct userdata *u,
const char *rsetid,
+ int type,
mir_node *node)
{
pa_murphyif *murphyif;
resource_interface *rif;
rset_hash *rh;
rset_data *rset;
- size_t i;
+ size_t i, size;
pa_assert(u);
pa_assert(rsetid);
- pa_assert(node);
+ pa_assert(type == RSET_INPUT || type == RSET_OUTPUT);
pa_assert_se((murphyif = u->murphyif));
rif = &murphyif->resource;
if ((rh = pa_hashmap_get(rif->nodes.rsetid, rsetid))) {
- for (i = 0; i < rh->nnode; i++) {
- if (rh->nodes[i] == node)
- return NULL;
+ if (rh->rset->dead) {
+ pa_log_debug("attempt to add dead rset '%s' to hashmap",
+ rh->rset->hashkey);
+ return NULL;
}
- i = rh->nnode++;
- rh->nodes = pa_xrealloc(rh->nodes, sizeof(mir_node *) * (rh->nnode+1));
+ if (node) {
+ for (i = 0; i < rh->nnode; i++) {
+ if (rh->nodes[i] == node)
+ return NULL;
+ }
+
+ i = rh->nnode++;
+ size = sizeof(mir_node *) * (rh->nnode + 1);
+ rh->nodes = pa_xrealloc(rh->nodes, size);
+ }
}
else {
rset = pa_xnew0(rset_data, 1);
- rset->id = pa_xstrdup(rsetid);
- rset->policy = pa_xstrdup("unknown");
+ rset->hashkey = pa_xstrdup(rsetid);
+ rset->dead = false;
+ rset->id = pa_xstrdup("unknown");
+ rset->policy = pa_xstrdup("unknown");
+ rset->name = pa_xstrdup("unknown");
rh = pa_xnew0(rset_hash, 1);
- rh->nnode = 1;
- rh->nodes = pa_xnew0(mir_node *, 2);
+ rh->nnode = node ? 1 : 0;
+ rh->nodes = pa_xnew0(mir_node *, rh->nnode + 1);
rh->rset = rset;
- pa_hashmap_put(rif->nodes.rsetid, (void *)rh->rset->id, rh);
+ pa_hashmap_put(rif->nodes.rsetid, (void *)rh->rset->hashkey, rh);
+
+ rif->nodes.nres[type]++;
i = 0;
}
+ if (node) {
+ rh->nodes[i+0] = node;
+ rh->nodes[i+1] = NULL;
+ }
- rh->nodes[i+0] = node;
- rh->nodes[i+1] = NULL;
+ rh->type[type] = true;
return rh;
}
pa_murphyif *murphyif;
resource_interface *rif;
rset_hash *rh;
+ rset_data *rset;
+ int type;
size_t i,j;
pa_assert(u);
rif = &murphyif->resource;
if ((rh = pa_hashmap_get(rif->nodes.rsetid, rsetid))) {
+ pa_assert_se((rset = rh->rset));
for (i = 0; i < rh->nnode; i++) {
if (node == rh->nodes[i]) {
- if (rh->nnode <= 1) {
+ if (rh->nnode <= 1 && (rset->hashkey[0] != '#' || rset->dead)){
pa_hashmap_remove(rif->nodes.rsetid, rsetid);
rset_hashmap_free(rh, NULL);
- return 0;
+
+ type = (node->direction == mir_input) ?
+ RSET_INPUT : RSET_OUTPUT;
+
+ if (rif->nodes.nres[type] > 0)
+ rif->nodes.nres[type]--;
}
else {
for (j = i; j < rh->nnode; j++)
rh->nodes[j] = rh->nodes[j+1];
rh->nnode--;
-
- return 0;
}
+
+ return 0;
}
}
}