summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Winship <danw@gnome.org>2010-12-08 15:56:37 +0100
committerDan Winship <danw@gnome.org>2012-04-17 21:26:17 -0400
commitc0414594616131e082e87b78b41542be6785158a (patch)
tree4f1c8d0956e59e7ee241519befd3bd82b85e5388
parent6b9cbd9736486821d189aeaed1e8d327aed2b2a7 (diff)
downloadlibsoup-c0414594616131e082e87b78b41542be6785158a.tar.gz
libsoup-c0414594616131e082e87b78b41542be6785158a.tar.bz2
libsoup-c0414594616131e082e87b78b41542be6785158a.zip
soup-message-io: use gio streams rather than SoupSocket
Use the socket's input/output streams for the base I/O, and add new SoupBodyInputStream and SoupBodyOutputStream that can be created from them to handle the body of a single message (including handling chunked encoding/decoding). Update chunk-test, which was assuming that the chunk_allocator callback would never be called if the message had a 0-length body; that's no longer true.
-rw-r--r--libsoup/Makefile.am4
-rw-r--r--libsoup/soup-body-input-stream.c362
-rw-r--r--libsoup/soup-body-input-stream.h48
-rw-r--r--libsoup/soup-body-output-stream.c322
-rw-r--r--libsoup/soup-body-output-stream.h47
-rw-r--r--libsoup/soup-message-io.c920
-rw-r--r--libsoup/soup-socket.c18
-rw-r--r--libsoup/soup-socket.h2
-rw-r--r--po/POTFILES.in1
-rw-r--r--tests/chunk-test.c10
-rw-r--r--tests/connection-test.c17
11 files changed, 1182 insertions, 569 deletions
diff --git a/libsoup/Makefile.am b/libsoup/Makefile.am
index 4526ca5b..5cfba046 100644
--- a/libsoup/Makefile.am
+++ b/libsoup/Makefile.am
@@ -95,6 +95,10 @@ libsoup_2_4_la_SOURCES = \
soup-auth-manager.c \
soup-auth-manager-ntlm.h \
soup-auth-manager-ntlm.c \
+ soup-body-input-stream.h \
+ soup-body-input-stream.c \
+ soup-body-output-stream.h \
+ soup-body-output-stream.c \
soup-cache.c \
soup-cache-private.h \
soup-connection.h \
diff --git a/libsoup/soup-body-input-stream.c b/libsoup/soup-body-input-stream.c
new file mode 100644
index 00000000..2c5d16ea
--- /dev/null
+++ b/libsoup/soup-body-input-stream.c
@@ -0,0 +1,362 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * soup-body-input-stream.c
+ *
+ * Copyright 2012 Red Hat, Inc.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <stdlib.h>
+#include <string.h>
+#include <gio/gio.h>
+
+#include <glib/gi18n-lib.h>
+
+#include "soup-body-input-stream.h"
+#include "soup-enum-types.h"
+#include "soup-filter-input-stream.h"
+#include "soup-message-headers.h"
+
+typedef enum {
+ SOUP_BODY_INPUT_STREAM_STATE_CHUNK_SIZE,
+ SOUP_BODY_INPUT_STREAM_STATE_CHUNK_END,
+ SOUP_BODY_INPUT_STREAM_STATE_CHUNK,
+ SOUP_BODY_INPUT_STREAM_STATE_TRAILERS,
+ SOUP_BODY_INPUT_STREAM_STATE_DONE
+} SoupBodyInputStreamState;
+
+struct _SoupBodyInputStreamPrivate {
+ GInputStream *base_stream;
+
+ SoupEncoding encoding;
+ goffset read_length;
+ SoupBodyInputStreamState chunked_state;
+ gboolean eof;
+};
+
+enum {
+ PROP_0,
+
+ PROP_ENCODING,
+ PROP_CONTENT_LENGTH
+};
+
+static void soup_body_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
+
+G_DEFINE_TYPE_WITH_CODE (SoupBodyInputStream, soup_body_input_stream, G_TYPE_FILTER_INPUT_STREAM,
+ G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+ soup_body_input_stream_pollable_init))
+
+static void
+soup_body_input_stream_init (SoupBodyInputStream *bistream)
+{
+ bistream->priv = G_TYPE_INSTANCE_GET_PRIVATE (bistream,
+ SOUP_TYPE_BODY_INPUT_STREAM,
+ SoupBodyInputStreamPrivate);
+ bistream->priv->encoding = SOUP_ENCODING_NONE;
+}
+
+static void
+constructed (GObject *object)
+{
+ SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (object);
+
+ bistream->priv->base_stream = g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (bistream));
+
+ if (bistream->priv->encoding == SOUP_ENCODING_NONE ||
+ (bistream->priv->encoding == SOUP_ENCODING_CONTENT_LENGTH &&
+ bistream->priv->read_length == 0))
+ bistream->priv->eof = TRUE;
+}
+
+static void
+set_property (GObject *object, guint prop_id,
+ const GValue *value, GParamSpec *pspec)
+{
+ SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (object);
+
+ switch (prop_id) {
+ case PROP_ENCODING:
+ bistream->priv->encoding = g_value_get_enum (value);
+ if (bistream->priv->encoding == SOUP_ENCODING_CHUNKED)
+ bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_CHUNK_SIZE;
+ break;
+ case PROP_CONTENT_LENGTH:
+ bistream->priv->read_length = g_value_get_int64 (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+get_property (GObject *object, guint prop_id,
+ GValue *value, GParamSpec *pspec)
+{
+ SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (object);
+
+ switch (prop_id) {
+ case PROP_ENCODING:
+ g_value_set_enum (value, bistream->priv->encoding);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static gssize
+soup_body_input_stream_read_raw (SoupBodyInputStream *bistream,
+ void *buffer,
+ gsize count,
+ gboolean blocking,
+ GCancellable *cancellable,
+ GError **error)
+{
+ gssize nread;
+
+ nread = g_pollable_stream_read (bistream->priv->base_stream,
+ buffer, count,
+ blocking,
+ cancellable, error);
+ if (nread == 0) {
+ bistream->priv->eof = TRUE;
+ if (bistream->priv->encoding != SOUP_ENCODING_EOF) {
+ g_set_error_literal (error, G_IO_ERROR,
+ G_IO_ERROR_PARTIAL_INPUT,
+ _("Connection terminated unexpectedly"));
+ return -1;
+ }
+ }
+ return nread;
+}
+
+static gssize
+soup_body_input_stream_read_chunked (SoupBodyInputStream *bistream,
+ void *buffer,
+ gsize count,
+ gboolean blocking,
+ GCancellable *cancellable,
+ GError **error)
+{
+ SoupFilterInputStream *fstream = SOUP_FILTER_INPUT_STREAM (bistream->priv->base_stream);
+ char metabuf[128];
+ gssize nread;
+ gboolean got_line;
+
+again:
+ switch (bistream->priv->chunked_state) {
+ case SOUP_BODY_INPUT_STREAM_STATE_CHUNK_SIZE:
+ nread = soup_filter_input_stream_read_line (
+ fstream, metabuf, sizeof (metabuf), blocking,
+ &got_line, cancellable, error);
+ if (nread <= 0)
+ return nread;
+ if (!got_line) {
+ g_set_error_literal (error, G_IO_ERROR,
+ G_IO_ERROR_PARTIAL_INPUT,
+ _("Connection terminated unexpectedly"));
+ return -1;
+ }
+
+ bistream->priv->read_length = strtoul (metabuf, NULL, 16);
+ if (bistream->priv->read_length > 0)
+ bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_CHUNK;
+ else
+ bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_TRAILERS;
+ break;
+
+ case SOUP_BODY_INPUT_STREAM_STATE_CHUNK:
+ nread = soup_body_input_stream_read_raw (
+ bistream, buffer,
+ MIN (count, bistream->priv->read_length),
+ blocking, cancellable, error);
+ if (nread > 0) {
+ bistream->priv->read_length -= nread;
+ if (bistream->priv->read_length == 0)
+ bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_CHUNK_END;
+ }
+ return nread;
+
+ case SOUP_BODY_INPUT_STREAM_STATE_CHUNK_END:
+ nread = soup_filter_input_stream_read_line (
+ SOUP_FILTER_INPUT_STREAM (bistream->priv->base_stream),
+ metabuf, sizeof (metabuf), blocking,
+ &got_line, cancellable, error);
+ if (nread <= 0)
+ return nread;
+ if (!got_line) {
+ g_set_error_literal (error, G_IO_ERROR,
+ G_IO_ERROR_PARTIAL_INPUT,
+ _("Connection terminated unexpectedly"));
+ return -1;
+ }
+
+ bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_CHUNK_SIZE;
+ break;
+
+ case SOUP_BODY_INPUT_STREAM_STATE_TRAILERS:
+ nread = soup_filter_input_stream_read_line (
+ fstream, buffer, count, blocking,
+ &got_line, cancellable, error);
+ if (nread <= 0)
+ return nread;
+
+ if (strncmp (buffer, "\r\n", nread) || strncmp (buffer, "\n", nread))
+ bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_DONE;
+ break;
+
+ case SOUP_BODY_INPUT_STREAM_STATE_DONE:
+ return 0;
+ }
+
+ goto again;
+}
+
+static gssize
+read_internal (GInputStream *stream,
+ void *buffer,
+ gsize count,
+ gboolean blocking,
+ GCancellable *cancellable,
+ GError **error)
+{
+ SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (stream);
+ gssize nread;
+
+ if (bistream->priv->eof)
+ return 0;
+
+ switch (bistream->priv->encoding) {
+ case SOUP_ENCODING_NONE:
+ return 0;
+
+ case SOUP_ENCODING_CHUNKED:
+ return soup_body_input_stream_read_chunked (bistream, buffer, count,
+ blocking, cancellable, error);
+
+ case SOUP_ENCODING_CONTENT_LENGTH:
+ case SOUP_ENCODING_EOF:
+ if (bistream->priv->read_length != -1) {
+ count = MIN (count, bistream->priv->read_length);
+ if (count == 0)
+ return 0;
+ }
+
+ nread = soup_body_input_stream_read_raw (bistream, buffer, count,
+ blocking, cancellable, error);
+ if (bistream->priv->read_length != -1 && nread > 0)
+ bistream->priv->read_length -= nread;
+ return nread;
+
+ default:
+ g_return_val_if_reached (-1);
+ }
+}
+
+static gssize
+soup_body_input_stream_read_fn (GInputStream *stream,
+ void *buffer,
+ gsize count,
+ GCancellable *cancellable,
+ GError **error)
+{
+ return read_internal (stream, buffer, count, TRUE,
+ cancellable, error);
+}
+
+static gboolean
+soup_body_input_stream_is_readable (GPollableInputStream *stream)
+{
+ SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (stream);
+
+ return bistream->priv->eof ||
+ g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (bistream->priv->base_stream));
+}
+
+static gssize
+soup_body_input_stream_read_nonblocking (GPollableInputStream *stream,
+ void *buffer,
+ gsize count,
+ GError **error)
+{
+ return read_internal (G_INPUT_STREAM (stream), buffer, count, FALSE,
+ NULL, error);
+}
+
+static GSource *
+soup_body_input_stream_create_source (GPollableInputStream *stream,
+ GCancellable *cancellable)
+{
+ SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (stream);
+ GSource *base_source, *pollable_source;
+
+ if (bistream->priv->eof)
+ base_source = g_timeout_source_new (0);
+ else
+ base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (bistream->priv->base_stream), cancellable);
+ g_source_set_dummy_callback (base_source);
+
+ pollable_source = g_pollable_source_new (G_OBJECT (stream));
+ g_source_add_child_source (pollable_source, base_source);
+ g_source_unref (base_source);
+
+ return pollable_source;
+}
+
+static void
+soup_body_input_stream_class_init (SoupBodyInputStreamClass *stream_class)
+{
+ GObjectClass *object_class = G_OBJECT_CLASS (stream_class);
+ GInputStreamClass *input_stream_class = G_INPUT_STREAM_CLASS (stream_class);
+
+ g_type_class_add_private (stream_class, sizeof (SoupBodyInputStreamPrivate));
+
+ object_class->constructed = constructed;
+ object_class->set_property = set_property;
+ object_class->get_property = get_property;
+
+ input_stream_class->read_fn = soup_body_input_stream_read_fn;
+
+ g_object_class_install_property (
+ object_class, PROP_ENCODING,
+ g_param_spec_enum ("encoding",
+ "Encoding",
+ "Message body encoding",
+ SOUP_TYPE_ENCODING,
+ SOUP_ENCODING_NONE,
+ G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
+ g_object_class_install_property (
+ object_class, PROP_CONTENT_LENGTH,
+ g_param_spec_int64 ("content-length",
+ "Content-Length",
+ "Message body Content-Length",
+ -1, G_MAXINT64, -1,
+ G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY));
+}
+
+static void
+soup_body_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
+ gpointer interface_data)
+{
+ pollable_interface->is_readable = soup_body_input_stream_is_readable;
+ pollable_interface->read_nonblocking = soup_body_input_stream_read_nonblocking;
+ pollable_interface->create_source = soup_body_input_stream_create_source;
+}
+
+GInputStream *
+soup_body_input_stream_new (SoupFilterInputStream *base_stream,
+ SoupEncoding encoding,
+ goffset content_length)
+{
+ return g_object_new (SOUP_TYPE_BODY_INPUT_STREAM,
+ "base-stream", base_stream,
+ "close-base-stream", FALSE,
+ "encoding", encoding,
+ "content-length", content_length,
+ NULL);
+}
diff --git a/libsoup/soup-body-input-stream.h b/libsoup/soup-body-input-stream.h
new file mode 100644
index 00000000..9e0c08e3
--- /dev/null
+++ b/libsoup/soup-body-input-stream.h
@@ -0,0 +1,48 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright 2012 Red Hat, Inc.
+ */
+
+#ifndef SOUP_BODY_INPUT_STREAM_H
+#define SOUP_BODY_INPUT_STREAM_H 1
+
+#include "soup-types.h"
+#include "soup-filter-input-stream.h"
+#include "soup-message-headers.h"
+
+G_BEGIN_DECLS
+
+#define SOUP_TYPE_BODY_INPUT_STREAM (soup_body_input_stream_get_type ())
+#define SOUP_BODY_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), SOUP_TYPE_BODY_INPUT_STREAM, SoupBodyInputStream))
+#define SOUP_BODY_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), SOUP_TYPE_BODY_INPUT_STREAM, SoupBodyInputStreamClass))
+#define SOUP_IS_BODY_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SOUP_TYPE_BODY_INPUT_STREAM))
+#define SOUP_IS_BODY_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((obj), SOUP_TYPE_BODY_INPUT_STREAM))
+#define SOUP_BODY_INPUT_STREAM_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), SOUP_TYPE_BODY_INPUT_STREAM, SoupBodyInputStreamClass))
+
+typedef struct _SoupBodyInputStreamPrivate SoupBodyInputStreamPrivate;
+
+typedef struct {
+ GFilterInputStream parent;
+
+ SoupBodyInputStreamPrivate *priv;
+} SoupBodyInputStream;
+
+typedef struct {
+ GFilterInputStreamClass parent_class;
+
+ /* Padding for future expansion */
+ void (*_libsoup_reserved1) (void);
+ void (*_libsoup_reserved2) (void);
+ void (*_libsoup_reserved3) (void);
+ void (*_libsoup_reserved4) (void);
+} SoupBodyInputStreamClass;
+
+GType soup_body_input_stream_get_type (void);
+
+GInputStream *soup_body_input_stream_new (SoupFilterInputStream *base_stream,
+ SoupEncoding encoding,
+ goffset content_length);
+
+G_END_DECLS
+
+#endif /* SOUP_BODY_INPUT_STREAM_H */
diff --git a/libsoup/soup-body-output-stream.c b/libsoup/soup-body-output-stream.c
new file mode 100644
index 00000000..269ec711
--- /dev/null
+++ b/libsoup/soup-body-output-stream.c
@@ -0,0 +1,322 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * soup-body-output-stream.c
+ *
+ * Copyright 2012 Red Hat, Inc.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <gio/gio.h>
+
+#include "soup-body-output-stream.h"
+#include "soup-enum-types.h"
+#include "soup-message-headers.h"
+
+typedef enum {
+ SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK_SIZE,
+ SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK_END,
+ SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK,
+ SOUP_BODY_OUTPUT_STREAM_STATE_TRAILERS,
+ SOUP_BODY_OUTPUT_STREAM_STATE_DONE
+} SoupBodyOutputStreamState;
+
+struct _SoupBodyOutputStreamPrivate {
+ GOutputStream *base_stream;
+ char buf[20];
+
+ SoupEncoding encoding;
+ goffset write_length;
+ goffset written;
+ SoupBodyOutputStreamState chunked_state;
+ gboolean eof;
+};
+
+enum {
+ PROP_0,
+
+ PROP_ENCODING,
+ PROP_CONTENT_LENGTH
+};
+
+static void soup_body_output_stream_pollable_init (GPollableOutputStreamInterface *pollable_interface, gpointer interface_data);
+
+G_DEFINE_TYPE_WITH_CODE (SoupBodyOutputStream, soup_body_output_stream, G_TYPE_FILTER_OUTPUT_STREAM,
+ G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
+ soup_body_output_stream_pollable_init))
+
+
+static void
+soup_body_output_stream_init (SoupBodyOutputStream *stream)
+{
+ stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
+ SOUP_TYPE_BODY_OUTPUT_STREAM,
+ SoupBodyOutputStreamPrivate);
+}
+
+static void
+constructed (GObject *object)
+{
+ SoupBodyOutputStream *bostream = SOUP_BODY_OUTPUT_STREAM (object);
+
+ bostream->priv->base_stream = g_filter_output_stream_get_base_stream (G_FILTER_OUTPUT_STREAM (bostream));
+}
+
+static void
+set_property (GObject *object, guint prop_id,
+ const GValue *value, GParamSpec *pspec)
+{
+ SoupBodyOutputStream *bostream = SOUP_BODY_OUTPUT_STREAM (object);
+
+ switch (prop_id) {
+ case PROP_ENCODING:
+ bostream->priv->encoding = g_value_get_enum (value);
+ if (bostream->priv->encoding == SOUP_ENCODING_CHUNKED)
+ bostream->priv->chunked_state = SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK_SIZE;
+ break;
+ case PROP_CONTENT_LENGTH:
+ bostream->priv->write_length = g_value_get_uint64 (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+get_property (GObject *object, guint prop_id,
+ GValue *value, GParamSpec *pspec)
+{
+ SoupBodyOutputStream *bostream = SOUP_BODY_OUTPUT_STREAM (object);
+
+ switch (prop_id) {
+ case PROP_ENCODING:
+ g_value_set_enum (value, bostream->priv->encoding);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static gssize
+soup_body_output_stream_write_raw (SoupBodyOutputStream *bostream,
+ const void *buffer,
+ gsize count,
+ GCancellable *cancellable,
+ GError **error)
+{
+ gssize nwrote, my_count;
+
+ /* If the caller tries to write too much to a Content-Length
+ * encoded stream, we truncate at the right point, but keep
+ * accepting additional data until they stop.
+ */
+ if (bostream->priv->write_length) {
+ my_count = MIN (count, bostream->priv->write_length - bostream->priv->written);
+ if (my_count == 0) {
+ bostream->priv->eof = TRUE;
+ return count;
+ }
+ } else
+ my_count = count;
+
+ nwrote = g_output_stream_write (bostream->priv->base_stream,
+ buffer, my_count,
+ cancellable, error);
+
+ if (nwrote > 0 && bostream->priv->write_length)
+ bostream->priv->written += nwrote;
+
+ if (nwrote == my_count && my_count != count)
+ nwrote = count;
+
+ return nwrote;
+}
+
+static gssize
+soup_body_output_stream_write_chunked (SoupBodyOutputStream *bostream,
+ const void *buffer,
+ gsize count,
+ GCancellable *cancellable,
+ GError **error)
+{
+ char *buf = bostream->priv->buf;
+ gssize nwrote, len;
+
+again:
+ len = strlen (buf);
+ if (len) {
+ nwrote = g_output_stream_write (bostream->priv->base_stream,
+ buf, len, cancellable, error);
+ if (nwrote < 0)
+ return nwrote;
+ memmove (buf, buf + nwrote, len + 1 - nwrote);
+ goto again;
+ }
+
+ switch (bostream->priv->chunked_state) {
+ case SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK_SIZE:
+ snprintf (buf, sizeof (bostream->priv->buf),
+ "%lx\r\n", (gulong)count);
+ len = strlen (buf);
+
+ if (count > 0)
+ bostream->priv->chunked_state = SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK;
+ else
+ bostream->priv->chunked_state = SOUP_BODY_OUTPUT_STREAM_STATE_TRAILERS;
+ break;
+
+ case SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK:
+ nwrote = g_output_stream_write (bostream->priv->base_stream,
+ buffer, count, cancellable, error);
+ if (nwrote < (gssize)count)
+ return nwrote;
+
+ bostream->priv->chunked_state = SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK_END;
+ break;
+
+ case SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK_END:
+ strncpy (buf, "\r\n", sizeof (bostream->priv->buf));
+ len = 2;
+ bostream->priv->chunked_state = SOUP_BODY_OUTPUT_STREAM_STATE_DONE;
+ break;
+
+ case SOUP_BODY_OUTPUT_STREAM_STATE_TRAILERS:
+ strncpy (buf, "\r\n", sizeof (bostream->priv->buf));
+ len = 2;
+ bostream->priv->chunked_state = SOUP_BODY_OUTPUT_STREAM_STATE_DONE;
+ break;
+
+ case SOUP_BODY_OUTPUT_STREAM_STATE_DONE:
+ bostream->priv->chunked_state = SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK_SIZE;
+ return count;
+ }
+
+ goto again;
+}
+
+static gssize
+soup_body_output_stream_write_fn (GOutputStream *stream,
+ const void *buffer,
+ gsize count,
+ GCancellable *cancellable,
+ GError **error)
+{
+ SoupBodyOutputStream *bostream = SOUP_BODY_OUTPUT_STREAM (stream);
+
+ if (bostream->priv->eof)
+ return count;
+
+ switch (bostream->priv->encoding) {
+ case SOUP_ENCODING_CHUNKED:
+ return soup_body_output_stream_write_chunked (bostream, buffer, count,
+ cancellable, error);
+
+ default:
+ return soup_body_output_stream_write_raw (bostream, buffer, count,
+ cancellable, error);
+ }
+}
+
+static gboolean
+soup_body_output_stream_close_fn (GOutputStream *stream,
+ GCancellable *cancellable,
+ GError **error)
+{
+ SoupBodyOutputStream *bostream = SOUP_BODY_OUTPUT_STREAM (stream);
+
+ if (bostream->priv->encoding == SOUP_ENCODING_CHUNKED) {
+ if (soup_body_output_stream_write_chunked (bostream, NULL, 0, cancellable, error) == -1)
+ return FALSE;
+ }
+
+ return G_OUTPUT_STREAM_CLASS (soup_body_output_stream_parent_class)->close_fn (stream, cancellable, error);
+}
+
+static gboolean
+soup_body_output_stream_is_writable (GPollableOutputStream *stream)
+{
+ SoupBodyOutputStream *bostream = SOUP_BODY_OUTPUT_STREAM (stream);
+
+ return bostream->priv->eof ||
+ g_pollable_output_stream_is_writable (G_POLLABLE_OUTPUT_STREAM (bostream->priv->base_stream));
+}
+
+static GSource *
+soup_body_output_stream_create_source (GPollableOutputStream *stream,
+ GCancellable *cancellable)
+{
+ SoupBodyOutputStream *bostream = SOUP_BODY_OUTPUT_STREAM (stream);
+ GSource *base_source, *pollable_source;
+
+ if (bostream->priv->eof)
+ base_source = g_timeout_source_new (0);
+ else
+ base_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (bostream->priv->base_stream), cancellable);
+ g_source_set_dummy_callback (base_source);
+
+ pollable_source = g_pollable_source_new (G_OBJECT (stream));
+ g_source_add_child_source (pollable_source, base_source);
+ g_source_unref (base_source);
+
+ return pollable_source;
+}
+
+static void
+soup_body_output_stream_class_init (SoupBodyOutputStreamClass *stream_class)
+{
+ GObjectClass *object_class = G_OBJECT_CLASS (stream_class);
+ GOutputStreamClass *output_stream_class = G_OUTPUT_STREAM_CLASS (stream_class);
+
+ g_type_class_add_private (stream_class, sizeof (SoupBodyOutputStreamPrivate));
+
+ object_class->constructed = constructed;
+ object_class->set_property = set_property;
+ object_class->get_property = get_property;
+
+ output_stream_class->write_fn = soup_body_output_stream_write_fn;
+ output_stream_class->close_fn = soup_body_output_stream_close_fn;
+
+ g_object_class_install_property (
+ object_class, PROP_ENCODING,
+ g_param_spec_enum ("encoding",
+ "Encoding",
+ "Message body encoding",
+ SOUP_TYPE_ENCODING,
+ SOUP_ENCODING_NONE,
+ G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
+ g_object_class_install_property (
+ object_class, PROP_CONTENT_LENGTH,
+ g_param_spec_uint64 ("content-length",
+ "Content-Length",
+ "Message body Content-Length",
+ 0, G_MAXUINT64, 0,
+ G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY));
+}
+
+static void
+soup_body_output_stream_pollable_init (GPollableOutputStreamInterface *pollable_interface,
+ gpointer interface_data)
+{
+ pollable_interface->is_writable = soup_body_output_stream_is_writable;
+ pollable_interface->create_source = soup_body_output_stream_create_source;
+}
+
+GOutputStream *
+soup_body_output_stream_new (GOutputStream *base_stream,
+ SoupEncoding encoding,
+ goffset content_length)
+{
+ return g_object_new (SOUP_TYPE_BODY_OUTPUT_STREAM,
+ "base-stream", base_stream,
+ "close-base-stream", FALSE,
+ "encoding", encoding,
+ "content-length", content_length,
+ NULL);
+}
diff --git a/libsoup/soup-body-output-stream.h b/libsoup/soup-body-output-stream.h
new file mode 100644
index 00000000..8bd8970d
--- /dev/null
+++ b/libsoup/soup-body-output-stream.h
@@ -0,0 +1,47 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright 2012 Red Hat, Inc.
+ */
+
+#ifndef SOUP_BODY_OUTPUT_STREAM_H
+#define SOUP_BODY_OUTPUT_STREAM_H 1
+
+#include "soup-types.h"
+#include "soup-message-headers.h"
+
+G_BEGIN_DECLS
+
+#define SOUP_TYPE_BODY_OUTPUT_STREAM (soup_body_output_stream_get_type ())
+#define SOUP_BODY_OUTPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), SOUP_TYPE_BODY_OUTPUT_STREAM, SoupBodyOutputStream))
+#define SOUP_BODY_OUTPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), SOUP_TYPE_BODY_OUTPUT_STREAM, SoupBodyOutputStreamClass))
+#define SOUP_IS_BODY_OUTPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SOUP_TYPE_BODY_OUTPUT_STREAM))
+#define SOUP_IS_BODY_OUTPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((obj), SOUP_TYPE_BODY_OUTPUT_STREAM))
+#define SOUP_BODY_OUTPUT_STREAM_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), SOUP_TYPE_BODY_OUTPUT_STREAM, SoupBodyOutputStreamClass))
+
+typedef struct _SoupBodyOutputStreamPrivate SoupBodyOutputStreamPrivate;
+
+typedef struct {
+ GFilterOutputStream parent;
+
+ SoupBodyOutputStreamPrivate *priv;
+} SoupBodyOutputStream;
+
+typedef struct {
+ GFilterOutputStreamClass parent_class;
+
+ /* Padding for future expansion */
+ void (*_libsoup_reserved1) (void);
+ void (*_libsoup_reserved2) (void);
+ void (*_libsoup_reserved3) (void);
+ void (*_libsoup_reserved4) (void);
+} SoupBodyOutputStreamClass;
+
+GType soup_body_output_stream_get_type (void);
+
+GOutputStream *soup_body_output_stream_new (GOutputStream *base_stream,
+ SoupEncoding encoding,
+ goffset content_length);
+
+G_END_DECLS
+
+#endif /* SOUP_BODY_OUTPUT_STREAM_H */
diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c
index cf2a2e3b..aabb902b 100644
--- a/libsoup/soup-message-io.c
+++ b/libsoup/soup-message-io.c
@@ -12,7 +12,10 @@
#include <stdlib.h>
#include <string.h>
+#include "soup-body-input-stream.h"
+#include "soup-body-output-stream.h"
#include "soup-connection.h"
+#include "soup-filter-input-stream.h"
#include "soup-message.h"
#include "soup-message-private.h"
#include "soup-message-queue.h"
@@ -28,11 +31,10 @@ typedef enum {
SOUP_MESSAGE_IO_STATE_NOT_STARTED,
SOUP_MESSAGE_IO_STATE_HEADERS,
SOUP_MESSAGE_IO_STATE_BLOCKING,
+ SOUP_MESSAGE_IO_STATE_BODY_START,
SOUP_MESSAGE_IO_STATE_BODY,
- SOUP_MESSAGE_IO_STATE_CHUNK_SIZE,
- SOUP_MESSAGE_IO_STATE_CHUNK,
- SOUP_MESSAGE_IO_STATE_CHUNK_END,
- SOUP_MESSAGE_IO_STATE_TRAILERS,
+ SOUP_MESSAGE_IO_STATE_BODY_DATA,
+ SOUP_MESSAGE_IO_STATE_BODY_DONE,
SOUP_MESSAGE_IO_STATE_FINISHING,
SOUP_MESSAGE_IO_STATE_DONE
} SoupMessageIOState;
@@ -43,17 +45,23 @@ typedef enum {
state != SOUP_MESSAGE_IO_STATE_DONE)
typedef struct {
- SoupSocket *sock;
SoupMessageQueueItem *item;
SoupMessageIOMode mode;
GCancellable *cancellable;
+ SoupSocket *sock;
+ SoupFilterInputStream *istream;
+ GInputStream *body_istream;
+ GOutputStream *ostream;
+ GOutputStream *body_ostream;
+ GMainContext *async_context;
+ gboolean blocking;
+
SoupMessageIOState read_state;
SoupEncoding read_encoding;
- GByteArray *read_meta_buf;
+ GByteArray *read_header_buf;
SoupMessageBody *read_body;
goffset read_length;
- gboolean read_eof_ok;
gboolean need_content_sniffed, need_got_chunk;
SoupMessageBody *sniff_data;
@@ -67,8 +75,9 @@ typedef struct {
goffset write_length;
goffset written;
- guint read_tag, write_tag;
+ GSource *io_source;
GSource *unpause_source;
+ gboolean paused;
SoupMessageGetHeadersFn get_headers_cb;
SoupMessageParseHeadersFn parse_headers_cb;
@@ -83,8 +92,8 @@ typedef struct {
*/
#define dummy_to_make_emacs_happy {
#define SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK { gboolean cancelled; g_object_ref (msg);
-#define SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED cancelled = (priv->io_data != io); g_object_unref (msg); if (cancelled || (!io->read_tag && !io->write_tag)) return; }
-#define SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED(val) cancelled = (priv->io_data != io); g_object_unref (msg); if (cancelled || (!io->read_tag && !io->write_tag)) return val; }
+#define SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED cancelled = (priv->io_data != io); g_object_unref (msg); if (cancelled || io->paused) return; }
+#define SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED(val) cancelled = (priv->io_data != io); g_object_unref (msg); if (cancelled || io->paused) return val; }
#define RESPONSE_BLOCK_SIZE 8192
@@ -103,10 +112,20 @@ soup_message_io_cleanup (SoupMessage *msg)
if (io->sock)
g_object_unref (io->sock);
+ if (io->istream)
+ g_object_remove_weak_pointer (G_OBJECT (io->istream), (gpointer *)&io->istream);
+ if (io->ostream)
+ g_object_remove_weak_pointer (G_OBJECT (io->ostream), (gpointer *)&io->ostream);
+ if (io->body_istream)
+ g_object_unref (io->body_istream);
+ if (io->body_ostream)
+ g_object_unref (io->body_ostream);
+ if (io->async_context)
+ g_main_context_unref (io->async_context);
if (io->item)
soup_message_queue_item_unref (io->item);
- g_byte_array_free (io->read_meta_buf, TRUE);
+ g_byte_array_free (io->read_header_buf, TRUE);
g_string_free (io->write_buf, TRUE);
if (io->write_chunk)
@@ -127,13 +146,9 @@ soup_message_io_stop (SoupMessage *msg)
if (!io)
return;
- if (io->read_tag) {
- g_signal_handler_disconnect (io->sock, io->read_tag);
- io->read_tag = 0;
- }
- if (io->write_tag) {
- g_signal_handler_disconnect (io->sock, io->write_tag);
- io->write_tag = 0;
+ if (io->io_source) {
+ g_source_destroy (io->io_source);
+ io->io_source = NULL;
}
if (io->unpause_source) {
@@ -145,9 +160,6 @@ soup_message_io_stop (SoupMessage *msg)
soup_socket_disconnect (io->sock);
}
-#define SOUP_MESSAGE_IO_EOL "\r\n"
-#define SOUP_MESSAGE_IO_EOL_LEN 2
-
void
soup_message_io_finished (SoupMessage *msg)
{
@@ -163,8 +175,6 @@ soup_message_io_finished (SoupMessage *msg)
g_object_unref (msg);
}
-static void io_read (SoupSocket *sock, SoupMessage *msg);
-
static gboolean
request_is_idempotent (SoupMessage *msg)
{
@@ -184,7 +194,7 @@ io_error (SoupSocket *sock, SoupMessage *msg, GError *error)
error->message);
} else if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
io->read_state <= SOUP_MESSAGE_IO_STATE_HEADERS &&
- io->read_meta_buf->len == 0 &&
+ io->read_header_buf->len == 0 &&
soup_connection_get_ever_used (io->item->conn) &&
!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT) &&
request_is_idempotent (msg)) {
@@ -248,87 +258,54 @@ io_handle_sniffing (SoupMessage *msg, gboolean done_reading)
return TRUE;
}
-/* Reads data from io->sock into io->read_meta_buf. If @to_blank is
- * %TRUE, it reads up until a blank line ("CRLF CRLF" or "LF LF").
- * Otherwise, it reads up until a single CRLF or LF.
- *
- * This function is used to read metadata, and read_body_chunk() is
- * used to read the message body contents.
- *
- * read_metadata, read_body_chunk, and write_data all use the same
- * convention for return values: if they return %TRUE, it means
- * they've completely finished the requested read/write, and the
- * caller should move on to the next step. If they return %FALSE, it
- * means that either (a) the socket returned SOUP_SOCKET_WOULD_BLOCK,
- * so the caller should give up for now and wait for the socket to
- * emit a signal, or (b) the socket returned an error, and io_error()
- * was called to process it and cancel the I/O. So either way, if the
- * function returns %FALSE, the caller should return immediately.
- */
static gboolean
-read_metadata (SoupMessage *msg, gboolean to_blank)
+read_headers (SoupMessage *msg, GCancellable *cancellable, GError **error)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
- SoupSocketIOStatus status;
- guchar read_buf[RESPONSE_BLOCK_SIZE];
- gsize nread;
+ gssize nread, old_len;
gboolean got_lf;
- GError *error = NULL;
while (1) {
- status = soup_socket_read_until (io->sock, read_buf,
- sizeof (read_buf),
- "\n", 1, &nread, &got_lf,
- io->cancellable, &error);
- switch (status) {
- case SOUP_SOCKET_OK:
- g_byte_array_append (io->read_meta_buf, read_buf, nread);
- break;
-
- case SOUP_SOCKET_EOF:
- /* More lame server handling... deal with
- * servers that don't send the final chunk.
- */
- if (io->read_state == SOUP_MESSAGE_IO_STATE_CHUNK_SIZE &&
- io->read_meta_buf->len == 0) {
- g_byte_array_append (io->read_meta_buf,
- (guchar *)"0\r\n", 3);
- got_lf = TRUE;
- break;
- } else if (io->read_state == SOUP_MESSAGE_IO_STATE_TRAILERS &&
- io->read_meta_buf->len == 0) {
- g_byte_array_append (io->read_meta_buf,
- (guchar *)"\r\n", 2);
- got_lf = TRUE;
- break;
- }
- /* else fall through */
-
- case SOUP_SOCKET_ERROR:
- io_error (io->sock, msg, error);
- return FALSE;
-
- case SOUP_SOCKET_WOULD_BLOCK:
+ old_len = io->read_header_buf->len;
+ g_byte_array_set_size (io->read_header_buf, old_len + RESPONSE_BLOCK_SIZE);
+ nread = soup_filter_input_stream_read_line (io->istream,
+ io->read_header_buf->data + old_len,
+ RESPONSE_BLOCK_SIZE,
+ io->blocking,
+ &got_lf,
+ cancellable, error);
+ io->read_header_buf->len = old_len + MAX (nread, 0);
+ if (nread == 0)
+ io_error (io->sock, msg, NULL);
+ if (nread <= 0)
return FALSE;
- }
if (got_lf) {
- if (!to_blank)
- break;
- if (nread == 1 && io->read_meta_buf->len >= 2 &&
- !strncmp ((char *)io->read_meta_buf->data +
- io->read_meta_buf->len - 2,
+ if (nread == 1 && old_len >= 2 &&
+ !strncmp ((char *)io->read_header_buf->data +
+ io->read_header_buf->len - 2,
"\n\n", 2))
break;
- else if (nread == 2 && io->read_meta_buf->len >= 3 &&
- !strncmp ((char *)io->read_meta_buf->data +
- io->read_meta_buf->len - 3,
+ else if (nread == 2 && old_len >= 3 &&
+ !strncmp ((char *)io->read_header_buf->data +
+ io->read_header_buf->len - 3,
"\n\r\n", 3))
break;
}
}
+ /* We need to "rewind" io->read_header_buf back one line.
+ * That SHOULD be two characters (CR LF), but if the
+ * web server was stupid, it might only be one.
+ */
+ if (io->read_header_buf->len < 3 ||
+ io->read_header_buf->data[io->read_header_buf->len - 2] == '\n')
+ io->read_header_buf->len--;
+ else
+ io->read_header_buf->len -= 2;
+ io->read_header_buf->data[io->read_header_buf->len] = '\0';
+
return TRUE;
}
@@ -445,165 +422,6 @@ content_decode (SoupMessage *msg, SoupBuffer *buf)
return buf;
}
-/* Reads as much message body data as is available on io->sock (but no
- * further than the end of the current message body or chunk). On a
- * successful read, emits "got_chunk" (possibly multiple times), and
- * (unless told not to) appends the chunk to io->read_body.
- *
- * See the note at read_metadata() for an explanation of the return
- * value.
- */
-static gboolean
-read_body_chunk (SoupMessage *msg)
-{
- SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
- SoupMessageIOData *io = priv->io_data;
- SoupSocketIOStatus status;
- guchar *stack_buf = NULL;
- gsize len;
- gboolean read_to_eof = (io->read_encoding == SOUP_ENCODING_EOF);
- gsize nread;
- GError *error = NULL;
- SoupBuffer *buffer;
-
- if (!io_handle_sniffing (msg, FALSE))
- return FALSE;
-
- while (read_to_eof || io->read_length > 0) {
- if (priv->chunk_allocator) {
- buffer = priv->chunk_allocator (msg, io->read_length, priv->chunk_allocator_data);
- if (!buffer) {
- soup_message_io_pause (msg);
- return FALSE;
- }
- } else {
- if (!stack_buf)
- stack_buf = alloca (RESPONSE_BLOCK_SIZE);
- buffer = soup_buffer_new (SOUP_MEMORY_TEMPORARY,
- stack_buf,
- RESPONSE_BLOCK_SIZE);
- }
-
- if (read_to_eof)
- len = buffer->length;
- else
- len = MIN (buffer->length, io->read_length);
-
- status = soup_socket_read (io->sock,
- (guchar *)buffer->data, len,
- &nread, io->cancellable, &error);
-
- if (status == SOUP_SOCKET_OK && nread) {
- buffer->length = nread;
- io->read_length -= nread;
-
- buffer = content_decode (msg, buffer);
- if (!buffer)
- continue;
-
- soup_message_body_got_chunk (io->read_body, buffer);
-
- if (io->need_content_sniffed) {
- soup_message_body_append_buffer (io->sniff_data, buffer);
- soup_buffer_free (buffer);
- io->need_got_chunk = TRUE;
- if (!io_handle_sniffing (msg, FALSE))
- return FALSE;
- continue;
- }
-
- SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
- soup_message_got_chunk (msg, buffer);
- soup_buffer_free (buffer);
- SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
- continue;
- }
-
- soup_buffer_free (buffer);
- switch (status) {
- case SOUP_SOCKET_OK:
- break;
-
- case SOUP_SOCKET_EOF:
- if (io->read_eof_ok) {
- io->read_length = 0;
- return TRUE;
- }
- /* else fall through */
-
- case SOUP_SOCKET_ERROR:
- io_error (io->sock, msg, error);
- return FALSE;
-
- case SOUP_SOCKET_WOULD_BLOCK:
- return FALSE;
- }
- }
-
- return TRUE;
-}
-
-/* Attempts to write @len bytes from @data. See the note at
- * read_metadata() for an explanation of the return value.
- */
-static gboolean
-write_data (SoupMessage *msg, const char *data, guint len, gboolean body)
-{
- SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
- SoupMessageIOData *io = priv->io_data;
- SoupSocketIOStatus status;
- gsize nwrote;
- GError *error = NULL;
- SoupBuffer *chunk;
- const char *start;
-
- while (len > io->written) {
- status = soup_socket_write (io->sock,
- data + io->written,
- len - io->written,
- &nwrote,
- io->cancellable, &error);
- switch (status) {
- case SOUP_SOCKET_EOF:
- case SOUP_SOCKET_ERROR:
- io_error (io->sock, msg, error);
- return FALSE;
-
- case SOUP_SOCKET_WOULD_BLOCK:
- return FALSE;
-
- case SOUP_SOCKET_OK:
- start = data + io->written;
- io->written += nwrote;
-
- if (body) {
- if (io->write_length)
- io->write_length -= nwrote;
-
- chunk = soup_buffer_new (SOUP_MEMORY_TEMPORARY,
- start, nwrote);
- SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
- soup_message_wrote_body_data (msg, chunk);
- soup_buffer_free (chunk);
- SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
- }
- break;
- }
- }
-
- io->written = 0;
- return TRUE;
-}
-
-static inline SoupMessageIOState
-io_body_state (SoupEncoding encoding)
-{
- if (encoding == SOUP_ENCODING_CHUNKED)
- return SOUP_MESSAGE_IO_STATE_CHUNK_SIZE;
- else
- return SOUP_MESSAGE_IO_STATE_BODY;
-}
-
/*
* There are two request/response formats: the basic request/response,
* possibly with one or more unsolicited informational responses (such
@@ -630,16 +448,24 @@ io_body_state (SoupEncoding encoding)
* W:DONE / R:DONE R:DONE / W:DONE
*/
-static void
-io_write (SoupSocket *sock, SoupMessage *msg)
+/* Attempts to push forward the writing side of @msg's I/O. Returns
+ * %TRUE if it manages to make some progress, and it is likely that
+ * further progress can be made. Returns %FALSE if it has reached a
+ * stopping point of some sort (need input from the application,
+ * socket not writable, write is complete, etc).
+ */
+static gboolean
+io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
+ SoupBuffer *chunk;
+ gssize nwrote;
- write_more:
switch (io->write_state) {
case SOUP_MESSAGE_IO_STATE_NOT_STARTED:
- return;
+ case SOUP_MESSAGE_IO_STATE_BLOCKING:
+ return FALSE;
case SOUP_MESSAGE_IO_STATE_HEADERS:
@@ -647,32 +473,29 @@ io_write (SoupSocket *sock, SoupMessage *msg)
io->get_headers_cb (msg, io->write_buf,
&io->write_encoding,
io->header_data);
- if (!io->write_buf->len) {
- soup_message_io_pause (msg);
- return;
- }
}
- if (!write_data (msg, io->write_buf->str,
- io->write_buf->len, FALSE))
- return;
+ while (io->written < io->write_buf->len) {
+ nwrote = g_pollable_stream_write (io->ostream,
+ io->write_buf->str + io->written,
+ io->write_buf->len - io->written,
+ io->blocking,
+ cancellable, error);
+ if (nwrote == -1)
+ return FALSE;
+ io->written += nwrote;
+ }
+ io->written = 0;
g_string_truncate (io->write_buf, 0);
- if (io->write_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
- SoupMessageHeaders *hdrs =
- (io->mode == SOUP_MESSAGE_IO_CLIENT) ?
- msg->request_headers : msg->response_headers;
- io->write_length = soup_message_headers_get_content_length (hdrs);
- }
-
if (io->mode == SOUP_MESSAGE_IO_SERVER &&
SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
if (msg->status_code == SOUP_STATUS_CONTINUE) {
/* Stop and wait for the body now */
io->write_state =
SOUP_MESSAGE_IO_STATE_BLOCKING;
- io->read_state = io_body_state (io->read_encoding);
+ io->read_state = SOUP_MESSAGE_IO_STATE_BODY;
} else {
/* We just wrote a 1xx response
* header, so stay in STATE_HEADERS.
@@ -682,13 +505,26 @@ io_write (SoupSocket *sock, SoupMessage *msg)
* response.)
*/
}
- } else if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
- soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) {
+
+ soup_message_wrote_informational (msg);
+ soup_message_cleanup_response (msg);
+ break;
+ }
+
+ if (io->write_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
+ SoupMessageHeaders *hdrs =
+ (io->mode == SOUP_MESSAGE_IO_CLIENT) ?
+ msg->request_headers : msg->response_headers;
+ io->write_length = soup_message_headers_get_content_length (hdrs);
+ }
+
+ if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
+ soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) {
/* Need to wait for the Continue response */
io->write_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
} else {
- io->write_state = io_body_state (io->write_encoding);
+ io->write_state = SOUP_MESSAGE_IO_STATE_BODY_START;
/* If the client was waiting for a Continue
* but we sent something else, then they're
@@ -696,39 +532,26 @@ io_write (SoupSocket *sock, SoupMessage *msg)
*/
if (io->mode == SOUP_MESSAGE_IO_SERVER &&
io->read_state == SOUP_MESSAGE_IO_STATE_BLOCKING)
- io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
+ io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
}
- SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
- if (SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
- soup_message_wrote_informational (msg);
- soup_message_cleanup_response (msg);
- } else
- soup_message_wrote_headers (msg);
- SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
+ soup_message_wrote_headers (msg);
break;
- case SOUP_MESSAGE_IO_STATE_BLOCKING:
- io_read (sock, msg);
-
- /* If io_read reached a point where we could write
- * again, it would have recursively called io_write.
- * So (a) we don't need to try to keep writing, and
- * (b) we can't anyway, because msg may have been
- * destroyed.
- */
- return;
+ case SOUP_MESSAGE_IO_STATE_BODY_START:
+ io->body_ostream = soup_body_output_stream_new (io->ostream,
+ io->write_encoding,
+ io->write_length);
+ io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
+ break;
case SOUP_MESSAGE_IO_STATE_BODY:
- if (!io->write_length && io->write_encoding != SOUP_ENCODING_EOF) {
- wrote_body:
- io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
-
- SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
- soup_message_wrote_body (msg);
- SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
+ if (!io->write_length &&
+ io->write_encoding != SOUP_ENCODING_EOF &&
+ io->write_encoding != SOUP_ENCODING_CHUNKED) {
+ io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
break;
}
@@ -736,164 +559,114 @@ io_write (SoupSocket *sock, SoupMessage *msg)
io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset);
if (!io->write_chunk) {
soup_message_io_pause (msg);
- return;
+ return FALSE;
+ }
+ if (!io->write_chunk->length) {
+ io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
+ break;
}
- if (io->write_chunk->length > io->write_length &&
- io->write_encoding != SOUP_ENCODING_EOF) {
- /* App is trying to write more than it
- * claimed it would; we have to truncate.
- */
- SoupBuffer *truncated =
- soup_buffer_new_subbuffer (io->write_chunk,
- 0, io->write_length);
- soup_buffer_free (io->write_chunk);
- io->write_chunk = truncated;
- } else if (io->write_encoding == SOUP_ENCODING_EOF &&
- !io->write_chunk->length)
- goto wrote_body;
}
- if (!write_data (msg, io->write_chunk->data,
- io->write_chunk->length, TRUE))
- return;
-
- if (io->mode == SOUP_MESSAGE_IO_SERVER ||
- priv->msg_flags & SOUP_MESSAGE_CAN_REBUILD)
- soup_message_body_wrote_chunk (io->write_body, io->write_chunk);
- io->write_body_offset += io->write_chunk->length;
- soup_buffer_free (io->write_chunk);
- io->write_chunk = NULL;
+ nwrote = g_pollable_stream_write (io->body_ostream,
+ io->write_chunk->data + io->written,
+ io->write_chunk->length - io->written,
+ io->blocking,
+ cancellable, error);
+ if (nwrote == -1)
+ return FALSE;
- SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
- soup_message_wrote_chunk (msg);
- SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
- break;
+ chunk = soup_buffer_new_subbuffer (io->write_chunk,
+ io->written, nwrote);
+ io->written += nwrote;
+ if (io->write_length)
+ io->write_length -= nwrote;
- case SOUP_MESSAGE_IO_STATE_CHUNK_SIZE:
- if (!io->write_chunk) {
- io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset);
- if (!io->write_chunk) {
- soup_message_io_pause (msg);
- return;
- }
- g_string_append_printf (io->write_buf, "%lx\r\n",
- (unsigned long) io->write_chunk->length);
- io->write_body_offset += io->write_chunk->length;
- }
+ if (io->written == io->write_chunk->length)
+ io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DATA;
- if (!write_data (msg, io->write_buf->str,
- io->write_buf->len, FALSE))
- return;
+ soup_message_wrote_body_data (msg, chunk);
+ soup_buffer_free (chunk);
+ break;
- g_string_truncate (io->write_buf, 0);
+ case SOUP_MESSAGE_IO_STATE_BODY_DATA:
+ io->written = 0;
if (io->write_chunk->length == 0) {
- /* The last chunk has no CHUNK_END... */
- io->write_state = SOUP_MESSAGE_IO_STATE_TRAILERS;
+ io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
break;
}
- io->write_state = SOUP_MESSAGE_IO_STATE_CHUNK;
- /* fall through */
-
-
- case SOUP_MESSAGE_IO_STATE_CHUNK:
- if (!write_data (msg, io->write_chunk->data,
- io->write_chunk->length, TRUE))
- return;
-
if (io->mode == SOUP_MESSAGE_IO_SERVER ||
priv->msg_flags & SOUP_MESSAGE_CAN_REBUILD)
soup_message_body_wrote_chunk (io->write_body, io->write_chunk);
+ io->write_body_offset += io->write_chunk->length;
soup_buffer_free (io->write_chunk);
io->write_chunk = NULL;
- io->write_state = SOUP_MESSAGE_IO_STATE_CHUNK_END;
-
- SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
+ io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
soup_message_wrote_chunk (msg);
- SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
-
- /* fall through */
-
-
- case SOUP_MESSAGE_IO_STATE_CHUNK_END:
- if (!write_data (msg, SOUP_MESSAGE_IO_EOL,
- SOUP_MESSAGE_IO_EOL_LEN, FALSE))
- return;
-
- io->write_state = SOUP_MESSAGE_IO_STATE_CHUNK_SIZE;
break;
- case SOUP_MESSAGE_IO_STATE_TRAILERS:
- if (!write_data (msg, SOUP_MESSAGE_IO_EOL,
- SOUP_MESSAGE_IO_EOL_LEN, FALSE))
- return;
+ case SOUP_MESSAGE_IO_STATE_BODY_DONE:
+ if (io->body_ostream) {
+ if (!g_output_stream_close (io->body_ostream, cancellable, error))
+ return FALSE;
+ g_clear_object (&io->body_ostream);
+ }
io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
-
- SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
soup_message_wrote_body (msg);
- SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
- /* fall through */
+ break;
case SOUP_MESSAGE_IO_STATE_FINISHING:
- if (io->write_tag) {
- g_signal_handler_disconnect (io->sock, io->write_tag);
- io->write_tag = 0;
- }
io->write_state = SOUP_MESSAGE_IO_STATE_DONE;
- if (io->mode == SOUP_MESSAGE_IO_CLIENT) {
+ if (io->mode == SOUP_MESSAGE_IO_CLIENT)
io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
- io_read (sock, msg);
- } else
- soup_message_io_finished (msg);
- return;
+ break;
case SOUP_MESSAGE_IO_STATE_DONE:
default:
- g_return_if_reached ();
+ g_return_val_if_reached (FALSE);
}
- goto write_more;
+ return TRUE;
}
-static void
-io_read (SoupSocket *sock, SoupMessage *msg)
+/* Attempts to push forward the reading side of @msg's I/O. Returns
+ * %TRUE if it manages to make some progress, and it is likely that
+ * further progress can be made. Returns %FALSE if it has reached a
+ * stopping point of some sort (need input from the application,
+ * socket not readable, read is complete, etc).
+ */
+static gboolean
+io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
+ guchar *stack_buf = NULL;
+ gssize nread;
+ SoupBuffer *buffer;
guint status;
- read_more:
switch (io->read_state) {
case SOUP_MESSAGE_IO_STATE_NOT_STARTED:
- return;
+ case SOUP_MESSAGE_IO_STATE_BLOCKING:
+ return FALSE;
case SOUP_MESSAGE_IO_STATE_HEADERS:
- if (!read_metadata (msg, TRUE))
- return;
-
- /* We need to "rewind" io->read_meta_buf back one line.
- * That SHOULD be two characters (CR LF), but if the
- * web server was stupid, it might only be one.
- */
- if (io->read_meta_buf->len < 3 ||
- io->read_meta_buf->data[io->read_meta_buf->len - 2] == '\n')
- io->read_meta_buf->len--;
- else
- io->read_meta_buf->len -= 2;
- io->read_meta_buf->data[io->read_meta_buf->len] = '\0';
- status = io->parse_headers_cb (msg, (char *)io->read_meta_buf->data,
- io->read_meta_buf->len,
+ if (!read_headers (msg, cancellable, error))
+ return FALSE;
+
+ status = io->parse_headers_cb (msg, (char *)io->read_header_buf->data,
+ io->read_header_buf->len,
&io->read_encoding,
io->header_data);
- g_byte_array_set_size (io->read_meta_buf, 0);
+ g_byte_array_set_size (io->read_header_buf, 0);
if (status != SOUP_STATUS_OK) {
/* Either we couldn't parse the headers, or they
@@ -910,26 +683,6 @@ io_read (SoupSocket *sock, SoupMessage *msg)
break;
}
- if (io->read_encoding == SOUP_ENCODING_EOF)
- io->read_eof_ok = TRUE;
-
- if (io->read_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
- SoupMessageHeaders *hdrs =
- (io->mode == SOUP_MESSAGE_IO_CLIENT) ?
- msg->response_headers : msg->request_headers;
- io->read_length = soup_message_headers_get_content_length (hdrs);
-
- if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
- !soup_message_is_keepalive (msg)) {
- /* Some servers suck and send
- * incorrect Content-Length values, so
- * allow EOF termination in this case
- * (iff the message is too short) too.
- */
- io->read_eof_ok = TRUE;
- }
- }
-
if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
if (msg->status_code == SOUP_STATUS_CONTINUE &&
@@ -938,11 +691,18 @@ io_read (SoupSocket *sock, SoupMessage *msg)
io->read_state =
SOUP_MESSAGE_IO_STATE_BLOCKING;
io->write_state =
- io_body_state (io->write_encoding);
+ SOUP_MESSAGE_IO_STATE_BODY_START;
} else {
/* Just stay in HEADERS */
io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
}
+
+ /* Informational responses have no bodies, so
+ * bail out here rather than parsing encoding, etc
+ */
+ soup_message_got_informational (msg);
+ soup_message_cleanup_response (msg);
+ break;
} else if (io->mode == SOUP_MESSAGE_IO_SERVER &&
soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) {
/* The client requested a Continue response. The
@@ -953,7 +713,7 @@ io_read (SoupSocket *sock, SoupMessage *msg)
io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
io->read_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
} else {
- io->read_state = io_body_state (io->read_encoding);
+ io->read_state = SOUP_MESSAGE_IO_STATE_BODY;
/* If the client was waiting for a Continue
* but got something else, then it's done
@@ -964,121 +724,187 @@ io_read (SoupSocket *sock, SoupMessage *msg)
io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
}
- if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
- SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
- SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
- soup_message_got_informational (msg);
- soup_message_cleanup_response (msg);
- SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
- } else {
- SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
- soup_message_got_headers (msg);
- SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
- }
- break;
-
+ if (io->read_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
+ SoupMessageHeaders *hdrs =
+ (io->mode == SOUP_MESSAGE_IO_CLIENT) ?
+ msg->response_headers : msg->request_headers;
+ io->read_length = soup_message_headers_get_content_length (hdrs);
- case SOUP_MESSAGE_IO_STATE_BLOCKING:
- io_write (sock, msg);
+ if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
+ !soup_message_is_keepalive (msg)) {
+ /* Some servers suck and send
+ * incorrect Content-Length values, so
+ * allow EOF termination in this case
+ * (iff the message is too short) too.
+ */
+ io->read_encoding = SOUP_ENCODING_EOF;
+ }
+ } else
+ io->read_length = -1;
- /* As in the io_write case, we *must* return here. */
- return;
+ io->body_istream = soup_body_input_stream_new (SOUP_FILTER_INPUT_STREAM (io->istream),
+ io->read_encoding,
+ io->read_length);
+ soup_message_got_headers (msg);
+ break;
case SOUP_MESSAGE_IO_STATE_BODY:
- if (!read_body_chunk (msg))
- return;
-
- got_body:
- if (!io_handle_sniffing (msg, TRUE)) {
- /* If the message was paused (as opposed to
- * cancelled), we need to make sure we wind up
- * back here when it's unpaused, even if it
- * was doing a chunked or EOF-terminated read
- * before.
- */
- if (io == priv->io_data) {
- io->read_state = SOUP_MESSAGE_IO_STATE_BODY;
- io->read_encoding = SOUP_ENCODING_CONTENT_LENGTH;
- io->read_length = 0;
+ if (!io_handle_sniffing (msg, FALSE))
+ return FALSE;
+
+ if (priv->chunk_allocator) {
+ buffer = priv->chunk_allocator (msg, io->read_length, priv->chunk_allocator_data);
+ if (!buffer) {
+ soup_message_io_pause (msg);
+ return FALSE;
}
- return;
+ } else {
+ if (!stack_buf)
+ stack_buf = alloca (RESPONSE_BLOCK_SIZE);
+ buffer = soup_buffer_new (SOUP_MEMORY_TEMPORARY,
+ stack_buf,
+ RESPONSE_BLOCK_SIZE);
}
- io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
+ nread = g_pollable_stream_read (io->body_istream,
+ (guchar *)buffer->data,
+ buffer->length,
+ io->blocking,
+ cancellable, error);
+ if (nread > 0) {
+ buffer->length = nread;
+ buffer = content_decode (msg, buffer);
+ if (!buffer)
+ break;
- SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
- soup_message_got_body (msg);
- SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
- break;
+ soup_message_body_got_chunk (io->read_body, buffer);
+ if (io->need_content_sniffed) {
+ soup_message_body_append_buffer (io->sniff_data, buffer);
+ soup_buffer_free (buffer);
+ io->need_got_chunk = TRUE;
+ if (!io_handle_sniffing (msg, FALSE))
+ return FALSE;
+ break;
+ }
- case SOUP_MESSAGE_IO_STATE_CHUNK_SIZE:
- if (!read_metadata (msg, FALSE))
- return;
+ soup_message_got_chunk (msg, buffer);
+ soup_buffer_free (buffer);
+ break;
+ }
- io->read_length = strtoul ((char *)io->read_meta_buf->data, NULL, 16);
- g_byte_array_set_size (io->read_meta_buf, 0);
+ soup_buffer_free (buffer);
+ if (nread == -1)
+ return FALSE;
- if (io->read_length > 0)
- io->read_state = SOUP_MESSAGE_IO_STATE_CHUNK;
- else
- io->read_state = SOUP_MESSAGE_IO_STATE_TRAILERS;
+ /* else nread == 0 */
+ io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
break;
- case SOUP_MESSAGE_IO_STATE_CHUNK:
- if (!read_body_chunk (msg))
- return;
+ case SOUP_MESSAGE_IO_STATE_BODY_DONE:
+ if (!io_handle_sniffing (msg, TRUE))
+ return FALSE;
- io->read_state = SOUP_MESSAGE_IO_STATE_CHUNK_END;
+ io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
+ soup_message_got_body (msg);
break;
- case SOUP_MESSAGE_IO_STATE_CHUNK_END:
- if (!read_metadata (msg, FALSE))
- return;
+ case SOUP_MESSAGE_IO_STATE_FINISHING:
+ io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
- g_byte_array_set_size (io->read_meta_buf, 0);
- io->read_state = SOUP_MESSAGE_IO_STATE_CHUNK_SIZE;
+ if (io->mode == SOUP_MESSAGE_IO_SERVER)
+ io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
break;
- case SOUP_MESSAGE_IO_STATE_TRAILERS:
- if (!read_metadata (msg, FALSE))
- return;
+ case SOUP_MESSAGE_IO_STATE_DONE:
+ default:
+ g_return_val_if_reached (FALSE);
+ }
- if (io->read_meta_buf->len <= SOUP_MESSAGE_IO_EOL_LEN)
- goto got_body;
+ return TRUE;
+}
- /* FIXME: process trailers */
- g_byte_array_set_size (io->read_meta_buf, 0);
- break;
+static GSource *
+soup_message_io_get_source (SoupMessage *msg, GCancellable *cancellable,
+ GSourceFunc callback, gpointer user_data)
+{
+ SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+ SoupMessageIOData *io = priv->io_data;
+ GSource *source;
+
+ if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state)) {
+ source = g_pollable_input_stream_create_source (
+ G_POLLABLE_INPUT_STREAM (io->istream), cancellable);
+ } else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state)) {
+ source = g_pollable_output_stream_create_source (
+ G_POLLABLE_OUTPUT_STREAM (io->ostream), cancellable);
+ } else
+ g_return_val_if_reached (NULL);
+ g_source_set_callback (source, callback, user_data, NULL);
+ return source;
+}
- case SOUP_MESSAGE_IO_STATE_FINISHING:
- if (io->read_tag) {
- g_signal_handler_disconnect (io->sock, io->read_tag);
- io->read_tag = 0;
- }
- io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
+static gboolean io_run (GObject *stream, SoupMessage *msg);
- if (io->mode == SOUP_MESSAGE_IO_SERVER) {
- io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
- io_write (sock, msg);
- } else
- soup_message_io_finished (msg);
- return;
+static void
+setup_io_source (SoupMessage *msg)
+{
+ SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+ SoupMessageIOData *io = priv->io_data;
+ io->io_source = soup_message_io_get_source (msg, NULL,
+ (GSourceFunc)io_run, msg);
+ g_source_attach (io->io_source, io->async_context);
+ g_source_unref (io->io_source);
+}
- case SOUP_MESSAGE_IO_STATE_DONE:
- default:
- g_return_if_reached ();
+static gboolean
+io_run (GObject *stream, SoupMessage *msg)
+{
+ SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+ SoupMessageIOData *io = priv->io_data;
+ GError *error = NULL;
+
+ if (io->io_source) {
+ g_source_destroy (io->io_source);
+ io->io_source = NULL;
+ }
+
+ g_object_ref (msg);
+
+ while (priv->io_data == io && !io->paused) {
+ gboolean progress = FALSE;
+
+ if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
+ progress = io_read (msg, io->cancellable, &error);
+ else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
+ progress = io_write (msg, io->cancellable, &error);
+
+ if (!progress)
+ break;
}
- goto read_more;
+ if (error) {
+ if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ g_clear_error (&error);
+ setup_io_source (msg);
+ } else
+ io_error (io->sock, msg, error);
+ } else if (priv->io_data == io &&
+ io->read_state == SOUP_MESSAGE_IO_STATE_DONE &&
+ io->write_state == SOUP_MESSAGE_IO_STATE_DONE)
+ soup_message_io_finished (msg);
+
+ g_object_unref (msg);
+ return FALSE;
}
+
static SoupMessageIOData *
new_iostate (SoupMessage *msg, SoupSocket *sock, SoupMessageIOMode mode,
SoupMessageGetHeadersFn get_headers_cb,
@@ -1089,9 +915,9 @@ new_iostate (SoupMessage *msg, SoupSocket *sock, SoupMessageIOMode mode,
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io;
+ gboolean non_blocking, use_thread_context;
io = g_slice_new0 (SoupMessageIOData);
- io->sock = g_object_ref (sock);
io->mode = mode;
io->get_headers_cb = get_headers_cb;
io->parse_headers_cb = parse_headers_cb;
@@ -1099,13 +925,32 @@ new_iostate (SoupMessage *msg, SoupSocket *sock, SoupMessageIOMode mode,
io->completion_cb = completion_cb;
io->completion_data = completion_data;
- io->read_meta_buf = g_byte_array_new ();
- io->write_buf = g_string_new (NULL);
+ io->sock = g_object_ref (sock);
+ io->istream = SOUP_FILTER_INPUT_STREAM (soup_socket_get_input_stream (sock));
+ if (io->istream)
+ g_object_add_weak_pointer (G_OBJECT (io->istream), (gpointer *)&io->istream);
+ io->ostream = soup_socket_get_output_stream (sock);
+ if (io->ostream)
+ g_object_add_weak_pointer (G_OBJECT (io->ostream), (gpointer *)&io->ostream);
+
+ g_object_get (io->sock,
+ SOUP_SOCKET_FLAG_NONBLOCKING, &non_blocking,
+ SOUP_SOCKET_USE_THREAD_CONTEXT, &use_thread_context,
+ NULL);
+ io->blocking = !non_blocking;
+
+ if (use_thread_context) {
+ io->async_context = g_main_context_get_thread_default ();
+ if (io->async_context)
+ g_main_context_ref (io->async_context);
+ } else {
+ g_object_get (io->sock,
+ SOUP_SOCKET_ASYNC_CONTEXT, &io->async_context,
+ NULL);
+ }
- io->read_tag = g_signal_connect (io->sock, "readable",
- G_CALLBACK (io_read), msg);
- io->write_tag = g_signal_connect (io->sock, "writable",
- G_CALLBACK (io_write), msg);
+ io->read_header_buf = g_byte_array_new ();
+ io->write_buf = g_string_new (NULL);
io->read_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
io->write_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
@@ -1139,7 +984,7 @@ soup_message_io_client (SoupMessageQueueItem *item,
io->write_body = item->msg->request_body;
io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
- io_write (sock, item->msg);
+ io_run (NULL, item->msg);
}
void
@@ -1160,7 +1005,7 @@ soup_message_io_server (SoupMessage *msg, SoupSocket *sock,
io->write_body = msg->response_body;
io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
- io_read (sock, msg);
+ io_run (NULL, msg);
}
void
@@ -1171,19 +1016,17 @@ soup_message_io_pause (SoupMessage *msg)
g_return_if_fail (io != NULL);
- if (io->write_tag) {
- g_signal_handler_disconnect (io->sock, io->write_tag);
- io->write_tag = 0;
- }
- if (io->read_tag) {
- g_signal_handler_disconnect (io->sock, io->read_tag);
- io->read_tag = 0;
+ if (io->io_source) {
+ g_source_destroy (io->io_source);
+ io->io_source = NULL;
}
if (io->unpause_source) {
g_source_destroy (io->unpause_source);
io->unpause_source = NULL;
}
+
+ io->paused = TRUE;
}
static gboolean
@@ -1194,25 +1037,12 @@ io_unpause_internal (gpointer msg)
g_return_val_if_fail (io != NULL, FALSE);
io->unpause_source = NULL;
+ io->paused = FALSE;
- if (io->write_tag || io->read_tag)
+ if (io->io_source)
return FALSE;
- if (io->write_state != SOUP_MESSAGE_IO_STATE_DONE) {
- io->write_tag = g_signal_connect (io->sock, "writable",
- G_CALLBACK (io_write), msg);
- }
-
- if (io->read_state != SOUP_MESSAGE_IO_STATE_DONE) {
- io->read_tag = g_signal_connect (io->sock, "readable",
- G_CALLBACK (io_read), msg);
- }
-
- if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
- io_write (io->sock, msg);
- else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
- io_read (io->sock, msg);
-
+ io_run (NULL, msg);
return FALSE;
}
@@ -1221,32 +1051,16 @@ soup_message_io_unpause (SoupMessage *msg)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
- gboolean non_blocking, use_thread_context;
- GMainContext *async_context;
g_return_if_fail (io != NULL);
- g_object_get (io->sock,
- SOUP_SOCKET_FLAG_NONBLOCKING, &non_blocking,
- SOUP_SOCKET_USE_THREAD_CONTEXT, &use_thread_context,
- NULL);
- if (use_thread_context)
- async_context = g_main_context_ref_thread_default ();
- else {
- g_object_get (io->sock,
- SOUP_SOCKET_ASYNC_CONTEXT, &async_context,
- NULL);
- }
-
- if (non_blocking) {
+ if (!io->blocking) {
if (!io->unpause_source) {
io->unpause_source = soup_add_completion (
- async_context, io_unpause_internal, msg);
+ io->async_context, io_unpause_internal, msg);
}
} else
io_unpause_internal (msg);
- if (async_context)
- g_main_context_unref (async_context);
}
/**
diff --git a/libsoup/soup-socket.c b/libsoup/soup-socket.c
index 049be923..2d72b385 100644
--- a/libsoup/soup-socket.c
+++ b/libsoup/soup-socket.c
@@ -129,8 +129,6 @@ disconnect_internal (SoupSocket *sock, gboolean close)
if (G_IS_TLS_CONNECTION (priv->conn))
g_signal_handlers_disconnect_by_func (priv->conn, soup_socket_peer_certificate_changed, sock);
g_clear_object (&priv->conn);
- g_clear_object (&priv->istream);
- g_clear_object (&priv->ostream);
}
if (priv->read_src) {
@@ -1325,6 +1323,22 @@ soup_socket_get_remote_address (SoupSocket *sock)
return priv->remote_addr;
}
+GInputStream *
+soup_socket_get_input_stream (SoupSocket *sock)
+{
+ g_return_val_if_fail (SOUP_IS_SOCKET (sock), NULL);
+
+ return SOUP_SOCKET_GET_PRIVATE (sock)->istream;
+}
+
+GOutputStream *
+soup_socket_get_output_stream (SoupSocket *sock)
+{
+ g_return_val_if_fail (SOUP_IS_SOCKET (sock), NULL);
+
+ return SOUP_SOCKET_GET_PRIVATE (sock)->ostream;
+}
+
static gboolean
socket_read_watch (GObject *pollable, gpointer user_data)
diff --git a/libsoup/soup-socket.h b/libsoup/soup-socket.h
index dc6b59c4..5cbf14ac 100644
--- a/libsoup/soup-socket.h
+++ b/libsoup/soup-socket.h
@@ -85,6 +85,8 @@ gboolean soup_socket_is_connected (SoupSocket *sock);
SoupAddress *soup_socket_get_local_address (SoupSocket *sock);
SoupAddress *soup_socket_get_remote_address (SoupSocket *sock);
+GInputStream *soup_socket_get_input_stream (SoupSocket *sock);
+GOutputStream *soup_socket_get_output_stream (SoupSocket *sock);
typedef enum {
SOUP_SOCKET_OK,
diff --git a/po/POTFILES.in b/po/POTFILES.in
index c43b9434..4115bb0b 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -1,2 +1,3 @@
+libsoup/soup-body-input-stream.c
libsoup/soup-request.c
libsoup/soup-requester.c
diff --git a/tests/chunk-test.c b/tests/chunk-test.c
index 3805fb75..c3eecc42 100644
--- a/tests/chunk-test.c
+++ b/tests/chunk-test.c
@@ -21,15 +21,6 @@ typedef struct {
gboolean streaming;
} PutTestData;
-static SoupBuffer *
-error_chunk_allocator (SoupMessage *msg, gsize max_len, gpointer user_data)
-{
- /* This should never be called, because there is no response body. */
- debug_printf (1, " error_chunk_allocator called!\n");
- errors++;
- return soup_buffer_new (SOUP_MEMORY_TAKE, g_malloc (100), 100);
-}
-
static void
write_next_chunk (SoupMessage *msg, gpointer user_data)
{
@@ -191,7 +182,6 @@ do_request_test (SoupSession *session, SoupURI *base_uri, RequestTestFlags flags
msg = soup_message_new_from_uri ("PUT", uri);
soup_message_headers_set_encoding (msg->request_headers, SOUP_ENCODING_CHUNKED);
soup_message_body_set_accumulate (msg->request_body, FALSE);
- soup_message_set_chunk_allocator (msg, error_chunk_allocator, NULL, NULL);
if (flags & HACKY_STREAMING) {
g_signal_connect (msg, "wrote_chunk",
G_CALLBACK (write_next_chunk_streaming_hack), &ptd);
diff --git a/tests/connection-test.c b/tests/connection-test.c
index 545bf101..7c6fb5a0 100644
--- a/tests/connection-test.c
+++ b/tests/connection-test.c
@@ -23,11 +23,20 @@ static void
close_socket (SoupMessage *msg, gpointer user_data)
{
SoupSocket *sock = user_data;
+ int sockfd;
- soup_socket_disconnect (sock);
-
- /* But also add the missing data to the message now, so
- * SoupServer can clean up after itself properly.
+ /* Actually calling soup_socket_disconnect() here would cause
+ * us to leak memory, so just shutdown the socket instead.
+ */
+ sockfd = soup_socket_get_fd (sock);
+#ifdef G_OS_WIN32
+ shutdown (sockfd, SD_SEND);
+#else
+ shutdown (sockfd, SHUT_WR);
+#endif
+
+ /* Then add the missing data to the message now, so SoupServer
+ * can clean up after itself properly.
*/
soup_message_body_append (msg->response_body, SOUP_MEMORY_STATIC,
"foo", 3);