struct lws *wsi;
struct lws_vhost *vh;
bool is_server;
+ int write_requested;
bool close_requested;
bool destroy_requested;
int curr_conn;
int _eventfd = eventfd(0, 0);
op->fd = _eventfd;
- g_callbacks.pollfd_cb(VINE_DATA_PATH_POLLFD_OP, _eventfd, LWS_POLLOUT, NULL);
+ g_callbacks.pollfd_cb(VINE_DATA_PATH_POLLFD_OP_ADD, _eventfd, LWS_POLLOUT, NULL);
if (write(_eventfd, &v, sizeof(v)) == -1)
VINE_LOGE("Write error(%d)", errno);
}
}
op_queue.push(op);
+ VINE_LOGI("op[%p] pushed. size[%zd]", op, op_queue.size());
return op;
- //return 0;
}
static void _del_websocket_op_request(websocket_op_s *op)
free(op);
}
-static void _process_pending_destroy(struct lws_vhost *vh)
+static void _terminate_server(struct lws_vhost *vh)
{
- VINE_LOGD("Process pending destroy");
+ VINE_LOGD("terminate server. vh[%p]", vh);
if (!vh)
return;
if (!protocol)
return;
- lws_callback_all_protocol_vhost_args(vh, protocol, LWS_CALLBACK_USER, NULL, 0);
listen_vh_list.erase(vh);
VINE_LOGI("vh[%p] will be destroyed.", vh);
lws_vhost_destroy(vh);
- while (!lws_service_adjust_timeout(g_context, 1, 0))
+ if (!lws_service_adjust_timeout(g_context, 1, 0))
lws_service_tsi(g_context, -1, 0);
+ VINE_LOGI("-");
}
+static void _terminate_client(websocket_s *ws)
+{
+ VINE_LOGD("terminate client. ws[%p]", ws);
+ if (!ws->wsi) {
+ VINE_LOGE("ws[%p] was already terminated.", ws);
+ return;
+ }
+
+ if (lws_callback_on_writable(ws->wsi) < 1)
+ VINE_LOGI("Failed to request a writable callback for close.");
+}
static void _destroy_websocket(websocket_s *ws)
{
_request_write(op->ws);
break;
case WEBSOCKET_OP_TERMINATE:
- _process_pending_destroy(op->vh);
+ if (op->ws->is_server)
+ _terminate_server(op->vh);
+ else
+ _terminate_client(op->ws);
break;
case WEBSOCKET_OP_DESTROY:
_destroy_websocket(op->ws);
// TODO: do write.
break;
case WEBSOCKET_OP_TERMINATE:
- VINE_LOGI("Terminate. Do not handle here. ws[%p]", op->ws);
- return;
case WEBSOCKET_OP_DESTROY:
- VINE_LOGI("Destroy. Do not handle here. ws[%p]", op->ws);
- return;
case WEBSOCKET_OP_DEINIT:
VINE_LOGE("This cannot be called here.");
return;
while (count-- > 0) {
op_queue.do_remove_if([&](websocket_op_s *op) {
- return (ws == NULL || (op && op->ws == ws));
+ if (!op || op->code == WEBSOCKET_OP_TERMINATE
+ || op->code == WEBSOCKET_OP_DESTROY
+ || op->code == WEBSOCKET_OP_DEINIT)
+ return false;
+ return (ws == NULL || (op && op->ws == ws));
}, _do_flush_op);
}
}
g_pollfds.erase(it);
VINE_LOGI("websocket poll fd[%d] is removed.", args->fd);
} else {
- VINE_LOGI("Cannot find fd[%d] but remove it from eventloop", args->fd);
+ VINE_LOGI("Cannot find fd[%d]", args->fd);
+ //return;
}
if (g_callbacks.pollfd_cb)
switch (reason) {
case LWS_CALLBACK_WSI_DESTROY:
VINE_LOGI("wsi[%p](ws: %p) is destroyed.", wsi, ws);
+ if (ws) {
+ VINE_LOGI("clear ws->wsi.");
+ ws->wsi = NULL;
+ }
break;
case LWS_CALLBACK_FILTER_NETWORK_CONNECTION:
return 0;
}
- if (ws->close_requested) {
+ if (ws->write_requested == 0 && ws->close_requested) {
VINE_LOGI("Close websocket.");
return -1;
}
if (ws->destroy_requested) {
- VINE_LOGI("ws[%p] will be destroyed.", ws);
- return -1;
+ VINE_LOGI("Destroy requested. Do not send data. ws[%p]", ws);
+ break;
}
- VINE_LOGI("Writeable to server.");
+ --ws->write_requested;
+ VINE_LOGI("Writeable to server. write_requested[%d]",
+ ws->write_requested);
+
n = _write_data(ws);
if (g_callbacks.written_cb)
g_callbacks.written_cb(n, ws->user);
RET_VAL_IF(len == 0, VINE_DATA_PATH_ERROR_INVALID_PARAMETER, "len is 0");
websocket_s *ws = (websocket_s *)handle;
+
+ if (ws->close_requested) {
+ VINE_LOGE("ws[%p] will be closed.", ws);
+ return VINE_DATA_PATH_ERROR_INVALID_OPERATION;
+ }
+
websocket_data_s *wd = (websocket_data_s *)calloc(1, sizeof(websocket_data_s));
unsigned char *data = (unsigned char *)calloc(len, sizeof(unsigned char));
if (!op)
return VINE_DATA_PATH_ERROR_OUT_OF_MEMORY;
+ ++(ws->write_requested);
+ VINE_LOGI("write_requested[%d]", ws->write_requested);
+
_notify_websocket_op_request(op);
return VINE_DATA_PATH_ERROR_NONE;
VINE_LOGD("ws[%p] will be closed.", ws);
ws->close_requested = true;
- if (ws->is_server) {
- websocket_op_s *op = _add_websocket_op_request(WEBSOCKET_OP_TERMINATE,
- ws, ws->vh, 0, NULL, 0, NULL, 0, NULL);
- if (!op)
- return VINE_DATA_PATH_ERROR_OUT_OF_MEMORY;
- _notify_websocket_op_request(op);
- return VINE_DATA_PATH_ERROR_NONE;
- }
-
- if (ws->wsi)
- lws_callback_on_writable(ws->wsi);
+ websocket_op_s *op = _add_websocket_op_request(WEBSOCKET_OP_TERMINATE,
+ ws, ws->vh, 0, NULL, 0, NULL, 0, NULL);
+ if (!op)
+ return VINE_DATA_PATH_ERROR_OUT_OF_MEMORY;
+ _notify_websocket_op_request(op);
return VINE_DATA_PATH_ERROR_NONE;
}
static void websocket_process_event(int fd, int events)
{
+ if (fd <= 0)
+ return;
+
auto it = g_pollfds.find(fd);
if (it == g_pollfds.end()) {
_process_websocket_op_request(fd);
+ g_callbacks.pollfd_cb(VINE_DATA_PATH_POLLFD_OP_DEL,
+ fd, LWS_POLLOUT, NULL);
close(fd);
return;
}
{
RET_VAL_IF(handle == NULL, VINE_DATA_PATH_ERROR_INVALID_PARAMETER, "handle is NULL");
-
websocket_s *ws = (websocket_s *)handle;
if (ws->destroy_requested) {
VINE_LOGD("Already requested.");
_flush_op_queue(ws);
ws->destroy_requested = true;
- websocket_op_s *op = _add_websocket_op_request(WEBSOCKET_OP_DESTROY,
- (websocket_s *)handle, NULL, 0, NULL, 0, NULL, 0, NULL);
- if (!op)
- return VINE_DATA_PATH_ERROR_OPERATION_FAILED;
- _notify_websocket_op_request(op);
-
return VINE_DATA_PATH_ERROR_NONE;
}