diff options
Diffstat (limited to 'net/rxrpc/sendmsg.c')
-rw-r--r-- | net/rxrpc/sendmsg.c | 195 |
1 files changed, 114 insertions, 81 deletions
diff --git a/net/rxrpc/sendmsg.c b/net/rxrpc/sendmsg.c index cde1e65f16b4..da49fcf1c456 100644 --- a/net/rxrpc/sendmsg.c +++ b/net/rxrpc/sendmsg.c @@ -18,6 +18,81 @@ #include "ar-internal.h" /* + * Propose an abort to be made in the I/O thread. + */ +bool rxrpc_propose_abort(struct rxrpc_call *call, s32 abort_code, int error, + enum rxrpc_abort_reason why) +{ + _enter("{%d},%d,%d,%u", call->debug_id, abort_code, error, why); + + if (!call->send_abort && !rxrpc_call_is_complete(call)) { + call->send_abort_why = why; + call->send_abort_err = error; + call->send_abort_seq = 0; + /* Request abort locklessly vs rxrpc_input_call_event(). */ + smp_store_release(&call->send_abort, abort_code); + rxrpc_poke_call(call, rxrpc_call_poke_abort); + return true; + } + + return false; +} + +/* + * Wait for a call to become connected. Interruption here doesn't cause the + * call to be aborted. + */ +static int rxrpc_wait_to_be_connected(struct rxrpc_call *call, long *timeo) +{ + DECLARE_WAITQUEUE(myself, current); + int ret = 0; + + _enter("%d", call->debug_id); + + if (rxrpc_call_state(call) != RXRPC_CALL_CLIENT_AWAIT_CONN) + return call->error; + + add_wait_queue_exclusive(&call->waitq, &myself); + + for (;;) { + ret = call->error; + if (ret < 0) + break; + + switch (call->interruptibility) { + case RXRPC_INTERRUPTIBLE: + case RXRPC_PREINTERRUPTIBLE: + set_current_state(TASK_INTERRUPTIBLE); + break; + case RXRPC_UNINTERRUPTIBLE: + default: + set_current_state(TASK_UNINTERRUPTIBLE); + break; + } + if (rxrpc_call_state(call) != RXRPC_CALL_CLIENT_AWAIT_CONN) { + ret = call->error; + break; + } + if ((call->interruptibility == RXRPC_INTERRUPTIBLE || + call->interruptibility == RXRPC_PREINTERRUPTIBLE) && + signal_pending(current)) { + ret = sock_intr_errno(*timeo); + break; + } + *timeo = schedule_timeout(*timeo); + } + + remove_wait_queue(&call->waitq, &myself); + __set_current_state(TASK_RUNNING); + + if (ret == 0 && rxrpc_call_is_complete(call)) + ret = call->error; + + _leave(" = %d", ret); + return ret; +} + +/* * Return true if there's sufficient Tx queue space. */ static bool rxrpc_check_tx_space(struct rxrpc_call *call, rxrpc_seq_t *_tx_win) @@ -39,7 +114,7 @@ static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx, if (rxrpc_check_tx_space(call, NULL)) return 0; - if (call->state >= RXRPC_CALL_COMPLETE) + if (rxrpc_call_is_complete(call)) return call->error; if (signal_pending(current)) @@ -74,7 +149,7 @@ static int rxrpc_wait_for_tx_window_waitall(struct rxrpc_sock *rx, if (rxrpc_check_tx_space(call, &tx_win)) return 0; - if (call->state >= RXRPC_CALL_COMPLETE) + if (rxrpc_call_is_complete(call)) return call->error; if (timeout == 0 && @@ -103,7 +178,7 @@ static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx, if (rxrpc_check_tx_space(call, NULL)) return 0; - if (call->state >= RXRPC_CALL_COMPLETE) + if (rxrpc_call_is_complete(call)) return call->error; trace_rxrpc_txqueue(call, rxrpc_txqueue_wait); @@ -168,7 +243,6 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call, struct rxrpc_txbuf *txb, rxrpc_notify_end_tx_t notify_end_tx) { - unsigned long now; rxrpc_seq_t seq = txb->seq; bool last = test_bit(RXRPC_TXBUF_LAST, &txb->flags), poke; @@ -191,36 +265,10 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call, poke = list_empty(&call->tx_sendmsg); list_add_tail(&txb->call_link, &call->tx_sendmsg); call->tx_prepared = seq; + if (last) + rxrpc_notify_end_tx(rx, call, notify_end_tx); spin_unlock(&call->tx_lock); - if (last || call->state == RXRPC_CALL_SERVER_ACK_REQUEST) { - _debug("________awaiting reply/ACK__________"); - write_lock(&call->state_lock); - switch (call->state) { - case RXRPC_CALL_CLIENT_SEND_REQUEST: - call->state = RXRPC_CALL_CLIENT_AWAIT_REPLY; - rxrpc_notify_end_tx(rx, call, notify_end_tx); - break; - case RXRPC_CALL_SERVER_ACK_REQUEST: - call->state = RXRPC_CALL_SERVER_SEND_REPLY; - now = jiffies; - WRITE_ONCE(call->delay_ack_at, now + MAX_JIFFY_OFFSET); - if (call->ackr_reason == RXRPC_ACK_DELAY) - call->ackr_reason = 0; - trace_rxrpc_timer(call, rxrpc_timer_init_for_send_reply, now); - if (!last) - break; - fallthrough; - case RXRPC_CALL_SERVER_SEND_REPLY: - call->state = RXRPC_CALL_SERVER_AWAIT_ACK; - rxrpc_notify_end_tx(rx, call, notify_end_tx); - break; - default: - break; - } - write_unlock(&call->state_lock); - } - if (poke) rxrpc_poke_call(call, rxrpc_call_poke_start); } @@ -245,6 +293,16 @@ static int rxrpc_send_data(struct rxrpc_sock *rx, timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT); + ret = rxrpc_wait_to_be_connected(call, &timeo); + if (ret < 0) + return ret; + + if (call->conn->state == RXRPC_CONN_CLIENT_UNSECURED) { + ret = rxrpc_init_client_conn_security(call->conn); + if (ret < 0) + return ret; + } + /* this should be in poll */ sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk); @@ -252,15 +310,20 @@ reload: ret = -EPIPE; if (sk->sk_shutdown & SEND_SHUTDOWN) goto maybe_error; - state = READ_ONCE(call->state); + state = rxrpc_call_state(call); ret = -ESHUTDOWN; if (state >= RXRPC_CALL_COMPLETE) goto maybe_error; ret = -EPROTO; if (state != RXRPC_CALL_CLIENT_SEND_REQUEST && state != RXRPC_CALL_SERVER_ACK_REQUEST && - state != RXRPC_CALL_SERVER_SEND_REPLY) + state != RXRPC_CALL_SERVER_SEND_REPLY) { + /* Request phase complete for this client call */ + trace_rxrpc_abort(call->debug_id, rxrpc_sendmsg_late_send, + call->cid, call->call_id, call->rx_consumed, + 0, -EPROTO); goto maybe_error; + } ret = -EMSGSIZE; if (call->tx_total_len != -1) { @@ -329,7 +392,7 @@ reload: /* check for the far side aborting the call or a network error * occurring */ - if (call->state == RXRPC_CALL_COMPLETE) + if (rxrpc_call_is_complete(call)) goto call_terminated; /* add the packet to the send queue if it's now full */ @@ -354,12 +417,9 @@ reload: success: ret = copied; - if (READ_ONCE(call->state) == RXRPC_CALL_COMPLETE) { - read_lock(&call->state_lock); - if (call->error < 0) - ret = call->error; - read_unlock(&call->state_lock); - } + if (rxrpc_call_is_complete(call) && + call->error < 0) + ret = call->error; out: call->tx_pending = txb; _leave(" = %d", ret); @@ -543,7 +603,6 @@ rxrpc_new_client_call_for_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, atomic_inc_return(&rxrpc_debug_id)); /* The socket is now unlocked */ - rxrpc_put_peer(cp.peer, rxrpc_peer_put_discard_tmp); _leave(" = %p\n", call); return call; } @@ -556,7 +615,6 @@ rxrpc_new_client_call_for_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len) __releases(&rx->sk.sk_lock.slock) { - enum rxrpc_call_state state; struct rxrpc_call *call; unsigned long now, j; bool dropped_lock = false; @@ -598,10 +656,10 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len) return PTR_ERR(call); /* ... and we have the call lock. */ ret = 0; - if (READ_ONCE(call->state) == RXRPC_CALL_COMPLETE) + if (rxrpc_call_is_complete(call)) goto out_put_unlock; } else { - switch (READ_ONCE(call->state)) { + switch (rxrpc_call_state(call)) { case RXRPC_CALL_UNINITIALISED: case RXRPC_CALL_CLIENT_AWAIT_CONN: case RXRPC_CALL_SERVER_PREALLOC: @@ -655,17 +713,13 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len) break; } - state = READ_ONCE(call->state); - _debug("CALL %d USR %lx ST %d on CONN %p", - call->debug_id, call->user_call_ID, state, call->conn); - - if (state >= RXRPC_CALL_COMPLETE) { + if (rxrpc_call_is_complete(call)) { /* it's too late for this call */ ret = -ESHUTDOWN; } else if (p.command == RXRPC_CMD_SEND_ABORT) { + rxrpc_propose_abort(call, p.abort_code, -ECONNABORTED, + rxrpc_abort_call_sendmsg); ret = 0; - if (rxrpc_abort_call("CMD", call, 0, p.abort_code, -ECONNABORTED)) - ret = rxrpc_send_abort_packet(call); } else if (p.command != RXRPC_CMD_SEND_DATA) { ret = -EINVAL; } else { @@ -705,34 +759,17 @@ int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call, bool dropped_lock = false; int ret; - _enter("{%d,%s},", call->debug_id, rxrpc_call_states[call->state]); + _enter("{%d},", call->debug_id); ASSERTCMP(msg->msg_name, ==, NULL); ASSERTCMP(msg->msg_control, ==, NULL); mutex_lock(&call->user_mutex); - _debug("CALL %d USR %lx ST %d on CONN %p", - call->debug_id, call->user_call_ID, call->state, call->conn); - - switch (READ_ONCE(call->state)) { - case RXRPC_CALL_CLIENT_SEND_REQUEST: - case RXRPC_CALL_SERVER_ACK_REQUEST: - case RXRPC_CALL_SERVER_SEND_REPLY: - ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len, - notify_end_tx, &dropped_lock); - break; - case RXRPC_CALL_COMPLETE: - read_lock(&call->state_lock); + ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len, + notify_end_tx, &dropped_lock); + if (ret == -ESHUTDOWN) ret = call->error; - read_unlock(&call->state_lock); - break; - default: - /* Request phase complete for this client call */ - trace_rxrpc_rx_eproto(call, 0, tracepoint_string("late_send")); - ret = -EPROTO; - break; - } if (!dropped_lock) mutex_unlock(&call->user_mutex); @@ -747,24 +784,20 @@ EXPORT_SYMBOL(rxrpc_kernel_send_data); * @call: The call to be aborted * @abort_code: The abort code to stick into the ABORT packet * @error: Local error value - * @why: 3-char string indicating why. + * @why: Indication as to why. * * Allow a kernel service to abort a call, if it's still in an abortable state * and return true if the call was aborted, false if it was already complete. */ bool rxrpc_kernel_abort_call(struct socket *sock, struct rxrpc_call *call, - u32 abort_code, int error, const char *why) + u32 abort_code, int error, enum rxrpc_abort_reason why) { bool aborted; - _enter("{%d},%d,%d,%s", call->debug_id, abort_code, error, why); + _enter("{%d},%d,%d,%u", call->debug_id, abort_code, error, why); mutex_lock(&call->user_mutex); - - aborted = rxrpc_abort_call(why, call, 0, abort_code, error); - if (aborted) - rxrpc_send_abort_packet(call); - + aborted = rxrpc_propose_abort(call, abort_code, error, why); mutex_unlock(&call->user_mutex); return aborted; } |