summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--libsoup/soup-message-queue.h3
-rw-r--r--libsoup/soup-session-async.c565
-rw-r--r--libsoup/soup-session-private.h27
-rw-r--r--libsoup/soup-session-sync.c388
-rw-r--r--libsoup/soup-session.c1109
-rw-r--r--tests/requester-test.c63
6 files changed, 1024 insertions, 1131 deletions
diff --git a/libsoup/soup-message-queue.h b/libsoup/soup-message-queue.h
index d3341bd7..dd619244 100644
--- a/libsoup/soup-message-queue.h
+++ b/libsoup/soup-message-queue.h
@@ -46,7 +46,8 @@ struct _SoupMessageQueueItem {
guint paused : 1;
guint new_api : 1;
guint io_started : 1;
- guint redirection_count : 29;
+ guint async : 1;
+ guint redirection_count : 28;
SoupMessageQueueItemState state;
diff --git a/libsoup/soup-session-async.c b/libsoup/soup-session-async.c
index 99edf32f..a24f4bab 100644
--- a/libsoup/soup-session-async.c
+++ b/libsoup/soup-session-async.c
@@ -27,48 +27,13 @@
* single-threaded programs.
**/
-static void run_queue (SoupSessionAsync *sa);
-static void do_idle_run_queue (SoupSession *session);
-
-static void send_request_running (SoupSession *session, SoupMessageQueueItem *item);
-static void send_request_restarted (SoupSession *session, SoupMessageQueueItem *item);
-static void send_request_finished (SoupSession *session, SoupMessageQueueItem *item);
-
G_DEFINE_TYPE (SoupSessionAsync, soup_session_async, SOUP_TYPE_SESSION)
-typedef struct {
- SoupSessionAsync *sa;
- GSList *sources;
- gboolean disposed;
-
-} SoupSessionAsyncPrivate;
-#define SOUP_SESSION_ASYNC_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_SESSION_ASYNC, SoupSessionAsyncPrivate))
-
static void
soup_session_async_init (SoupSessionAsync *sa)
{
- SoupSessionAsyncPrivate *priv = SOUP_SESSION_ASYNC_GET_PRIVATE (sa);
-
- priv->sa = sa;
}
-static void
-soup_session_async_dispose (GObject *object)
-{
- SoupSessionAsyncPrivate *priv = SOUP_SESSION_ASYNC_GET_PRIVATE (object);
- GSList *iter;
-
- priv->disposed = TRUE;
- for (iter = priv->sources; iter; iter = iter->next) {
- g_source_destroy (iter->data);
- g_source_unref (iter->data);
- }
- g_clear_pointer (&priv->sources, g_slist_free);
-
- G_OBJECT_CLASS (soup_session_async_parent_class)->dispose (object);
-}
-
-
/**
* soup_session_async_new:
*
@@ -106,274 +71,15 @@ soup_session_async_new_with_options (const char *optname1, ...)
}
static void
-message_completed (SoupMessage *msg, gpointer user_data)
-{
- SoupMessageQueueItem *item = user_data;
-
- do_idle_run_queue (item->session);
-
- if (item->state != SOUP_MESSAGE_RESTARTING)
- item->state = SOUP_MESSAGE_FINISHING;
-}
-
-static void
-ssl_tunnel_completed (SoupConnection *conn, guint status, gpointer user_data)
-{
- SoupMessageQueueItem *tunnel_item = user_data;
- SoupMessageQueueItem *item = tunnel_item->related;
- SoupSession *session = item->session;
-
- soup_message_finished (tunnel_item->msg);
- soup_message_queue_item_unref (tunnel_item);
-
- if (!SOUP_STATUS_IS_SUCCESSFUL (status)) {
- soup_session_set_item_connection (session, item, NULL);
- soup_message_set_status (item->msg, status);
- }
-
- item->state = SOUP_MESSAGE_READY;
- do_idle_run_queue (session);
- soup_message_queue_item_unref (item);
-}
-
-static void
-tunnel_message_completed (SoupMessage *tunnel_msg, gpointer user_data)
-{
- SoupMessageQueueItem *tunnel_item = user_data;
- SoupSession *session = tunnel_item->session;
- SoupMessageQueueItem *item = tunnel_item->related;
-
- if (tunnel_item->state == SOUP_MESSAGE_RESTARTING) {
- soup_message_restarted (tunnel_msg);
- if (tunnel_item->conn) {
- tunnel_item->state = SOUP_MESSAGE_RUNNING;
- soup_session_send_queue_item (session, tunnel_item,
- tunnel_message_completed);
- return;
- }
-
- soup_message_set_status (tunnel_msg, SOUP_STATUS_TRY_AGAIN);
- }
-
- tunnel_item->state = SOUP_MESSAGE_FINISHED;
- soup_session_unqueue_item (session, tunnel_item);
-
- if (SOUP_STATUS_IS_SUCCESSFUL (tunnel_msg->status_code)) {
- soup_connection_start_ssl_async (item->conn, item->cancellable,
- ssl_tunnel_completed, tunnel_item);
- } else {
- ssl_tunnel_completed (item->conn, tunnel_msg->status_code,
- tunnel_item);
- }
-}
-
-static void
-got_connection (SoupConnection *conn, guint status, gpointer user_data)
-{
- SoupMessageQueueItem *item = user_data;
- SoupSession *session = item->session;
-
- if (status != SOUP_STATUS_OK) {
- if (item->state == SOUP_MESSAGE_CONNECTING) {
- soup_session_set_item_status (session, item, status);
- soup_session_set_item_connection (session, item, NULL);
- item->state = SOUP_MESSAGE_READY;
- }
- } else
- item->state = SOUP_MESSAGE_CONNECTED;
-
- run_queue ((SoupSessionAsync *)session);
- soup_message_queue_item_unref (item);
-}
-
-static void
-process_queue_item (SoupMessageQueueItem *item,
- gboolean *should_prune,
- gboolean loop)
-{
- SoupSession *session = item->session;
-
- if (item->async_context != soup_session_get_async_context (session))
- return;
-
- do {
- if (item->paused)
- return;
-
- switch (item->state) {
- case SOUP_MESSAGE_STARTING:
- if (!soup_session_get_connection (session, item, should_prune))
- return;
-
- if (soup_connection_get_state (item->conn) != SOUP_CONNECTION_NEW) {
- item->state = SOUP_MESSAGE_READY;
- break;
- }
-
- item->state = SOUP_MESSAGE_CONNECTING;
- soup_message_queue_item_ref (item);
- soup_connection_connect_async (item->conn, item->cancellable,
- got_connection, item);
- return;
-
- case SOUP_MESSAGE_CONNECTED:
- if (soup_connection_is_tunnelled (item->conn)) {
- SoupMessageQueueItem *tunnel_item;
-
- soup_message_queue_item_ref (item);
-
- item->state = SOUP_MESSAGE_TUNNELING;
-
- tunnel_item = soup_session_make_connect_message (session, item->conn);
- tunnel_item->related = item;
- soup_session_send_queue_item (session, tunnel_item, tunnel_message_completed);
- return;
- }
-
- item->state = SOUP_MESSAGE_READY;
- break;
-
- case SOUP_MESSAGE_READY:
- soup_message_set_https_status (item->msg, item->conn);
- if (item->msg->status_code) {
- if (item->msg->status_code == SOUP_STATUS_TRY_AGAIN) {
- soup_message_cleanup_response (item->msg);
- item->state = SOUP_MESSAGE_STARTING;
- } else
- item->state = SOUP_MESSAGE_FINISHING;
- break;
- }
-
- item->state = SOUP_MESSAGE_RUNNING;
- soup_session_send_queue_item (session, item, message_completed);
- if (item->new_api)
- send_request_running (session, item);
- break;
-
- case SOUP_MESSAGE_RESTARTING:
- item->state = SOUP_MESSAGE_STARTING;
- soup_message_restarted (item->msg);
- if (item->new_api)
- send_request_restarted (session, item);
- break;
-
- case SOUP_MESSAGE_FINISHING:
- item->state = SOUP_MESSAGE_FINISHED;
- soup_message_finished (item->msg);
- if (item->state != SOUP_MESSAGE_FINISHED) {
- g_return_if_fail (!item->new_api);
- break;
- }
-
- soup_message_queue_item_ref (item);
- soup_session_unqueue_item (session, item);
- if (item->callback)
- item->callback (session, item->msg, item->callback_data);
- else if (item->new_api)
- send_request_finished (session, item);
-
- soup_message_queue_item_unref (item);
- return;
-
- default:
- /* Nothing to do with this message in any
- * other state.
- */
- return;
- }
- } while (loop && item->state != SOUP_MESSAGE_FINISHED);
-}
-
-static void
-run_queue (SoupSessionAsync *sa)
-{
- SoupSession *session = SOUP_SESSION (sa);
- SoupMessageQueue *queue = soup_session_get_queue (session);
- SoupMessageQueueItem *item;
- SoupMessage *msg;
- gboolean try_pruning = TRUE, should_prune = FALSE;
-
- g_object_ref (session);
- soup_session_cleanup_connections (session, FALSE);
-
- try_again:
- for (item = soup_message_queue_first (queue);
- item;
- item = soup_message_queue_next (queue, item)) {
- msg = item->msg;
-
- /* CONNECT messages are handled specially */
- if (msg->method != SOUP_METHOD_CONNECT)
- process_queue_item (item, &should_prune, TRUE);
- }
-
- if (try_pruning && should_prune) {
- /* There is at least one message in the queue that
- * could be sent if we pruned an idle connection from
- * some other server.
- */
- if (soup_session_cleanup_connections (session, TRUE)) {
- try_pruning = should_prune = FALSE;
- goto try_again;
- }
- }
-
- g_object_unref (session);
-}
-
-static gboolean
-idle_run_queue (gpointer user_data)
-{
- SoupSessionAsyncPrivate *priv = user_data;
- GSource *source;
-
- if (priv->disposed)
- return FALSE;
-
- source = g_main_current_source ();
- priv->sources = g_slist_remove (priv->sources, source);
-
- /* Ensure that the source is destroyed before running the queue */
- g_source_destroy (source);
- g_source_unref (source);
-
- run_queue (priv->sa);
- return FALSE;
-}
-
-static void
-do_idle_run_queue (SoupSession *session)
-{
- SoupSessionAsyncPrivate *priv = SOUP_SESSION_ASYNC_GET_PRIVATE (session);
- GMainContext *async_context = soup_session_get_async_context (session);
- GSource *source;
-
- if (priv->disposed)
- return;
-
- /* We use priv rather than session as the source data, because
- * other parts of libsoup (or the calling app) may have sources
- * using the session as the source data.
- */
-
- source = g_main_context_find_source_by_user_data (async_context, priv);
- if (source)
- return;
-
- source = soup_add_completion_reffed (async_context, idle_run_queue, priv);
- priv->sources = g_slist_prepend (priv->sources, source);
-}
-
-static void
soup_session_async_queue_message (SoupSession *session, SoupMessage *req,
SoupSessionCallback callback, gpointer user_data)
{
SoupMessageQueueItem *item;
- item = soup_session_append_queue_item (session, req, callback, user_data);
+ item = soup_session_append_queue_item (session, req, TRUE, FALSE,
+ callback, user_data);
+ soup_session_kick_queue (session);
soup_message_queue_item_unref (item);
-
- do_idle_run_queue (session);
}
static guint
@@ -383,10 +89,9 @@ soup_session_async_send_message (SoupSession *session, SoupMessage *req)
GMainContext *async_context =
soup_session_get_async_context (session);
- soup_session_async_queue_message (session, req, NULL, NULL);
-
- item = soup_message_queue_lookup (soup_session_get_queue (session), req);
- g_return_val_if_fail (item != NULL, SOUP_STATUS_MALFORMED);
+ item = soup_session_append_queue_item (session, req, TRUE, FALSE,
+ NULL, NULL);
+ soup_session_kick_queue (session);
while (item->state != SOUP_MESSAGE_FINISHED)
g_main_context_iteration (async_context, TRUE);
@@ -402,7 +107,6 @@ soup_session_async_cancel_message (SoupSession *session, SoupMessage *msg,
{
SoupMessageQueue *queue;
SoupMessageQueueItem *item;
- gboolean dummy;
SOUP_SESSION_CLASS (soup_session_async_parent_class)->
cancel_message (session, msg, status_code);
@@ -424,7 +128,7 @@ soup_session_async_cancel_message (SoupSession *session, SoupMessage *msg,
item->state = SOUP_MESSAGE_FINISHING;
if (item->state != SOUP_MESSAGE_FINISHED)
- process_queue_item (item, &dummy, FALSE);
+ soup_session_process_queue_item (session, item, NULL, FALSE);
soup_message_queue_item_unref (item);
}
@@ -463,268 +167,13 @@ soup_session_async_auth_required (SoupSession *session, SoupMessage *msg,
}
static void
-soup_session_async_kick (SoupSession *session)
-{
- do_idle_run_queue (session);
-}
-
-
-static void
-send_request_return_result (SoupMessageQueueItem *item,
- gpointer stream, GError *error)
-{
- GTask *task;
-
- task = item->task;
- item->task = NULL;
-
- if (item->io_source) {
- g_source_destroy (item->io_source);
- g_clear_pointer (&item->io_source, g_source_unref);
- }
-
- if (error)
- g_task_return_error (task, error);
- else if (SOUP_STATUS_IS_TRANSPORT_ERROR (item->msg->status_code)) {
- if (stream)
- g_object_unref (stream);
- g_task_return_new_error (task, SOUP_HTTP_ERROR,
- item->msg->status_code,
- "%s",
- item->msg->reason_phrase);
- } else
- g_task_return_pointer (task, stream, g_object_unref);
- g_object_unref (task);
-}
-
-static void
-send_request_restarted (SoupSession *session, SoupMessageQueueItem *item)
-{
- /* We won't be needing this, then. */
- g_object_set_data (G_OBJECT (item->msg), "SoupSessionAsync:ostream", NULL);
- item->io_started = FALSE;
-}
-
-static void
-send_request_finished (SoupSession *session, SoupMessageQueueItem *item)
-{
- GMemoryOutputStream *mostream;
- GInputStream *istream = NULL;
- GError *error = NULL;
-
- if (!item->task) {
- /* Something else already took care of it. */
- return;
- }
-
- mostream = g_object_get_data (G_OBJECT (item->task), "SoupSessionAsync:ostream");
- if (mostream) {
- gpointer data;
- gssize size;
-
- /* We thought it would be requeued, but it wasn't, so
- * return the original body.
- */
- size = g_memory_output_stream_get_data_size (mostream);
- data = size ? g_memory_output_stream_steal_data (mostream) : g_strdup ("");
- istream = g_memory_input_stream_new_from_data (data, size, g_free);
- } else if (item->io_started) {
- /* The message finished before becoming readable. This
- * will happen, eg, if it's cancelled from got-headers.
- * Do nothing; the op will complete via read_ready_cb()
- * after we return;
- */
- return;
- } else {
- /* The message finished before even being started;
- * probably a tunnel connect failure.
- */
- istream = g_memory_input_stream_new ();
- }
-
- send_request_return_result (item, istream, error);
-}
-
-static void
-send_async_spliced (GObject *source, GAsyncResult *result, gpointer user_data)
-{
- SoupMessageQueueItem *item = user_data;
- GInputStream *istream = g_object_get_data (source, "istream");
- GError *error = NULL;
-
- /* It should be safe to call the sync close() method here since
- * the message body has already been written.
- */
- g_input_stream_close (istream, NULL, NULL);
- g_object_unref (istream);
-
- /* If the message was cancelled, it will be completed via other means */
- if (g_cancellable_is_cancelled (item->cancellable) ||
- !item->task) {
- soup_message_queue_item_unref (item);
- return;
- }
-
- if (g_output_stream_splice_finish (G_OUTPUT_STREAM (source),
- result, &error) == -1) {
- send_request_return_result (item, NULL, error);
- soup_message_queue_item_unref (item);
- return;
- }
-
- /* Otherwise either restarted or finished will eventually be called. */
- do_idle_run_queue (item->session);
- soup_message_queue_item_unref (item);
-}
-
-static void
-send_async_maybe_complete (SoupMessageQueueItem *item,
- GInputStream *stream)
-{
- if (item->msg->status_code == SOUP_STATUS_UNAUTHORIZED ||
- item->msg->status_code == SOUP_STATUS_PROXY_UNAUTHORIZED ||
- soup_session_would_redirect (item->session, item->msg)) {
- GOutputStream *ostream;
-
- /* Message may be requeued, so gather the current message body... */
- ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
- g_object_set_data_full (G_OBJECT (item->task), "SoupSessionAsync:ostream",
- ostream, g_object_unref);
-
- g_object_set_data (G_OBJECT (ostream), "istream", stream);
-
- /* Give the splice op its own ref on item */
- soup_message_queue_item_ref (item);
- g_output_stream_splice_async (ostream, stream,
- /* We can't use CLOSE_SOURCE because it
- * might get closed in the wrong thread.
- */
- G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
- G_PRIORITY_DEFAULT,
- item->cancellable,
- send_async_spliced, item);
- return;
- }
-
- send_request_return_result (item, stream, NULL);
-}
-
-static void try_run_until_read (SoupMessageQueueItem *item);
-
-static gboolean
-read_ready_cb (SoupMessage *msg, gpointer user_data)
-{
- SoupMessageQueueItem *item = user_data;
-
- g_clear_pointer (&item->io_source, g_source_unref);
- try_run_until_read (item);
- return FALSE;
-}
-
-static void
-try_run_until_read (SoupMessageQueueItem *item)
-{
- GError *error = NULL;
- GInputStream *stream = NULL;
-
- if (soup_message_io_run_until_read (item->msg, item->cancellable, &error))
- stream = soup_message_io_get_response_istream (item->msg, &error);
- if (stream) {
- send_async_maybe_complete (item, stream);
- return;
- }
-
- if (g_error_matches (error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN)) {
- item->state = SOUP_MESSAGE_RESTARTING;
- soup_message_io_finished (item->msg);
- g_error_free (error);
- return;
- }
-
- if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
- if (item->state != SOUP_MESSAGE_FINISHED) {
- gboolean dummy;
-
- if (soup_message_io_in_progress (item->msg))
- soup_message_io_finished (item->msg);
- item->state = SOUP_MESSAGE_FINISHING;
- process_queue_item (item, &dummy, FALSE);
- }
- send_request_return_result (item, NULL, error);
- return;
- }
-
- g_clear_error (&error);
- item->io_source = soup_message_io_get_source (item->msg, item->cancellable,
- read_ready_cb, item);
- g_source_attach (item->io_source, soup_session_get_async_context (item->session));
-}
-
-static void
-send_request_running (SoupSession *session, SoupMessageQueueItem *item)
-{
- item->io_started = TRUE;
- try_run_until_read (item);
-}
-
-void
-soup_session_send_request_async (SoupSession *session,
- SoupMessage *msg,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data)
-{
- SoupMessageQueueItem *item;
- gboolean use_thread_context;
-
- g_return_if_fail (SOUP_IS_SESSION_ASYNC (session));
-
- g_object_get (G_OBJECT (session),
- SOUP_SESSION_USE_THREAD_CONTEXT, &use_thread_context,
- NULL);
- g_return_if_fail (use_thread_context);
-
- soup_session_async_queue_message (session, msg, NULL, NULL);
-
- item = soup_message_queue_lookup (soup_session_get_queue (session), msg);
- g_return_if_fail (item != NULL);
-
- item->new_api = TRUE;
- item->task = g_task_new (session, cancellable, callback, user_data);
- g_task_set_task_data (item->task, item, (GDestroyNotify) soup_message_queue_item_unref);
-
- if (cancellable) {
- g_object_unref (item->cancellable);
- item->cancellable = g_object_ref (cancellable);
- }
-}
-
-GInputStream *
-soup_session_send_request_finish (SoupSession *session,
- GAsyncResult *result,
- GError **error)
-{
- g_return_val_if_fail (SOUP_IS_SESSION_ASYNC (session), NULL);
- g_return_val_if_fail (g_task_is_valid (result, session), NULL);
-
- return g_task_propagate_pointer (G_TASK (result), error);
-}
-
-static void
soup_session_async_class_init (SoupSessionAsyncClass *soup_session_async_class)
{
SoupSessionClass *session_class = SOUP_SESSION_CLASS (soup_session_async_class);
- GObjectClass *object_class = G_OBJECT_CLASS (session_class);
-
- g_type_class_add_private (soup_session_async_class,
- sizeof (SoupSessionAsyncPrivate));
/* virtual method override */
session_class->queue_message = soup_session_async_queue_message;
session_class->send_message = soup_session_async_send_message;
session_class->cancel_message = soup_session_async_cancel_message;
session_class->auth_required = soup_session_async_auth_required;
- session_class->kick = soup_session_async_kick;
-
- object_class->dispose = soup_session_async_dispose;
}
diff --git a/libsoup/soup-session-private.h b/libsoup/soup-session-private.h
index 297faf57..dc4d300b 100644
--- a/libsoup/soup-session-private.h
+++ b/libsoup/soup-session-private.h
@@ -17,26 +17,12 @@ SoupMessageQueue *soup_session_get_queue (SoupSession *s
SoupMessageQueueItem *soup_session_append_queue_item (SoupSession *session,
SoupMessage *msg,
+ gboolean async,
+ gboolean new_api,
SoupSessionCallback callback,
gpointer user_data);
-SoupMessageQueueItem *soup_session_make_connect_message (SoupSession *session,
- SoupConnection *conn);
-gboolean soup_session_get_connection (SoupSession *session,
- SoupMessageQueueItem *item,
- gboolean *try_pruning);
-gboolean soup_session_cleanup_connections (SoupSession *session,
- gboolean prune_idle);
-void soup_session_send_queue_item (SoupSession *session,
- SoupMessageQueueItem *item,
- SoupMessageCompletionFn completion_cb);
-void soup_session_unqueue_item (SoupSession *session,
- SoupMessageQueueItem *item);
-void soup_session_set_item_connection (SoupSession *session,
- SoupMessageQueueItem *item,
- SoupConnection *conn);
-void soup_session_set_item_status (SoupSession *session,
- SoupMessageQueueItem *item,
- guint status_code);
+
+void soup_session_kick_queue (SoupSession *session);
GInputStream *soup_session_send_request (SoupSession *session,
SoupMessage *msg,
@@ -52,6 +38,11 @@ GInputStream *soup_session_send_request_finish (SoupSession *s
GAsyncResult *result,
GError **error);
+void soup_session_process_queue_item (SoupSession *session,
+ SoupMessageQueueItem *item,
+ gboolean *should_prune,
+ gboolean loop);
+
G_END_DECLS
#endif /* SOUP_SESSION_PRIVATE_H */
diff --git a/libsoup/soup-session-sync.c b/libsoup/soup-session-sync.c
index 43d0a491..cbd24606 100644
--- a/libsoup/soup-session-sync.c
+++ b/libsoup/soup-session-sync.c
@@ -42,35 +42,13 @@
* handler callbacks, until I/O is complete.
**/
-typedef struct {
- GMutex lock;
- GCond cond;
-} SoupSessionSyncPrivate;
-#define SOUP_SESSION_SYNC_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_SESSION_SYNC, SoupSessionSyncPrivate))
-
G_DEFINE_TYPE (SoupSessionSync, soup_session_sync, SOUP_TYPE_SESSION)
static void
soup_session_sync_init (SoupSessionSync *ss)
{
- SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (ss);
-
- g_mutex_init (&priv->lock);
- g_cond_init (&priv->cond);
-}
-
-static void
-soup_session_sync_finalize (GObject *object)
-{
- SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (object);
-
- g_mutex_clear (&priv->lock);
- g_cond_clear (&priv->cond);
-
- G_OBJECT_CLASS (soup_session_sync_parent_class)->finalize (object);
}
-
/**
* soup_session_sync_new:
*
@@ -107,181 +85,6 @@ soup_session_sync_new_with_options (const char *optname1, ...)
return session;
}
-static guint
-tunnel_connect (SoupSession *session, SoupMessageQueueItem *related)
-{
- SoupConnection *conn = related->conn;
- SoupMessageQueueItem *item;
- guint status;
-
- g_object_ref (conn);
-
- item = soup_session_make_connect_message (session, conn);
- do {
- soup_session_send_queue_item (session, item, NULL);
- status = item->msg->status_code;
- if (item->state == SOUP_MESSAGE_RESTARTING &&
- soup_message_io_in_progress (item->msg)) {
- soup_message_restarted (item->msg);
- item->state = SOUP_MESSAGE_RUNNING;
- } else {
- if (item->state == SOUP_MESSAGE_RESTARTING)
- status = SOUP_STATUS_TRY_AGAIN;
- item->state = SOUP_MESSAGE_FINISHED;
- soup_message_finished (item->msg);
- }
- } while (item->state == SOUP_MESSAGE_STARTING);
- soup_session_unqueue_item (session, item);
- soup_message_queue_item_unref (item);
-
- if (SOUP_STATUS_IS_SUCCESSFUL (status)) {
- if (!soup_connection_start_ssl_sync (conn, related->cancellable))
- status = SOUP_STATUS_SSL_FAILED;
- soup_message_set_https_status (related->msg, conn);
- }
-
- g_object_unref (conn);
- return status;
-}
-
-static void
-get_connection (SoupMessageQueueItem *item)
-{
- SoupSession *session = item->session;
- SoupMessage *msg = item->msg;
- gboolean try_pruning = FALSE;
- guint status;
-
-try_again:
- soup_session_cleanup_connections (session, FALSE);
-
- if (!soup_session_get_connection (session, item, &try_pruning)) {
- if (!try_pruning)
- return;
- soup_session_cleanup_connections (session, TRUE);
- if (!soup_session_get_connection (session, item, &try_pruning))
- return;
- try_pruning = FALSE;
- }
-
- if (soup_connection_get_state (item->conn) == SOUP_CONNECTION_IDLE) {
- item->state = SOUP_MESSAGE_READY;
- return;
- }
-
- if (soup_connection_get_state (item->conn) == SOUP_CONNECTION_NEW) {
- status = soup_connection_connect_sync (item->conn, item->cancellable);
- if (status == SOUP_STATUS_TRY_AGAIN) {
- soup_session_set_item_connection (session, item, NULL);
- goto try_again;
- }
-
- soup_message_set_https_status (msg, item->conn);
-
- if (!SOUP_STATUS_IS_SUCCESSFUL (status)) {
- if (!msg->status_code)
- soup_session_set_item_status (session, item, status);
- item->state = SOUP_MESSAGE_FINISHING;
- soup_session_set_item_connection (session, item, NULL);
- return;
- }
- }
-
- if (soup_connection_is_tunnelled (item->conn)) {
- status = tunnel_connect (session, item);
- if (!SOUP_STATUS_IS_SUCCESSFUL (status)) {
- soup_session_set_item_connection (session, item, NULL);
- if (status == SOUP_STATUS_TRY_AGAIN)
- goto try_again;
- soup_session_set_item_status (session, item, status);
- item->state = SOUP_MESSAGE_FINISHING;
- return;
- }
- }
-
- item->state = SOUP_MESSAGE_READY;
-}
-
-static void process_queue_item (SoupMessageQueueItem *item);
-
-static void
-new_api_message_completed (SoupMessage *msg, gpointer user_data)
-{
- SoupMessageQueueItem *item = user_data;
-
- if (item->state != SOUP_MESSAGE_RESTARTING) {
- item->state = SOUP_MESSAGE_FINISHING;
- process_queue_item (item);
- }
-}
-
-static void
-process_queue_item (SoupMessageQueueItem *item)
-{
- SoupSession *session = item->session;
- SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (session);
-
- soup_message_queue_item_ref (item);
-
- do {
- if (item->paused) {
- g_mutex_lock (&priv->lock);
- while (item->paused)
- g_cond_wait (&priv->cond, &priv->lock);
- g_mutex_unlock (&priv->lock);
- }
-
- switch (item->state) {
- case SOUP_MESSAGE_STARTING:
- g_mutex_lock (&priv->lock);
- do {
- get_connection (item);
- if (item->state == SOUP_MESSAGE_STARTING)
- g_cond_wait (&priv->cond, &priv->lock);
- } while (item->state == SOUP_MESSAGE_STARTING);
- g_mutex_unlock (&priv->lock);
- break;
-
- case SOUP_MESSAGE_READY:
- item->state = SOUP_MESSAGE_RUNNING;
-
- if (item->new_api) {
- soup_session_send_queue_item (item->session, item, new_api_message_completed);
- goto out;
- }
-
- soup_session_send_queue_item (item->session, item, NULL);
- if (item->state != SOUP_MESSAGE_RESTARTING)
- item->state = SOUP_MESSAGE_FINISHING;
- break;
-
- case SOUP_MESSAGE_RUNNING:
- g_warn_if_fail (item->new_api);
- item->state = SOUP_MESSAGE_FINISHING;
- break;
-
- case SOUP_MESSAGE_RESTARTING:
- item->state = SOUP_MESSAGE_STARTING;
- soup_message_restarted (item->msg);
- break;
-
- case SOUP_MESSAGE_FINISHING:
- item->state = SOUP_MESSAGE_FINISHED;
- soup_message_finished (item->msg);
- soup_session_unqueue_item (session, item);
- break;
-
- default:
- g_warn_if_reached ();
- item->state = SOUP_MESSAGE_FINISHING;
- break;
- }
- } while (item->state != SOUP_MESSAGE_FINISHED);
-
- out:
- soup_message_queue_item_unref (item);
-}
-
static gboolean
queue_message_callback (gpointer data)
{
@@ -297,7 +100,7 @@ queue_message_thread (gpointer data)
{
SoupMessageQueueItem *item = data;
- process_queue_item (item);
+ soup_session_process_queue_item (item->session, item, NULL, TRUE);
if (item->callback) {
soup_add_completion (soup_session_get_async_context (item->session),
queue_message_callback, item);
@@ -314,7 +117,8 @@ soup_session_sync_queue_message (SoupSession *session, SoupMessage *msg,
SoupMessageQueueItem *item;
GThread *thread;
- item = soup_session_append_queue_item (session, msg, callback, user_data);
+ item = soup_session_append_queue_item (session, msg, FALSE, FALSE,
+ callback, user_data);
thread = g_thread_new ("SoupSessionSync:queue_message",
queue_message_thread, item);
g_thread_unref (thread);
@@ -326,25 +130,15 @@ soup_session_sync_send_message (SoupSession *session, SoupMessage *msg)
SoupMessageQueueItem *item;
guint status;
- item = soup_session_append_queue_item (session, msg, NULL, NULL);
- process_queue_item (item);
+ item = soup_session_append_queue_item (session, msg, FALSE, FALSE,
+ NULL, NULL);
+ soup_session_process_queue_item (session, item, NULL, TRUE);
status = msg->status_code;
soup_message_queue_item_unref (item);
return status;
}
static void
-soup_session_sync_cancel_message (SoupSession *session, SoupMessage *msg, guint status_code)
-{
- SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (session);
-
- g_mutex_lock (&priv->lock);
- SOUP_SESSION_CLASS (soup_session_sync_parent_class)->cancel_message (session, msg, status_code);
- g_cond_broadcast (&priv->cond);
- g_mutex_unlock (&priv->lock);
-}
-
-static void
soup_session_sync_auth_required (SoupSession *session, SoupMessage *msg,
SoupAuth *auth, gboolean retrying)
{
@@ -363,182 +157,12 @@ soup_session_sync_auth_required (SoupSession *session, SoupMessage *msg,
}
static void
-soup_session_sync_flush_queue (SoupSession *session)
-{
- SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (session);
- SoupMessageQueue *queue;
- SoupMessageQueueItem *item;
- GHashTable *current;
- gboolean done = FALSE;
-
- /* Record the current contents of the queue */
- current = g_hash_table_new (NULL, NULL);
- queue = soup_session_get_queue (session);
- for (item = soup_message_queue_first (queue);
- item;
- item = soup_message_queue_next (queue, item))
- g_hash_table_insert (current, item, item);
-
- /* Cancel everything */
- SOUP_SESSION_CLASS (soup_session_sync_parent_class)->flush_queue (session);
-
- /* Wait until all of the items in @current have been removed
- * from the queue. (This is not the same as "wait for the
- * queue to be empty", because the app may queue new requests
- * in response to the cancellation of the old ones. We don't
- * try to cancel those requests as well, since we'd likely
- * just end up looping forever.)
- */
- g_mutex_lock (&priv->lock);
- do {
- done = TRUE;
- for (item = soup_message_queue_first (queue);
- item;
- item = soup_message_queue_next (queue, item)) {
- if (g_hash_table_lookup (current, item))
- done = FALSE;
- }
-
- if (!done)
- g_cond_wait (&priv->cond, &priv->lock);
- } while (!done);
- g_mutex_unlock (&priv->lock);
-
- g_hash_table_destroy (current);
-}
-
-static void
-soup_session_sync_kick (SoupSession *session)
-{
- SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (session);
-
- g_cond_broadcast (&priv->cond);
-}
-
-static void
soup_session_sync_class_init (SoupSessionSyncClass *session_sync_class)
{
- GObjectClass *object_class = G_OBJECT_CLASS (session_sync_class);
SoupSessionClass *session_class = SOUP_SESSION_CLASS (session_sync_class);
- g_type_class_add_private (session_sync_class, sizeof (SoupSessionSyncPrivate));
-
/* virtual method override */
session_class->queue_message = soup_session_sync_queue_message;
session_class->send_message = soup_session_sync_send_message;
- session_class->cancel_message = soup_session_sync_cancel_message;
session_class->auth_required = soup_session_sync_auth_required;
- session_class->flush_queue = soup_session_sync_flush_queue;
- session_class->kick = soup_session_sync_kick;
-
- object_class->finalize = soup_session_sync_finalize;
-}
-
-
-GInputStream *
-soup_session_send_request (SoupSession *session,
- SoupMessage *msg,
- GCancellable *cancellable,
- GError **error)
-{
- SoupMessageQueueItem *item;
- GInputStream *stream = NULL;
- GOutputStream *ostream;
- GMemoryOutputStream *mostream;
- gssize size;
- GError *my_error = NULL;
-
- g_return_val_if_fail (SOUP_IS_SESSION_SYNC (session), NULL);
-
- item = soup_session_append_queue_item (session, msg, NULL, NULL);
-
- item->new_api = TRUE;
- if (cancellable) {
- g_object_unref (item->cancellable);
- item->cancellable = g_object_ref (cancellable);
- }
-
- while (!stream) {
- /* Get a connection, etc */
- process_queue_item (item);
- if (item->state != SOUP_MESSAGE_RUNNING)
- break;
-
- /* Send request, read headers */
- if (!soup_message_io_run_until_read (msg, item->cancellable, &my_error)) {
- if (g_error_matches (my_error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN)) {
- item->state = SOUP_MESSAGE_RESTARTING;
- soup_message_io_finished (item->msg);
- g_clear_error (&my_error);
- continue;
- } else
- break;
- }
-
- stream = soup_message_io_get_response_istream (msg, &my_error);
- if (!stream)
- break;
-
- /* Break if the message doesn't look likely-to-be-requeued */
- if (msg->status_code != SOUP_STATUS_UNAUTHORIZED &&
- msg->status_code != SOUP_STATUS_PROXY_UNAUTHORIZED &&
- !soup_session_would_redirect (session, msg))
- break;
-
- /* Gather the current message body... */
- ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
- if (g_output_stream_splice (ostream, stream,
- G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
- G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
- item->cancellable, &my_error) == -1) {
- g_object_unref (stream);
- g_object_unref (ostream);
- stream = NULL;
- break;
- }
- g_object_unref (stream);
- stream = NULL;
-
- /* If the message was requeued, loop */
- if (item->state == SOUP_MESSAGE_RESTARTING) {
- g_object_unref (ostream);
- continue;
- }
-
- /* Not requeued, so return the original body */
- mostream = G_MEMORY_OUTPUT_STREAM (ostream);
- size = g_memory_output_stream_get_data_size (mostream);
- stream = g_memory_input_stream_new ();
- if (size) {
- g_memory_input_stream_add_data (G_MEMORY_INPUT_STREAM (stream),
- g_memory_output_stream_steal_data (mostream),
- size, g_free);
- }
- g_object_unref (ostream);
- }
-
- if (my_error)
- g_propagate_error (error, my_error);
- else if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) {
- if (stream) {
- g_object_unref (stream);
- stream = NULL;
- }
- g_set_error_literal (error, SOUP_HTTP_ERROR, msg->status_code,
- msg->reason_phrase);
- } else if (!stream)
- stream = g_memory_input_stream_new ();
-
- if (!stream) {
- if (soup_message_io_in_progress (msg))
- soup_message_io_finished (msg);
- else if (item->state != SOUP_MESSAGE_FINISHED)
- item->state = SOUP_MESSAGE_FINISHING;
-
- if (item->state != SOUP_MESSAGE_FINISHED)
- process_queue_item (item);
- }
-
- soup_message_queue_item_unref (item);
- return stream;
}
diff --git a/libsoup/soup-session.c b/libsoup/soup-session.c
index 3eecd06b..f5d93d4d 100644
--- a/libsoup/soup-session.c
+++ b/libsoup/soup-session.c
@@ -77,6 +77,8 @@ static guint soup_host_uri_hash (gconstpointer key);
static gboolean soup_host_uri_equal (gconstpointer v1, gconstpointer v2);
typedef struct {
+ SoupSession *session;
+
GTlsDatabase *tlsdb;
char *ssl_ca_file;
gboolean ssl_strict;
@@ -100,12 +102,15 @@ typedef struct {
* SoupSessionHost, adding/removing a connection,
* disconnecting a connection, or moving a connection from
* IDLE to IN_USE. Must not emit signals or destroy objects
- * while holding it.
+ * while holding it. conn_cond is signaled when it may be
+ * possible for a previously-blocked message to continue.
*/
GMutex conn_lock;
+ GCond conn_cond;
GMainContext *async_context;
gboolean use_thread_context;
+ GSList *run_queue_sources;
GResolver *resolver;
@@ -126,6 +131,10 @@ static void auth_manager_authenticate (SoupAuthManager *manager,
SoupMessage *msg, SoupAuth *auth,
gboolean retrying, gpointer user_data);
+static void async_run_queue (SoupSession *session);
+
+static void async_send_request_running (SoupSession *session, SoupMessageQueueItem *item);
+
#define SOUP_SESSION_MAX_CONNS_DEFAULT 10
#define SOUP_SESSION_MAX_CONNS_PER_HOST_DEFAULT 2
@@ -133,9 +142,9 @@ static void auth_manager_authenticate (SoupAuthManager *manager,
#define SOUP_SESSION_USER_AGENT_BASE "libsoup/" PACKAGE_VERSION
-G_DEFINE_ABSTRACT_TYPE_WITH_CODE (SoupSession, soup_session, G_TYPE_OBJECT,
- soup_init ();
- )
+G_DEFINE_TYPE_WITH_CODE (SoupSession, soup_session, G_TYPE_OBJECT,
+ soup_init ();
+ )
enum {
REQUEST_QUEUED,
@@ -182,9 +191,12 @@ soup_session_init (SoupSession *session)
SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
SoupAuthManager *auth_manager;
+ priv->session = session;
+
priv->queue = soup_message_queue_new (session);
g_mutex_init (&priv->conn_lock);
+ g_cond_init (&priv->conn_cond);
priv->http_hosts = g_hash_table_new_full (soup_host_uri_hash,
soup_host_uri_equal,
NULL, (GDestroyNotify)free_host);
@@ -225,6 +237,15 @@ soup_session_dispose (GObject *object)
{
SoupSession *session = SOUP_SESSION (object);
SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+ GSList *iter;
+
+ priv->disposed = TRUE;
+
+ for (iter = priv->run_queue_sources; iter; iter = iter->next) {
+ g_source_destroy (iter->data);
+ g_source_unref (iter->data);
+ }
+ g_clear_pointer (&priv->run_queue_sources, g_slist_free);
priv->disposed = TRUE;
soup_session_abort (session);
@@ -245,6 +266,7 @@ soup_session_finalize (GObject *object)
soup_message_queue_destroy (priv->queue);
g_mutex_clear (&priv->conn_lock);
+ g_cond_clear (&priv->conn_cond);
g_hash_table_destroy (priv->http_hosts);
g_hash_table_destroy (priv->https_hosts);
g_hash_table_destroy (priv->conns);
@@ -826,10 +848,7 @@ get_host_for_uri (SoupSession *session, SoupURI *uri)
return host;
}
-/* Note: get_host_for_message doesn't lock the conn_lock. The caller
- * must do it itself if there's a chance the host doesn't already
- * exist.
- */
+/* Requires conn_lock to be locked */
static SoupSessionHost *
get_host_for_message (SoupSession *session, SoupMessage *msg)
{
@@ -1031,6 +1050,36 @@ redirect_handler (SoupMessage *msg, gpointer user_data)
}
static void
+proxy_connection_event (SoupConnection *conn,
+ GSocketClientEvent event,
+ GIOStream *connection,
+ gpointer user_data)
+{
+ SoupMessageQueueItem *item = user_data;
+
+ soup_message_network_event (item->msg, event, connection);
+}
+
+static void
+soup_session_set_item_connection (SoupSession *session,
+ SoupMessageQueueItem *item,
+ SoupConnection *conn)
+{
+ if (item->conn) {
+ g_signal_handlers_disconnect_by_func (item->conn, proxy_connection_event, item);
+ g_object_unref (item->conn);
+ }
+
+ item->conn = conn;
+
+ if (item->conn) {
+ g_object_ref (item->conn);
+ g_signal_connect (item->conn, "event",
+ G_CALLBACK (proxy_connection_event), item);
+ }
+}
+
+static void
message_restarted (SoupMessage *msg, gpointer user_data)
{
SoupMessageQueueItem *item = user_data;
@@ -1048,6 +1097,7 @@ message_restarted (SoupMessage *msg, gpointer user_data)
SoupMessageQueueItem *
soup_session_append_queue_item (SoupSession *session, SoupMessage *msg,
+ gboolean async, gboolean new_api,
SoupSessionCallback callback, gpointer user_data)
{
SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
@@ -1057,6 +1107,8 @@ soup_session_append_queue_item (SoupSession *session, SoupMessage *msg,
soup_message_cleanup_response (msg);
item = soup_message_queue_append (priv->queue, msg, callback, user_data);
+ item->async = async;
+ item->new_api = new_api;
g_mutex_lock (&priv->conn_lock);
host = get_host_for_message (session, item->msg);
@@ -1077,7 +1129,7 @@ soup_session_append_queue_item (SoupSession *session, SoupMessage *msg,
return item;
}
-void
+static void
soup_session_send_queue_item (SoupSession *session,
SoupMessageQueueItem *item,
SoupMessageCompletionFn completion_cb)
@@ -1116,9 +1168,9 @@ soup_session_send_queue_item (SoupSession *session,
soup_connection_send_request (item->conn, item, completion_cb, item);
}
-gboolean
+static gboolean
soup_session_cleanup_connections (SoupSession *session,
- gboolean prune_idle)
+ gboolean cleanup_idle)
{
SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
GSList *conns = NULL, *c;
@@ -1131,7 +1183,7 @@ soup_session_cleanup_connections (SoupSession *session,
while (g_hash_table_iter_next (&iter, &conn, &host)) {
state = soup_connection_get_state (conn);
if (state == SOUP_CONNECTION_REMOTE_DISCONNECTED ||
- (prune_idle && state == SOUP_CONNECTION_IDLE)) {
+ (cleanup_idle && state == SOUP_CONNECTION_IDLE)) {
conns = g_slist_prepend (conns, g_object_ref (conn));
g_hash_table_iter_remove (&iter);
drop_connection (session, host, conn);
@@ -1233,7 +1285,7 @@ connection_disconnected (SoupConnection *conn, gpointer user_data)
g_mutex_unlock (&priv->conn_lock);
- SOUP_SESSION_GET_CLASS (session)->kick (session);
+ soup_session_kick_queue (session);
}
static void
@@ -1243,88 +1295,281 @@ connection_state_changed (GObject *object, GParamSpec *param, gpointer user_data
SoupConnection *conn = SOUP_CONNECTION (object);
if (soup_connection_get_state (conn) == SOUP_CONNECTION_IDLE)
- SOUP_SESSION_GET_CLASS (session)->kick (session);
+ soup_session_kick_queue (session);
}
-SoupMessageQueueItem *
-soup_session_make_connect_message (SoupSession *session,
- SoupConnection *conn)
+SoupMessageQueue *
+soup_session_get_queue (SoupSession *session)
+{
+ SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+
+ return priv->queue;
+}
+
+static void
+soup_session_unqueue_item (SoupSession *session,
+ SoupMessageQueueItem *item)
+{
+ SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+ SoupSessionHost *host;
+
+ if (item->conn) {
+ if (item->msg->method != SOUP_METHOD_CONNECT ||
+ !SOUP_STATUS_IS_SUCCESSFUL (item->msg->status_code))
+ soup_connection_set_state (item->conn, SOUP_CONNECTION_IDLE);
+ soup_session_set_item_connection (session, item, NULL);
+ }
+
+ if (item->state != SOUP_MESSAGE_FINISHED) {
+ g_warning ("finished an item with state %d", item->state);
+ return;
+ }
+
+ soup_message_queue_remove (priv->queue, item);
+
+ g_mutex_lock (&priv->conn_lock);
+ host = get_host_for_message (session, item->msg);
+ host->num_messages--;
+ g_mutex_unlock (&priv->conn_lock);
+
+ /* g_signal_handlers_disconnect_by_func doesn't work if you
+ * have a metamarshal, meaning it doesn't work with
+ * soup_message_add_header_handler()
+ */
+ g_signal_handlers_disconnect_matched (item->msg, G_SIGNAL_MATCH_DATA,
+ 0, 0, NULL, NULL, item);
+ g_signal_emit (session, signals[REQUEST_UNQUEUED], 0, item->msg);
+ soup_message_queue_item_unref (item);
+}
+
+static void
+soup_session_set_item_status (SoupSession *session,
+ SoupMessageQueueItem *item,
+ guint status_code)
{
SoupURI *uri;
+ char *msg;
+
+ switch (status_code) {
+ case SOUP_STATUS_CANT_RESOLVE:
+ case SOUP_STATUS_CANT_CONNECT:
+ uri = soup_message_get_uri (item->msg);
+ msg = g_strdup_printf ("%s (%s)",
+ soup_status_get_phrase (status_code),
+ uri->host);
+ soup_message_set_status_full (item->msg, status_code, msg);
+ g_free (msg);
+ break;
+
+ case SOUP_STATUS_CANT_RESOLVE_PROXY:
+ case SOUP_STATUS_CANT_CONNECT_PROXY:
+ if (item->proxy_uri && item->proxy_uri->host) {
+ msg = g_strdup_printf ("%s (%s)",
+ soup_status_get_phrase (status_code),
+ item->proxy_uri->host);
+ soup_message_set_status_full (item->msg, status_code, msg);
+ g_free (msg);
+ break;
+ }
+ soup_message_set_status (item->msg, status_code);
+ break;
+
+ case SOUP_STATUS_SSL_FAILED:
+ if (!g_tls_backend_supports_tls (g_tls_backend_get_default ())) {
+ soup_message_set_status_full (item->msg, status_code,
+ "TLS/SSL support not available; install glib-networking");
+ } else
+ soup_message_set_status (item->msg, status_code);
+ break;
+
+ default:
+ soup_message_set_status (item->msg, status_code);
+ break;
+ }
+}
+
+
+static void
+message_completed (SoupMessage *msg, gpointer user_data)
+{
+ SoupMessageQueueItem *item = user_data;
+
+ if (item->async)
+ soup_session_kick_queue (item->session);
+
+ if (item->state != SOUP_MESSAGE_RESTARTING) {
+ item->state = SOUP_MESSAGE_FINISHING;
+
+ if (item->new_api && !item->async)
+ soup_session_process_queue_item (item->session, item, NULL, TRUE);
+ }
+}
+
+static void
+tunnel_complete (SoupConnection *conn, guint status, gpointer user_data)
+{
+ SoupMessageQueueItem *tunnel_item = user_data;
+ SoupMessageQueueItem *item = tunnel_item->related;
+ SoupSession *session = tunnel_item->session;
+
+ soup_message_finished (tunnel_item->msg);
+ soup_message_queue_item_unref (tunnel_item);
+
+ if (item->msg->status_code)
+ item->state = SOUP_MESSAGE_FINISHING;
+ else
+ soup_message_set_https_status (item->msg, item->conn);
+
+ if (!SOUP_STATUS_IS_SUCCESSFUL (status)) {
+ soup_session_set_item_connection (session, item, NULL);
+ soup_session_set_item_status (session, item, status);
+ }
+
+ item->state = SOUP_MESSAGE_READY;
+ if (item->async)
+ soup_session_kick_queue (session);
+ soup_message_queue_item_unref (item);
+}
+
+static void
+tunnel_message_completed (SoupMessage *msg, gpointer user_data)
+{
+ SoupMessageQueueItem *tunnel_item = user_data;
+ SoupMessageQueueItem *item = tunnel_item->related;
+ SoupSession *session = tunnel_item->session;
+ guint status;
+
+ if (tunnel_item->state == SOUP_MESSAGE_RESTARTING) {
+ soup_message_restarted (msg);
+ if (tunnel_item->conn) {
+ tunnel_item->state = SOUP_MESSAGE_RUNNING;
+ soup_session_send_queue_item (session, tunnel_item,
+ tunnel_message_completed);
+ return;
+ }
+
+ soup_message_set_status (msg, SOUP_STATUS_TRY_AGAIN);
+ }
+
+ tunnel_item->state = SOUP_MESSAGE_FINISHED;
+ soup_session_unqueue_item (session, tunnel_item);
+
+ status = tunnel_item->msg->status_code;
+ if (!SOUP_STATUS_IS_SUCCESSFUL (status)) {
+ tunnel_complete (item->conn, status, tunnel_item);
+ return;
+ }
+
+ if (tunnel_item->async) {
+ soup_connection_start_ssl_async (item->conn, item->cancellable,
+ tunnel_complete, tunnel_item);
+ } else {
+ status = soup_connection_start_ssl_sync (item->conn, item->cancellable);
+ tunnel_complete (item->conn, status, tunnel_item);
+ }
+}
+
+static void
+tunnel_connect (SoupMessageQueueItem *item)
+{
+ SoupSession *session = item->session;
+ SoupMessageQueueItem *tunnel_item;
+ SoupURI *uri;
SoupMessage *msg;
- SoupMessageQueueItem *item;
- uri = soup_connection_get_remote_uri (conn);
+ item->state = SOUP_MESSAGE_TUNNELING;
+
+ uri = soup_connection_get_remote_uri (item->conn);
msg = soup_message_new_from_uri (SOUP_METHOD_CONNECT, uri);
soup_message_set_flags (msg, SOUP_MESSAGE_NO_REDIRECT);
- item = soup_session_append_queue_item (session, msg, NULL, NULL);
- soup_session_set_item_connection (session, item, conn);
+ tunnel_item = soup_session_append_queue_item (session, msg,
+ item->async, FALSE,
+ NULL, NULL);
g_object_unref (msg);
- item->state = SOUP_MESSAGE_RUNNING;
+ tunnel_item->related = item;
+ soup_message_queue_item_ref (item);
+ soup_session_set_item_connection (session, tunnel_item, item->conn);
+ tunnel_item->state = SOUP_MESSAGE_RUNNING;
- g_signal_emit (session, signals[TUNNELING], 0, conn);
- return item;
+ g_signal_emit (session, signals[TUNNELING], 0, tunnel_item->conn);
+
+ soup_session_send_queue_item (session, tunnel_item,
+ tunnel_message_completed);
}
-gboolean
-soup_session_get_connection (SoupSession *session,
- SoupMessageQueueItem *item,
- gboolean *try_pruning)
+static void
+got_connection (SoupConnection *conn, guint status, gpointer user_data)
+{
+ SoupMessageQueueItem *item = user_data;
+ SoupSession *session = item->session;
+
+ if (status != SOUP_STATUS_OK) {
+ if (item->state == SOUP_MESSAGE_CONNECTING) {
+ soup_session_set_item_status (session, item, status);
+ soup_session_set_item_connection (session, item, NULL);
+ item->state = SOUP_MESSAGE_READY;
+ }
+ } else
+ item->state = SOUP_MESSAGE_CONNECTED;
+
+ if (item->async) {
+ if (item->state == SOUP_MESSAGE_CONNECTED ||
+ item->state == SOUP_MESSAGE_READY)
+ async_run_queue (item->session);
+ else
+ soup_session_kick_queue (item->session);
+
+ soup_message_queue_item_unref (item);
+ }
+}
+
+/* requires conn_lock */
+static SoupConnection *
+get_connection_for_host (SoupSession *session,
+ SoupMessageQueueItem *item,
+ SoupSessionHost *host,
+ gboolean need_new_connection,
+ gboolean *try_cleanup)
{
SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
SoupConnection *conn;
- SoupSessionHost *host;
GSList *conns;
int num_pending = 0;
- gboolean need_new_connection;
if (priv->disposed)
return FALSE;
if (item->conn) {
g_return_val_if_fail (soup_connection_get_state (item->conn) != SOUP_CONNECTION_DISCONNECTED, FALSE);
- return TRUE;
+ return item->conn;
}
- need_new_connection =
- (soup_message_get_flags (item->msg) & SOUP_MESSAGE_NEW_CONNECTION) ||
- (!(soup_message_get_flags (item->msg) & SOUP_MESSAGE_IDEMPOTENT) &&
- !SOUP_METHOD_IS_IDEMPOTENT (item->msg->method));
-
- g_mutex_lock (&priv->conn_lock);
-
- host = get_host_for_message (session, item->msg);
for (conns = host->connections; conns; conns = conns->next) {
- if (!need_new_connection && soup_connection_get_state (conns->data) == SOUP_CONNECTION_IDLE) {
- soup_connection_set_state (conns->data, SOUP_CONNECTION_IN_USE);
- g_mutex_unlock (&priv->conn_lock);
- soup_session_set_item_connection (session, item, conns->data);
- soup_message_set_https_status (item->msg, item->conn);
- return TRUE;
- } else if (soup_connection_get_state (conns->data) == SOUP_CONNECTION_CONNECTING)
+ conn = conns->data;
+
+ if (!need_new_connection && soup_connection_get_state (conn) == SOUP_CONNECTION_IDLE) {
+ soup_connection_set_state (conn, SOUP_CONNECTION_IN_USE);
+ return conn;
+ } else if (soup_connection_get_state (conn) == SOUP_CONNECTION_CONNECTING)
num_pending++;
}
/* Limit the number of pending connections; num_messages / 2
* is somewhat arbitrary...
*/
- if (num_pending > host->num_messages / 2) {
- g_mutex_unlock (&priv->conn_lock);
- return FALSE;
- }
+ if (num_pending > host->num_messages / 2)
+ return NULL;
if (host->num_conns >= priv->max_conns_per_host) {
if (need_new_connection)
- *try_pruning = TRUE;
- g_mutex_unlock (&priv->conn_lock);
- return FALSE;
+ *try_cleanup = TRUE;
+ return NULL;
}
if (priv->num_conns >= priv->max_conns) {
- *try_pruning = TRUE;
- g_mutex_unlock (&priv->conn_lock);
- return FALSE;
+ *try_cleanup = TRUE;
+ return NULL;
}
conn = g_object_new (
@@ -1347,6 +1592,10 @@ soup_session_get_connection (SoupSession *session,
G_CALLBACK (connection_state_changed),
session);
+ /* This is a debugging-related signal, and so can ignore the
+ * usual rule about not emitting signals while holding
+ * conn_lock.
+ */
g_signal_emit (session, signals[CONNECTION_CREATED], 0, conn);
g_hash_table_insert (priv->conns, conn, host);
@@ -1361,129 +1610,220 @@ soup_session_get_connection (SoupSession *session,
host->keep_alive_src = NULL;
}
- g_mutex_unlock (&priv->conn_lock);
- soup_session_set_item_connection (session, item, conn);
- return TRUE;
+ return conn;
}
-SoupMessageQueue *
-soup_session_get_queue (SoupSession *session)
+static gboolean
+get_connection (SoupMessageQueueItem *item, gboolean *should_cleanup)
{
+ SoupSession *session = item->session;
SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+ SoupSessionHost *host;
+ SoupConnection *conn = NULL;
+ gboolean my_should_cleanup = FALSE;
+ gboolean need_new_connection;
- return priv->queue;
-}
+ need_new_connection =
+ (soup_message_get_flags (item->msg) & SOUP_MESSAGE_NEW_CONNECTION) ||
+ (!(soup_message_get_flags (item->msg) & SOUP_MESSAGE_IDEMPOTENT) &&
+ !SOUP_METHOD_IS_IDEMPOTENT (item->msg->method));
-void
-soup_session_unqueue_item (SoupSession *session,
- SoupMessageQueueItem *item)
-{
- SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
- SoupSessionHost *host;
+ g_mutex_lock (&priv->conn_lock);
+ host = get_host_for_message (session, item->msg);
+ while (TRUE) {
+ conn = get_connection_for_host (session, item, host,
+ need_new_connection,
+ &my_should_cleanup);
+ if (conn || item->async)
+ break;
- if (item->conn) {
- if (item->msg->method != SOUP_METHOD_CONNECT ||
- !SOUP_STATUS_IS_SUCCESSFUL (item->msg->status_code))
- soup_connection_set_state (item->conn, SOUP_CONNECTION_IDLE);
- soup_session_set_item_connection (session, item, NULL);
+ if (my_should_cleanup) {
+ g_mutex_unlock (&priv->conn_lock);
+ soup_session_cleanup_connections (session, TRUE);
+ g_mutex_lock (&priv->conn_lock);
+
+ my_should_cleanup = FALSE;
+ continue;
+ }
+
+ g_cond_wait (&priv->conn_cond, &priv->conn_lock);
}
+ g_mutex_unlock (&priv->conn_lock);
- if (item->state != SOUP_MESSAGE_FINISHED) {
- g_warning ("finished an item with state %d", item->state);
- return;
+ if (!conn) {
+ if (should_cleanup)
+ *should_cleanup = my_should_cleanup;
+ return FALSE;
}
- soup_message_queue_remove (priv->queue, item);
+ soup_session_set_item_connection (session, item, conn);
+ soup_message_set_https_status (item->msg, item->conn);
- g_mutex_lock (&priv->conn_lock);
- host = get_host_for_message (session, item->msg);
- host->num_messages--;
- g_mutex_unlock (&priv->conn_lock);
+ if (soup_connection_get_state (item->conn) != SOUP_CONNECTION_NEW) {
+ item->state = SOUP_MESSAGE_READY;
+ return TRUE;
+ }
- /* g_signal_handlers_disconnect_by_func doesn't work if you
- * have a metamarshal, meaning it doesn't work with
- * soup_message_add_header_handler()
- */
- g_signal_handlers_disconnect_matched (item->msg, G_SIGNAL_MATCH_DATA,
- 0, 0, NULL, NULL, item);
- g_signal_emit (session, signals[REQUEST_UNQUEUED], 0, item->msg);
- soup_message_queue_item_unref (item);
+ item->state = SOUP_MESSAGE_CONNECTING;
+
+ if (item->async) {
+ soup_message_queue_item_ref (item);
+ soup_connection_connect_async (item->conn, item->cancellable,
+ got_connection, item);
+ return FALSE;
+ } else {
+ guint status;
+
+ status = soup_connection_connect_sync (item->conn, item->cancellable);
+ got_connection (item->conn, status, item);
+
+ return TRUE;
+ }
}
-static void
-proxy_connection_event (SoupConnection *conn,
- GSocketClientEvent event,
- GIOStream *connection,
- gpointer user_data)
+void
+soup_session_process_queue_item (SoupSession *session,
+ SoupMessageQueueItem *item,
+ gboolean *should_cleanup,
+ gboolean loop)
{
- SoupMessageQueueItem *item = user_data;
+ g_assert (item->session == session);
- soup_message_network_event (item->msg, event, connection);
+ do {
+ if (item->paused)
+ return;
+
+ switch (item->state) {
+ case SOUP_MESSAGE_STARTING:
+ if (!get_connection (item, should_cleanup))
+ return;
+ break;
+
+ case SOUP_MESSAGE_CONNECTED:
+ if (soup_connection_is_tunnelled (item->conn))
+ tunnel_connect (item);
+ else
+ item->state = SOUP_MESSAGE_READY;
+ break;
+
+ case SOUP_MESSAGE_READY:
+ soup_message_set_https_status (item->msg, item->conn);
+ if (item->msg->status_code) {
+ if (item->msg->status_code == SOUP_STATUS_TRY_AGAIN) {
+ soup_message_cleanup_response (item->msg);
+ item->state = SOUP_MESSAGE_STARTING;
+ } else
+ item->state = SOUP_MESSAGE_FINISHING;
+ break;
+ }
+
+ item->state = SOUP_MESSAGE_RUNNING;
+
+ soup_session_send_queue_item (session, item, message_completed);
+
+ if (item->new_api) {
+ if (item->async)
+ async_send_request_running (session, item);
+ return;
+ }
+ break;
+
+ case SOUP_MESSAGE_RUNNING:
+ if (item->async)
+ return;
+
+ g_warn_if_fail (item->new_api);
+ item->state = SOUP_MESSAGE_FINISHING;
+ break;
+
+ case SOUP_MESSAGE_RESTARTING:
+ item->state = SOUP_MESSAGE_STARTING;
+ soup_message_restarted (item->msg);
+ break;
+
+ case SOUP_MESSAGE_FINISHING:
+ item->state = SOUP_MESSAGE_FINISHED;
+ soup_message_finished (item->msg);
+ if (item->state != SOUP_MESSAGE_FINISHED) {
+ g_return_if_fail (!item->new_api);
+ break;
+ }
+
+ soup_session_unqueue_item (session, item);
+ if (item->async && item->callback)
+ item->callback (session, item->msg, item->callback_data);
+ return;
+
+ default:
+ /* Nothing to do with this message in any
+ * other state.
+ */
+ g_warn_if_fail (item->async);
+ return;
+ }
+ } while (loop && item->state != SOUP_MESSAGE_FINISHED);
}
-void
-soup_session_set_item_connection (SoupSession *session,
- SoupMessageQueueItem *item,
- SoupConnection *conn)
+static void
+async_run_queue (SoupSession *session)
{
- if (item->conn) {
- g_signal_handlers_disconnect_by_func (item->conn, proxy_connection_event, item);
- g_object_unref (item->conn);
- }
+ SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+ SoupMessageQueueItem *item;
+ SoupMessage *msg;
+ gboolean try_cleanup = TRUE, should_cleanup = FALSE;
- item->conn = conn;
+ g_object_ref (session);
+ soup_session_cleanup_connections (session, FALSE);
- if (item->conn) {
- g_object_ref (item->conn);
- g_signal_connect (item->conn, "event",
- G_CALLBACK (proxy_connection_event), item);
+ try_again:
+ for (item = soup_message_queue_first (priv->queue);
+ item;
+ item = soup_message_queue_next (priv->queue, item)) {
+ msg = item->msg;
+
+ /* CONNECT messages are handled specially */
+ if (msg->method == SOUP_METHOD_CONNECT)
+ continue;
+
+ if (item->async_context != soup_session_get_async_context (session))
+ continue;
+
+ soup_session_process_queue_item (session, item, &should_cleanup, TRUE);
+ }
+
+ if (try_cleanup && should_cleanup) {
+ /* There is at least one message in the queue that
+ * could be sent if we cleanupd an idle connection from
+ * some other server.
+ */
+ if (soup_session_cleanup_connections (session, TRUE)) {
+ try_cleanup = should_cleanup = FALSE;
+ goto try_again;
+ }
}
+
+ g_object_unref (session);
}
-void
-soup_session_set_item_status (SoupSession *session,
- SoupMessageQueueItem *item,
- guint status_code)
+static gboolean
+idle_run_queue (gpointer user_data)
{
- SoupURI *uri;
- char *msg;
+ SoupSessionPrivate *priv = user_data;
+ GSource *source;
- switch (status_code) {
- case SOUP_STATUS_CANT_RESOLVE:
- case SOUP_STATUS_CANT_CONNECT:
- uri = soup_message_get_uri (item->msg);
- msg = g_strdup_printf ("%s (%s)",
- soup_status_get_phrase (status_code),
- uri->host);
- soup_message_set_status_full (item->msg, status_code, msg);
- g_free (msg);
- break;
+ if (priv->disposed)
+ return FALSE;
- case SOUP_STATUS_CANT_RESOLVE_PROXY:
- case SOUP_STATUS_CANT_CONNECT_PROXY:
- if (item->proxy_uri && item->proxy_uri->host) {
- msg = g_strdup_printf ("%s (%s)",
- soup_status_get_phrase (status_code),
- item->proxy_uri->host);
- soup_message_set_status_full (item->msg, status_code, msg);
- g_free (msg);
- break;
- }
- soup_message_set_status (item->msg, status_code);
- break;
+ source = g_main_current_source ();
+ priv->run_queue_sources = g_slist_remove (priv->run_queue_sources, source);
- case SOUP_STATUS_SSL_FAILED:
- if (!g_tls_backend_supports_tls (g_tls_backend_get_default ())) {
- soup_message_set_status_full (item->msg, status_code,
- "TLS/SSL support not available; install glib-networking");
- } else
- soup_message_set_status (item->msg, status_code);
- break;
+ /* Ensure that the source is destroyed before running the queue */
+ g_source_destroy (source);
+ g_source_unref (source);
- default:
- soup_message_set_status (item->msg, status_code);
- break;
- }
+ g_assert (priv->session);
+ async_run_queue (priv->session);
+ return FALSE;
}
/**
@@ -1517,7 +1857,7 @@ void
soup_session_queue_message (SoupSession *session, SoupMessage *msg,
SoupSessionCallback callback, gpointer user_data)
{
- g_return_if_fail (SOUP_IS_SESSION (session));
+ g_return_if_fail (SOUP_IS_SESSION_ASYNC (session) || SOUP_IS_SESSION_SYNC (session));
g_return_if_fail (SOUP_IS_MESSAGE (msg));
SOUP_SESSION_GET_CLASS (session)->queue_message (session, msg,
@@ -1557,7 +1897,6 @@ soup_session_requeue_message (SoupSession *session, SoupMessage *msg)
SOUP_SESSION_GET_CLASS (session)->requeue_message (session, msg);
}
-
/**
* soup_session_send_message:
* @session: a #SoupSession
@@ -1574,7 +1913,7 @@ soup_session_requeue_message (SoupSession *session, SoupMessage *msg)
guint
soup_session_send_message (SoupSession *session, SoupMessage *msg)
{
- g_return_val_if_fail (SOUP_IS_SESSION (session), SOUP_STATUS_MALFORMED);
+ g_return_val_if_fail (SOUP_IS_SESSION_ASYNC (session) || SOUP_IS_SESSION_SYNC (session), SOUP_STATUS_MALFORMED);
g_return_val_if_fail (SOUP_IS_MESSAGE (msg), SOUP_STATUS_MALFORMED);
return SOUP_SESSION_GET_CLASS (session)->send_message (session, msg);
@@ -1609,6 +1948,48 @@ soup_session_pause_message (SoupSession *session,
soup_message_queue_item_unref (item);
}
+static void
+soup_session_real_kick_queue (SoupSession *session)
+{
+ SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+ SoupMessageQueueItem *item;
+ gboolean have_sync_items = FALSE;
+
+ if (priv->disposed)
+ return;
+
+ for (item = soup_message_queue_first (priv->queue);
+ item;
+ item = soup_message_queue_next (priv->queue, item)) {
+ if (item->async) {
+ GSource *source;
+
+ /* We use priv rather than session as the
+ * source data, because other parts of libsoup
+ * (or the calling app) may have sources using
+ * the session as the source data.
+ */
+ source = g_main_context_find_source_by_user_data (item->async_context, priv);
+ if (!source) {
+ source = soup_add_completion_reffed (item->async_context,
+ idle_run_queue, priv);
+ priv->run_queue_sources = g_slist_prepend (priv->run_queue_sources,
+ source);
+ }
+ } else
+ have_sync_items = TRUE;
+ }
+
+ if (have_sync_items)
+ g_cond_broadcast (&priv->conn_cond);
+}
+
+void
+soup_session_kick_queue (SoupSession *session)
+{
+ SOUP_SESSION_GET_CLASS (session)->kick (session);
+}
+
/**
* soup_session_unpause_message:
* @session: a #SoupSession
@@ -1640,7 +2021,7 @@ soup_session_unpause_message (SoupSession *session,
soup_message_io_unpause (msg);
soup_message_queue_item_unref (item);
- SOUP_SESSION_GET_CLASS (session)->kick (session);
+ soup_session_kick_queue (session);
}
@@ -1657,6 +2038,7 @@ soup_session_real_cancel_message (SoupSession *session, SoupMessage *msg, guint
soup_message_set_status (msg, status_code);
g_cancellable_cancel (item->cancellable);
+ soup_session_kick_queue (item->session);
soup_message_queue_item_unref (item);
}
@@ -1716,13 +2098,52 @@ soup_session_real_flush_queue (SoupSession *session)
{
SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
SoupMessageQueueItem *item;
+ GHashTable *current = NULL;
+ gboolean done = FALSE;
+
+ if (SOUP_IS_SESSION_SYNC (session)) {
+ /* Record the current contents of the queue */
+ current = g_hash_table_new (NULL, NULL);
+ for (item = soup_message_queue_first (priv->queue);
+ item;
+ item = soup_message_queue_next (priv->queue, item))
+ g_hash_table_insert (current, item, item);
+ }
+ /* Cancel everything */
for (item = soup_message_queue_first (priv->queue);
item;
item = soup_message_queue_next (priv->queue, item)) {
soup_session_cancel_message (session, item->msg,
SOUP_STATUS_CANCELLED);
}
+
+ if (SOUP_IS_SESSION_SYNC (session)) {
+ /* Wait until all of the items in @current have been
+ * removed from the queue. (This is not the same as
+ * "wait for the queue to be empty", because the app
+ * may queue new requests in response to the
+ * cancellation of the old ones. We don't try to
+ * cancel those requests as well, since we'd likely
+ * just end up looping forever.)
+ */
+ g_mutex_lock (&priv->conn_lock);
+ do {
+ done = TRUE;
+ for (item = soup_message_queue_first (priv->queue);
+ item;
+ item = soup_message_queue_next (priv->queue, item)) {
+ if (g_hash_table_lookup (current, item))
+ done = FALSE;
+ }
+
+ if (!done)
+ g_cond_wait (&priv->conn_cond, &priv->conn_lock);
+ } while (!done);
+ g_mutex_unlock (&priv->conn_lock);
+
+ g_hash_table_destroy (current);
+ }
}
/**
@@ -2107,6 +2528,7 @@ soup_session_class_init (SoupSessionClass *session_class)
session_class->cancel_message = soup_session_real_cancel_message;
session_class->auth_required = soup_session_real_auth_required;
session_class->flush_queue = soup_session_real_flush_queue;
+ session_class->kick = soup_session_real_kick_queue;
/* virtual method override */
object_class->dispose = soup_session_dispose;
@@ -2808,3 +3230,384 @@ soup_session_class_init (SoupSessionClass *session_class)
G_TYPE_STRV,
G_PARAM_READWRITE));
}
+
+
+/* send_request_async */
+
+static void
+async_send_request_return_result (SoupMessageQueueItem *item,
+ gpointer stream, GError *error)
+{
+ GTask *task;
+
+ g_return_if_fail (item->task != NULL);
+
+ g_signal_handlers_disconnect_matched (item->msg, G_SIGNAL_MATCH_DATA,
+ 0, 0, NULL, NULL, item);
+
+ task = item->task;
+ item->task = NULL;
+
+ if (item->io_source) {
+ g_source_destroy (item->io_source);
+ g_clear_pointer (&item->io_source, g_source_unref);
+ }
+
+ if (error)
+ g_task_return_error (task, error);
+ else if (SOUP_STATUS_IS_TRANSPORT_ERROR (item->msg->status_code)) {
+ if (stream)
+ g_object_unref (stream);
+ g_task_return_new_error (task, SOUP_HTTP_ERROR,
+ item->msg->status_code,
+ "%s",
+ item->msg->reason_phrase);
+ } else
+ g_task_return_pointer (task, stream, g_object_unref);
+ g_object_unref (task);
+}
+
+static void
+async_send_request_restarted (SoupMessage *msg, gpointer user_data)
+{
+ SoupMessageQueueItem *item = user_data;
+
+ /* We won't be needing this, then. */
+ g_object_set_data (G_OBJECT (item->msg), "SoupSession:ostream", NULL);
+ item->io_started = FALSE;
+}
+
+static void
+async_send_request_finished (SoupMessage *msg, gpointer user_data)
+{
+ SoupMessageQueueItem *item = user_data;
+ GMemoryOutputStream *mostream;
+ GInputStream *istream = NULL;
+ GError *error = NULL;
+
+ if (!item->task) {
+ /* Something else already took care of it. */
+ return;
+ }
+
+ mostream = g_object_get_data (G_OBJECT (item->task), "SoupSession:ostream");
+ if (mostream) {
+ gpointer data;
+ gssize size;
+
+ /* We thought it would be requeued, but it wasn't, so
+ * return the original body.
+ */
+ size = g_memory_output_stream_get_data_size (mostream);
+ data = size ? g_memory_output_stream_steal_data (mostream) : g_strdup ("");
+ istream = g_memory_input_stream_new_from_data (data, size, g_free);
+ } else if (item->io_started) {
+ /* The message finished before becoming readable. This
+ * will happen, eg, if it's cancelled from got-headers.
+ * Do nothing; the op will complete via read_ready_cb()
+ * after we return;
+ */
+ return;
+ } else {
+ /* The message finished before even being started;
+ * probably a tunnel connect failure.
+ */
+ istream = g_memory_input_stream_new ();
+ }
+
+ async_send_request_return_result (item, istream, error);
+}
+
+static void
+send_async_spliced (GObject *source, GAsyncResult *result, gpointer user_data)
+{
+ SoupMessageQueueItem *item = user_data;
+ GInputStream *istream = g_object_get_data (source, "istream");
+ GError *error = NULL;
+
+ /* It should be safe to call the sync close() method here since
+ * the message body has already been written.
+ */
+ g_input_stream_close (istream, NULL, NULL);
+ g_object_unref (istream);
+
+ /* If the message was cancelled, it will be completed via other means */
+ if (g_cancellable_is_cancelled (item->cancellable) ||
+ !item->task) {
+ soup_message_queue_item_unref (item);
+ return;
+ }
+
+ if (g_output_stream_splice_finish (G_OUTPUT_STREAM (source),
+ result, &error) == -1) {
+ async_send_request_return_result (item, NULL, error);
+ soup_message_queue_item_unref (item);
+ return;
+ }
+
+ /* Otherwise either restarted or finished will eventually be called. */
+ soup_session_kick_queue (item->session);
+ soup_message_queue_item_unref (item);
+}
+
+static void
+send_async_maybe_complete (SoupMessageQueueItem *item,
+ GInputStream *stream)
+{
+ if (item->msg->status_code == SOUP_STATUS_UNAUTHORIZED ||
+ item->msg->status_code == SOUP_STATUS_PROXY_UNAUTHORIZED ||
+ soup_session_would_redirect (item->session, item->msg)) {
+ GOutputStream *ostream;
+
+ /* Message may be requeued, so gather the current message body... */
+ ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
+ g_object_set_data_full (G_OBJECT (item->task), "SoupSession:ostream",
+ ostream, g_object_unref);
+
+ g_object_set_data (G_OBJECT (ostream), "istream", stream);
+
+ /* Give the splice op its own ref on item */
+ soup_message_queue_item_ref (item);
+ g_output_stream_splice_async (ostream, stream,
+ /* We can't use CLOSE_SOURCE because it
+ * might get closed in the wrong thread.
+ */
+ G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+ G_PRIORITY_DEFAULT,
+ item->cancellable,
+ send_async_spliced, item);
+ return;
+ }
+
+ async_send_request_return_result (item, stream, NULL);
+}
+
+static void try_run_until_read (SoupMessageQueueItem *item);
+
+static gboolean
+read_ready_cb (SoupMessage *msg, gpointer user_data)
+{
+ SoupMessageQueueItem *item = user_data;
+
+ g_clear_pointer (&item->io_source, g_source_unref);
+ try_run_until_read (item);
+ return FALSE;
+}
+
+static void
+try_run_until_read (SoupMessageQueueItem *item)
+{
+ GError *error = NULL;
+ GInputStream *stream = NULL;
+
+ if (soup_message_io_run_until_read (item->msg, item->cancellable, &error))
+ stream = soup_message_io_get_response_istream (item->msg, &error);
+ if (stream) {
+ send_async_maybe_complete (item, stream);
+ return;
+ }
+
+ if (g_error_matches (error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN)) {
+ item->state = SOUP_MESSAGE_RESTARTING;
+ soup_message_io_finished (item->msg);
+ g_error_free (error);
+ return;
+ }
+
+ if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ if (item->state != SOUP_MESSAGE_FINISHED) {
+ if (soup_message_io_in_progress (item->msg))
+ soup_message_io_finished (item->msg);
+ item->state = SOUP_MESSAGE_FINISHING;
+ soup_session_process_queue_item (item->session, item, NULL, FALSE);
+ }
+ async_send_request_return_result (item, NULL, error);
+ return;
+ }
+
+ g_clear_error (&error);
+ item->io_source = soup_message_io_get_source (item->msg, item->cancellable,
+ read_ready_cb, item);
+ g_source_attach (item->io_source, soup_session_get_async_context (item->session));
+}
+
+static void
+async_send_request_running (SoupSession *session, SoupMessageQueueItem *item)
+{
+ item->io_started = TRUE;
+ try_run_until_read (item);
+}
+
+void
+soup_session_send_request_async (SoupSession *session,
+ SoupMessage *msg,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ SoupMessageQueueItem *item;
+ gboolean use_thread_context;
+
+ g_return_if_fail (SOUP_IS_SESSION (session));
+ g_return_if_fail (!SOUP_IS_SESSION_SYNC (session));
+
+ g_object_get (G_OBJECT (session),
+ SOUP_SESSION_USE_THREAD_CONTEXT, &use_thread_context,
+ NULL);
+ g_return_if_fail (use_thread_context);
+
+ item = soup_session_append_queue_item (session, msg, TRUE, TRUE,
+ NULL, NULL);
+ g_signal_connect (msg, "restarted",
+ G_CALLBACK (async_send_request_restarted), item);
+ g_signal_connect (msg, "finished",
+ G_CALLBACK (async_send_request_finished), item);
+
+ item->new_api = TRUE;
+ item->task = g_task_new (session, cancellable, callback, user_data);
+ g_task_set_task_data (item->task, item, (GDestroyNotify) soup_message_queue_item_unref);
+
+ if (cancellable) {
+ g_object_unref (item->cancellable);
+ item->cancellable = g_object_ref (cancellable);
+ }
+
+ soup_session_kick_queue (session);
+}
+
+GInputStream *
+soup_session_send_request_finish (SoupSession *session,
+ GAsyncResult *result,
+ GError **error)
+{
+ GTask *task;
+
+ g_return_val_if_fail (SOUP_IS_SESSION (session), NULL);
+ g_return_val_if_fail (!SOUP_IS_SESSION_SYNC (session), NULL);
+ g_return_val_if_fail (g_task_is_valid (result, session), NULL);
+
+ task = G_TASK (result);
+ if (g_task_had_error (task)) {
+ SoupMessageQueueItem *item = g_task_get_task_data (task);
+
+ if (soup_message_io_in_progress (item->msg))
+ soup_message_io_finished (item->msg);
+ else if (item->state != SOUP_MESSAGE_FINISHED)
+ item->state = SOUP_MESSAGE_FINISHING;
+
+ if (item->state != SOUP_MESSAGE_FINISHED)
+ soup_session_process_queue_item (session, item, NULL, FALSE);
+ }
+
+ return g_task_propagate_pointer (task, error);
+}
+
+GInputStream *
+soup_session_send_request (SoupSession *session,
+ SoupMessage *msg,
+ GCancellable *cancellable,
+ GError **error)
+{
+ SoupMessageQueueItem *item;
+ GInputStream *stream = NULL;
+ GOutputStream *ostream;
+ GMemoryOutputStream *mostream;
+ gssize size;
+ GError *my_error = NULL;
+
+ g_return_val_if_fail (SOUP_IS_SESSION (session), NULL);
+ g_return_val_if_fail (!SOUP_IS_SESSION_ASYNC (session), NULL);
+
+ item = soup_session_append_queue_item (session, msg, FALSE, TRUE,
+ NULL, NULL);
+
+ item->new_api = TRUE;
+ if (cancellable) {
+ g_object_unref (item->cancellable);
+ item->cancellable = g_object_ref (cancellable);
+ }
+
+ while (!stream) {
+ /* Get a connection, etc */
+ soup_session_process_queue_item (session, item, NULL, TRUE);
+ if (item->state != SOUP_MESSAGE_RUNNING)
+ break;
+
+ /* Send request, read headers */
+ if (!soup_message_io_run_until_read (msg, item->cancellable, &my_error)) {
+ if (g_error_matches (my_error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN)) {
+ item->state = SOUP_MESSAGE_RESTARTING;
+ soup_message_io_finished (item->msg);
+ g_clear_error (&my_error);
+ continue;
+ } else
+ break;
+ }
+
+ stream = soup_message_io_get_response_istream (msg, &my_error);
+ if (!stream)
+ break;
+
+ /* Break if the message doesn't look likely-to-be-requeued */
+ if (msg->status_code != SOUP_STATUS_UNAUTHORIZED &&
+ msg->status_code != SOUP_STATUS_PROXY_UNAUTHORIZED &&
+ !soup_session_would_redirect (session, msg))
+ break;
+
+ /* Gather the current message body... */
+ ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
+ if (g_output_stream_splice (ostream, stream,
+ G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
+ G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+ item->cancellable, &my_error) == -1) {
+ g_object_unref (stream);
+ g_object_unref (ostream);
+ stream = NULL;
+ break;
+ }
+ g_object_unref (stream);
+ stream = NULL;
+
+ /* If the message was requeued, loop */
+ if (item->state == SOUP_MESSAGE_RESTARTING) {
+ g_object_unref (ostream);
+ continue;
+ }
+
+ /* Not requeued, so return the original body */
+ mostream = G_MEMORY_OUTPUT_STREAM (ostream);
+ size = g_memory_output_stream_get_data_size (mostream);
+ stream = g_memory_input_stream_new ();
+ if (size) {
+ g_memory_input_stream_add_data (G_MEMORY_INPUT_STREAM (stream),
+ g_memory_output_stream_steal_data (mostream),
+ size, g_free);
+ }
+ g_object_unref (ostream);
+ }
+
+ if (my_error)
+ g_propagate_error (error, my_error);
+ else if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) {
+ if (stream) {
+ g_object_unref (stream);
+ stream = NULL;
+ }
+ g_set_error_literal (error, SOUP_HTTP_ERROR, msg->status_code,
+ msg->reason_phrase);
+ } else if (!stream)
+ stream = g_memory_input_stream_new ();
+
+ if (!stream) {
+ if (soup_message_io_in_progress (msg))
+ soup_message_io_finished (msg);
+ else if (item->state != SOUP_MESSAGE_FINISHED)
+ item->state = SOUP_MESSAGE_FINISHING;
+
+ if (item->state != SOUP_MESSAGE_FINISHED)
+ soup_session_process_queue_item (session, item, NULL, TRUE);
+ }
+
+ soup_message_queue_item_unref (item);
+ return stream;
+}
diff --git a/tests/requester-test.c b/tests/requester-test.c
index bc45e9f6..a4379377 100644
--- a/tests/requester-test.c
+++ b/tests/requester-test.c
@@ -372,21 +372,22 @@ do_test_for_thread_and_context (SoupSession *session, const char *base_uri)
}
static void
-do_simple_test (const char *uri)
+do_simple_test (const char *uri, gboolean plain_session)
{
SoupSession *session;
- debug_printf (1, "Simple streaming test\n");
+ debug_printf (1, "Simple streaming test with %s\n",
+ plain_session ? "SoupSession" : "SoupSessionAsync");
- session = soup_test_session_new (SOUP_TYPE_SESSION_ASYNC,
+ session = soup_test_session_new (plain_session ? SOUP_TYPE_SESSION : SOUP_TYPE_SESSION_ASYNC,
SOUP_SESSION_USE_THREAD_CONTEXT, TRUE,
NULL);
do_test_for_thread_and_context (session, uri);
soup_test_session_abort_unref (session);
}
-static gpointer
-do_test_with_context (const char *uri)
+static void
+do_test_with_context_and_type (const char *uri, gboolean plain_session)
{
GMainContext *async_context;
SoupSession *session;
@@ -394,7 +395,7 @@ do_test_with_context (const char *uri)
async_context = g_main_context_new ();
g_main_context_push_thread_default (async_context);
- session = soup_test_session_new (SOUP_TYPE_SESSION_ASYNC,
+ session = soup_test_session_new (plain_session ? SOUP_TYPE_SESSION : SOUP_TYPE_SESSION_ASYNC,
SOUP_SESSION_ASYNC_CONTEXT, async_context,
SOUP_SESSION_USE_THREAD_CONTEXT, TRUE,
NULL);
@@ -404,25 +405,43 @@ do_test_with_context (const char *uri)
g_main_context_pop_thread_default (async_context);
g_main_context_unref (async_context);
+}
+
+static gpointer
+do_test_with_context (gpointer uri)
+{
+ do_test_with_context_and_type (uri, FALSE);
+ return NULL;
+}
+
+static gpointer
+do_plain_test_with_context (gpointer uri)
+{
+ do_test_with_context_and_type (uri, TRUE);
return NULL;
}
static void
-do_context_test (const char *uri)
+do_context_test (const char *uri, gboolean plain_session)
{
- debug_printf (1, "Streaming with a non-default-context\n");
- do_test_with_context (uri);
+ debug_printf (1, "Streaming with a non-default-context with %s\n",
+ plain_session ? "SoupSession" : "SoupSessionAsync");
+ if (plain_session)
+ do_plain_test_with_context ((gpointer)uri);
+ else
+ do_test_with_context ((gpointer)uri);
}
static void
-do_thread_test (const char *uri)
+do_thread_test (const char *uri, gboolean plain_session)
{
GThread *thread;
- debug_printf (1, "Streaming in another thread\n");
+ debug_printf (1, "Streaming in another thread with %s\n",
+ plain_session ? "SoupSession" : "SoupSessionAsync");
thread = g_thread_new ("do_test_with_context",
- (GThreadFunc)do_test_with_context,
+ plain_session ? do_plain_test_with_context : do_test_with_context,
(gpointer)uri);
g_thread_join (thread);
}
@@ -542,16 +561,17 @@ do_sync_request (SoupSession *session, SoupRequest *request,
}
static void
-do_sync_test (const char *uri_string)
+do_sync_test (const char *uri_string, gboolean plain_session)
{
SoupSession *session;
SoupRequester *requester;
SoupRequest *request;
SoupURI *uri;
- debug_printf (1, "Sync streaming\n");
+ debug_printf (1, "Sync streaming with %s\n",
+ plain_session ? "SoupSession" : "SoupSessionSync");
- session = soup_test_session_new (SOUP_TYPE_SESSION_SYNC, NULL);
+ session = soup_test_session_new (plain_session ? SOUP_TYPE_SESSION : SOUP_TYPE_SESSION_SYNC, NULL);
requester = soup_requester_new ();
soup_session_add_feature (session, SOUP_SESSION_FEATURE (requester));
g_object_unref (requester);
@@ -614,10 +634,15 @@ main (int argc, char **argv)
uri = g_strdup_printf ("http://127.0.0.1:%u/foo", soup_server_get_port (server));
- do_simple_test (uri);
- do_thread_test (uri);
- do_context_test (uri);
- do_sync_test (uri);
+ do_simple_test (uri, FALSE);
+ do_thread_test (uri, FALSE);
+ do_context_test (uri, FALSE);
+ do_sync_test (uri, FALSE);
+
+ do_simple_test (uri, TRUE);
+ do_thread_test (uri, TRUE);
+ do_context_test (uri, TRUE);
+ do_sync_test (uri, TRUE);
g_free (uri);
soup_buffer_free (response);