summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergio Villar Senin <svillar@igalia.com>2012-07-30 12:40:32 +0200
committerSergio Villar Senin <svillar@igalia.com>2012-12-18 16:06:06 +0100
commitd12d25e3515b329f65da8f276965fe46c7c628ac (patch)
tree0824d6aa1588c3ff0a4d71590d5ae4876bf7ed19
parent38c13f1d28ee5e86dbdaf2e7a121fb0c148d41d1 (diff)
downloadlibsoup-d12d25e3515b329f65da8f276965fe46c7c628ac.tar.gz
libsoup-d12d25e3515b329f65da8f276965fe46c7c628ac.tar.bz2
libsoup-d12d25e3515b329f65da8f276965fe46c7c628ac.zip
SoupCacheInputStream: new input stream filter that writes to the cache
The SoupCacheInputStream will be added to the stream stack as any other GPollableInputStream. It will transparently read data from the underlying stream and will pass it unmodified to the upper level. Apart from that the stream writes everything it reads to a local file. Once the caching finishes a callback will be called. Caching may be cancelled at any point by providing a GCancellable. https://bugzilla.gnome.org/show_bug.cgi?id=682112
-rw-r--r--libsoup/Makefile.am2
-rw-r--r--libsoup/soup-cache-input-stream.c333
-rw-r--r--libsoup/soup-cache-input-stream.h52
-rw-r--r--po/POTFILES.in1
4 files changed, 388 insertions, 0 deletions
diff --git a/libsoup/Makefile.am b/libsoup/Makefile.am
index 87629ed1..59553f04 100644
--- a/libsoup/Makefile.am
+++ b/libsoup/Makefile.am
@@ -114,6 +114,8 @@ libsoup_2_4_la_SOURCES = \
soup-body-output-stream.h \
soup-body-output-stream.c \
soup-cache.c \
+ soup-cache-input-stream.h \
+ soup-cache-input-stream.c \
soup-cache-private.h \
soup-client-input-stream.h \
soup-client-input-stream.c \
diff --git a/libsoup/soup-cache-input-stream.c b/libsoup/soup-cache-input-stream.c
new file mode 100644
index 00000000..a44652a8
--- /dev/null
+++ b/libsoup/soup-cache-input-stream.c
@@ -0,0 +1,333 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright (C) 2012 Igalia, S.L.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <glib/gi18n-lib.h>
+#include "soup-cache-input-stream.h"
+#include "soup-message-body.h"
+
+static void soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
+
+G_DEFINE_TYPE_WITH_CODE (SoupCacheInputStream, soup_cache_input_stream, SOUP_TYPE_FILTER_INPUT_STREAM,
+ G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+ soup_cache_input_stream_pollable_init))
+
+/* properties */
+enum {
+ PROP_0,
+
+ PROP_OUTPUT_STREAM,
+
+ LAST_PROP
+};
+
+struct _SoupCacheInputStreamPrivate
+{
+ GOutputStream *output_stream;
+ gsize bytes_written;
+
+ gboolean read_finished;
+ SoupBuffer *current_writing_buffer;
+ GQueue *buffer_queue;
+
+ GTask *task;
+};
+
+static void soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream);
+
+static inline void
+notify_and_clear (SoupCacheInputStream *istream, GError *error)
+{
+ SoupCacheInputStreamPrivate *priv = istream->priv;
+
+ if (error)
+ g_task_return_error (priv->task, error);
+ else
+ g_task_return_int (priv->task, priv->bytes_written);
+
+ g_clear_object (&priv->output_stream);
+ g_clear_object (&priv->task);
+}
+
+gsize
+soup_cache_input_stream_cache_finish (SoupCacheInputStream *istream,
+ GAsyncResult *result,
+ GError **error)
+{
+ return g_task_propagate_int (G_TASK (result), error);
+}
+
+static inline void
+try_write_next_buffer (SoupCacheInputStream *istream)
+{
+ SoupCacheInputStreamPrivate *priv = istream->priv;
+
+ if (priv->current_writing_buffer == NULL && priv->buffer_queue->length)
+ soup_cache_input_stream_write_next_buffer (istream);
+ else if (priv->read_finished)
+ notify_and_clear (istream, NULL);
+ else if (g_input_stream_is_closed (G_INPUT_STREAM (istream))) {
+ GError *error = NULL;
+ g_set_error_literal (&error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+ _("Network stream unexpectedly closed"));
+ notify_and_clear (istream, error);
+ }
+}
+
+static void
+file_replaced_cb (GObject *source,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (user_data);
+ SoupCacheInputStreamPrivate *priv = istream->priv;
+ GError *error = NULL;
+
+ priv->output_stream = (GOutputStream *) g_file_replace_finish (G_FILE (source), res, &error);
+
+ if (error)
+ g_task_return_error (priv->task, error);
+ else
+ try_write_next_buffer (istream);
+}
+
+void
+soup_cache_input_stream_cache (SoupCacheInputStream *istream,
+ GFile *file,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ SoupCacheInputStreamPrivate *priv = istream->priv;
+
+ priv->task = g_task_new (istream, cancellable, callback, user_data);
+
+ g_file_replace_async (file, NULL, FALSE,
+ G_FILE_CREATE_PRIVATE | G_FILE_CREATE_REPLACE_DESTINATION,
+ G_PRIORITY_LOW, cancellable, file_replaced_cb, istream);
+}
+
+static void
+soup_cache_input_stream_init (SoupCacheInputStream *self)
+{
+ SoupCacheInputStreamPrivate *priv =
+ G_TYPE_INSTANCE_GET_PRIVATE (self, SOUP_TYPE_CACHE_INPUT_STREAM,
+ SoupCacheInputStreamPrivate);
+
+ priv->buffer_queue = g_queue_new ();
+ self->priv = priv;
+}
+
+static void
+soup_cache_input_stream_get_property (GObject *object,
+ guint property_id, GValue *value, GParamSpec *pspec)
+{
+ SoupCacheInputStream *self = SOUP_CACHE_INPUT_STREAM (object);
+ SoupCacheInputStreamPrivate *priv = self->priv;
+
+ switch (property_id) {
+ case PROP_OUTPUT_STREAM:
+ g_value_set_object (value, priv->output_stream);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+soup_cache_input_stream_set_property (GObject *object,
+ guint property_id, const GValue *value, GParamSpec *pspec)
+{
+ SoupCacheInputStream *self = SOUP_CACHE_INPUT_STREAM (object);
+ SoupCacheInputStreamPrivate *priv = self->priv;
+
+ switch (property_id) {
+ case PROP_OUTPUT_STREAM:
+ priv->output_stream = g_value_dup_object (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+soup_cache_input_stream_dispose (GObject *object)
+{
+ SoupCacheInputStreamPrivate *priv = SOUP_CACHE_INPUT_STREAM (object)->priv;
+
+ g_clear_object (&priv->output_stream);
+ g_clear_object (&priv->task);
+
+ G_OBJECT_CLASS (soup_cache_input_stream_parent_class)->dispose (object);
+}
+
+static void
+soup_cache_input_stream_finalize (GObject *object)
+{
+ SoupCacheInputStream *self = (SoupCacheInputStream *)object;
+ SoupCacheInputStreamPrivate *priv = self->priv;
+
+ g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
+ g_queue_free_full (priv->buffer_queue, (GDestroyNotify) soup_buffer_free);
+
+ G_OBJECT_CLASS (soup_cache_input_stream_parent_class)->finalize (object);
+}
+
+static void
+write_ready_cb (GObject *source, GAsyncResult *result, SoupCacheInputStream *istream)
+{
+ GOutputStream *ostream = G_OUTPUT_STREAM (source);
+ SoupCacheInputStreamPrivate *priv = istream->priv;
+ gssize write_size;
+ gsize pending;
+ GError *error = NULL;
+
+ write_size = g_output_stream_write_finish (ostream, result, &error);
+ if (error) {
+ notify_and_clear (istream, error);
+ g_object_unref (istream);
+ return;
+ }
+
+ /* Check that we have written everything */
+ pending = priv->current_writing_buffer->length - write_size;
+ if (pending) {
+ SoupBuffer *subbuffer = soup_buffer_new_subbuffer (priv->current_writing_buffer,
+ write_size, pending);
+ g_queue_push_head (priv->buffer_queue, subbuffer);
+ }
+
+ priv->bytes_written += write_size;
+ g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
+
+ try_write_next_buffer (istream);
+ g_object_unref (istream);
+}
+
+static void
+soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream)
+{
+ SoupCacheInputStreamPrivate *priv = istream->priv;
+ SoupBuffer *buffer = g_queue_pop_head (priv->buffer_queue);
+ int priority;
+
+ g_assert (priv->output_stream && !g_output_stream_is_closed (priv->output_stream));
+ g_assert (priv->task);
+
+ g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
+ priv->current_writing_buffer = buffer;
+
+ if (priv->buffer_queue->length > 10)
+ priority = G_PRIORITY_DEFAULT;
+ else
+ priority = G_PRIORITY_LOW;
+
+ g_output_stream_write_async (priv->output_stream, buffer->data, buffer->length,
+ priority, g_task_get_cancellable (priv->task),
+ (GAsyncReadyCallback) write_ready_cb,
+ g_object_ref (istream));
+}
+
+static gssize
+read_internal (GInputStream *stream,
+ void *buffer,
+ gsize count,
+ gboolean blocking,
+ GCancellable *cancellable,
+ GError **error)
+{
+ SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (stream);
+ SoupCacheInputStreamPrivate *priv = istream->priv;
+ GInputStream *base_stream;
+ gssize nread;
+
+ base_stream = g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (stream));
+ nread = g_pollable_stream_read (base_stream, buffer, count, blocking,
+ cancellable, error);
+
+ if (G_UNLIKELY (nread == -1 || priv->read_finished || !priv->task))
+ return nread;
+
+ if (nread == 0) {
+ priv->read_finished = TRUE;
+
+ if (priv->current_writing_buffer == NULL && priv->output_stream)
+ notify_and_clear (istream, NULL);
+ } else {
+ SoupBuffer *soup_buffer = soup_buffer_new (SOUP_MEMORY_COPY, buffer, nread);
+ g_queue_push_tail (priv->buffer_queue, soup_buffer);
+
+ if (priv->current_writing_buffer == NULL && priv->output_stream)
+ soup_cache_input_stream_write_next_buffer (istream);
+ }
+
+ return nread;
+}
+
+static gssize
+soup_cache_input_stream_read_fn (GInputStream *stream,
+ void *buffer,
+ gsize count,
+ GCancellable *cancellable,
+ GError **error)
+{
+ return read_internal (stream, buffer, count, TRUE,
+ cancellable, error);
+}
+
+static gssize
+soup_cache_input_stream_read_nonblocking (GPollableInputStream *stream,
+ void *buffer,
+ gsize count,
+ GError **error)
+{
+ return read_internal (G_INPUT_STREAM (stream), buffer, count, FALSE,
+ NULL, error);
+}
+
+static void
+soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
+ gpointer interface_data)
+{
+ pollable_interface->read_nonblocking = soup_cache_input_stream_read_nonblocking;
+}
+
+static void
+soup_cache_input_stream_class_init (SoupCacheInputStreamClass *klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+ GInputStreamClass *istream_class = G_INPUT_STREAM_CLASS (klass);
+
+ g_type_class_add_private (klass, sizeof (SoupCacheInputStreamPrivate));
+
+ gobject_class->get_property = soup_cache_input_stream_get_property;
+ gobject_class->set_property = soup_cache_input_stream_set_property;
+ gobject_class->dispose = soup_cache_input_stream_dispose;
+ gobject_class->finalize = soup_cache_input_stream_finalize;
+
+ istream_class->read_fn = soup_cache_input_stream_read_fn;
+
+ g_object_class_install_property (gobject_class, PROP_OUTPUT_STREAM,
+ g_param_spec_object ("output-stream", "Output stream",
+ "the output stream where to write.",
+ G_TYPE_OUTPUT_STREAM,
+ G_PARAM_READWRITE |
+ G_PARAM_CONSTRUCT_ONLY |
+ G_PARAM_STATIC_STRINGS));
+}
+
+GInputStream *
+soup_cache_input_stream_new (GInputStream *base_stream)
+{
+ return g_object_new (SOUP_TYPE_CACHE_INPUT_STREAM,
+ "base-stream", base_stream,
+ "close-base-stream", FALSE,
+ NULL);
+}
diff --git a/libsoup/soup-cache-input-stream.h b/libsoup/soup-cache-input-stream.h
new file mode 100644
index 00000000..c999d102
--- /dev/null
+++ b/libsoup/soup-cache-input-stream.h
@@ -0,0 +1,52 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * soup-cache-input-stream.h - Header for SoupCacheInputStream
+ */
+
+#ifndef __SOUP_CACHE_INPUT_STREAM_H__
+#define __SOUP_CACHE_INPUT_STREAM_H__
+
+#include "soup-filter-input-stream.h"
+
+G_BEGIN_DECLS
+
+#define SOUP_TYPE_CACHE_INPUT_STREAM (soup_cache_input_stream_get_type())
+#define SOUP_CACHE_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), SOUP_TYPE_CACHE_INPUT_STREAM, SoupCacheInputStream))
+#define SOUP_CACHE_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), SOUP_TYPE_CACHE_INPUT_STREAM, SoupCacheInputStreamClass))
+#define SOUP_IS_CACHE_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SOUP_TYPE_CACHE_INPUT_STREAM))
+#define SOUP_IS_CACHE_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), SOUP_TYPE_CACHE_INPUT_STREAM))
+#define SOUP_CACHE_INPUT_STREAM_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), SOUP_TYPE_CACHE_INPUT_STREAM, SoupCacheInputStreamClass))
+
+typedef struct _SoupCacheInputStream SoupCacheInputStream;
+typedef struct _SoupCacheInputStreamClass SoupCacheInputStreamClass;
+typedef struct _SoupCacheInputStreamPrivate SoupCacheInputStreamPrivate;
+
+struct _SoupCacheInputStreamClass
+{
+ SoupFilterInputStreamClass parent_class;
+};
+
+struct _SoupCacheInputStream
+{
+ SoupFilterInputStream parent;
+
+ SoupCacheInputStreamPrivate *priv;
+};
+
+GType soup_cache_input_stream_get_type (void) G_GNUC_CONST;
+
+GInputStream *soup_cache_input_stream_new (GInputStream *base_stream);
+
+void soup_cache_input_stream_cache (SoupCacheInputStream *istream,
+ GFile *file,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data);
+
+gsize soup_cache_input_stream_cache_finish (SoupCacheInputStream *istream,
+ GAsyncResult *result,
+ GError **error);
+
+G_END_DECLS
+
+#endif /* __SOUP_CACHE_INPUT_STREAM_H__ */
diff --git a/po/POTFILES.in b/po/POTFILES.in
index fff1f0ea..21c70d42 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -1,4 +1,5 @@
libsoup/soup-body-input-stream.c
+libsoup/soup-cache-input-stream.c
libsoup/soup-converter-wrapper.c
libsoup/soup-message-client-io.c
libsoup/soup-message-io.c