/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */ /* * Copyright 2013 Red Hat, Inc. */ #include "test-utils.h" static void force_io_streams_init (void) { SoupServer *server; SoupSession *session; SoupURI *base_uri; SoupMessage *msg; /* Poke libsoup enough to cause SoupBodyInputStream and * SoupBodyOutputStream to get defined, so we can find them * via g_type_from_name() later. */ server = soup_test_server_new (TRUE); base_uri = soup_test_server_get_uri (server, "http", NULL); session = soup_test_session_new (SOUP_TYPE_SESSION, NULL); msg = soup_message_new_from_uri ("POST", base_uri); soup_session_send_message (session, msg); g_object_unref (msg); soup_test_session_abort_unref (session); soup_uri_free (base_uri); soup_test_server_quit_unref (server); } typedef struct { GFilterInputStream grandparent; gpointer *soup_filter_input_stream_private; gboolean is_readable; } SlowInputStream; typedef struct { GFilterInputStreamClass grandparent; } SlowInputStreamClass; GType slow_input_stream_get_type (void); static void slow_pollable_input_stream_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data); G_DEFINE_TYPE_WITH_CODE (SlowInputStream, slow_input_stream, g_type_from_name ("SoupFilterInputStream"), G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, slow_pollable_input_stream_init); ) static void slow_input_stream_init (SlowInputStream *sis) { } static gssize slow_input_stream_read (GInputStream *stream, void *buffer, gsize count, GCancellable *cancellable, GError **error) { return g_input_stream_read (G_FILTER_INPUT_STREAM (stream)->base_stream, buffer, 1, cancellable, error); } static void slow_input_stream_class_init (SlowInputStreamClass *sisclass) { GInputStreamClass *input_stream_class = G_INPUT_STREAM_CLASS (sisclass); input_stream_class->read_fn = slow_input_stream_read; } static gboolean slow_input_stream_is_readable (GPollableInputStream *stream) { return ((SlowInputStream *)stream)->is_readable; } static gssize slow_input_stream_read_nonblocking (GPollableInputStream *stream, void *buffer, gsize count, GError **error) { if (((SlowInputStream *)stream)->is_readable) { ((SlowInputStream *)stream)->is_readable = FALSE; return slow_input_stream_read (G_INPUT_STREAM (stream), buffer, count, NULL, error); } else { g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, "would block"); return -1; } } static GSource * slow_input_stream_create_source (GPollableInputStream *stream, GCancellable *cancellable) { GSource *base_source, *pollable_source; ((SlowInputStream *)stream)->is_readable = TRUE; base_source = g_timeout_source_new (0); g_source_set_dummy_callback (base_source); pollable_source = g_pollable_source_new (G_OBJECT (stream)); g_source_add_child_source (pollable_source, base_source); g_source_unref (base_source); return pollable_source; } static void slow_pollable_input_stream_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data) { pollable_interface->is_readable = slow_input_stream_is_readable; pollable_interface->read_nonblocking = slow_input_stream_read_nonblocking; pollable_interface->create_source = slow_input_stream_create_source; } typedef struct { GFilterOutputStream parent; gboolean is_writable; } SlowOutputStream; typedef struct { GFilterOutputStreamClass parent; } SlowOutputStreamClass; GType slow_output_stream_get_type (void); static void slow_pollable_output_stream_init (GPollableOutputStreamInterface *pollable_interface, gpointer interface_data); G_DEFINE_TYPE_WITH_CODE (SlowOutputStream, slow_output_stream, g_type_from_name ("GFilterOutputStream"), G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM, slow_pollable_output_stream_init); ) static void slow_output_stream_init (SlowOutputStream *sis) { } static gssize slow_output_stream_write (GOutputStream *stream, const void *buffer, gsize count, GCancellable *cancellable, GError **error) { return g_output_stream_write (G_FILTER_OUTPUT_STREAM (stream)->base_stream, buffer, 1, cancellable, error); } static void slow_output_stream_class_init (SlowOutputStreamClass *sisclass) { GOutputStreamClass *output_stream_class = G_OUTPUT_STREAM_CLASS (sisclass); output_stream_class->write_fn = slow_output_stream_write; } static gboolean slow_output_stream_is_writable (GPollableOutputStream *stream) { return ((SlowOutputStream *)stream)->is_writable; } static gssize slow_output_stream_write_nonblocking (GPollableOutputStream *stream, const void *buffer, gsize count, GError **error) { if (((SlowOutputStream *)stream)->is_writable) { ((SlowOutputStream *)stream)->is_writable = FALSE; return slow_output_stream_write (G_OUTPUT_STREAM (stream), buffer, count, NULL, error); } else { g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, "would block"); return -1; } } static GSource * slow_output_stream_create_source (GPollableOutputStream *stream, GCancellable *cancellable) { GSource *base_source, *pollable_source; ((SlowOutputStream *)stream)->is_writable = TRUE; base_source = g_timeout_source_new (0); g_source_set_dummy_callback (base_source); pollable_source = g_pollable_source_new (G_OBJECT (stream)); g_source_add_child_source (pollable_source, base_source); g_source_unref (base_source); return pollable_source; } static void slow_pollable_output_stream_init (GPollableOutputStreamInterface *pollable_interface, gpointer interface_data) { pollable_interface->is_writable = slow_output_stream_is_writable; pollable_interface->write_nonblocking = slow_output_stream_write_nonblocking; pollable_interface->create_source = slow_output_stream_create_source; } typedef struct { GFilterOutputStream parent; gboolean is_broken; } BreakingOutputStream; typedef struct { GFilterOutputStreamClass parent; } BreakingOutputStreamClass; GType breaking_output_stream_get_type (void); static void breaking_pollable_output_stream_init (GPollableOutputStreamInterface *pollable_interface, gpointer interface_data); G_DEFINE_TYPE_WITH_CODE (BreakingOutputStream, breaking_output_stream, g_type_from_name ("GFilterOutputStream"), G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM, breaking_pollable_output_stream_init); ) static void breaking_output_stream_init (BreakingOutputStream *sis) { } static gssize breaking_output_stream_write (GOutputStream *stream, const void *buffer, gsize count, GCancellable *cancellable, GError **error) { if (((BreakingOutputStream *)stream)->is_broken) { g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_FAILED, "failed"); return -1; } if (count > 128) { ((BreakingOutputStream *)stream)->is_broken = TRUE; count /= 2; } return g_output_stream_write (G_FILTER_OUTPUT_STREAM (stream)->base_stream, buffer, count, cancellable, error); } static void breaking_output_stream_class_init (BreakingOutputStreamClass *sisclass) { GOutputStreamClass *output_stream_class = G_OUTPUT_STREAM_CLASS (sisclass); output_stream_class->write_fn = breaking_output_stream_write; } static gboolean breaking_output_stream_is_writable (GPollableOutputStream *stream) { return TRUE; } static gssize breaking_output_stream_write_nonblocking (GPollableOutputStream *stream, const void *buffer, gsize count, GError **error) { if (((BreakingOutputStream *)stream)->is_broken) { g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_FAILED, "failed"); return -1; } if (count > 128) { ((BreakingOutputStream *)stream)->is_broken = TRUE; count /= 2; } return g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM (G_FILTER_OUTPUT_STREAM (stream)->base_stream), buffer, count, NULL, error); } static GSource * breaking_output_stream_create_source (GPollableOutputStream *stream, GCancellable *cancellable) { GSource *base_source, *pollable_source; base_source = g_timeout_source_new (0); g_source_set_dummy_callback (base_source); pollable_source = g_pollable_source_new (G_OBJECT (stream)); g_source_add_child_source (pollable_source, base_source); g_source_unref (base_source); return pollable_source; } static void breaking_pollable_output_stream_init (GPollableOutputStreamInterface *pollable_interface, gpointer interface_data) { pollable_interface->is_writable = breaking_output_stream_is_writable; pollable_interface->write_nonblocking = breaking_output_stream_write_nonblocking; pollable_interface->create_source = breaking_output_stream_create_source; } #define CHUNK_SIZE 1024 static GString * chunkify (const char *str, gsize length) { GString *gstr; int i, size; gstr = g_string_new (NULL); for (i = 0; i < length; i += CHUNK_SIZE) { size = MIN (CHUNK_SIZE, length - i); g_string_append_printf (gstr, "%x\r\n", size); g_string_append_len (gstr, str + i, size); g_string_append (gstr, "\r\n"); } g_string_append (gstr, "0\r\n\r\n"); return gstr; } static void do_io_tests (void) { GInputStream *imem, *islow, *in; GOutputStream *omem, *oslow, *out; GMemoryOutputStream *mem; SoupBuffer *raw_contents; char *buf; GString *chunkified; GError *error = NULL; gssize nread, nwrote, total; gssize chunk_length, chunk_total; raw_contents = soup_test_get_index (); chunkified = chunkify (raw_contents->data, raw_contents->length); debug_printf (1, " sync read\n"); imem = g_memory_input_stream_new_from_data (chunkified->str, chunkified->len, NULL); islow = g_object_new (slow_input_stream_get_type (), "base-stream", imem, "close-base-stream", TRUE, NULL); in = g_object_new (g_type_from_name ("SoupBodyInputStream"), "base-stream", islow, "close-base-stream", TRUE, "encoding", SOUP_ENCODING_CHUNKED, NULL); g_object_unref (imem); g_object_unref (islow); buf = g_malloc (raw_contents->length); total = 0; while (TRUE) { nread = g_input_stream_read (in, buf + total, raw_contents->length - total, NULL, &error); g_assert_no_error (error); g_clear_error (&error); if (nread > 0) total += nread; else break; } g_input_stream_close (in, NULL, &error); g_assert_no_error (error); g_clear_error (&error); g_object_unref (in); soup_assert_cmpmem (buf, total, raw_contents->data, raw_contents->length); g_free (buf); debug_printf (1, " async read\n"); imem = g_memory_input_stream_new_from_data (chunkified->str, chunkified->len, NULL); islow = g_object_new (slow_input_stream_get_type (), "base-stream", imem, "close-base-stream", TRUE, NULL); in = g_object_new (g_type_from_name ("SoupBodyInputStream"), "base-stream", islow, "close-base-stream", TRUE, "encoding", SOUP_ENCODING_CHUNKED, NULL); g_object_unref (imem); g_object_unref (islow); buf = g_malloc (raw_contents->length); total = 0; while (TRUE) { nread = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (in), buf + total, raw_contents->length - total, NULL, &error); if (nread == -1 && g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { GSource *source; g_clear_error (&error); source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (in), NULL); g_source_set_dummy_callback (source); g_source_attach (source, NULL); while (!g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (in))) g_main_context_iteration (NULL, TRUE); g_source_destroy (source); g_source_unref (source); continue; } else if (nread == -1) { g_assert_no_error (error); g_clear_error (&error); break; } else if (nread == 0) break; else total += nread; } g_input_stream_close (in, NULL, &error); g_assert_no_error (error); g_clear_error (&error); g_object_unref (in); soup_assert_cmpmem (buf, total, raw_contents->data, raw_contents->length); g_free (buf); debug_printf (1, " sync write\n"); buf = g_malloc (chunkified->len); omem = g_memory_output_stream_new (buf, chunkified->len, NULL, NULL); oslow = g_object_new (slow_output_stream_get_type (), "base-stream", omem, "close-base-stream", TRUE, NULL); out = g_object_new (g_type_from_name ("SoupBodyOutputStream"), "base-stream", oslow, "close-base-stream", TRUE, "encoding", SOUP_ENCODING_CHUNKED, NULL); g_object_unref (omem); g_object_unref (oslow); total = chunk_length = chunk_total = 0; while (total < raw_contents->length) { if (chunk_total == chunk_length) { chunk_length = MIN (CHUNK_SIZE, raw_contents->length - total); chunk_total = 0; } nwrote = g_output_stream_write (out, raw_contents->data + total, chunk_length - chunk_total, NULL, &error); g_assert_no_error (error); g_clear_error (&error); if (nwrote > 0) { total += nwrote; chunk_total += nwrote; } else break; } g_output_stream_close (out, NULL, &error); g_assert_no_error (error); g_clear_error (&error); mem = G_MEMORY_OUTPUT_STREAM (omem); soup_assert_cmpmem (g_memory_output_stream_get_data (mem), g_memory_output_stream_get_data_size (mem), chunkified->str, chunkified->len); g_object_unref (out); g_free (buf); debug_printf (1, " async write\n"); buf = g_malloc (chunkified->len); omem = g_memory_output_stream_new (buf, chunkified->len, NULL, NULL); oslow = g_object_new (slow_output_stream_get_type (), "base-stream", omem, "close-base-stream", TRUE, NULL); out = g_object_new (g_type_from_name ("SoupBodyOutputStream"), "base-stream", oslow, "close-base-stream", TRUE, "encoding", SOUP_ENCODING_CHUNKED, NULL); g_object_unref (omem); g_object_unref (oslow); total = chunk_length = chunk_total = 0; while (total < raw_contents->length) { if (chunk_total == chunk_length) { chunk_length = MIN (CHUNK_SIZE, raw_contents->length - total); chunk_total = 0; } nwrote = g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM (out), raw_contents->data + total, chunk_length - chunk_total, NULL, &error); if (nwrote == -1 && g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { GSource *source; g_clear_error (&error); source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (out), NULL); g_source_set_dummy_callback (source); g_source_attach (source, NULL); while (!g_pollable_output_stream_is_writable (G_POLLABLE_OUTPUT_STREAM (out))) g_main_context_iteration (NULL, TRUE); g_source_destroy (source); g_source_unref (source); continue; } else if (nwrote == -1) { g_assert_no_error (error); g_clear_error (&error); break; } else { total += nwrote; chunk_total += nwrote; } } g_output_stream_close (out, NULL, &error); g_assert_no_error (error); g_clear_error (&error); mem = G_MEMORY_OUTPUT_STREAM (omem); soup_assert_cmpmem (g_memory_output_stream_get_data (mem), g_memory_output_stream_get_data_size (mem), chunkified->str, chunkified->len); g_object_unref (out); g_free (buf); debug_printf (1, " failed write\n"); /* this succeeds if it doesn't critical */ buf = g_malloc (chunkified->len); omem = g_memory_output_stream_new (buf, chunkified->len, NULL, NULL); oslow = g_object_new (breaking_output_stream_get_type (), "base-stream", omem, "close-base-stream", TRUE, NULL); out = g_object_new (g_type_from_name ("SoupBodyOutputStream"), "base-stream", oslow, "close-base-stream", TRUE, "encoding", SOUP_ENCODING_CHUNKED, NULL); g_object_unref (omem); g_object_unref (oslow); total = 0; while (total < raw_contents->length) { nwrote = g_output_stream_write (out, raw_contents->data + total, raw_contents->length - total, NULL, NULL); if (nwrote == -1) break; else total += nwrote; } g_assert_cmpint (total, !=, raw_contents->length); g_output_stream_close (out, NULL, NULL); g_object_unref (out); g_free (buf); g_string_free (chunkified, TRUE); } int main (int argc, char **argv) { int ret; test_init (argc, argv, NULL); force_io_streams_init (); g_test_add_func ("/chunk-io", do_io_tests); ret = g_test_run (); test_cleanup (); return ret; }