diff options
author | Dan Winship <danw@gnome.org> | 2012-05-27 11:42:45 -0400 |
---|---|---|
committer | Dan Winship <danw@gnome.org> | 2012-12-10 17:14:56 +0100 |
commit | 98d7c7372d276eacdcf86f82b7a8a5114a83fe60 (patch) | |
tree | c7adcf8c421d477d98baeb4353ca9b88e28586dd | |
parent | 596e64f858ebccb32b987bc76e3376f3bb5bcbe7 (diff) | |
download | libsoup-98d7c7372d276eacdcf86f82b7a8a5114a83fe60.tar.gz libsoup-98d7c7372d276eacdcf86f82b7a8a5114a83fe60.tar.bz2 libsoup-98d7c7372d276eacdcf86f82b7a8a5114a83fe60.zip |
SoupSession: allow creating a "plain" SoupSession for use with new APIs
In gio-based APIs, async vs sync is a function-level distinction, not
a class-level distinction. Merge most of the existing SoupSessionAsync
and SoupSessionSync code up into SoupSession, and make SoupSession
non-abstract, so that you can create a SoupSession and then use either
sync or async SoupRequest-based APIs on it. (The traditional APIs
still require one of the traditional subclasses, although the code
reorg does affect them in some ways, such as making SoupSessionAsync
more thread-safe.)
-rw-r--r-- | libsoup/soup-message-queue.h | 3 | ||||
-rw-r--r-- | libsoup/soup-session-async.c | 565 | ||||
-rw-r--r-- | libsoup/soup-session-private.h | 27 | ||||
-rw-r--r-- | libsoup/soup-session-sync.c | 388 | ||||
-rw-r--r-- | libsoup/soup-session.c | 1109 | ||||
-rw-r--r-- | tests/requester-test.c | 63 |
6 files changed, 1024 insertions, 1131 deletions
diff --git a/libsoup/soup-message-queue.h b/libsoup/soup-message-queue.h index d3341bd7..dd619244 100644 --- a/libsoup/soup-message-queue.h +++ b/libsoup/soup-message-queue.h @@ -46,7 +46,8 @@ struct _SoupMessageQueueItem { guint paused : 1; guint new_api : 1; guint io_started : 1; - guint redirection_count : 29; + guint async : 1; + guint redirection_count : 28; SoupMessageQueueItemState state; diff --git a/libsoup/soup-session-async.c b/libsoup/soup-session-async.c index 99edf32f..a24f4bab 100644 --- a/libsoup/soup-session-async.c +++ b/libsoup/soup-session-async.c @@ -27,48 +27,13 @@ * single-threaded programs. **/ -static void run_queue (SoupSessionAsync *sa); -static void do_idle_run_queue (SoupSession *session); - -static void send_request_running (SoupSession *session, SoupMessageQueueItem *item); -static void send_request_restarted (SoupSession *session, SoupMessageQueueItem *item); -static void send_request_finished (SoupSession *session, SoupMessageQueueItem *item); - G_DEFINE_TYPE (SoupSessionAsync, soup_session_async, SOUP_TYPE_SESSION) -typedef struct { - SoupSessionAsync *sa; - GSList *sources; - gboolean disposed; - -} SoupSessionAsyncPrivate; -#define SOUP_SESSION_ASYNC_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_SESSION_ASYNC, SoupSessionAsyncPrivate)) - static void soup_session_async_init (SoupSessionAsync *sa) { - SoupSessionAsyncPrivate *priv = SOUP_SESSION_ASYNC_GET_PRIVATE (sa); - - priv->sa = sa; } -static void -soup_session_async_dispose (GObject *object) -{ - SoupSessionAsyncPrivate *priv = SOUP_SESSION_ASYNC_GET_PRIVATE (object); - GSList *iter; - - priv->disposed = TRUE; - for (iter = priv->sources; iter; iter = iter->next) { - g_source_destroy (iter->data); - g_source_unref (iter->data); - } - g_clear_pointer (&priv->sources, g_slist_free); - - G_OBJECT_CLASS (soup_session_async_parent_class)->dispose (object); -} - - /** * soup_session_async_new: * @@ -106,274 +71,15 @@ soup_session_async_new_with_options (const char *optname1, ...) } static void -message_completed (SoupMessage *msg, gpointer user_data) -{ - SoupMessageQueueItem *item = user_data; - - do_idle_run_queue (item->session); - - if (item->state != SOUP_MESSAGE_RESTARTING) - item->state = SOUP_MESSAGE_FINISHING; -} - -static void -ssl_tunnel_completed (SoupConnection *conn, guint status, gpointer user_data) -{ - SoupMessageQueueItem *tunnel_item = user_data; - SoupMessageQueueItem *item = tunnel_item->related; - SoupSession *session = item->session; - - soup_message_finished (tunnel_item->msg); - soup_message_queue_item_unref (tunnel_item); - - if (!SOUP_STATUS_IS_SUCCESSFUL (status)) { - soup_session_set_item_connection (session, item, NULL); - soup_message_set_status (item->msg, status); - } - - item->state = SOUP_MESSAGE_READY; - do_idle_run_queue (session); - soup_message_queue_item_unref (item); -} - -static void -tunnel_message_completed (SoupMessage *tunnel_msg, gpointer user_data) -{ - SoupMessageQueueItem *tunnel_item = user_data; - SoupSession *session = tunnel_item->session; - SoupMessageQueueItem *item = tunnel_item->related; - - if (tunnel_item->state == SOUP_MESSAGE_RESTARTING) { - soup_message_restarted (tunnel_msg); - if (tunnel_item->conn) { - tunnel_item->state = SOUP_MESSAGE_RUNNING; - soup_session_send_queue_item (session, tunnel_item, - tunnel_message_completed); - return; - } - - soup_message_set_status (tunnel_msg, SOUP_STATUS_TRY_AGAIN); - } - - tunnel_item->state = SOUP_MESSAGE_FINISHED; - soup_session_unqueue_item (session, tunnel_item); - - if (SOUP_STATUS_IS_SUCCESSFUL (tunnel_msg->status_code)) { - soup_connection_start_ssl_async (item->conn, item->cancellable, - ssl_tunnel_completed, tunnel_item); - } else { - ssl_tunnel_completed (item->conn, tunnel_msg->status_code, - tunnel_item); - } -} - -static void -got_connection (SoupConnection *conn, guint status, gpointer user_data) -{ - SoupMessageQueueItem *item = user_data; - SoupSession *session = item->session; - - if (status != SOUP_STATUS_OK) { - if (item->state == SOUP_MESSAGE_CONNECTING) { - soup_session_set_item_status (session, item, status); - soup_session_set_item_connection (session, item, NULL); - item->state = SOUP_MESSAGE_READY; - } - } else - item->state = SOUP_MESSAGE_CONNECTED; - - run_queue ((SoupSessionAsync *)session); - soup_message_queue_item_unref (item); -} - -static void -process_queue_item (SoupMessageQueueItem *item, - gboolean *should_prune, - gboolean loop) -{ - SoupSession *session = item->session; - - if (item->async_context != soup_session_get_async_context (session)) - return; - - do { - if (item->paused) - return; - - switch (item->state) { - case SOUP_MESSAGE_STARTING: - if (!soup_session_get_connection (session, item, should_prune)) - return; - - if (soup_connection_get_state (item->conn) != SOUP_CONNECTION_NEW) { - item->state = SOUP_MESSAGE_READY; - break; - } - - item->state = SOUP_MESSAGE_CONNECTING; - soup_message_queue_item_ref (item); - soup_connection_connect_async (item->conn, item->cancellable, - got_connection, item); - return; - - case SOUP_MESSAGE_CONNECTED: - if (soup_connection_is_tunnelled (item->conn)) { - SoupMessageQueueItem *tunnel_item; - - soup_message_queue_item_ref (item); - - item->state = SOUP_MESSAGE_TUNNELING; - - tunnel_item = soup_session_make_connect_message (session, item->conn); - tunnel_item->related = item; - soup_session_send_queue_item (session, tunnel_item, tunnel_message_completed); - return; - } - - item->state = SOUP_MESSAGE_READY; - break; - - case SOUP_MESSAGE_READY: - soup_message_set_https_status (item->msg, item->conn); - if (item->msg->status_code) { - if (item->msg->status_code == SOUP_STATUS_TRY_AGAIN) { - soup_message_cleanup_response (item->msg); - item->state = SOUP_MESSAGE_STARTING; - } else - item->state = SOUP_MESSAGE_FINISHING; - break; - } - - item->state = SOUP_MESSAGE_RUNNING; - soup_session_send_queue_item (session, item, message_completed); - if (item->new_api) - send_request_running (session, item); - break; - - case SOUP_MESSAGE_RESTARTING: - item->state = SOUP_MESSAGE_STARTING; - soup_message_restarted (item->msg); - if (item->new_api) - send_request_restarted (session, item); - break; - - case SOUP_MESSAGE_FINISHING: - item->state = SOUP_MESSAGE_FINISHED; - soup_message_finished (item->msg); - if (item->state != SOUP_MESSAGE_FINISHED) { - g_return_if_fail (!item->new_api); - break; - } - - soup_message_queue_item_ref (item); - soup_session_unqueue_item (session, item); - if (item->callback) - item->callback (session, item->msg, item->callback_data); - else if (item->new_api) - send_request_finished (session, item); - - soup_message_queue_item_unref (item); - return; - - default: - /* Nothing to do with this message in any - * other state. - */ - return; - } - } while (loop && item->state != SOUP_MESSAGE_FINISHED); -} - -static void -run_queue (SoupSessionAsync *sa) -{ - SoupSession *session = SOUP_SESSION (sa); - SoupMessageQueue *queue = soup_session_get_queue (session); - SoupMessageQueueItem *item; - SoupMessage *msg; - gboolean try_pruning = TRUE, should_prune = FALSE; - - g_object_ref (session); - soup_session_cleanup_connections (session, FALSE); - - try_again: - for (item = soup_message_queue_first (queue); - item; - item = soup_message_queue_next (queue, item)) { - msg = item->msg; - - /* CONNECT messages are handled specially */ - if (msg->method != SOUP_METHOD_CONNECT) - process_queue_item (item, &should_prune, TRUE); - } - - if (try_pruning && should_prune) { - /* There is at least one message in the queue that - * could be sent if we pruned an idle connection from - * some other server. - */ - if (soup_session_cleanup_connections (session, TRUE)) { - try_pruning = should_prune = FALSE; - goto try_again; - } - } - - g_object_unref (session); -} - -static gboolean -idle_run_queue (gpointer user_data) -{ - SoupSessionAsyncPrivate *priv = user_data; - GSource *source; - - if (priv->disposed) - return FALSE; - - source = g_main_current_source (); - priv->sources = g_slist_remove (priv->sources, source); - - /* Ensure that the source is destroyed before running the queue */ - g_source_destroy (source); - g_source_unref (source); - - run_queue (priv->sa); - return FALSE; -} - -static void -do_idle_run_queue (SoupSession *session) -{ - SoupSessionAsyncPrivate *priv = SOUP_SESSION_ASYNC_GET_PRIVATE (session); - GMainContext *async_context = soup_session_get_async_context (session); - GSource *source; - - if (priv->disposed) - return; - - /* We use priv rather than session as the source data, because - * other parts of libsoup (or the calling app) may have sources - * using the session as the source data. - */ - - source = g_main_context_find_source_by_user_data (async_context, priv); - if (source) - return; - - source = soup_add_completion_reffed (async_context, idle_run_queue, priv); - priv->sources = g_slist_prepend (priv->sources, source); -} - -static void soup_session_async_queue_message (SoupSession *session, SoupMessage *req, SoupSessionCallback callback, gpointer user_data) { SoupMessageQueueItem *item; - item = soup_session_append_queue_item (session, req, callback, user_data); + item = soup_session_append_queue_item (session, req, TRUE, FALSE, + callback, user_data); + soup_session_kick_queue (session); soup_message_queue_item_unref (item); - - do_idle_run_queue (session); } static guint @@ -383,10 +89,9 @@ soup_session_async_send_message (SoupSession *session, SoupMessage *req) GMainContext *async_context = soup_session_get_async_context (session); - soup_session_async_queue_message (session, req, NULL, NULL); - - item = soup_message_queue_lookup (soup_session_get_queue (session), req); - g_return_val_if_fail (item != NULL, SOUP_STATUS_MALFORMED); + item = soup_session_append_queue_item (session, req, TRUE, FALSE, + NULL, NULL); + soup_session_kick_queue (session); while (item->state != SOUP_MESSAGE_FINISHED) g_main_context_iteration (async_context, TRUE); @@ -402,7 +107,6 @@ soup_session_async_cancel_message (SoupSession *session, SoupMessage *msg, { SoupMessageQueue *queue; SoupMessageQueueItem *item; - gboolean dummy; SOUP_SESSION_CLASS (soup_session_async_parent_class)-> cancel_message (session, msg, status_code); @@ -424,7 +128,7 @@ soup_session_async_cancel_message (SoupSession *session, SoupMessage *msg, item->state = SOUP_MESSAGE_FINISHING; if (item->state != SOUP_MESSAGE_FINISHED) - process_queue_item (item, &dummy, FALSE); + soup_session_process_queue_item (session, item, NULL, FALSE); soup_message_queue_item_unref (item); } @@ -463,268 +167,13 @@ soup_session_async_auth_required (SoupSession *session, SoupMessage *msg, } static void -soup_session_async_kick (SoupSession *session) -{ - do_idle_run_queue (session); -} - - -static void -send_request_return_result (SoupMessageQueueItem *item, - gpointer stream, GError *error) -{ - GTask *task; - - task = item->task; - item->task = NULL; - - if (item->io_source) { - g_source_destroy (item->io_source); - g_clear_pointer (&item->io_source, g_source_unref); - } - - if (error) - g_task_return_error (task, error); - else if (SOUP_STATUS_IS_TRANSPORT_ERROR (item->msg->status_code)) { - if (stream) - g_object_unref (stream); - g_task_return_new_error (task, SOUP_HTTP_ERROR, - item->msg->status_code, - "%s", - item->msg->reason_phrase); - } else - g_task_return_pointer (task, stream, g_object_unref); - g_object_unref (task); -} - -static void -send_request_restarted (SoupSession *session, SoupMessageQueueItem *item) -{ - /* We won't be needing this, then. */ - g_object_set_data (G_OBJECT (item->msg), "SoupSessionAsync:ostream", NULL); - item->io_started = FALSE; -} - -static void -send_request_finished (SoupSession *session, SoupMessageQueueItem *item) -{ - GMemoryOutputStream *mostream; - GInputStream *istream = NULL; - GError *error = NULL; - - if (!item->task) { - /* Something else already took care of it. */ - return; - } - - mostream = g_object_get_data (G_OBJECT (item->task), "SoupSessionAsync:ostream"); - if (mostream) { - gpointer data; - gssize size; - - /* We thought it would be requeued, but it wasn't, so - * return the original body. - */ - size = g_memory_output_stream_get_data_size (mostream); - data = size ? g_memory_output_stream_steal_data (mostream) : g_strdup (""); - istream = g_memory_input_stream_new_from_data (data, size, g_free); - } else if (item->io_started) { - /* The message finished before becoming readable. This - * will happen, eg, if it's cancelled from got-headers. - * Do nothing; the op will complete via read_ready_cb() - * after we return; - */ - return; - } else { - /* The message finished before even being started; - * probably a tunnel connect failure. - */ - istream = g_memory_input_stream_new (); - } - - send_request_return_result (item, istream, error); -} - -static void -send_async_spliced (GObject *source, GAsyncResult *result, gpointer user_data) -{ - SoupMessageQueueItem *item = user_data; - GInputStream *istream = g_object_get_data (source, "istream"); - GError *error = NULL; - - /* It should be safe to call the sync close() method here since - * the message body has already been written. - */ - g_input_stream_close (istream, NULL, NULL); - g_object_unref (istream); - - /* If the message was cancelled, it will be completed via other means */ - if (g_cancellable_is_cancelled (item->cancellable) || - !item->task) { - soup_message_queue_item_unref (item); - return; - } - - if (g_output_stream_splice_finish (G_OUTPUT_STREAM (source), - result, &error) == -1) { - send_request_return_result (item, NULL, error); - soup_message_queue_item_unref (item); - return; - } - - /* Otherwise either restarted or finished will eventually be called. */ - do_idle_run_queue (item->session); - soup_message_queue_item_unref (item); -} - -static void -send_async_maybe_complete (SoupMessageQueueItem *item, - GInputStream *stream) -{ - if (item->msg->status_code == SOUP_STATUS_UNAUTHORIZED || - item->msg->status_code == SOUP_STATUS_PROXY_UNAUTHORIZED || - soup_session_would_redirect (item->session, item->msg)) { - GOutputStream *ostream; - - /* Message may be requeued, so gather the current message body... */ - ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free); - g_object_set_data_full (G_OBJECT (item->task), "SoupSessionAsync:ostream", - ostream, g_object_unref); - - g_object_set_data (G_OBJECT (ostream), "istream", stream); - - /* Give the splice op its own ref on item */ - soup_message_queue_item_ref (item); - g_output_stream_splice_async (ostream, stream, - /* We can't use CLOSE_SOURCE because it - * might get closed in the wrong thread. - */ - G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET, - G_PRIORITY_DEFAULT, - item->cancellable, - send_async_spliced, item); - return; - } - - send_request_return_result (item, stream, NULL); -} - -static void try_run_until_read (SoupMessageQueueItem *item); - -static gboolean -read_ready_cb (SoupMessage *msg, gpointer user_data) -{ - SoupMessageQueueItem *item = user_data; - - g_clear_pointer (&item->io_source, g_source_unref); - try_run_until_read (item); - return FALSE; -} - -static void -try_run_until_read (SoupMessageQueueItem *item) -{ - GError *error = NULL; - GInputStream *stream = NULL; - - if (soup_message_io_run_until_read (item->msg, item->cancellable, &error)) - stream = soup_message_io_get_response_istream (item->msg, &error); - if (stream) { - send_async_maybe_complete (item, stream); - return; - } - - if (g_error_matches (error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN)) { - item->state = SOUP_MESSAGE_RESTARTING; - soup_message_io_finished (item->msg); - g_error_free (error); - return; - } - - if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { - if (item->state != SOUP_MESSAGE_FINISHED) { - gboolean dummy; - - if (soup_message_io_in_progress (item->msg)) - soup_message_io_finished (item->msg); - item->state = SOUP_MESSAGE_FINISHING; - process_queue_item (item, &dummy, FALSE); - } - send_request_return_result (item, NULL, error); - return; - } - - g_clear_error (&error); - item->io_source = soup_message_io_get_source (item->msg, item->cancellable, - read_ready_cb, item); - g_source_attach (item->io_source, soup_session_get_async_context (item->session)); -} - -static void -send_request_running (SoupSession *session, SoupMessageQueueItem *item) -{ - item->io_started = TRUE; - try_run_until_read (item); -} - -void -soup_session_send_request_async (SoupSession *session, - SoupMessage *msg, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer user_data) -{ - SoupMessageQueueItem *item; - gboolean use_thread_context; - - g_return_if_fail (SOUP_IS_SESSION_ASYNC (session)); - - g_object_get (G_OBJECT (session), - SOUP_SESSION_USE_THREAD_CONTEXT, &use_thread_context, - NULL); - g_return_if_fail (use_thread_context); - - soup_session_async_queue_message (session, msg, NULL, NULL); - - item = soup_message_queue_lookup (soup_session_get_queue (session), msg); - g_return_if_fail (item != NULL); - - item->new_api = TRUE; - item->task = g_task_new (session, cancellable, callback, user_data); - g_task_set_task_data (item->task, item, (GDestroyNotify) soup_message_queue_item_unref); - - if (cancellable) { - g_object_unref (item->cancellable); - item->cancellable = g_object_ref (cancellable); - } -} - -GInputStream * -soup_session_send_request_finish (SoupSession *session, - GAsyncResult *result, - GError **error) -{ - g_return_val_if_fail (SOUP_IS_SESSION_ASYNC (session), NULL); - g_return_val_if_fail (g_task_is_valid (result, session), NULL); - - return g_task_propagate_pointer (G_TASK (result), error); -} - -static void soup_session_async_class_init (SoupSessionAsyncClass *soup_session_async_class) { SoupSessionClass *session_class = SOUP_SESSION_CLASS (soup_session_async_class); - GObjectClass *object_class = G_OBJECT_CLASS (session_class); - - g_type_class_add_private (soup_session_async_class, - sizeof (SoupSessionAsyncPrivate)); /* virtual method override */ session_class->queue_message = soup_session_async_queue_message; session_class->send_message = soup_session_async_send_message; session_class->cancel_message = soup_session_async_cancel_message; session_class->auth_required = soup_session_async_auth_required; - session_class->kick = soup_session_async_kick; - - object_class->dispose = soup_session_async_dispose; } diff --git a/libsoup/soup-session-private.h b/libsoup/soup-session-private.h index 297faf57..dc4d300b 100644 --- a/libsoup/soup-session-private.h +++ b/libsoup/soup-session-private.h @@ -17,26 +17,12 @@ SoupMessageQueue *soup_session_get_queue (SoupSession *s SoupMessageQueueItem *soup_session_append_queue_item (SoupSession *session, SoupMessage *msg, + gboolean async, + gboolean new_api, SoupSessionCallback callback, gpointer user_data); -SoupMessageQueueItem *soup_session_make_connect_message (SoupSession *session, - SoupConnection *conn); -gboolean soup_session_get_connection (SoupSession *session, - SoupMessageQueueItem *item, - gboolean *try_pruning); -gboolean soup_session_cleanup_connections (SoupSession *session, - gboolean prune_idle); -void soup_session_send_queue_item (SoupSession *session, - SoupMessageQueueItem *item, - SoupMessageCompletionFn completion_cb); -void soup_session_unqueue_item (SoupSession *session, - SoupMessageQueueItem *item); -void soup_session_set_item_connection (SoupSession *session, - SoupMessageQueueItem *item, - SoupConnection *conn); -void soup_session_set_item_status (SoupSession *session, - SoupMessageQueueItem *item, - guint status_code); + +void soup_session_kick_queue (SoupSession *session); GInputStream *soup_session_send_request (SoupSession *session, SoupMessage *msg, @@ -52,6 +38,11 @@ GInputStream *soup_session_send_request_finish (SoupSession *s GAsyncResult *result, GError **error); +void soup_session_process_queue_item (SoupSession *session, + SoupMessageQueueItem *item, + gboolean *should_prune, + gboolean loop); + G_END_DECLS #endif /* SOUP_SESSION_PRIVATE_H */ diff --git a/libsoup/soup-session-sync.c b/libsoup/soup-session-sync.c index 43d0a491..cbd24606 100644 --- a/libsoup/soup-session-sync.c +++ b/libsoup/soup-session-sync.c @@ -42,35 +42,13 @@ * handler callbacks, until I/O is complete. **/ -typedef struct { - GMutex lock; - GCond cond; -} SoupSessionSyncPrivate; -#define SOUP_SESSION_SYNC_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_SESSION_SYNC, SoupSessionSyncPrivate)) - G_DEFINE_TYPE (SoupSessionSync, soup_session_sync, SOUP_TYPE_SESSION) static void soup_session_sync_init (SoupSessionSync *ss) { - SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (ss); - - g_mutex_init (&priv->lock); - g_cond_init (&priv->cond); -} - -static void -soup_session_sync_finalize (GObject *object) -{ - SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (object); - - g_mutex_clear (&priv->lock); - g_cond_clear (&priv->cond); - - G_OBJECT_CLASS (soup_session_sync_parent_class)->finalize (object); } - /** * soup_session_sync_new: * @@ -107,181 +85,6 @@ soup_session_sync_new_with_options (const char *optname1, ...) return session; } -static guint -tunnel_connect (SoupSession *session, SoupMessageQueueItem *related) -{ - SoupConnection *conn = related->conn; - SoupMessageQueueItem *item; - guint status; - - g_object_ref (conn); - - item = soup_session_make_connect_message (session, conn); - do { - soup_session_send_queue_item (session, item, NULL); - status = item->msg->status_code; - if (item->state == SOUP_MESSAGE_RESTARTING && - soup_message_io_in_progress (item->msg)) { - soup_message_restarted (item->msg); - item->state = SOUP_MESSAGE_RUNNING; - } else { - if (item->state == SOUP_MESSAGE_RESTARTING) - status = SOUP_STATUS_TRY_AGAIN; - item->state = SOUP_MESSAGE_FINISHED; - soup_message_finished (item->msg); - } - } while (item->state == SOUP_MESSAGE_STARTING); - soup_session_unqueue_item (session, item); - soup_message_queue_item_unref (item); - - if (SOUP_STATUS_IS_SUCCESSFUL (status)) { - if (!soup_connection_start_ssl_sync (conn, related->cancellable)) - status = SOUP_STATUS_SSL_FAILED; - soup_message_set_https_status (related->msg, conn); - } - - g_object_unref (conn); - return status; -} - -static void -get_connection (SoupMessageQueueItem *item) -{ - SoupSession *session = item->session; - SoupMessage *msg = item->msg; - gboolean try_pruning = FALSE; - guint status; - -try_again: - soup_session_cleanup_connections (session, FALSE); - - if (!soup_session_get_connection (session, item, &try_pruning)) { - if (!try_pruning) - return; - soup_session_cleanup_connections (session, TRUE); - if (!soup_session_get_connection (session, item, &try_pruning)) - return; - try_pruning = FALSE; - } - - if (soup_connection_get_state (item->conn) == SOUP_CONNECTION_IDLE) { - item->state = SOUP_MESSAGE_READY; - return; - } - - if (soup_connection_get_state (item->conn) == SOUP_CONNECTION_NEW) { - status = soup_connection_connect_sync (item->conn, item->cancellable); - if (status == SOUP_STATUS_TRY_AGAIN) { - soup_session_set_item_connection (session, item, NULL); - goto try_again; - } - - soup_message_set_https_status (msg, item->conn); - - if (!SOUP_STATUS_IS_SUCCESSFUL (status)) { - if (!msg->status_code) - soup_session_set_item_status (session, item, status); - item->state = SOUP_MESSAGE_FINISHING; - soup_session_set_item_connection (session, item, NULL); - return; - } - } - - if (soup_connection_is_tunnelled (item->conn)) { - status = tunnel_connect (session, item); - if (!SOUP_STATUS_IS_SUCCESSFUL (status)) { - soup_session_set_item_connection (session, item, NULL); - if (status == SOUP_STATUS_TRY_AGAIN) - goto try_again; - soup_session_set_item_status (session, item, status); - item->state = SOUP_MESSAGE_FINISHING; - return; - } - } - - item->state = SOUP_MESSAGE_READY; -} - -static void process_queue_item (SoupMessageQueueItem *item); - -static void -new_api_message_completed (SoupMessage *msg, gpointer user_data) -{ - SoupMessageQueueItem *item = user_data; - - if (item->state != SOUP_MESSAGE_RESTARTING) { - item->state = SOUP_MESSAGE_FINISHING; - process_queue_item (item); - } -} - -static void -process_queue_item (SoupMessageQueueItem *item) -{ - SoupSession *session = item->session; - SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (session); - - soup_message_queue_item_ref (item); - - do { - if (item->paused) { - g_mutex_lock (&priv->lock); - while (item->paused) - g_cond_wait (&priv->cond, &priv->lock); - g_mutex_unlock (&priv->lock); - } - - switch (item->state) { - case SOUP_MESSAGE_STARTING: - g_mutex_lock (&priv->lock); - do { - get_connection (item); - if (item->state == SOUP_MESSAGE_STARTING) - g_cond_wait (&priv->cond, &priv->lock); - } while (item->state == SOUP_MESSAGE_STARTING); - g_mutex_unlock (&priv->lock); - break; - - case SOUP_MESSAGE_READY: - item->state = SOUP_MESSAGE_RUNNING; - - if (item->new_api) { - soup_session_send_queue_item (item->session, item, new_api_message_completed); - goto out; - } - - soup_session_send_queue_item (item->session, item, NULL); - if (item->state != SOUP_MESSAGE_RESTARTING) - item->state = SOUP_MESSAGE_FINISHING; - break; - - case SOUP_MESSAGE_RUNNING: - g_warn_if_fail (item->new_api); - item->state = SOUP_MESSAGE_FINISHING; - break; - - case SOUP_MESSAGE_RESTARTING: - item->state = SOUP_MESSAGE_STARTING; - soup_message_restarted (item->msg); - break; - - case SOUP_MESSAGE_FINISHING: - item->state = SOUP_MESSAGE_FINISHED; - soup_message_finished (item->msg); - soup_session_unqueue_item (session, item); - break; - - default: - g_warn_if_reached (); - item->state = SOUP_MESSAGE_FINISHING; - break; - } - } while (item->state != SOUP_MESSAGE_FINISHED); - - out: - soup_message_queue_item_unref (item); -} - static gboolean queue_message_callback (gpointer data) { @@ -297,7 +100,7 @@ queue_message_thread (gpointer data) { SoupMessageQueueItem *item = data; - process_queue_item (item); + soup_session_process_queue_item (item->session, item, NULL, TRUE); if (item->callback) { soup_add_completion (soup_session_get_async_context (item->session), queue_message_callback, item); @@ -314,7 +117,8 @@ soup_session_sync_queue_message (SoupSession *session, SoupMessage *msg, SoupMessageQueueItem *item; GThread *thread; - item = soup_session_append_queue_item (session, msg, callback, user_data); + item = soup_session_append_queue_item (session, msg, FALSE, FALSE, + callback, user_data); thread = g_thread_new ("SoupSessionSync:queue_message", queue_message_thread, item); g_thread_unref (thread); @@ -326,25 +130,15 @@ soup_session_sync_send_message (SoupSession *session, SoupMessage *msg) SoupMessageQueueItem *item; guint status; - item = soup_session_append_queue_item (session, msg, NULL, NULL); - process_queue_item (item); + item = soup_session_append_queue_item (session, msg, FALSE, FALSE, + NULL, NULL); + soup_session_process_queue_item (session, item, NULL, TRUE); status = msg->status_code; soup_message_queue_item_unref (item); return status; } static void -soup_session_sync_cancel_message (SoupSession *session, SoupMessage *msg, guint status_code) -{ - SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (session); - - g_mutex_lock (&priv->lock); - SOUP_SESSION_CLASS (soup_session_sync_parent_class)->cancel_message (session, msg, status_code); - g_cond_broadcast (&priv->cond); - g_mutex_unlock (&priv->lock); -} - -static void soup_session_sync_auth_required (SoupSession *session, SoupMessage *msg, SoupAuth *auth, gboolean retrying) { @@ -363,182 +157,12 @@ soup_session_sync_auth_required (SoupSession *session, SoupMessage *msg, } static void -soup_session_sync_flush_queue (SoupSession *session) -{ - SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (session); - SoupMessageQueue *queue; - SoupMessageQueueItem *item; - GHashTable *current; - gboolean done = FALSE; - - /* Record the current contents of the queue */ - current = g_hash_table_new (NULL, NULL); - queue = soup_session_get_queue (session); - for (item = soup_message_queue_first (queue); - item; - item = soup_message_queue_next (queue, item)) - g_hash_table_insert (current, item, item); - - /* Cancel everything */ - SOUP_SESSION_CLASS (soup_session_sync_parent_class)->flush_queue (session); - - /* Wait until all of the items in @current have been removed - * from the queue. (This is not the same as "wait for the - * queue to be empty", because the app may queue new requests - * in response to the cancellation of the old ones. We don't - * try to cancel those requests as well, since we'd likely - * just end up looping forever.) - */ - g_mutex_lock (&priv->lock); - do { - done = TRUE; - for (item = soup_message_queue_first (queue); - item; - item = soup_message_queue_next (queue, item)) { - if (g_hash_table_lookup (current, item)) - done = FALSE; - } - - if (!done) - g_cond_wait (&priv->cond, &priv->lock); - } while (!done); - g_mutex_unlock (&priv->lock); - - g_hash_table_destroy (current); -} - -static void -soup_session_sync_kick (SoupSession *session) -{ - SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (session); - - g_cond_broadcast (&priv->cond); -} - -static void soup_session_sync_class_init (SoupSessionSyncClass *session_sync_class) { - GObjectClass *object_class = G_OBJECT_CLASS (session_sync_class); SoupSessionClass *session_class = SOUP_SESSION_CLASS (session_sync_class); - g_type_class_add_private (session_sync_class, sizeof (SoupSessionSyncPrivate)); - /* virtual method override */ session_class->queue_message = soup_session_sync_queue_message; session_class->send_message = soup_session_sync_send_message; - session_class->cancel_message = soup_session_sync_cancel_message; session_class->auth_required = soup_session_sync_auth_required; - session_class->flush_queue = soup_session_sync_flush_queue; - session_class->kick = soup_session_sync_kick; - - object_class->finalize = soup_session_sync_finalize; -} - - -GInputStream * -soup_session_send_request (SoupSession *session, - SoupMessage *msg, - GCancellable *cancellable, - GError **error) -{ - SoupMessageQueueItem *item; - GInputStream *stream = NULL; - GOutputStream *ostream; - GMemoryOutputStream *mostream; - gssize size; - GError *my_error = NULL; - - g_return_val_if_fail (SOUP_IS_SESSION_SYNC (session), NULL); - - item = soup_session_append_queue_item (session, msg, NULL, NULL); - - item->new_api = TRUE; - if (cancellable) { - g_object_unref (item->cancellable); - item->cancellable = g_object_ref (cancellable); - } - - while (!stream) { - /* Get a connection, etc */ - process_queue_item (item); - if (item->state != SOUP_MESSAGE_RUNNING) - break; - - /* Send request, read headers */ - if (!soup_message_io_run_until_read (msg, item->cancellable, &my_error)) { - if (g_error_matches (my_error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN)) { - item->state = SOUP_MESSAGE_RESTARTING; - soup_message_io_finished (item->msg); - g_clear_error (&my_error); - continue; - } else - break; - } - - stream = soup_message_io_get_response_istream (msg, &my_error); - if (!stream) - break; - - /* Break if the message doesn't look likely-to-be-requeued */ - if (msg->status_code != SOUP_STATUS_UNAUTHORIZED && - msg->status_code != SOUP_STATUS_PROXY_UNAUTHORIZED && - !soup_session_would_redirect (session, msg)) - break; - - /* Gather the current message body... */ - ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free); - if (g_output_stream_splice (ostream, stream, - G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE | - G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET, - item->cancellable, &my_error) == -1) { - g_object_unref (stream); - g_object_unref (ostream); - stream = NULL; - break; - } - g_object_unref (stream); - stream = NULL; - - /* If the message was requeued, loop */ - if (item->state == SOUP_MESSAGE_RESTARTING) { - g_object_unref (ostream); - continue; - } - - /* Not requeued, so return the original body */ - mostream = G_MEMORY_OUTPUT_STREAM (ostream); - size = g_memory_output_stream_get_data_size (mostream); - stream = g_memory_input_stream_new (); - if (size) { - g_memory_input_stream_add_data (G_MEMORY_INPUT_STREAM (stream), - g_memory_output_stream_steal_data (mostream), - size, g_free); - } - g_object_unref (ostream); - } - - if (my_error) - g_propagate_error (error, my_error); - else if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) { - if (stream) { - g_object_unref (stream); - stream = NULL; - } - g_set_error_literal (error, SOUP_HTTP_ERROR, msg->status_code, - msg->reason_phrase); - } else if (!stream) - stream = g_memory_input_stream_new (); - - if (!stream) { - if (soup_message_io_in_progress (msg)) - soup_message_io_finished (msg); - else if (item->state != SOUP_MESSAGE_FINISHED) - item->state = SOUP_MESSAGE_FINISHING; - - if (item->state != SOUP_MESSAGE_FINISHED) - process_queue_item (item); - } - - soup_message_queue_item_unref (item); - return stream; } diff --git a/libsoup/soup-session.c b/libsoup/soup-session.c index 3eecd06b..f5d93d4d 100644 --- a/libsoup/soup-session.c +++ b/libsoup/soup-session.c @@ -77,6 +77,8 @@ static guint soup_host_uri_hash (gconstpointer key); static gboolean soup_host_uri_equal (gconstpointer v1, gconstpointer v2); typedef struct { + SoupSession *session; + GTlsDatabase *tlsdb; char *ssl_ca_file; gboolean ssl_strict; @@ -100,12 +102,15 @@ typedef struct { * SoupSessionHost, adding/removing a connection, * disconnecting a connection, or moving a connection from * IDLE to IN_USE. Must not emit signals or destroy objects - * while holding it. + * while holding it. conn_cond is signaled when it may be + * possible for a previously-blocked message to continue. */ GMutex conn_lock; + GCond conn_cond; GMainContext *async_context; gboolean use_thread_context; + GSList *run_queue_sources; GResolver *resolver; @@ -126,6 +131,10 @@ static void auth_manager_authenticate (SoupAuthManager *manager, SoupMessage *msg, SoupAuth *auth, gboolean retrying, gpointer user_data); +static void async_run_queue (SoupSession *session); + +static void async_send_request_running (SoupSession *session, SoupMessageQueueItem *item); + #define SOUP_SESSION_MAX_CONNS_DEFAULT 10 #define SOUP_SESSION_MAX_CONNS_PER_HOST_DEFAULT 2 @@ -133,9 +142,9 @@ static void auth_manager_authenticate (SoupAuthManager *manager, #define SOUP_SESSION_USER_AGENT_BASE "libsoup/" PACKAGE_VERSION -G_DEFINE_ABSTRACT_TYPE_WITH_CODE (SoupSession, soup_session, G_TYPE_OBJECT, - soup_init (); - ) +G_DEFINE_TYPE_WITH_CODE (SoupSession, soup_session, G_TYPE_OBJECT, + soup_init (); + ) enum { REQUEST_QUEUED, @@ -182,9 +191,12 @@ soup_session_init (SoupSession *session) SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session); SoupAuthManager *auth_manager; + priv->session = session; + priv->queue = soup_message_queue_new (session); g_mutex_init (&priv->conn_lock); + g_cond_init (&priv->conn_cond); priv->http_hosts = g_hash_table_new_full (soup_host_uri_hash, soup_host_uri_equal, NULL, (GDestroyNotify)free_host); @@ -225,6 +237,15 @@ soup_session_dispose (GObject *object) { SoupSession *session = SOUP_SESSION (object); SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session); + GSList *iter; + + priv->disposed = TRUE; + + for (iter = priv->run_queue_sources; iter; iter = iter->next) { + g_source_destroy (iter->data); + g_source_unref (iter->data); + } + g_clear_pointer (&priv->run_queue_sources, g_slist_free); priv->disposed = TRUE; soup_session_abort (session); @@ -245,6 +266,7 @@ soup_session_finalize (GObject *object) soup_message_queue_destroy (priv->queue); g_mutex_clear (&priv->conn_lock); + g_cond_clear (&priv->conn_cond); g_hash_table_destroy (priv->http_hosts); g_hash_table_destroy (priv->https_hosts); g_hash_table_destroy (priv->conns); @@ -826,10 +848,7 @@ get_host_for_uri (SoupSession *session, SoupURI *uri) return host; } -/* Note: get_host_for_message doesn't lock the conn_lock. The caller - * must do it itself if there's a chance the host doesn't already - * exist. - */ +/* Requires conn_lock to be locked */ static SoupSessionHost * get_host_for_message (SoupSession *session, SoupMessage *msg) { @@ -1031,6 +1050,36 @@ redirect_handler (SoupMessage *msg, gpointer user_data) } static void +proxy_connection_event (SoupConnection *conn, + GSocketClientEvent event, + GIOStream *connection, + gpointer user_data) +{ + SoupMessageQueueItem *item = user_data; + + soup_message_network_event (item->msg, event, connection); +} + +static void +soup_session_set_item_connection (SoupSession *session, + SoupMessageQueueItem *item, + SoupConnection *conn) +{ + if (item->conn) { + g_signal_handlers_disconnect_by_func (item->conn, proxy_connection_event, item); + g_object_unref (item->conn); + } + + item->conn = conn; + + if (item->conn) { + g_object_ref (item->conn); + g_signal_connect (item->conn, "event", + G_CALLBACK (proxy_connection_event), item); + } +} + +static void message_restarted (SoupMessage *msg, gpointer user_data) { SoupMessageQueueItem *item = user_data; @@ -1048,6 +1097,7 @@ message_restarted (SoupMessage *msg, gpointer user_data) SoupMessageQueueItem * soup_session_append_queue_item (SoupSession *session, SoupMessage *msg, + gboolean async, gboolean new_api, SoupSessionCallback callback, gpointer user_data) { SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session); @@ -1057,6 +1107,8 @@ soup_session_append_queue_item (SoupSession *session, SoupMessage *msg, soup_message_cleanup_response (msg); item = soup_message_queue_append (priv->queue, msg, callback, user_data); + item->async = async; + item->new_api = new_api; g_mutex_lock (&priv->conn_lock); host = get_host_for_message (session, item->msg); @@ -1077,7 +1129,7 @@ soup_session_append_queue_item (SoupSession *session, SoupMessage *msg, return item; } -void +static void soup_session_send_queue_item (SoupSession *session, SoupMessageQueueItem *item, SoupMessageCompletionFn completion_cb) @@ -1116,9 +1168,9 @@ soup_session_send_queue_item (SoupSession *session, soup_connection_send_request (item->conn, item, completion_cb, item); } -gboolean +static gboolean soup_session_cleanup_connections (SoupSession *session, - gboolean prune_idle) + gboolean cleanup_idle) { SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session); GSList *conns = NULL, *c; @@ -1131,7 +1183,7 @@ soup_session_cleanup_connections (SoupSession *session, while (g_hash_table_iter_next (&iter, &conn, &host)) { state = soup_connection_get_state (conn); if (state == SOUP_CONNECTION_REMOTE_DISCONNECTED || - (prune_idle && state == SOUP_CONNECTION_IDLE)) { + (cleanup_idle && state == SOUP_CONNECTION_IDLE)) { conns = g_slist_prepend (conns, g_object_ref (conn)); g_hash_table_iter_remove (&iter); drop_connection (session, host, conn); @@ -1233,7 +1285,7 @@ connection_disconnected (SoupConnection *conn, gpointer user_data) g_mutex_unlock (&priv->conn_lock); - SOUP_SESSION_GET_CLASS (session)->kick (session); + soup_session_kick_queue (session); } static void @@ -1243,88 +1295,281 @@ connection_state_changed (GObject *object, GParamSpec *param, gpointer user_data SoupConnection *conn = SOUP_CONNECTION (object); if (soup_connection_get_state (conn) == SOUP_CONNECTION_IDLE) - SOUP_SESSION_GET_CLASS (session)->kick (session); + soup_session_kick_queue (session); } -SoupMessageQueueItem * -soup_session_make_connect_message (SoupSession *session, - SoupConnection *conn) +SoupMessageQueue * +soup_session_get_queue (SoupSession *session) +{ + SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session); + + return priv->queue; +} + +static void +soup_session_unqueue_item (SoupSession *session, + SoupMessageQueueItem *item) +{ + SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session); + SoupSessionHost *host; + + if (item->conn) { + if (item->msg->method != SOUP_METHOD_CONNECT || + !SOUP_STATUS_IS_SUCCESSFUL (item->msg->status_code)) + soup_connection_set_state (item->conn, SOUP_CONNECTION_IDLE); + soup_session_set_item_connection (session, item, NULL); + } + + if (item->state != SOUP_MESSAGE_FINISHED) { + g_warning ("finished an item with state %d", item->state); + return; + } + + soup_message_queue_remove (priv->queue, item); + + g_mutex_lock (&priv->conn_lock); + host = get_host_for_message (session, item->msg); + host->num_messages--; + g_mutex_unlock (&priv->conn_lock); + + /* g_signal_handlers_disconnect_by_func doesn't work if you + * have a metamarshal, meaning it doesn't work with + * soup_message_add_header_handler() + */ + g_signal_handlers_disconnect_matched (item->msg, G_SIGNAL_MATCH_DATA, + 0, 0, NULL, NULL, item); + g_signal_emit (session, signals[REQUEST_UNQUEUED], 0, item->msg); + soup_message_queue_item_unref (item); +} + +static void +soup_session_set_item_status (SoupSession *session, + SoupMessageQueueItem *item, + guint status_code) { SoupURI *uri; + char *msg; + + switch (status_code) { + case SOUP_STATUS_CANT_RESOLVE: + case SOUP_STATUS_CANT_CONNECT: + uri = soup_message_get_uri (item->msg); + msg = g_strdup_printf ("%s (%s)", + soup_status_get_phrase (status_code), + uri->host); + soup_message_set_status_full (item->msg, status_code, msg); + g_free (msg); + break; + + case SOUP_STATUS_CANT_RESOLVE_PROXY: + case SOUP_STATUS_CANT_CONNECT_PROXY: + if (item->proxy_uri && item->proxy_uri->host) { + msg = g_strdup_printf ("%s (%s)", + soup_status_get_phrase (status_code), + item->proxy_uri->host); + soup_message_set_status_full (item->msg, status_code, msg); + g_free (msg); + break; + } + soup_message_set_status (item->msg, status_code); + break; + + case SOUP_STATUS_SSL_FAILED: + if (!g_tls_backend_supports_tls (g_tls_backend_get_default ())) { + soup_message_set_status_full (item->msg, status_code, + "TLS/SSL support not available; install glib-networking"); + } else + soup_message_set_status (item->msg, status_code); + break; + + default: + soup_message_set_status (item->msg, status_code); + break; + } +} + + +static void +message_completed (SoupMessage *msg, gpointer user_data) +{ + SoupMessageQueueItem *item = user_data; + + if (item->async) + soup_session_kick_queue (item->session); + + if (item->state != SOUP_MESSAGE_RESTARTING) { + item->state = SOUP_MESSAGE_FINISHING; + + if (item->new_api && !item->async) + soup_session_process_queue_item (item->session, item, NULL, TRUE); + } +} + +static void +tunnel_complete (SoupConnection *conn, guint status, gpointer user_data) +{ + SoupMessageQueueItem *tunnel_item = user_data; + SoupMessageQueueItem *item = tunnel_item->related; + SoupSession *session = tunnel_item->session; + + soup_message_finished (tunnel_item->msg); + soup_message_queue_item_unref (tunnel_item); + + if (item->msg->status_code) + item->state = SOUP_MESSAGE_FINISHING; + else + soup_message_set_https_status (item->msg, item->conn); + + if (!SOUP_STATUS_IS_SUCCESSFUL (status)) { + soup_session_set_item_connection (session, item, NULL); + soup_session_set_item_status (session, item, status); + } + + item->state = SOUP_MESSAGE_READY; + if (item->async) + soup_session_kick_queue (session); + soup_message_queue_item_unref (item); +} + +static void +tunnel_message_completed (SoupMessage *msg, gpointer user_data) +{ + SoupMessageQueueItem *tunnel_item = user_data; + SoupMessageQueueItem *item = tunnel_item->related; + SoupSession *session = tunnel_item->session; + guint status; + + if (tunnel_item->state == SOUP_MESSAGE_RESTARTING) { + soup_message_restarted (msg); + if (tunnel_item->conn) { + tunnel_item->state = SOUP_MESSAGE_RUNNING; + soup_session_send_queue_item (session, tunnel_item, + tunnel_message_completed); + return; + } + + soup_message_set_status (msg, SOUP_STATUS_TRY_AGAIN); + } + + tunnel_item->state = SOUP_MESSAGE_FINISHED; + soup_session_unqueue_item (session, tunnel_item); + + status = tunnel_item->msg->status_code; + if (!SOUP_STATUS_IS_SUCCESSFUL (status)) { + tunnel_complete (item->conn, status, tunnel_item); + return; + } + + if (tunnel_item->async) { + soup_connection_start_ssl_async (item->conn, item->cancellable, + tunnel_complete, tunnel_item); + } else { + status = soup_connection_start_ssl_sync (item->conn, item->cancellable); + tunnel_complete (item->conn, status, tunnel_item); + } +} + +static void +tunnel_connect (SoupMessageQueueItem *item) +{ + SoupSession *session = item->session; + SoupMessageQueueItem *tunnel_item; + SoupURI *uri; SoupMessage *msg; - SoupMessageQueueItem *item; - uri = soup_connection_get_remote_uri (conn); + item->state = SOUP_MESSAGE_TUNNELING; + + uri = soup_connection_get_remote_uri (item->conn); msg = soup_message_new_from_uri (SOUP_METHOD_CONNECT, uri); soup_message_set_flags (msg, SOUP_MESSAGE_NO_REDIRECT); - item = soup_session_append_queue_item (session, msg, NULL, NULL); - soup_session_set_item_connection (session, item, conn); + tunnel_item = soup_session_append_queue_item (session, msg, + item->async, FALSE, + NULL, NULL); g_object_unref (msg); - item->state = SOUP_MESSAGE_RUNNING; + tunnel_item->related = item; + soup_message_queue_item_ref (item); + soup_session_set_item_connection (session, tunnel_item, item->conn); + tunnel_item->state = SOUP_MESSAGE_RUNNING; - g_signal_emit (session, signals[TUNNELING], 0, conn); - return item; + g_signal_emit (session, signals[TUNNELING], 0, tunnel_item->conn); + + soup_session_send_queue_item (session, tunnel_item, + tunnel_message_completed); } -gboolean -soup_session_get_connection (SoupSession *session, - SoupMessageQueueItem *item, - gboolean *try_pruning) +static void +got_connection (SoupConnection *conn, guint status, gpointer user_data) +{ + SoupMessageQueueItem *item = user_data; + SoupSession *session = item->session; + + if (status != SOUP_STATUS_OK) { + if (item->state == SOUP_MESSAGE_CONNECTING) { + soup_session_set_item_status (session, item, status); + soup_session_set_item_connection (session, item, NULL); + item->state = SOUP_MESSAGE_READY; + } + } else + item->state = SOUP_MESSAGE_CONNECTED; + + if (item->async) { + if (item->state == SOUP_MESSAGE_CONNECTED || + item->state == SOUP_MESSAGE_READY) + async_run_queue (item->session); + else + soup_session_kick_queue (item->session); + + soup_message_queue_item_unref (item); + } +} + +/* requires conn_lock */ +static SoupConnection * +get_connection_for_host (SoupSession *session, + SoupMessageQueueItem *item, + SoupSessionHost *host, + gboolean need_new_connection, + gboolean *try_cleanup) { SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session); SoupConnection *conn; - SoupSessionHost *host; GSList *conns; int num_pending = 0; - gboolean need_new_connection; if (priv->disposed) return FALSE; if (item->conn) { g_return_val_if_fail (soup_connection_get_state (item->conn) != SOUP_CONNECTION_DISCONNECTED, FALSE); - return TRUE; + return item->conn; } - need_new_connection = - (soup_message_get_flags (item->msg) & SOUP_MESSAGE_NEW_CONNECTION) || - (!(soup_message_get_flags (item->msg) & SOUP_MESSAGE_IDEMPOTENT) && - !SOUP_METHOD_IS_IDEMPOTENT (item->msg->method)); - - g_mutex_lock (&priv->conn_lock); - - host = get_host_for_message (session, item->msg); for (conns = host->connections; conns; conns = conns->next) { - if (!need_new_connection && soup_connection_get_state (conns->data) == SOUP_CONNECTION_IDLE) { - soup_connection_set_state (conns->data, SOUP_CONNECTION_IN_USE); - g_mutex_unlock (&priv->conn_lock); - soup_session_set_item_connection (session, item, conns->data); - soup_message_set_https_status (item->msg, item->conn); - return TRUE; - } else if (soup_connection_get_state (conns->data) == SOUP_CONNECTION_CONNECTING) + conn = conns->data; + + if (!need_new_connection && soup_connection_get_state (conn) == SOUP_CONNECTION_IDLE) { + soup_connection_set_state (conn, SOUP_CONNECTION_IN_USE); + return conn; + } else if (soup_connection_get_state (conn) == SOUP_CONNECTION_CONNECTING) num_pending++; } /* Limit the number of pending connections; num_messages / 2 * is somewhat arbitrary... */ - if (num_pending > host->num_messages / 2) { - g_mutex_unlock (&priv->conn_lock); - return FALSE; - } + if (num_pending > host->num_messages / 2) + return NULL; if (host->num_conns >= priv->max_conns_per_host) { if (need_new_connection) - *try_pruning = TRUE; - g_mutex_unlock (&priv->conn_lock); - return FALSE; + *try_cleanup = TRUE; + return NULL; } if (priv->num_conns >= priv->max_conns) { - *try_pruning = TRUE; - g_mutex_unlock (&priv->conn_lock); - return FALSE; + *try_cleanup = TRUE; + return NULL; } conn = g_object_new ( @@ -1347,6 +1592,10 @@ soup_session_get_connection (SoupSession *session, G_CALLBACK (connection_state_changed), session); + /* This is a debugging-related signal, and so can ignore the + * usual rule about not emitting signals while holding + * conn_lock. + */ g_signal_emit (session, signals[CONNECTION_CREATED], 0, conn); g_hash_table_insert (priv->conns, conn, host); @@ -1361,129 +1610,220 @@ soup_session_get_connection (SoupSession *session, host->keep_alive_src = NULL; } - g_mutex_unlock (&priv->conn_lock); - soup_session_set_item_connection (session, item, conn); - return TRUE; + return conn; } -SoupMessageQueue * -soup_session_get_queue (SoupSession *session) +static gboolean +get_connection (SoupMessageQueueItem *item, gboolean *should_cleanup) { + SoupSession *session = item->session; SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session); + SoupSessionHost *host; + SoupConnection *conn = NULL; + gboolean my_should_cleanup = FALSE; + gboolean need_new_connection; - return priv->queue; -} + need_new_connection = + (soup_message_get_flags (item->msg) & SOUP_MESSAGE_NEW_CONNECTION) || + (!(soup_message_get_flags (item->msg) & SOUP_MESSAGE_IDEMPOTENT) && + !SOUP_METHOD_IS_IDEMPOTENT (item->msg->method)); -void -soup_session_unqueue_item (SoupSession *session, - SoupMessageQueueItem *item) -{ - SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session); - SoupSessionHost *host; + g_mutex_lock (&priv->conn_lock); + host = get_host_for_message (session, item->msg); + while (TRUE) { + conn = get_connection_for_host (session, item, host, + need_new_connection, + &my_should_cleanup); + if (conn || item->async) + break; - if (item->conn) { - if (item->msg->method != SOUP_METHOD_CONNECT || - !SOUP_STATUS_IS_SUCCESSFUL (item->msg->status_code)) - soup_connection_set_state (item->conn, SOUP_CONNECTION_IDLE); - soup_session_set_item_connection (session, item, NULL); + if (my_should_cleanup) { + g_mutex_unlock (&priv->conn_lock); + soup_session_cleanup_connections (session, TRUE); + g_mutex_lock (&priv->conn_lock); + + my_should_cleanup = FALSE; + continue; + } + + g_cond_wait (&priv->conn_cond, &priv->conn_lock); } + g_mutex_unlock (&priv->conn_lock); - if (item->state != SOUP_MESSAGE_FINISHED) { - g_warning ("finished an item with state %d", item->state); - return; + if (!conn) { + if (should_cleanup) + *should_cleanup = my_should_cleanup; + return FALSE; } - soup_message_queue_remove (priv->queue, item); + soup_session_set_item_connection (session, item, conn); + soup_message_set_https_status (item->msg, item->conn); - g_mutex_lock (&priv->conn_lock); - host = get_host_for_message (session, item->msg); - host->num_messages--; - g_mutex_unlock (&priv->conn_lock); + if (soup_connection_get_state (item->conn) != SOUP_CONNECTION_NEW) { + item->state = SOUP_MESSAGE_READY; + return TRUE; + } - /* g_signal_handlers_disconnect_by_func doesn't work if you - * have a metamarshal, meaning it doesn't work with - * soup_message_add_header_handler() - */ - g_signal_handlers_disconnect_matched (item->msg, G_SIGNAL_MATCH_DATA, - 0, 0, NULL, NULL, item); - g_signal_emit (session, signals[REQUEST_UNQUEUED], 0, item->msg); - soup_message_queue_item_unref (item); + item->state = SOUP_MESSAGE_CONNECTING; + + if (item->async) { + soup_message_queue_item_ref (item); + soup_connection_connect_async (item->conn, item->cancellable, + got_connection, item); + return FALSE; + } else { + guint status; + + status = soup_connection_connect_sync (item->conn, item->cancellable); + got_connection (item->conn, status, item); + + return TRUE; + } } -static void -proxy_connection_event (SoupConnection *conn, - GSocketClientEvent event, - GIOStream *connection, - gpointer user_data) +void +soup_session_process_queue_item (SoupSession *session, + SoupMessageQueueItem *item, + gboolean *should_cleanup, + gboolean loop) { - SoupMessageQueueItem *item = user_data; + g_assert (item->session == session); - soup_message_network_event (item->msg, event, connection); + do { + if (item->paused) + return; + + switch (item->state) { + case SOUP_MESSAGE_STARTING: + if (!get_connection (item, should_cleanup)) + return; + break; + + case SOUP_MESSAGE_CONNECTED: + if (soup_connection_is_tunnelled (item->conn)) + tunnel_connect (item); + else + item->state = SOUP_MESSAGE_READY; + break; + + case SOUP_MESSAGE_READY: + soup_message_set_https_status (item->msg, item->conn); + if (item->msg->status_code) { + if (item->msg->status_code == SOUP_STATUS_TRY_AGAIN) { + soup_message_cleanup_response (item->msg); + item->state = SOUP_MESSAGE_STARTING; + } else + item->state = SOUP_MESSAGE_FINISHING; + break; + } + + item->state = SOUP_MESSAGE_RUNNING; + + soup_session_send_queue_item (session, item, message_completed); + + if (item->new_api) { + if (item->async) + async_send_request_running (session, item); + return; + } + break; + + case SOUP_MESSAGE_RUNNING: + if (item->async) + return; + + g_warn_if_fail (item->new_api); + item->state = SOUP_MESSAGE_FINISHING; + break; + + case SOUP_MESSAGE_RESTARTING: + item->state = SOUP_MESSAGE_STARTING; + soup_message_restarted (item->msg); + break; + + case SOUP_MESSAGE_FINISHING: + item->state = SOUP_MESSAGE_FINISHED; + soup_message_finished (item->msg); + if (item->state != SOUP_MESSAGE_FINISHED) { + g_return_if_fail (!item->new_api); + break; + } + + soup_session_unqueue_item (session, item); + if (item->async && item->callback) + item->callback (session, item->msg, item->callback_data); + return; + + default: + /* Nothing to do with this message in any + * other state. + */ + g_warn_if_fail (item->async); + return; + } + } while (loop && item->state != SOUP_MESSAGE_FINISHED); } -void -soup_session_set_item_connection (SoupSession *session, - SoupMessageQueueItem *item, - SoupConnection *conn) +static void +async_run_queue (SoupSession *session) { - if (item->conn) { - g_signal_handlers_disconnect_by_func (item->conn, proxy_connection_event, item); - g_object_unref (item->conn); - } + SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session); + SoupMessageQueueItem *item; + SoupMessage *msg; + gboolean try_cleanup = TRUE, should_cleanup = FALSE; - item->conn = conn; + g_object_ref (session); + soup_session_cleanup_connections (session, FALSE); - if (item->conn) { - g_object_ref (item->conn); - g_signal_connect (item->conn, "event", - G_CALLBACK (proxy_connection_event), item); + try_again: + for (item = soup_message_queue_first (priv->queue); + item; + item = soup_message_queue_next (priv->queue, item)) { + msg = item->msg; + + /* CONNECT messages are handled specially */ + if (msg->method == SOUP_METHOD_CONNECT) + continue; + + if (item->async_context != soup_session_get_async_context (session)) + continue; + + soup_session_process_queue_item (session, item, &should_cleanup, TRUE); + } + + if (try_cleanup && should_cleanup) { + /* There is at least one message in the queue that + * could be sent if we cleanupd an idle connection from + * some other server. + */ + if (soup_session_cleanup_connections (session, TRUE)) { + try_cleanup = should_cleanup = FALSE; + goto try_again; + } } + + g_object_unref (session); } -void -soup_session_set_item_status (SoupSession *session, - SoupMessageQueueItem *item, - guint status_code) +static gboolean +idle_run_queue (gpointer user_data) { - SoupURI *uri; - char *msg; + SoupSessionPrivate *priv = user_data; + GSource *source; - switch (status_code) { - case SOUP_STATUS_CANT_RESOLVE: - case SOUP_STATUS_CANT_CONNECT: - uri = soup_message_get_uri (item->msg); - msg = g_strdup_printf ("%s (%s)", - soup_status_get_phrase (status_code), - uri->host); - soup_message_set_status_full (item->msg, status_code, msg); - g_free (msg); - break; + if (priv->disposed) + return FALSE; - case SOUP_STATUS_CANT_RESOLVE_PROXY: - case SOUP_STATUS_CANT_CONNECT_PROXY: - if (item->proxy_uri && item->proxy_uri->host) { - msg = g_strdup_printf ("%s (%s)", - soup_status_get_phrase (status_code), - item->proxy_uri->host); - soup_message_set_status_full (item->msg, status_code, msg); - g_free (msg); - break; - } - soup_message_set_status (item->msg, status_code); - break; + source = g_main_current_source (); + priv->run_queue_sources = g_slist_remove (priv->run_queue_sources, source); - case SOUP_STATUS_SSL_FAILED: - if (!g_tls_backend_supports_tls (g_tls_backend_get_default ())) { - soup_message_set_status_full (item->msg, status_code, - "TLS/SSL support not available; install glib-networking"); - } else - soup_message_set_status (item->msg, status_code); - break; + /* Ensure that the source is destroyed before running the queue */ + g_source_destroy (source); + g_source_unref (source); - default: - soup_message_set_status (item->msg, status_code); - break; - } + g_assert (priv->session); + async_run_queue (priv->session); + return FALSE; } /** @@ -1517,7 +1857,7 @@ void soup_session_queue_message (SoupSession *session, SoupMessage *msg, SoupSessionCallback callback, gpointer user_data) { - g_return_if_fail (SOUP_IS_SESSION (session)); + g_return_if_fail (SOUP_IS_SESSION_ASYNC (session) || SOUP_IS_SESSION_SYNC (session)); g_return_if_fail (SOUP_IS_MESSAGE (msg)); SOUP_SESSION_GET_CLASS (session)->queue_message (session, msg, @@ -1557,7 +1897,6 @@ soup_session_requeue_message (SoupSession *session, SoupMessage *msg) SOUP_SESSION_GET_CLASS (session)->requeue_message (session, msg); } - /** * soup_session_send_message: * @session: a #SoupSession @@ -1574,7 +1913,7 @@ soup_session_requeue_message (SoupSession *session, SoupMessage *msg) guint soup_session_send_message (SoupSession *session, SoupMessage *msg) { - g_return_val_if_fail (SOUP_IS_SESSION (session), SOUP_STATUS_MALFORMED); + g_return_val_if_fail (SOUP_IS_SESSION_ASYNC (session) || SOUP_IS_SESSION_SYNC (session), SOUP_STATUS_MALFORMED); g_return_val_if_fail (SOUP_IS_MESSAGE (msg), SOUP_STATUS_MALFORMED); return SOUP_SESSION_GET_CLASS (session)->send_message (session, msg); @@ -1609,6 +1948,48 @@ soup_session_pause_message (SoupSession *session, soup_message_queue_item_unref (item); } +static void +soup_session_real_kick_queue (SoupSession *session) +{ + SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session); + SoupMessageQueueItem *item; + gboolean have_sync_items = FALSE; + + if (priv->disposed) + return; + + for (item = soup_message_queue_first (priv->queue); + item; + item = soup_message_queue_next (priv->queue, item)) { + if (item->async) { + GSource *source; + + /* We use priv rather than session as the + * source data, because other parts of libsoup + * (or the calling app) may have sources using + * the session as the source data. + */ + source = g_main_context_find_source_by_user_data (item->async_context, priv); + if (!source) { + source = soup_add_completion_reffed (item->async_context, + idle_run_queue, priv); + priv->run_queue_sources = g_slist_prepend (priv->run_queue_sources, + source); + } + } else + have_sync_items = TRUE; + } + + if (have_sync_items) + g_cond_broadcast (&priv->conn_cond); +} + +void +soup_session_kick_queue (SoupSession *session) +{ + SOUP_SESSION_GET_CLASS (session)->kick (session); +} + /** * soup_session_unpause_message: * @session: a #SoupSession @@ -1640,7 +2021,7 @@ soup_session_unpause_message (SoupSession *session, soup_message_io_unpause (msg); soup_message_queue_item_unref (item); - SOUP_SESSION_GET_CLASS (session)->kick (session); + soup_session_kick_queue (session); } @@ -1657,6 +2038,7 @@ soup_session_real_cancel_message (SoupSession *session, SoupMessage *msg, guint soup_message_set_status (msg, status_code); g_cancellable_cancel (item->cancellable); + soup_session_kick_queue (item->session); soup_message_queue_item_unref (item); } @@ -1716,13 +2098,52 @@ soup_session_real_flush_queue (SoupSession *session) { SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session); SoupMessageQueueItem *item; + GHashTable *current = NULL; + gboolean done = FALSE; + + if (SOUP_IS_SESSION_SYNC (session)) { + /* Record the current contents of the queue */ + current = g_hash_table_new (NULL, NULL); + for (item = soup_message_queue_first (priv->queue); + item; + item = soup_message_queue_next (priv->queue, item)) + g_hash_table_insert (current, item, item); + } + /* Cancel everything */ for (item = soup_message_queue_first (priv->queue); item; item = soup_message_queue_next (priv->queue, item)) { soup_session_cancel_message (session, item->msg, SOUP_STATUS_CANCELLED); } + + if (SOUP_IS_SESSION_SYNC (session)) { + /* Wait until all of the items in @current have been + * removed from the queue. (This is not the same as + * "wait for the queue to be empty", because the app + * may queue new requests in response to the + * cancellation of the old ones. We don't try to + * cancel those requests as well, since we'd likely + * just end up looping forever.) + */ + g_mutex_lock (&priv->conn_lock); + do { + done = TRUE; + for (item = soup_message_queue_first (priv->queue); + item; + item = soup_message_queue_next (priv->queue, item)) { + if (g_hash_table_lookup (current, item)) + done = FALSE; + } + + if (!done) + g_cond_wait (&priv->conn_cond, &priv->conn_lock); + } while (!done); + g_mutex_unlock (&priv->conn_lock); + + g_hash_table_destroy (current); + } } /** @@ -2107,6 +2528,7 @@ soup_session_class_init (SoupSessionClass *session_class) session_class->cancel_message = soup_session_real_cancel_message; session_class->auth_required = soup_session_real_auth_required; session_class->flush_queue = soup_session_real_flush_queue; + session_class->kick = soup_session_real_kick_queue; /* virtual method override */ object_class->dispose = soup_session_dispose; @@ -2808,3 +3230,384 @@ soup_session_class_init (SoupSessionClass *session_class) G_TYPE_STRV, G_PARAM_READWRITE)); } + + +/* send_request_async */ + +static void +async_send_request_return_result (SoupMessageQueueItem *item, + gpointer stream, GError *error) +{ + GTask *task; + + g_return_if_fail (item->task != NULL); + + g_signal_handlers_disconnect_matched (item->msg, G_SIGNAL_MATCH_DATA, + 0, 0, NULL, NULL, item); + + task = item->task; + item->task = NULL; + + if (item->io_source) { + g_source_destroy (item->io_source); + g_clear_pointer (&item->io_source, g_source_unref); + } + + if (error) + g_task_return_error (task, error); + else if (SOUP_STATUS_IS_TRANSPORT_ERROR (item->msg->status_code)) { + if (stream) + g_object_unref (stream); + g_task_return_new_error (task, SOUP_HTTP_ERROR, + item->msg->status_code, + "%s", + item->msg->reason_phrase); + } else + g_task_return_pointer (task, stream, g_object_unref); + g_object_unref (task); +} + +static void +async_send_request_restarted (SoupMessage *msg, gpointer user_data) +{ + SoupMessageQueueItem *item = user_data; + + /* We won't be needing this, then. */ + g_object_set_data (G_OBJECT (item->msg), "SoupSession:ostream", NULL); + item->io_started = FALSE; +} + +static void +async_send_request_finished (SoupMessage *msg, gpointer user_data) +{ + SoupMessageQueueItem *item = user_data; + GMemoryOutputStream *mostream; + GInputStream *istream = NULL; + GError *error = NULL; + + if (!item->task) { + /* Something else already took care of it. */ + return; + } + + mostream = g_object_get_data (G_OBJECT (item->task), "SoupSession:ostream"); + if (mostream) { + gpointer data; + gssize size; + + /* We thought it would be requeued, but it wasn't, so + * return the original body. + */ + size = g_memory_output_stream_get_data_size (mostream); + data = size ? g_memory_output_stream_steal_data (mostream) : g_strdup (""); + istream = g_memory_input_stream_new_from_data (data, size, g_free); + } else if (item->io_started) { + /* The message finished before becoming readable. This + * will happen, eg, if it's cancelled from got-headers. + * Do nothing; the op will complete via read_ready_cb() + * after we return; + */ + return; + } else { + /* The message finished before even being started; + * probably a tunnel connect failure. + */ + istream = g_memory_input_stream_new (); + } + + async_send_request_return_result (item, istream, error); +} + +static void +send_async_spliced (GObject *source, GAsyncResult *result, gpointer user_data) +{ + SoupMessageQueueItem *item = user_data; + GInputStream *istream = g_object_get_data (source, "istream"); + GError *error = NULL; + + /* It should be safe to call the sync close() method here since + * the message body has already been written. + */ + g_input_stream_close (istream, NULL, NULL); + g_object_unref (istream); + + /* If the message was cancelled, it will be completed via other means */ + if (g_cancellable_is_cancelled (item->cancellable) || + !item->task) { + soup_message_queue_item_unref (item); + return; + } + + if (g_output_stream_splice_finish (G_OUTPUT_STREAM (source), + result, &error) == -1) { + async_send_request_return_result (item, NULL, error); + soup_message_queue_item_unref (item); + return; + } + + /* Otherwise either restarted or finished will eventually be called. */ + soup_session_kick_queue (item->session); + soup_message_queue_item_unref (item); +} + +static void +send_async_maybe_complete (SoupMessageQueueItem *item, + GInputStream *stream) +{ + if (item->msg->status_code == SOUP_STATUS_UNAUTHORIZED || + item->msg->status_code == SOUP_STATUS_PROXY_UNAUTHORIZED || + soup_session_would_redirect (item->session, item->msg)) { + GOutputStream *ostream; + + /* Message may be requeued, so gather the current message body... */ + ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free); + g_object_set_data_full (G_OBJECT (item->task), "SoupSession:ostream", + ostream, g_object_unref); + + g_object_set_data (G_OBJECT (ostream), "istream", stream); + + /* Give the splice op its own ref on item */ + soup_message_queue_item_ref (item); + g_output_stream_splice_async (ostream, stream, + /* We can't use CLOSE_SOURCE because it + * might get closed in the wrong thread. + */ + G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET, + G_PRIORITY_DEFAULT, + item->cancellable, + send_async_spliced, item); + return; + } + + async_send_request_return_result (item, stream, NULL); +} + +static void try_run_until_read (SoupMessageQueueItem *item); + +static gboolean +read_ready_cb (SoupMessage *msg, gpointer user_data) +{ + SoupMessageQueueItem *item = user_data; + + g_clear_pointer (&item->io_source, g_source_unref); + try_run_until_read (item); + return FALSE; +} + +static void +try_run_until_read (SoupMessageQueueItem *item) +{ + GError *error = NULL; + GInputStream *stream = NULL; + + if (soup_message_io_run_until_read (item->msg, item->cancellable, &error)) + stream = soup_message_io_get_response_istream (item->msg, &error); + if (stream) { + send_async_maybe_complete (item, stream); + return; + } + + if (g_error_matches (error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN)) { + item->state = SOUP_MESSAGE_RESTARTING; + soup_message_io_finished (item->msg); + g_error_free (error); + return; + } + + if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + if (item->state != SOUP_MESSAGE_FINISHED) { + if (soup_message_io_in_progress (item->msg)) + soup_message_io_finished (item->msg); + item->state = SOUP_MESSAGE_FINISHING; + soup_session_process_queue_item (item->session, item, NULL, FALSE); + } + async_send_request_return_result (item, NULL, error); + return; + } + + g_clear_error (&error); + item->io_source = soup_message_io_get_source (item->msg, item->cancellable, + read_ready_cb, item); + g_source_attach (item->io_source, soup_session_get_async_context (item->session)); +} + +static void +async_send_request_running (SoupSession *session, SoupMessageQueueItem *item) +{ + item->io_started = TRUE; + try_run_until_read (item); +} + +void +soup_session_send_request_async (SoupSession *session, + SoupMessage *msg, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + SoupMessageQueueItem *item; + gboolean use_thread_context; + + g_return_if_fail (SOUP_IS_SESSION (session)); + g_return_if_fail (!SOUP_IS_SESSION_SYNC (session)); + + g_object_get (G_OBJECT (session), + SOUP_SESSION_USE_THREAD_CONTEXT, &use_thread_context, + NULL); + g_return_if_fail (use_thread_context); + + item = soup_session_append_queue_item (session, msg, TRUE, TRUE, + NULL, NULL); + g_signal_connect (msg, "restarted", + G_CALLBACK (async_send_request_restarted), item); + g_signal_connect (msg, "finished", + G_CALLBACK (async_send_request_finished), item); + + item->new_api = TRUE; + item->task = g_task_new (session, cancellable, callback, user_data); + g_task_set_task_data (item->task, item, (GDestroyNotify) soup_message_queue_item_unref); + + if (cancellable) { + g_object_unref (item->cancellable); + item->cancellable = g_object_ref (cancellable); + } + + soup_session_kick_queue (session); +} + +GInputStream * +soup_session_send_request_finish (SoupSession *session, + GAsyncResult *result, + GError **error) +{ + GTask *task; + + g_return_val_if_fail (SOUP_IS_SESSION (session), NULL); + g_return_val_if_fail (!SOUP_IS_SESSION_SYNC (session), NULL); + g_return_val_if_fail (g_task_is_valid (result, session), NULL); + + task = G_TASK (result); + if (g_task_had_error (task)) { + SoupMessageQueueItem *item = g_task_get_task_data (task); + + if (soup_message_io_in_progress (item->msg)) + soup_message_io_finished (item->msg); + else if (item->state != SOUP_MESSAGE_FINISHED) + item->state = SOUP_MESSAGE_FINISHING; + + if (item->state != SOUP_MESSAGE_FINISHED) + soup_session_process_queue_item (session, item, NULL, FALSE); + } + + return g_task_propagate_pointer (task, error); +} + +GInputStream * +soup_session_send_request (SoupSession *session, + SoupMessage *msg, + GCancellable *cancellable, + GError **error) +{ + SoupMessageQueueItem *item; + GInputStream *stream = NULL; + GOutputStream *ostream; + GMemoryOutputStream *mostream; + gssize size; + GError *my_error = NULL; + + g_return_val_if_fail (SOUP_IS_SESSION (session), NULL); + g_return_val_if_fail (!SOUP_IS_SESSION_ASYNC (session), NULL); + + item = soup_session_append_queue_item (session, msg, FALSE, TRUE, + NULL, NULL); + + item->new_api = TRUE; + if (cancellable) { + g_object_unref (item->cancellable); + item->cancellable = g_object_ref (cancellable); + } + + while (!stream) { + /* Get a connection, etc */ + soup_session_process_queue_item (session, item, NULL, TRUE); + if (item->state != SOUP_MESSAGE_RUNNING) + break; + + /* Send request, read headers */ + if (!soup_message_io_run_until_read (msg, item->cancellable, &my_error)) { + if (g_error_matches (my_error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN)) { + item->state = SOUP_MESSAGE_RESTARTING; + soup_message_io_finished (item->msg); + g_clear_error (&my_error); + continue; + } else + break; + } + + stream = soup_message_io_get_response_istream (msg, &my_error); + if (!stream) + break; + + /* Break if the message doesn't look likely-to-be-requeued */ + if (msg->status_code != SOUP_STATUS_UNAUTHORIZED && + msg->status_code != SOUP_STATUS_PROXY_UNAUTHORIZED && + !soup_session_would_redirect (session, msg)) + break; + + /* Gather the current message body... */ + ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free); + if (g_output_stream_splice (ostream, stream, + G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE | + G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET, + item->cancellable, &my_error) == -1) { + g_object_unref (stream); + g_object_unref (ostream); + stream = NULL; + break; + } + g_object_unref (stream); + stream = NULL; + + /* If the message was requeued, loop */ + if (item->state == SOUP_MESSAGE_RESTARTING) { + g_object_unref (ostream); + continue; + } + + /* Not requeued, so return the original body */ + mostream = G_MEMORY_OUTPUT_STREAM (ostream); + size = g_memory_output_stream_get_data_size (mostream); + stream = g_memory_input_stream_new (); + if (size) { + g_memory_input_stream_add_data (G_MEMORY_INPUT_STREAM (stream), + g_memory_output_stream_steal_data (mostream), + size, g_free); + } + g_object_unref (ostream); + } + + if (my_error) + g_propagate_error (error, my_error); + else if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) { + if (stream) { + g_object_unref (stream); + stream = NULL; + } + g_set_error_literal (error, SOUP_HTTP_ERROR, msg->status_code, + msg->reason_phrase); + } else if (!stream) + stream = g_memory_input_stream_new (); + + if (!stream) { + if (soup_message_io_in_progress (msg)) + soup_message_io_finished (msg); + else if (item->state != SOUP_MESSAGE_FINISHED) + item->state = SOUP_MESSAGE_FINISHING; + + if (item->state != SOUP_MESSAGE_FINISHED) + soup_session_process_queue_item (session, item, NULL, TRUE); + } + + soup_message_queue_item_unref (item); + return stream; +} diff --git a/tests/requester-test.c b/tests/requester-test.c index bc45e9f6..a4379377 100644 --- a/tests/requester-test.c +++ b/tests/requester-test.c @@ -372,21 +372,22 @@ do_test_for_thread_and_context (SoupSession *session, const char *base_uri) } static void -do_simple_test (const char *uri) +do_simple_test (const char *uri, gboolean plain_session) { SoupSession *session; - debug_printf (1, "Simple streaming test\n"); + debug_printf (1, "Simple streaming test with %s\n", + plain_session ? "SoupSession" : "SoupSessionAsync"); - session = soup_test_session_new (SOUP_TYPE_SESSION_ASYNC, + session = soup_test_session_new (plain_session ? SOUP_TYPE_SESSION : SOUP_TYPE_SESSION_ASYNC, SOUP_SESSION_USE_THREAD_CONTEXT, TRUE, NULL); do_test_for_thread_and_context (session, uri); soup_test_session_abort_unref (session); } -static gpointer -do_test_with_context (const char *uri) +static void +do_test_with_context_and_type (const char *uri, gboolean plain_session) { GMainContext *async_context; SoupSession *session; @@ -394,7 +395,7 @@ do_test_with_context (const char *uri) async_context = g_main_context_new (); g_main_context_push_thread_default (async_context); - session = soup_test_session_new (SOUP_TYPE_SESSION_ASYNC, + session = soup_test_session_new (plain_session ? SOUP_TYPE_SESSION : SOUP_TYPE_SESSION_ASYNC, SOUP_SESSION_ASYNC_CONTEXT, async_context, SOUP_SESSION_USE_THREAD_CONTEXT, TRUE, NULL); @@ -404,25 +405,43 @@ do_test_with_context (const char *uri) g_main_context_pop_thread_default (async_context); g_main_context_unref (async_context); +} + +static gpointer +do_test_with_context (gpointer uri) +{ + do_test_with_context_and_type (uri, FALSE); + return NULL; +} + +static gpointer +do_plain_test_with_context (gpointer uri) +{ + do_test_with_context_and_type (uri, TRUE); return NULL; } static void -do_context_test (const char *uri) +do_context_test (const char *uri, gboolean plain_session) { - debug_printf (1, "Streaming with a non-default-context\n"); - do_test_with_context (uri); + debug_printf (1, "Streaming with a non-default-context with %s\n", + plain_session ? "SoupSession" : "SoupSessionAsync"); + if (plain_session) + do_plain_test_with_context ((gpointer)uri); + else + do_test_with_context ((gpointer)uri); } static void -do_thread_test (const char *uri) +do_thread_test (const char *uri, gboolean plain_session) { GThread *thread; - debug_printf (1, "Streaming in another thread\n"); + debug_printf (1, "Streaming in another thread with %s\n", + plain_session ? "SoupSession" : "SoupSessionAsync"); thread = g_thread_new ("do_test_with_context", - (GThreadFunc)do_test_with_context, + plain_session ? do_plain_test_with_context : do_test_with_context, (gpointer)uri); g_thread_join (thread); } @@ -542,16 +561,17 @@ do_sync_request (SoupSession *session, SoupRequest *request, } static void -do_sync_test (const char *uri_string) +do_sync_test (const char *uri_string, gboolean plain_session) { SoupSession *session; SoupRequester *requester; SoupRequest *request; SoupURI *uri; - debug_printf (1, "Sync streaming\n"); + debug_printf (1, "Sync streaming with %s\n", + plain_session ? "SoupSession" : "SoupSessionSync"); - session = soup_test_session_new (SOUP_TYPE_SESSION_SYNC, NULL); + session = soup_test_session_new (plain_session ? SOUP_TYPE_SESSION : SOUP_TYPE_SESSION_SYNC, NULL); requester = soup_requester_new (); soup_session_add_feature (session, SOUP_SESSION_FEATURE (requester)); g_object_unref (requester); @@ -614,10 +634,15 @@ main (int argc, char **argv) uri = g_strdup_printf ("http://127.0.0.1:%u/foo", soup_server_get_port (server)); - do_simple_test (uri); - do_thread_test (uri); - do_context_test (uri); - do_sync_test (uri); + do_simple_test (uri, FALSE); + do_thread_test (uri, FALSE); + do_context_test (uri, FALSE); + do_sync_test (uri, FALSE); + + do_simple_test (uri, TRUE); + do_thread_test (uri, TRUE); + do_context_test (uri, TRUE); + do_sync_test (uri, TRUE); g_free (uri); soup_buffer_free (response); |