diff options
Diffstat (limited to 'tests/chunk-io-test.c')
-rw-r--r-- | tests/chunk-io-test.c | 610 |
1 files changed, 610 insertions, 0 deletions
diff --git a/tests/chunk-io-test.c b/tests/chunk-io-test.c new file mode 100644 index 00000000..1e53eef1 --- /dev/null +++ b/tests/chunk-io-test.c @@ -0,0 +1,610 @@ +/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */ +/* + * Copyright 2013 Red Hat, Inc. + */ + +#include "test-utils.h" + +static void +force_io_streams_init (void) +{ + SoupServer *server; + SoupSession *session; + guint port; + SoupURI *base_uri; + SoupMessage *msg; + + /* Poke libsoup enough to cause SoupBodyInputStream and + * SoupBodyOutputStream to get defined, so we can find them + * via g_type_from_name() later. + */ + + server = soup_test_server_new (TRUE); + port = soup_server_get_port (server); + + base_uri = soup_uri_new ("http://127.0.0.1"); + soup_uri_set_port (base_uri, port); + + session = soup_test_session_new (SOUP_TYPE_SESSION, NULL); + msg = soup_message_new_from_uri ("POST", base_uri); + soup_session_send_message (session, msg); + g_object_unref (msg); + soup_test_session_abort_unref (session); + + soup_uri_free (base_uri); + soup_test_server_quit_unref (server); +} + +typedef struct { + GFilterInputStream grandparent; + + gpointer *soup_filter_input_stream_private; + + gboolean is_readable; +} SlowInputStream; + +typedef struct { + GFilterInputStreamClass grandparent; +} SlowInputStreamClass; + +GType slow_input_stream_get_type (void); +static void slow_pollable_input_stream_init (GPollableInputStreamInterface *pollable_interface, + gpointer interface_data); + +G_DEFINE_TYPE_WITH_CODE (SlowInputStream, slow_input_stream, + g_type_from_name ("SoupFilterInputStream"), + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, slow_pollable_input_stream_init); + ) + +static void +slow_input_stream_init (SlowInputStream *sis) +{ +} + +static gssize +slow_input_stream_read (GInputStream *stream, + void *buffer, + gsize count, + GCancellable *cancellable, + GError **error) +{ + return g_input_stream_read (G_FILTER_INPUT_STREAM (stream)->base_stream, + buffer, 1, cancellable, error); +} + +static void +slow_input_stream_class_init (SlowInputStreamClass *sisclass) +{ + GInputStreamClass *input_stream_class = G_INPUT_STREAM_CLASS (sisclass); + + input_stream_class->read_fn = slow_input_stream_read; +} + +static gboolean +slow_input_stream_is_readable (GPollableInputStream *stream) +{ + return ((SlowInputStream *)stream)->is_readable; +} + +static gssize +slow_input_stream_read_nonblocking (GPollableInputStream *stream, + void *buffer, + gsize count, + GError **error) +{ + if (((SlowInputStream *)stream)->is_readable) { + ((SlowInputStream *)stream)->is_readable = FALSE; + return slow_input_stream_read (G_INPUT_STREAM (stream), buffer, count, + NULL, error); + } else { + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, + "would block"); + return -1; + } +} + +static GSource * +slow_input_stream_create_source (GPollableInputStream *stream, + GCancellable *cancellable) +{ + GSource *base_source, *pollable_source; + + ((SlowInputStream *)stream)->is_readable = TRUE; + base_source = g_timeout_source_new (0); + g_source_set_dummy_callback (base_source); + + pollable_source = g_pollable_source_new (G_OBJECT (stream)); + g_source_add_child_source (pollable_source, base_source); + g_source_unref (base_source); + + return pollable_source; +} + +static void +slow_pollable_input_stream_init (GPollableInputStreamInterface *pollable_interface, + gpointer interface_data) +{ + pollable_interface->is_readable = slow_input_stream_is_readable; + pollable_interface->read_nonblocking = slow_input_stream_read_nonblocking; + pollable_interface->create_source = slow_input_stream_create_source; +} + +typedef struct { + GFilterOutputStream parent; + + gboolean is_writable; +} SlowOutputStream; + +typedef struct { + GFilterOutputStreamClass parent; +} SlowOutputStreamClass; + +GType slow_output_stream_get_type (void); + +static void slow_pollable_output_stream_init (GPollableOutputStreamInterface *pollable_interface, + gpointer interface_data); + +G_DEFINE_TYPE_WITH_CODE (SlowOutputStream, slow_output_stream, + g_type_from_name ("GFilterOutputStream"), + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM, slow_pollable_output_stream_init); + ) + +static void +slow_output_stream_init (SlowOutputStream *sis) +{ +} + +static gssize +slow_output_stream_write (GOutputStream *stream, + const void *buffer, + gsize count, + GCancellable *cancellable, + GError **error) +{ + return g_output_stream_write (G_FILTER_OUTPUT_STREAM (stream)->base_stream, + buffer, 1, cancellable, error); +} + +static void +slow_output_stream_class_init (SlowOutputStreamClass *sisclass) +{ + GOutputStreamClass *output_stream_class = G_OUTPUT_STREAM_CLASS (sisclass); + + output_stream_class->write_fn = slow_output_stream_write; +} + +static gboolean +slow_output_stream_is_writable (GPollableOutputStream *stream) +{ + return ((SlowOutputStream *)stream)->is_writable; +} + +static gssize +slow_output_stream_write_nonblocking (GPollableOutputStream *stream, + const void *buffer, + gsize count, + GError **error) +{ + if (((SlowOutputStream *)stream)->is_writable) { + ((SlowOutputStream *)stream)->is_writable = FALSE; + return slow_output_stream_write (G_OUTPUT_STREAM (stream), buffer, count, + NULL, error); + } else { + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, + "would block"); + return -1; + } +} + +static GSource * +slow_output_stream_create_source (GPollableOutputStream *stream, + GCancellable *cancellable) +{ + GSource *base_source, *pollable_source; + + ((SlowOutputStream *)stream)->is_writable = TRUE; + base_source = g_timeout_source_new (0); + g_source_set_dummy_callback (base_source); + + pollable_source = g_pollable_source_new (G_OBJECT (stream)); + g_source_add_child_source (pollable_source, base_source); + g_source_unref (base_source); + + return pollable_source; +} + +static void +slow_pollable_output_stream_init (GPollableOutputStreamInterface *pollable_interface, + gpointer interface_data) +{ + pollable_interface->is_writable = slow_output_stream_is_writable; + pollable_interface->write_nonblocking = slow_output_stream_write_nonblocking; + pollable_interface->create_source = slow_output_stream_create_source; +} + +typedef struct { + GFilterOutputStream parent; + + gboolean is_broken; +} BreakingOutputStream; + +typedef struct { + GFilterOutputStreamClass parent; +} BreakingOutputStreamClass; + +GType breaking_output_stream_get_type (void); + +static void breaking_pollable_output_stream_init (GPollableOutputStreamInterface *pollable_interface, + gpointer interface_data); + +G_DEFINE_TYPE_WITH_CODE (BreakingOutputStream, breaking_output_stream, + g_type_from_name ("GFilterOutputStream"), + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM, breaking_pollable_output_stream_init); + ) + +static void +breaking_output_stream_init (BreakingOutputStream *sis) +{ +} + +static gssize +breaking_output_stream_write (GOutputStream *stream, + const void *buffer, + gsize count, + GCancellable *cancellable, + GError **error) +{ + if (((BreakingOutputStream *)stream)->is_broken) { + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_FAILED, "failed"); + return -1; + } + + if (count > 128) { + ((BreakingOutputStream *)stream)->is_broken = TRUE; + count /= 2; + } + return g_output_stream_write (G_FILTER_OUTPUT_STREAM (stream)->base_stream, + buffer, count, cancellable, error); +} + +static void +breaking_output_stream_class_init (BreakingOutputStreamClass *sisclass) +{ + GOutputStreamClass *output_stream_class = G_OUTPUT_STREAM_CLASS (sisclass); + + output_stream_class->write_fn = breaking_output_stream_write; +} + +static gboolean +breaking_output_stream_is_writable (GPollableOutputStream *stream) +{ + return TRUE; +} + +static gssize +breaking_output_stream_write_nonblocking (GPollableOutputStream *stream, + const void *buffer, + gsize count, + GError **error) +{ + if (((BreakingOutputStream *)stream)->is_broken) { + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_FAILED, "failed"); + return -1; + } + + if (count > 128) { + ((BreakingOutputStream *)stream)->is_broken = TRUE; + count /= 2; + } + return g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM (G_FILTER_OUTPUT_STREAM (stream)->base_stream), + buffer, count, NULL, error); +} + +static GSource * +breaking_output_stream_create_source (GPollableOutputStream *stream, + GCancellable *cancellable) +{ + GSource *base_source, *pollable_source; + + base_source = g_timeout_source_new (0); + g_source_set_dummy_callback (base_source); + + pollable_source = g_pollable_source_new (G_OBJECT (stream)); + g_source_add_child_source (pollable_source, base_source); + g_source_unref (base_source); + + return pollable_source; +} + +static void +breaking_pollable_output_stream_init (GPollableOutputStreamInterface *pollable_interface, + gpointer interface_data) +{ + pollable_interface->is_writable = breaking_output_stream_is_writable; + pollable_interface->write_nonblocking = breaking_output_stream_write_nonblocking; + pollable_interface->create_source = breaking_output_stream_create_source; +} + +#define CHUNK_SIZE 1024 + +static GString * +chunkify (const char *str, gsize length) +{ + GString *gstr; + int i, size; + + gstr = g_string_new (NULL); + for (i = 0; i < length; i += CHUNK_SIZE) { + size = MIN (CHUNK_SIZE, length - i); + g_string_append_printf (gstr, "%x\r\n", size); + g_string_append_len (gstr, str + i, size); + g_string_append (gstr, "\r\n"); + } + g_string_append (gstr, "0\r\n\r\n"); + + return gstr; +} + +static void +do_io_tests (void) +{ + GInputStream *imem, *islow, *in; + GOutputStream *omem, *oslow, *out; + GMemoryOutputStream *mem; + SoupBuffer *raw_contents; + char *buf; + GString *chunkified; + GError *error = NULL; + gssize nread, nwrote, total; + gssize chunk_length, chunk_total; + + raw_contents = soup_test_get_index (); + chunkified = chunkify (raw_contents->data, raw_contents->length); + + debug_printf (1, " sync read\n"); + + imem = g_memory_input_stream_new_from_data (chunkified->str, chunkified->len, NULL); + islow = g_object_new (slow_input_stream_get_type (), + "base-stream", imem, + "close-base-stream", TRUE, + NULL); + in = g_object_new (g_type_from_name ("SoupBodyInputStream"), + "base-stream", islow, + "close-base-stream", TRUE, + "encoding", SOUP_ENCODING_CHUNKED, + NULL); + g_object_unref (imem); + g_object_unref (islow); + + buf = g_malloc (raw_contents->length); + total = 0; + while (TRUE) { + nread = g_input_stream_read (in, buf + total, + raw_contents->length - total, + NULL, &error); + g_assert_no_error (error); + g_clear_error (&error); + if (nread > 0) + total += nread; + else + break; + } + + g_input_stream_close (in, NULL, &error); + g_assert_no_error (error); + g_clear_error (&error); + g_object_unref (in); + + soup_assert_cmpmem (buf, total, raw_contents->data, raw_contents->length); + g_free (buf); + + debug_printf (1, " async read\n"); + + imem = g_memory_input_stream_new_from_data (chunkified->str, chunkified->len, NULL); + islow = g_object_new (slow_input_stream_get_type (), + "base-stream", imem, + "close-base-stream", TRUE, + NULL); + in = g_object_new (g_type_from_name ("SoupBodyInputStream"), + "base-stream", islow, + "close-base-stream", TRUE, + "encoding", SOUP_ENCODING_CHUNKED, + NULL); + g_object_unref (imem); + g_object_unref (islow); + + buf = g_malloc (raw_contents->length); + total = 0; + while (TRUE) { + nread = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (in), + buf + total, + raw_contents->length - total, + NULL, &error); + if (nread == -1 && g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + GSource *source; + + g_clear_error (&error); + source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (in), NULL); + g_source_set_dummy_callback (source); + g_source_attach (source, NULL); + while (!g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (in))) + g_main_context_iteration (NULL, TRUE); + g_source_destroy (source); + g_source_unref (source); + continue; + } else if (nread == -1) { + g_assert_no_error (error); + g_clear_error (&error); + break; + } else if (nread == 0) + break; + else + total += nread; + } + + g_input_stream_close (in, NULL, &error); + g_assert_no_error (error); + g_clear_error (&error); + g_object_unref (in); + + soup_assert_cmpmem (buf, total, raw_contents->data, raw_contents->length); + g_free (buf); + + debug_printf (1, " sync write\n"); + + buf = g_malloc (chunkified->len); + omem = g_memory_output_stream_new (buf, chunkified->len, NULL, NULL); + oslow = g_object_new (slow_output_stream_get_type (), + "base-stream", omem, + "close-base-stream", TRUE, + NULL); + out = g_object_new (g_type_from_name ("SoupBodyOutputStream"), + "base-stream", oslow, + "close-base-stream", TRUE, + "encoding", SOUP_ENCODING_CHUNKED, + NULL); + g_object_unref (omem); + g_object_unref (oslow); + + total = chunk_length = chunk_total = 0; + while (total < raw_contents->length) { + if (chunk_total == chunk_length) { + chunk_length = MIN (CHUNK_SIZE, raw_contents->length - total); + chunk_total = 0; + } + nwrote = g_output_stream_write (out, raw_contents->data + total, + chunk_length - chunk_total, NULL, &error); + g_assert_no_error (error); + g_clear_error (&error); + if (nwrote > 0) { + total += nwrote; + chunk_total += nwrote; + } else + break; + } + + g_output_stream_close (out, NULL, &error); + g_assert_no_error (error); + g_clear_error (&error); + + mem = G_MEMORY_OUTPUT_STREAM (omem); + soup_assert_cmpmem (g_memory_output_stream_get_data (mem), + g_memory_output_stream_get_data_size (mem), + chunkified->str, chunkified->len); + + g_object_unref (out); + g_free (buf); + + debug_printf (1, " async write\n"); + + buf = g_malloc (chunkified->len); + omem = g_memory_output_stream_new (buf, chunkified->len, NULL, NULL); + oslow = g_object_new (slow_output_stream_get_type (), + "base-stream", omem, + "close-base-stream", TRUE, + NULL); + out = g_object_new (g_type_from_name ("SoupBodyOutputStream"), + "base-stream", oslow, + "close-base-stream", TRUE, + "encoding", SOUP_ENCODING_CHUNKED, + NULL); + g_object_unref (omem); + g_object_unref (oslow); + + total = chunk_length = chunk_total = 0; + while (total < raw_contents->length) { + if (chunk_total == chunk_length) { + chunk_length = MIN (CHUNK_SIZE, raw_contents->length - total); + chunk_total = 0; + } + nwrote = g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM (out), + raw_contents->data + total, + chunk_length - chunk_total, + NULL, &error); + if (nwrote == -1 && g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + GSource *source; + + g_clear_error (&error); + source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (out), NULL); + g_source_set_dummy_callback (source); + g_source_attach (source, NULL); + while (!g_pollable_output_stream_is_writable (G_POLLABLE_OUTPUT_STREAM (out))) + g_main_context_iteration (NULL, TRUE); + g_source_destroy (source); + g_source_unref (source); + continue; + } else if (nwrote == -1) { + g_assert_no_error (error); + g_clear_error (&error); + break; + } else { + total += nwrote; + chunk_total += nwrote; + } + } + + g_output_stream_close (out, NULL, &error); + g_assert_no_error (error); + g_clear_error (&error); + + mem = G_MEMORY_OUTPUT_STREAM (omem); + soup_assert_cmpmem (g_memory_output_stream_get_data (mem), + g_memory_output_stream_get_data_size (mem), + chunkified->str, chunkified->len); + + g_object_unref (out); + g_free (buf); + + debug_printf (1, " failed write\n"); + /* this succeeds if it doesn't critical */ + + buf = g_malloc (chunkified->len); + omem = g_memory_output_stream_new (buf, chunkified->len, NULL, NULL); + oslow = g_object_new (breaking_output_stream_get_type (), + "base-stream", omem, + "close-base-stream", TRUE, + NULL); + out = g_object_new (g_type_from_name ("SoupBodyOutputStream"), + "base-stream", oslow, + "close-base-stream", TRUE, + "encoding", SOUP_ENCODING_CHUNKED, + NULL); + g_object_unref (omem); + g_object_unref (oslow); + + total = 0; + while (total < raw_contents->length) { + nwrote = g_output_stream_write (out, raw_contents->data + total, + raw_contents->length - total, NULL, NULL); + if (nwrote == -1) + break; + else + total += nwrote; + } + + g_assert_cmpint (total, !=, raw_contents->length); + + g_output_stream_close (out, NULL, NULL); + g_object_unref (out); + + g_free (buf); + + g_string_free (chunkified, TRUE); +} + +int +main (int argc, char **argv) +{ + int ret; + + test_init (argc, argv, NULL); + + force_io_streams_init (); + + g_test_add_func ("/chunk-io", do_io_tests); + + ret = g_test_run (); + + test_cleanup (); + return ret; +} |