diff options
author | Sergio Villar Senin <svillar@igalia.com> | 2012-07-30 12:40:32 +0200 |
---|---|---|
committer | Sergio Villar Senin <svillar@igalia.com> | 2012-12-18 16:06:06 +0100 |
commit | d12d25e3515b329f65da8f276965fe46c7c628ac (patch) | |
tree | 0824d6aa1588c3ff0a4d71590d5ae4876bf7ed19 | |
parent | 38c13f1d28ee5e86dbdaf2e7a121fb0c148d41d1 (diff) | |
download | libsoup-d12d25e3515b329f65da8f276965fe46c7c628ac.tar.gz libsoup-d12d25e3515b329f65da8f276965fe46c7c628ac.tar.bz2 libsoup-d12d25e3515b329f65da8f276965fe46c7c628ac.zip |
SoupCacheInputStream: new input stream filter that writes to the cache
The SoupCacheInputStream will be added to the stream stack as any other
GPollableInputStream. It will transparently read data from the underlying
stream and will pass it unmodified to the upper level.
Apart from that the stream writes everything it reads to a local file. Once
the caching finishes a callback will be called. Caching may be cancelled at
any point by providing a GCancellable.
https://bugzilla.gnome.org/show_bug.cgi?id=682112
-rw-r--r-- | libsoup/Makefile.am | 2 | ||||
-rw-r--r-- | libsoup/soup-cache-input-stream.c | 333 | ||||
-rw-r--r-- | libsoup/soup-cache-input-stream.h | 52 | ||||
-rw-r--r-- | po/POTFILES.in | 1 |
4 files changed, 388 insertions, 0 deletions
diff --git a/libsoup/Makefile.am b/libsoup/Makefile.am index 87629ed1..59553f04 100644 --- a/libsoup/Makefile.am +++ b/libsoup/Makefile.am @@ -114,6 +114,8 @@ libsoup_2_4_la_SOURCES = \ soup-body-output-stream.h \ soup-body-output-stream.c \ soup-cache.c \ + soup-cache-input-stream.h \ + soup-cache-input-stream.c \ soup-cache-private.h \ soup-client-input-stream.h \ soup-client-input-stream.c \ diff --git a/libsoup/soup-cache-input-stream.c b/libsoup/soup-cache-input-stream.c new file mode 100644 index 00000000..a44652a8 --- /dev/null +++ b/libsoup/soup-cache-input-stream.c @@ -0,0 +1,333 @@ +/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */ +/* + * Copyright (C) 2012 Igalia, S.L. + */ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include <glib/gi18n-lib.h> +#include "soup-cache-input-stream.h" +#include "soup-message-body.h" + +static void soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data); + +G_DEFINE_TYPE_WITH_CODE (SoupCacheInputStream, soup_cache_input_stream, SOUP_TYPE_FILTER_INPUT_STREAM, + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, + soup_cache_input_stream_pollable_init)) + +/* properties */ +enum { + PROP_0, + + PROP_OUTPUT_STREAM, + + LAST_PROP +}; + +struct _SoupCacheInputStreamPrivate +{ + GOutputStream *output_stream; + gsize bytes_written; + + gboolean read_finished; + SoupBuffer *current_writing_buffer; + GQueue *buffer_queue; + + GTask *task; +}; + +static void soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream); + +static inline void +notify_and_clear (SoupCacheInputStream *istream, GError *error) +{ + SoupCacheInputStreamPrivate *priv = istream->priv; + + if (error) + g_task_return_error (priv->task, error); + else + g_task_return_int (priv->task, priv->bytes_written); + + g_clear_object (&priv->output_stream); + g_clear_object (&priv->task); +} + +gsize +soup_cache_input_stream_cache_finish (SoupCacheInputStream *istream, + GAsyncResult *result, + GError **error) +{ + return g_task_propagate_int (G_TASK (result), error); +} + +static inline void +try_write_next_buffer (SoupCacheInputStream *istream) +{ + SoupCacheInputStreamPrivate *priv = istream->priv; + + if (priv->current_writing_buffer == NULL && priv->buffer_queue->length) + soup_cache_input_stream_write_next_buffer (istream); + else if (priv->read_finished) + notify_and_clear (istream, NULL); + else if (g_input_stream_is_closed (G_INPUT_STREAM (istream))) { + GError *error = NULL; + g_set_error_literal (&error, G_IO_ERROR, G_IO_ERROR_CLOSED, + _("Network stream unexpectedly closed")); + notify_and_clear (istream, error); + } +} + +static void +file_replaced_cb (GObject *source, + GAsyncResult *res, + gpointer user_data) +{ + SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (user_data); + SoupCacheInputStreamPrivate *priv = istream->priv; + GError *error = NULL; + + priv->output_stream = (GOutputStream *) g_file_replace_finish (G_FILE (source), res, &error); + + if (error) + g_task_return_error (priv->task, error); + else + try_write_next_buffer (istream); +} + +void +soup_cache_input_stream_cache (SoupCacheInputStream *istream, + GFile *file, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + SoupCacheInputStreamPrivate *priv = istream->priv; + + priv->task = g_task_new (istream, cancellable, callback, user_data); + + g_file_replace_async (file, NULL, FALSE, + G_FILE_CREATE_PRIVATE | G_FILE_CREATE_REPLACE_DESTINATION, + G_PRIORITY_LOW, cancellable, file_replaced_cb, istream); +} + +static void +soup_cache_input_stream_init (SoupCacheInputStream *self) +{ + SoupCacheInputStreamPrivate *priv = + G_TYPE_INSTANCE_GET_PRIVATE (self, SOUP_TYPE_CACHE_INPUT_STREAM, + SoupCacheInputStreamPrivate); + + priv->buffer_queue = g_queue_new (); + self->priv = priv; +} + +static void +soup_cache_input_stream_get_property (GObject *object, + guint property_id, GValue *value, GParamSpec *pspec) +{ + SoupCacheInputStream *self = SOUP_CACHE_INPUT_STREAM (object); + SoupCacheInputStreamPrivate *priv = self->priv; + + switch (property_id) { + case PROP_OUTPUT_STREAM: + g_value_set_object (value, priv->output_stream); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +static void +soup_cache_input_stream_set_property (GObject *object, + guint property_id, const GValue *value, GParamSpec *pspec) +{ + SoupCacheInputStream *self = SOUP_CACHE_INPUT_STREAM (object); + SoupCacheInputStreamPrivate *priv = self->priv; + + switch (property_id) { + case PROP_OUTPUT_STREAM: + priv->output_stream = g_value_dup_object (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +static void +soup_cache_input_stream_dispose (GObject *object) +{ + SoupCacheInputStreamPrivate *priv = SOUP_CACHE_INPUT_STREAM (object)->priv; + + g_clear_object (&priv->output_stream); + g_clear_object (&priv->task); + + G_OBJECT_CLASS (soup_cache_input_stream_parent_class)->dispose (object); +} + +static void +soup_cache_input_stream_finalize (GObject *object) +{ + SoupCacheInputStream *self = (SoupCacheInputStream *)object; + SoupCacheInputStreamPrivate *priv = self->priv; + + g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free); + g_queue_free_full (priv->buffer_queue, (GDestroyNotify) soup_buffer_free); + + G_OBJECT_CLASS (soup_cache_input_stream_parent_class)->finalize (object); +} + +static void +write_ready_cb (GObject *source, GAsyncResult *result, SoupCacheInputStream *istream) +{ + GOutputStream *ostream = G_OUTPUT_STREAM (source); + SoupCacheInputStreamPrivate *priv = istream->priv; + gssize write_size; + gsize pending; + GError *error = NULL; + + write_size = g_output_stream_write_finish (ostream, result, &error); + if (error) { + notify_and_clear (istream, error); + g_object_unref (istream); + return; + } + + /* Check that we have written everything */ + pending = priv->current_writing_buffer->length - write_size; + if (pending) { + SoupBuffer *subbuffer = soup_buffer_new_subbuffer (priv->current_writing_buffer, + write_size, pending); + g_queue_push_head (priv->buffer_queue, subbuffer); + } + + priv->bytes_written += write_size; + g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free); + + try_write_next_buffer (istream); + g_object_unref (istream); +} + +static void +soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream) +{ + SoupCacheInputStreamPrivate *priv = istream->priv; + SoupBuffer *buffer = g_queue_pop_head (priv->buffer_queue); + int priority; + + g_assert (priv->output_stream && !g_output_stream_is_closed (priv->output_stream)); + g_assert (priv->task); + + g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free); + priv->current_writing_buffer = buffer; + + if (priv->buffer_queue->length > 10) + priority = G_PRIORITY_DEFAULT; + else + priority = G_PRIORITY_LOW; + + g_output_stream_write_async (priv->output_stream, buffer->data, buffer->length, + priority, g_task_get_cancellable (priv->task), + (GAsyncReadyCallback) write_ready_cb, + g_object_ref (istream)); +} + +static gssize +read_internal (GInputStream *stream, + void *buffer, + gsize count, + gboolean blocking, + GCancellable *cancellable, + GError **error) +{ + SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (stream); + SoupCacheInputStreamPrivate *priv = istream->priv; + GInputStream *base_stream; + gssize nread; + + base_stream = g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (stream)); + nread = g_pollable_stream_read (base_stream, buffer, count, blocking, + cancellable, error); + + if (G_UNLIKELY (nread == -1 || priv->read_finished || !priv->task)) + return nread; + + if (nread == 0) { + priv->read_finished = TRUE; + + if (priv->current_writing_buffer == NULL && priv->output_stream) + notify_and_clear (istream, NULL); + } else { + SoupBuffer *soup_buffer = soup_buffer_new (SOUP_MEMORY_COPY, buffer, nread); + g_queue_push_tail (priv->buffer_queue, soup_buffer); + + if (priv->current_writing_buffer == NULL && priv->output_stream) + soup_cache_input_stream_write_next_buffer (istream); + } + + return nread; +} + +static gssize +soup_cache_input_stream_read_fn (GInputStream *stream, + void *buffer, + gsize count, + GCancellable *cancellable, + GError **error) +{ + return read_internal (stream, buffer, count, TRUE, + cancellable, error); +} + +static gssize +soup_cache_input_stream_read_nonblocking (GPollableInputStream *stream, + void *buffer, + gsize count, + GError **error) +{ + return read_internal (G_INPUT_STREAM (stream), buffer, count, FALSE, + NULL, error); +} + +static void +soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, + gpointer interface_data) +{ + pollable_interface->read_nonblocking = soup_cache_input_stream_read_nonblocking; +} + +static void +soup_cache_input_stream_class_init (SoupCacheInputStreamClass *klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GInputStreamClass *istream_class = G_INPUT_STREAM_CLASS (klass); + + g_type_class_add_private (klass, sizeof (SoupCacheInputStreamPrivate)); + + gobject_class->get_property = soup_cache_input_stream_get_property; + gobject_class->set_property = soup_cache_input_stream_set_property; + gobject_class->dispose = soup_cache_input_stream_dispose; + gobject_class->finalize = soup_cache_input_stream_finalize; + + istream_class->read_fn = soup_cache_input_stream_read_fn; + + g_object_class_install_property (gobject_class, PROP_OUTPUT_STREAM, + g_param_spec_object ("output-stream", "Output stream", + "the output stream where to write.", + G_TYPE_OUTPUT_STREAM, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY | + G_PARAM_STATIC_STRINGS)); +} + +GInputStream * +soup_cache_input_stream_new (GInputStream *base_stream) +{ + return g_object_new (SOUP_TYPE_CACHE_INPUT_STREAM, + "base-stream", base_stream, + "close-base-stream", FALSE, + NULL); +} diff --git a/libsoup/soup-cache-input-stream.h b/libsoup/soup-cache-input-stream.h new file mode 100644 index 00000000..c999d102 --- /dev/null +++ b/libsoup/soup-cache-input-stream.h @@ -0,0 +1,52 @@ +/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */ +/* + * soup-cache-input-stream.h - Header for SoupCacheInputStream + */ + +#ifndef __SOUP_CACHE_INPUT_STREAM_H__ +#define __SOUP_CACHE_INPUT_STREAM_H__ + +#include "soup-filter-input-stream.h" + +G_BEGIN_DECLS + +#define SOUP_TYPE_CACHE_INPUT_STREAM (soup_cache_input_stream_get_type()) +#define SOUP_CACHE_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), SOUP_TYPE_CACHE_INPUT_STREAM, SoupCacheInputStream)) +#define SOUP_CACHE_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), SOUP_TYPE_CACHE_INPUT_STREAM, SoupCacheInputStreamClass)) +#define SOUP_IS_CACHE_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SOUP_TYPE_CACHE_INPUT_STREAM)) +#define SOUP_IS_CACHE_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), SOUP_TYPE_CACHE_INPUT_STREAM)) +#define SOUP_CACHE_INPUT_STREAM_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), SOUP_TYPE_CACHE_INPUT_STREAM, SoupCacheInputStreamClass)) + +typedef struct _SoupCacheInputStream SoupCacheInputStream; +typedef struct _SoupCacheInputStreamClass SoupCacheInputStreamClass; +typedef struct _SoupCacheInputStreamPrivate SoupCacheInputStreamPrivate; + +struct _SoupCacheInputStreamClass +{ + SoupFilterInputStreamClass parent_class; +}; + +struct _SoupCacheInputStream +{ + SoupFilterInputStream parent; + + SoupCacheInputStreamPrivate *priv; +}; + +GType soup_cache_input_stream_get_type (void) G_GNUC_CONST; + +GInputStream *soup_cache_input_stream_new (GInputStream *base_stream); + +void soup_cache_input_stream_cache (SoupCacheInputStream *istream, + GFile *file, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); + +gsize soup_cache_input_stream_cache_finish (SoupCacheInputStream *istream, + GAsyncResult *result, + GError **error); + +G_END_DECLS + +#endif /* __SOUP_CACHE_INPUT_STREAM_H__ */ diff --git a/po/POTFILES.in b/po/POTFILES.in index fff1f0ea..21c70d42 100644 --- a/po/POTFILES.in +++ b/po/POTFILES.in @@ -1,4 +1,5 @@ libsoup/soup-body-input-stream.c +libsoup/soup-cache-input-stream.c libsoup/soup-converter-wrapper.c libsoup/soup-message-client-io.c libsoup/soup-message-io.c |