[PATCH 2/3] Add write buffer for non-blocking write
Denis Kenzior
denkenz at gmail.com
Thu Feb 25 14:02:30 PST 2010
Hi Zhenhua,
> Use two layers to cache server side response data to client.
> 1. A fixed-length ring buffer, that is write_buf.
> 2. A list of full ring buffer. The head is to write data out to the
> client and the tail is to cache response data in. At least one buffer
> is in the list.
>
> When the first layer buffer is full, a free buffer is allocated and
> appended at the tail of the full list. It replaces original write
> buffer as current write_buf.
> ---
> gatchat/gatserver.c | 130
> +++++++++++++++++++++++++++++++++++++++++++++----- 1 files changed, 117
> insertions(+), 13 deletions(-)
>
> diff --git a/gatchat/gatserver.c b/gatchat/gatserver.c
> index bf7e847..7e11359 100644
> --- a/gatchat/gatserver.c
> +++ b/gatchat/gatserver.c
> @@ -32,6 +32,14 @@
> #include "ringbuffer.h"
> #include "gatserver.h"
>
> +#define READ_BUF_SIZE 4096
Separate patch please
> +/* #define WRITE_SCHEDULER_DEBUG 1 */
> +#ifdef WRITE_SCHEDULER_DEBUG
> +#define WRITE_BUF_SIZE 4
> +#else
> +#define WRITE_BUF_SIZE 4096
> +#endif
> +
Don't do this, you're confusing the intentions of what WRITE_SCHEDULER_DEBUG
did in atchat vs the way you're using it here.
> enum ParserState {
> PARSER_STATE_IDLE,
> PARSER_STATE_A,
> @@ -90,17 +98,75 @@ struct _GAtServer {
> struct v250_settings v250; /* V.250 command setting */
> GIOChannel *channel; /* Server IO */
> guint read_watch; /* GSource read id, 0 if none */
> + guint write_watch; /* GSource write id, 0 if none */
> guint read_so_far; /* Number of bytes processed */
> GAtDisconnectFunc user_disconnect; /* User disconnect func */
> gpointer user_disconnect_data; /* User disconnect data */
> GAtDebugFunc debugf; /* Debugging output function */
> gpointer debug_data; /* Data to pass to debug func */
> struct ring_buffer *read_buf; /* Current read buffer */
> + struct ring_buffer *write_buf; /* Current write buffer */
> + GSList *full_list; /* List of full ring buffer */
Use a GQueue here instead of write_buf and full_list. The head of the queue
is the stuff to be written, the tail of the queue is the free buffer to stuff
data into.
> guint max_read_attempts; /* Max reads per select */
> enum ParserState parser_state;
> gboolean destroyed; /* Re-entrancy guard */
> };
>
> +static void g_at_server_wakeup_writer(GAtServer *server);
> +
> +static gboolean replace_write_buf(GAtServer *server)
> +{
> + struct ring_buffer *free_buf = ring_buffer_new(WRITE_BUF_SIZE);
> +
> + /* Append this free buf into full list and replace current write
> + * buffer by this free buffer */
> + server->full_list = g_slist_append(server->full_list, free_buf);
> +
> + server->write_buf = free_buf;
> +
> + return TRUE;
> +}
> +
> +static void send_common(GAtServer *server, const char *buf, unsigned int
> len) +{
> + gsize avail = ring_buffer_avail(server->write_buf);
> + gsize towrite = len;
> + gsize bytes_written;
> + gsize offset = 0;
> +
> + if (avail > towrite) {
> + ring_buffer_write(server->write_buf, buf, towrite);
> +
> + g_at_server_wakeup_writer(server);
> +
> + return;
> + }
I suggest using only the while loop version, no need to short-circuit here.
> +
> + /* Write as much as we can */
> + bytes_written = ring_buffer_write(server->write_buf, buf, avail);
> + towrite -= bytes_written;
> + offset = bytes_written;
> +
> + /* If current write buf is full, replace it with next free buffer */
> + if (!replace_write_buf(server))
> + return;
This looks wrong, perhaps we should simply forcefully shutdown here in the
unlikely event that this happens?
> +
> + bytes_written = ring_buffer_write(server->write_buf, buf + offset,
> + towrite);
> + while (bytes_written < towrite) {
> + /* The next free buf is full, replace with next free one */
> + if (!replace_write_buf(server))
> + return;
> +
> + towrite -= bytes_written;
> + offset += bytes_written;
> + bytes_written = ring_buffer_write(server->write_buf,
> + buf + offset, towrite);
> + }
My brain hurts, you're using ring_buffer_write like three times. Please
restructure the loop to be more concise. Something like:
while (bytes_written < towrite) {
bytes_written += ring_buffer_write(buffer,
MIN(ring_buffer_avail(buffer), towrite-bytes_written);
if (ring_buffer_avail(buffer) == 0)
allocate_next();
}
> +
> + g_at_server_wakeup_writer(server);
> +}
> +
> static void g_at_server_send_result(GAtServer *server, GAtServerResult
> result) {
> struct v250_settings v250 = server->v250;
> @@ -108,7 +174,7 @@ static void g_at_server_send_result(GAtServer *server,
> GAtServerResult result) char buf[1024];
> char t = v250.s3;
> char r = v250.s4;
> - gsize wbuf;
> + unsigned int len;
>
> if (v250.quiet)
> return;
> @@ -117,16 +183,13 @@ static void g_at_server_send_result(GAtServer
> *server, GAtServerResult result) return;
>
> if (v250.is_v1)
> - snprintf(buf, sizeof(buf), "%c%c%s%c%c", t, r, result_str,
> + len = snprintf(buf, sizeof(buf), "%c%c%s%c%c", t, r, result_str,
> t, r);
> else
> - snprintf(buf, sizeof(buf), "%u%c", (unsigned int) result, t);
> -
> - g_at_util_debug_chat(FALSE, buf, strlen(buf),
> - server->debugf, server->debug_data);
> + len = snprintf(buf, sizeof(buf), "%u%c", (unsigned int) result,
> + t);
>
> - g_io_channel_write(server->channel, (char *) buf, strlen(buf),
> - &wbuf);
> + send_common(server, buf, len);
So strictly speaking this is wrong. man snprintf for the Return Value. If
you insist on using len, at least use MIN(len, sizeof(buf)-1) or something
like that.
> }
>
> static inline gboolean is_at_command_prefix(const char c)
> @@ -432,12 +495,23 @@ static gboolean received_data(GIOChannel *channel,
> GIOCondition cond, return TRUE;
> }
>
> +static gboolean can_write_data(GIOChannel *channel, GIOCondition cond,
> + gpointer data)
> +{
> + return FALSE;
> +}
> +
> static void g_at_server_cleanup(GAtServer *server)
> {
> /* Cleanup all received data */
> ring_buffer_free(server->read_buf);
> server->read_buf = NULL;
>
> + /* Cleanup pending data to write */
> + if (server->full_list)
Is server->full_list ever NULL?
> + g_slist_foreach(server->full_list, (GFunc)ring_buffer_free,
> + NULL);
> +
> server->channel = NULL;
> }
>
> @@ -446,8 +520,6 @@ static void read_watcher_destroy_notify(GAtServer
> *server) g_at_server_cleanup(server);
> server->read_watch = 0;
>
> - server->channel = NULL;
> -
> if (server->user_disconnect)
> server->user_disconnect(server->user_disconnect_data);
>
> @@ -455,6 +527,23 @@ static void read_watcher_destroy_notify(GAtServer
> *server) g_free(server);
> }
>
> +static void write_watcher_destroy_notify(GAtServer *server)
> +{
> + server->write_watch = 0;
> +}
> +
> +static void g_at_server_wakeup_writer(GAtServer *server)
> +{
> + if (server->write_watch != 0)
> + return;
> +
> + server->write_watch = g_io_add_watch_full(server->channel,
> + G_PRIORITY_DEFAULT,
> + G_IO_OUT | G_IO_HUP | G_IO_ERR | G_IO_NVAL,
> + can_write_data, server,
> + (GDestroyNotify)write_watcher_destroy_notify);
> +}
> +
> static void v250_settings_create(struct v250_settings *v250)
> {
> v250->s3 = '\r';
> @@ -482,12 +571,20 @@ GAtServer *g_at_server_new(GIOChannel *io)
> server->ref_count = 1;
> v250_settings_create(&server->v250);
> server->channel = io;
> - server->read_buf = ring_buffer_new(4096);
> - server->max_read_attempts = 3;
> -
> + server->read_buf = ring_buffer_new(READ_BUF_SIZE);
> if (!server->read_buf)
> goto error;
>
> + server->write_buf = ring_buffer_new(WRITE_BUF_SIZE);
> + if (!server->write_buf)
> + goto error;
> +
> + /* Current write buf becomes the tail of full list */
> + server->full_list = g_slist_append(server->full_list,
> + server->write_buf);
> +
> + server->max_read_attempts = 3;
> +
> if (!g_at_util_setup_io(server->channel, G_IO_FLAG_NONBLOCK))
> goto error;
>
> @@ -502,6 +599,10 @@ error:
> if (server->read_buf)
> ring_buffer_free(server->read_buf);
>
> + if (server->full_list)
> + g_slist_foreach(server->full_list, (GFunc)ring_buffer_free,
> + NULL);
> +
Is server->full_list ever NULL?
> if (server)
> g_free(server);
>
> @@ -552,6 +653,9 @@ gboolean g_at_server_shutdown(GAtServer *server)
> server->user_disconnect = NULL;
> server->user_disconnect_data = NULL;
>
> + if (server->write_watch)
> + g_source_remove(server->write_watch);
> +
> if (server->read_watch)
> g_source_remove(server->read_watch);
>
Regards,
-Denis
More information about the ofono
mailing list