#else
#define MAX_MESSAGE_QUEUE 512
#endif
+
+#define MAX_MIRROR_INSTANCES 10
+
+struct lws_mirror_instance;
+
struct per_session_data__lws_mirror {
struct lws *wsi;
+ struct lws_mirror_instance *mi;
+ struct per_session_data__lws_mirror *same_mi_pss_list;
int ringbuffer_tail;
};
size_t len;
};
-struct per_vhost_data__lws_mirror {
+struct lws_mirror_instance {
+ struct lws_mirror_instance *next;
+ struct per_session_data__lws_mirror *same_mi_pss_list;
+ char name[30];
struct a_message ringbuffer[MAX_MESSAGE_QUEUE];
int ringbuffer_head;
};
+struct per_vhost_data__lws_mirror {
+ struct lws_mirror_instance *mi_list;
+};
+
static int
callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len)
{
struct per_session_data__lws_mirror *pss =
- (struct per_session_data__lws_mirror *)user;
+ (struct per_session_data__lws_mirror *)user, **ppss,
+ *pss1;
struct per_vhost_data__lws_mirror *v =
(struct per_vhost_data__lws_mirror *)
lws_protocol_vh_priv_get(lws_get_vhost(wsi),
lws_get_protocol(wsi));
- int n, m;
+ struct lws_mirror_instance *mi, **pmi;
+ char name[30];
+ int n, m, count_mi = 0;
switch (reason) {
case LWS_CALLBACK_ESTABLISHED:
lwsl_info("%s: LWS_CALLBACK_ESTABLISHED\n", __func__);
- pss->ringbuffer_tail = v->ringbuffer_head;
+
+ /*
+ * mirror instance name... defaults to "", but if URL includes
+ * "?mirror=xxx", will be "xxx"
+ */
+
+ name[0] = '\0';
+ lws_get_urlarg_by_name(wsi, "mirror", name, sizeof(name) - 1);
+
+ lwsl_notice("mirror %s\n", name);
+
+ /* is there already a mirror instance of this name? */
+
+ mi = v->mi_list;
+ while (mi) {
+ if (!strcmp(name, mi->name)) {
+ lwsl_notice("Joining existing mi %p '%s'\n",
+ mi, name);
+ /* yes... we will join it */
+ break;
+ }
+ count_mi++;
+ mi = mi->next;
+ }
+
+ if (!mi) {
+
+ /* no existing mirror instance for name */
+
+ if (count_mi == MAX_MIRROR_INSTANCES)
+ return -1;
+
+ /* create one with this name, and join it */
+
+ mi = malloc(sizeof(*mi));
+ memset(mi, 0, sizeof(*mi));
+ mi->next = v->mi_list;
+ v->mi_list = mi;
+ strcpy(mi->name, name);
+ mi->ringbuffer_head = 0;
+
+
+ lwsl_notice("Created new mi %p '%s'\n", mi, name);
+ }
+
+ /* add our pss to list of guys bound to this mi */
+
+ pss->same_mi_pss_list = mi->same_mi_pss_list;
+ mi->same_mi_pss_list = pss;
+
+ /* init the pss */
+
+ pss->mi = mi;
+ pss->ringbuffer_tail = mi->ringbuffer_head;
pss->wsi = wsi;
+
+ break;
+
+ case LWS_CALLBACK_CLOSED:
+
+ /* detach our pss from the mirror instance */
+
+ mi = pss->mi;
+ if (!mi)
+ break;
+ ppss = &mi->same_mi_pss_list;
+
+ while (*ppss) {
+ if (*ppss == pss) {
+ *ppss = pss->same_mi_pss_list;
+ break;
+ }
+
+ ppss = &(*ppss)->same_mi_pss_list;
+ }
+
+ if (!mi->same_mi_pss_list) {
+
+ /* last pss unbound from mi... delete mi */
+
+ pmi = &v->mi_list;
+ while (*pmi) {
+ if (*pmi == mi) {
+ *pmi = (*pmi)->next;
+
+ if (!pss->mi)
+ break;
+ lwsl_info("%s: mirror protocol cleaning up %p\n", __func__, v);
+ for (n = 0; n < ARRAY_SIZE(pss->mi->ringbuffer); n++)
+ if (pss->mi->ringbuffer[n].payload) {
+ free(pss->mi->ringbuffer[n].payload);
+ pss->mi->ringbuffer[n].payload = NULL;
+ }
+
+ free(mi);
+ break;
+ }
+ count_mi++;
+ pmi = &(*pmi)->next;
+ }
+ }
+
break;
case LWS_CALLBACK_PROTOCOL_INIT: /* per vhost */
break;
case LWS_CALLBACK_PROTOCOL_DESTROY: /* per vhost */
- if (!v)
- break;
- lwsl_info("%s: mirror protocol cleaning up %p\n", __func__, v);
- for (n = 0; n < ARRAY_SIZE(v->ringbuffer); n++)
- if (v->ringbuffer[n].payload) {
- free(v->ringbuffer[n].payload);
- v->ringbuffer[n].payload = NULL;
- }
break;
case LWS_CALLBACK_SERVER_WRITEABLE:
- while (pss->ringbuffer_tail != v->ringbuffer_head) {
- m = v->ringbuffer[pss->ringbuffer_tail].len;
+ while (pss->ringbuffer_tail != pss->mi->ringbuffer_head) {
+ m = pss->mi->ringbuffer[pss->ringbuffer_tail].len;
n = lws_write(wsi, (unsigned char *)
- v->ringbuffer[pss->ringbuffer_tail].payload +
+ pss->mi->ringbuffer[pss->ringbuffer_tail].payload +
LWS_PRE, m, LWS_WRITE_TEXT);
if (n < 0) {
lwsl_err("ERROR %d writing to mirror socket\n", n);
else
pss->ringbuffer_tail++;
- if (((v->ringbuffer_head - pss->ringbuffer_tail) &
+ if (((pss->mi->ringbuffer_head - pss->ringbuffer_tail) &
(MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 15))
lws_rx_flow_allow_all_protocol(lws_get_context(wsi),
lws_get_protocol(wsi));
break;
case LWS_CALLBACK_RECEIVE:
- if (((v->ringbuffer_head - pss->ringbuffer_tail) &
+ if (((pss->mi->ringbuffer_head - pss->ringbuffer_tail) &
(MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 1)) {
lwsl_err("dropping!\n");
goto choke;
}
- if (v->ringbuffer[v->ringbuffer_head].payload)
- free(v->ringbuffer[v->ringbuffer_head].payload);
+ if (pss->mi->ringbuffer[pss->mi->ringbuffer_head].payload)
+ free(pss->mi->ringbuffer[pss->mi->ringbuffer_head].payload);
- v->ringbuffer[v->ringbuffer_head].payload = malloc(LWS_PRE + len);
- v->ringbuffer[v->ringbuffer_head].len = len;
- memcpy((char *)v->ringbuffer[v->ringbuffer_head].payload +
+ pss->mi->ringbuffer[pss->mi->ringbuffer_head].payload = malloc(LWS_PRE + len);
+ pss->mi->ringbuffer[pss->mi->ringbuffer_head].len = len;
+ memcpy((char *)pss->mi->ringbuffer[pss->mi->ringbuffer_head].payload +
LWS_PRE, in, len);
- if (v->ringbuffer_head == (MAX_MESSAGE_QUEUE - 1))
- v->ringbuffer_head = 0;
+ if (pss->mi->ringbuffer_head == (MAX_MESSAGE_QUEUE - 1))
+ pss->mi->ringbuffer_head = 0;
else
- v->ringbuffer_head++;
+ pss->mi->ringbuffer_head++;
- if (((v->ringbuffer_head - pss->ringbuffer_tail) &
+ if (((pss->mi->ringbuffer_head - pss->ringbuffer_tail) &
(MAX_MESSAGE_QUEUE - 1)) != (MAX_MESSAGE_QUEUE - 2))
goto done;
lws_rx_flow_control(wsi, 0);
done:
- lws_callback_on_writable_all_protocol(lws_get_context(wsi),
- lws_get_protocol(wsi));
+ /*
+ * ask for WRITABLE callback for every wsi bound to this
+ * mirror instance
+ */
+
+ pss1 = pss->mi->same_mi_pss_list;
+ while (pss1) {
+ lws_callback_on_writable(pss1->wsi);
+ pss1 = pss1->same_mi_pss_list;
+ }
break;
default: