2 * libwebsockets - lib/core-net/sequencer.c
4 * Copyright (C) 2019 Andy Green <andy@warmcat.com>
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation:
9 * version 2.1 of the License.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
22 #include "private-lib-core.h"
27 typedef struct lws_seq_event {
28 struct lws_dll2 seq_event_list;
38 typedef struct lws_sequencer {
39 struct lws_dll2 seq_list;
41 lws_sorted_usec_list_t sul_timeout;
42 lws_sorted_usec_list_t sul_pending;
44 struct lws_dll2_owner seq_event_owner;
45 struct lws_context_per_thread *pt;
48 const lws_retry_bo_t *retry;
50 lws_usec_t time_created;
51 lws_usec_t timeout; /* 0 or time we timeout */
56 #define QUEUE_SANITY_LIMIT 10
59 lws_sul_seq_heartbeat_cb(lws_sorted_usec_list_t *sul)
61 struct lws_context_per_thread *pt = lws_container_of(sul,
62 struct lws_context_per_thread, sul_seq_heartbeat);
64 /* send every sequencer a heartbeat message... it can ignore it */
66 lws_start_foreach_dll_safe(struct lws_dll2 *, p, tp,
67 lws_dll2_get_head(&pt->seq_owner)) {
68 lws_seq_t *s = lws_container_of(p, lws_seq_t, seq_list);
70 /* queue the message to inform the sequencer */
71 lws_seq_queue_event(s, LWSSEQ_HEARTBEAT, NULL, NULL);
73 } lws_end_foreach_dll_safe(p, tp);
75 /* schedule the next one */
77 __lws_sul_insert(&pt->pt_sul_owner, &pt->sul_seq_heartbeat,
82 lws_seq_pt_init(struct lws_context_per_thread *pt)
84 pt->sul_seq_heartbeat.cb = lws_sul_seq_heartbeat_cb;
86 /* schedule the first heartbeat */
87 __lws_sul_insert(&pt->pt_sul_owner, &pt->sul_seq_heartbeat,
94 lws_seq_create(lws_seq_info_t *i)
96 struct lws_context_per_thread *pt = &i->context->pt[i->tsi];
97 lws_seq_t *seq = lws_zalloc(sizeof(*seq) + i->user_size, __func__);
105 seq->retry = i->retry;
107 *i->puser = (void *)&seq[1];
109 /* add the sequencer to the pt */
111 lws_pt_lock(pt, __func__); /* ---------------------------------- pt { */
113 lws_dll2_add_tail(&seq->seq_list, &pt->seq_owner);
115 lws_pt_unlock(pt); /* } pt ------------------------------------------ */
117 seq->time_created = lws_now_usecs();
119 /* try to queue the creation cb */
121 if (lws_seq_queue_event(seq, LWSSEQ_CREATED, NULL, NULL)) {
122 lws_dll2_remove(&seq->seq_list);
132 seq_ev_destroy(struct lws_dll2 *d, void *user)
134 lws_seq_event_t *seqe = lws_container_of(d, lws_seq_event_t,
137 lws_dll2_remove(&seqe->seq_event_list);
144 lws_seq_destroy(lws_seq_t **pseq)
146 lws_seq_t *seq = *pseq;
148 /* defeat another thread racing to add events while we are destroying */
151 seq->cb(seq, (void *)&seq[1], LWSSEQ_DESTROYED, NULL, NULL);
153 lws_pt_lock(seq->pt, __func__); /* -------------------------- pt { */
155 lws_dll2_remove(&seq->seq_list);
156 lws_dll2_remove(&seq->sul_timeout.list);
157 lws_dll2_remove(&seq->sul_pending.list);
158 /* remove and destroy any pending events */
159 lws_dll2_foreach_safe(&seq->seq_event_owner, NULL, seq_ev_destroy);
161 lws_pt_unlock(seq->pt); /* } pt ---------------------------------- */
164 lws_free_set_NULL(seq);
168 lws_seq_destroy_all_on_pt(struct lws_context_per_thread *pt)
170 lws_start_foreach_dll_safe(struct lws_dll2 *, p, tp,
171 pt->seq_owner.head) {
172 lws_seq_t *s = lws_container_of(p, lws_seq_t,
177 } lws_end_foreach_dll_safe(p, tp);
181 lws_seq_sul_pending_cb(lws_sorted_usec_list_t *sul)
183 lws_seq_t *seq = lws_container_of(sul, lws_seq_t, sul_pending);
184 lws_seq_event_t *seqe;
188 if (!seq->seq_event_owner.count)
191 /* events are only added at tail, so no race possible yet... */
193 dh = lws_dll2_get_head(&seq->seq_event_owner);
194 seqe = lws_container_of(dh, lws_seq_event_t, seq_event_list);
196 n = seq->cb(seq, (void *)&seq[1], seqe->e, seqe->data, seqe->aux);
198 /* ... have to lock here though, because we will change the list */
200 lws_pt_lock(seq->pt, __func__); /* ----------------------------- pt { */
202 /* detach event from sequencer event list and free it */
203 lws_dll2_remove(&seqe->seq_event_list);
205 lws_pt_unlock(seq->pt); /* } pt ------------------------------------- */
208 lwsl_info("%s: destroying seq '%s' by request\n", __func__,
210 lws_seq_destroy(&seq);
215 lws_seq_queue_event(lws_seq_t *seq, lws_seq_events_t e, void *data, void *aux)
217 lws_seq_event_t *seqe;
219 if (!seq || seq->going_down)
222 seqe = lws_zalloc(sizeof(*seqe), __func__);
230 // lwsl_notice("%s: seq %s: event %d\n", __func__, seq->name, e);
232 lws_pt_lock(seq->pt, __func__); /* ----------------------------- pt { */
234 if (seq->seq_event_owner.count > QUEUE_SANITY_LIMIT) {
235 lwsl_err("%s: more than %d events queued\n", __func__,
239 lws_dll2_add_tail(&seqe->seq_event_list, &seq->seq_event_owner);
241 seq->sul_pending.cb = lws_seq_sul_pending_cb;
242 __lws_sul_insert(&seq->pt->pt_sul_owner, &seq->sul_pending, 1);
244 lws_pt_unlock(seq->pt); /* } pt ------------------------------------- */
250 * Check if wsi still extant, by peeking in the message queue for a
251 * LWSSEQ_WSI_CONN_CLOSE message about wsi. (Doesn't need to do the same for
252 * CONN_FAIL since that will never have produced any messages prior to that).
254 * Use this to avoid trying to perform operations on wsi that have already
255 * closed but we didn't get to that message yet.
257 * Returns 0 if not closed yet or 1 if it has closed but we didn't process the
262 lws_seq_check_wsi(lws_seq_t *seq, struct lws *wsi)
264 lws_seq_event_t *seqe;
267 lws_pt_lock(seq->pt, __func__); /* ----------------------------- pt { */
269 dh = lws_dll2_get_head(&seq->seq_event_owner);
271 seqe = lws_container_of(dh, lws_seq_event_t, seq_event_list);
273 if (seqe->e == LWSSEQ_WSI_CONN_CLOSE && seqe->data == wsi)
279 lws_pt_unlock(seq->pt); /* } pt ------------------------------------- */
286 lws_seq_sul_timeout_cb(lws_sorted_usec_list_t *sul)
288 lws_seq_t *s = lws_container_of(sul, lws_seq_t, sul_timeout);
290 lws_seq_queue_event(s, LWSSEQ_TIMED_OUT, NULL, NULL);
293 /* set us to LWS_SET_TIMER_USEC_CANCEL to remove timeout */
296 lws_seq_timeout_us(lws_seq_t *seq, lws_usec_t us)
298 seq->sul_timeout.cb = lws_seq_sul_timeout_cb;
299 /* list is always at the very top of the sul */
300 return __lws_sul_insert(&seq->pt->pt_sul_owner,
301 (lws_sorted_usec_list_t *)&seq->sul_timeout.list, us);
305 lws_seq_from_user(void *u)
307 return &((lws_seq_t *)u)[-1];
311 lws_seq_name(lws_seq_t *seq)
317 lws_seq_us_since_creation(lws_seq_t *seq)
319 return lws_now_usecs() - seq->time_created;
323 lws_seq_get_context(lws_seq_t *seq)
325 return seq->pt->context;