struct v250_settings v250; /* V.250 command setting */
GIOChannel *channel; /* Server IO */
guint read_watch; /* GSource read id, 0 if none */
+ guint write_watch; /* GSource write id, 0 if none */
guint read_so_far; /* Number of bytes processed */
GAtDisconnectFunc user_disconnect; /* User disconnect func */
gpointer user_disconnect_data; /* User disconnect data */
GAtDebugFunc debugf; /* Debugging output function */
gpointer debug_data; /* Data to pass to debug func */
struct ring_buffer *read_buf; /* Current read buffer */
+ GQueue *write_queue; /* Write buffer queue */
guint max_read_attempts; /* Max reads per select */
enum ParserState parser_state;
gboolean destroyed; /* Re-entrancy guard */
};
+static void g_at_server_wakeup_writer(GAtServer *server);
+
+static struct ring_buffer *allocate_next(GAtServer *server)
+{
+ struct ring_buffer *buf = ring_buffer_new(BUF_SIZE);
+
+ if (!buf)
+ return NULL;
+
+ g_queue_push_tail(server->write_queue, buf);
+
+ return buf;
+}
+
+static void send_common(GAtServer *server, const char *buf, unsigned int len)
+{
+ gsize towrite = len;
+ gsize bytes_written = 0;
+ struct ring_buffer *write_buf;
+
+ write_buf = g_queue_peek_tail(server->write_queue);
+
+ while (bytes_written < towrite) {
+ gsize wbytes = MIN((gsize)ring_buffer_avail(write_buf),
+ towrite - bytes_written);
+
+ bytes_written += ring_buffer_write(write_buf,
+ buf + bytes_written,
+ wbytes);
+
+ if (ring_buffer_avail(write_buf) == 0)
+ write_buf = allocate_next(server);
+ }
+
+ g_at_server_wakeup_writer(server);
+}
+
static void g_at_server_send_result(GAtServer *server, GAtServerResult result)
{
struct v250_settings v250 = server->v250;
char buf[1024];
char t = v250.s3;
char r = v250.s4;
- gsize wbuf;
+ unsigned int len;
if (v250.quiet)
return;
return;
if (v250.is_v1)
- snprintf(buf, sizeof(buf), "%c%c%s%c%c", t, r, result_str,
+ len = snprintf(buf, sizeof(buf), "%c%c%s%c%c", t, r, result_str,
t, r);
else
- snprintf(buf, sizeof(buf), "%u%c", (unsigned int) result, t);
-
- g_at_util_debug_chat(FALSE, buf, strlen(buf),
- server->debugf, server->debug_data);
+ len = snprintf(buf, sizeof(buf), "%u%c", (unsigned int) result,
+ t);
- g_io_channel_write(server->channel, (char *) buf, strlen(buf),
- &wbuf);
+ send_common(server, buf, MIN(len, sizeof(buf)-1));
}
static inline gboolean is_at_command_prefix(const char c)
return TRUE;
}
+static gboolean can_write_data(GIOChannel *channel, GIOCondition cond,
+ gpointer data)
+{
+ return FALSE;
+}
+
+static void write_queue_free(GQueue *write_queue)
+{
+ struct ring_buffer *write_buf;
+
+ while ((write_buf = g_queue_pop_head(write_queue)))
+ ring_buffer_free(write_buf);
+
+ g_queue_free(write_queue);
+}
+
static void g_at_server_cleanup(GAtServer *server)
{
/* Cleanup all received data */
ring_buffer_free(server->read_buf);
server->read_buf = NULL;
+ /* Cleanup pending data to write */
+ write_queue_free(server->write_queue);
+
server->channel = NULL;
}
g_free(server);
}
+static void write_watcher_destroy_notify(GAtServer *server)
+{
+ server->write_watch = 0;
+}
+
+static void g_at_server_wakeup_writer(GAtServer *server)
+{
+ if (server->write_watch != 0)
+ return;
+
+ server->write_watch = g_io_add_watch_full(server->channel,
+ G_PRIORITY_DEFAULT,
+ G_IO_OUT | G_IO_HUP | G_IO_ERR | G_IO_NVAL,
+ can_write_data, server,
+ (GDestroyNotify)write_watcher_destroy_notify);
+}
+
static void v250_settings_create(struct v250_settings *v250)
{
v250->s3 = '\r';
v250_settings_create(&server->v250);
server->channel = io;
server->read_buf = ring_buffer_new(BUF_SIZE);
- server->max_read_attempts = 3;
-
if (!server->read_buf)
goto error;
+ server->write_queue = g_queue_new();
+ if (!server->write_queue)
+ goto error;
+
+ if (!allocate_next(server))
+ goto error;
+
+ server->max_read_attempts = 3;
+
if (!g_at_util_setup_io(server->channel, G_IO_FLAG_NONBLOCK))
goto error;
if (server->read_buf)
ring_buffer_free(server->read_buf);
+ if (server->write_queue)
+ write_queue_free(server->write_queue);
+
if (server)
g_free(server);
server->user_disconnect = NULL;
server->user_disconnect_data = NULL;
+ if (server->write_watch)
+ g_source_remove(server->write_watch);
+
if (server->read_watch)
g_source_remove(server->read_watch);