diff options
Diffstat (limited to 'libthor/thor_net.c')
-rw-r--r-- | libthor/thor_net.c | 267 |
1 files changed, 224 insertions, 43 deletions
diff --git a/libthor/thor_net.c b/libthor/thor_net.c index 08d479f..a3e8a8f 100644 --- a/libthor/thor_net.c +++ b/libthor/thor_net.c @@ -4,6 +4,7 @@ #include <stdlib.h> #include <unistd.h> #include <sys/socket.h> +#include <aio.h> #include "thor.h" #include "thor_internal.h" @@ -135,26 +136,180 @@ static int thor_net_recv(thor_device_handle *th, unsigned char *buf, return 0; } -static int t_thor_net_send_chunk(thor_device_handle *th, unsigned char *chunk, - off_t size, int chunk_number) -{ +struct t_thor_net_chunk { + struct net_device_handle *nh; + struct aiocb data_transfer; + struct aiocb resp_transfer; + void *user_data; + off_t useful_size; struct data_res_pkt resp; + unsigned char *buf; + off_t trans_unit_size; + int chunk_number; + int data_finished; + int resp_finished; +}; + +struct t_thor_net_transfer { + struct thor_device_handle *th; + struct thor_data_src *data; + thor_progress_cb report_progress; + void *user_data; + off_t data_left; + off_t data_sent; + off_t data_in_progress; + int chunk_number; + int completed; int ret; +}; - ret = thor_net_send(th, chunk, size, 0); - if (ret < 0) - return ret; +static int t_thor_submit_chunk(struct t_thor_net_chunk *chunk) +{ + struct t_thor_net_transfer *transfer = chunk->user_data; + int ret; + + chunk->data_finished = chunk->resp_finished = 0; + + ret = aio_write(&chunk->data_transfer); + if (ret) + goto out; + + memset(&chunk->resp, 0, DATA_RES_PKT_SIZE); + ret = aio_read(&chunk->resp_transfer); + if (ret) + goto cancel_data_transfer; + + return 0; +cancel_data_transfer: + aio_cancel(chunk->nh->sock_fd, &chunk->data_transfer); +out: + return ret; +} - memset(&resp, 0, DATA_RES_PKT_SIZE); +static int t_thor_prep_next_chunk(struct t_thor_net_chunk *chunk, + struct t_thor_net_transfer *transfer) +{ + off_t to_read; + int ret; + + to_read = transfer->data_left - transfer->data_in_progress; + if (to_read <= 0) { + printf("to big data in progress\n"); + fflush(stdout); + return -EINVAL; + } - ret = thor_net_recv(th, (unsigned char *)&resp, DATA_RES_PKT_SIZE, 0); - if (ret < 0) + chunk->useful_size = to_read > chunk->trans_unit_size ? + chunk->trans_unit_size : to_read; + + ret = transfer->data->get_block(transfer->data, + chunk->buf, chunk->useful_size); + if (ret < 0 || ret != chunk->useful_size) return ret; - if (resp.cnt != chunk_number) - return -EIO; + memset(chunk->buf + chunk->useful_size, 0, + chunk->trans_unit_size - chunk->useful_size); + chunk->chunk_number = transfer->chunk_number++; + + ret = t_thor_submit_chunk(chunk); + if (!ret) + transfer->data_in_progress += chunk->useful_size; + + return ret; +} + +static void check_next_chunk(struct t_thor_net_chunk *chunk, + struct t_thor_net_transfer *transfer) +{ + int ret; - return resp.ack; + if (transfer->data_left - transfer->data_in_progress) { + ret = t_thor_prep_next_chunk(chunk, transfer); + if (ret) { + transfer->ret = ret; + transfer->completed = 1; + } + } else { + if (transfer->data_in_progress == 0) + transfer->completed = 1; + } +} + +static void data_finished(sigval_t sigval) +{ + struct t_thor_net_chunk *chunk = sigval.sival_ptr; + struct t_thor_net_transfer *transfer = chunk->user_data; + int ret; + + chunk->data_finished = 1; + + ret = aio_error(&chunk->data_transfer); + if (ret == ECANCELED || transfer->ret) { + return; + } else if (ret < 0) { + transfer->ret = ret; + transfer->completed = 1; + return; + } + + ret = aio_return(&chunk->data_transfer); + if (ret < chunk->data_transfer.aio_nbytes) { + transfer->ret = -EIO; + transfer->completed = 1; + return; + } + + if (chunk->resp_finished) + check_next_chunk(chunk, transfer); +} + +static void resp_finished(sigval_t sigval) +{ + struct t_thor_net_chunk *chunk = sigval.sival_ptr; + struct t_thor_net_transfer *transfer = chunk->user_data; + int ret; + + chunk->resp_finished = 1; + transfer->data_in_progress -= chunk->useful_size; + + ret = aio_error(&chunk->resp_transfer); + if (ret == ECANCELED || transfer->ret) { + return; + } else if (ret < 0) { + transfer->ret = ret; + transfer->completed = 1; + return; + } + + ret = aio_return(&chunk->resp_transfer); + if (ret < chunk->resp_transfer.aio_nbytes) { + transfer->ret = -EIO; + transfer->completed = 1; + return; + } + + if (chunk->resp.cnt != chunk->chunk_number) { + fprintf(stderr, "chunk number mismatch: %d != %d\n", + chunk->resp.cnt, chunk->chunk_number); + fflush(stderr); + transfer->ret = -EINVAL; + transfer->completed = 1; + return; + } + + transfer->data_sent += chunk->useful_size; + transfer->data_left -= chunk->useful_size; + + if (transfer->report_progress) + transfer->report_progress(transfer->th, + transfer->data, + transfer->data_sent, + transfer->data_left, + chunk->chunk_number, + transfer->user_data); + + if (chunk->data_finished) + check_next_chunk(chunk, transfer); } static int thor_net_send_raw_data(thor_device_handle *th, @@ -164,54 +319,80 @@ static int thor_net_send_raw_data(thor_device_handle *th, void *user_data) { struct net_device_handle *nh = th->dev_priv; - unsigned char *chunk; - off_t data_left; - off_t size; - off_t data_sent = 0; - int chunk_number = 1; + struct t_thor_net_transfer transfer; + struct t_thor_net_chunk chunk; int ret; if (!nh) return -ENODEV; - chunk = malloc(trans_unit_size); - if (!chunk) + /* Init transfer */ + transfer.th = th; + transfer.data = data; + transfer.report_progress = report_progress; + transfer.user_data = user_data; + transfer.data_left = data->get_file_length(data); + transfer.data_sent = 0; + transfer.chunk_number = 1; + transfer.completed = 0; + transfer.data_in_progress = 0; + transfer.ret = 0; + + /* Init chunk */ + chunk.user_data = &transfer; + chunk.useful_size = 0; + chunk.trans_unit_size = trans_unit_size; + chunk.buf = malloc(trans_unit_size); + if (!chunk.buf) return -ENOMEM; - data_left = data->get_file_length(data); + chunk.data_transfer.aio_fildes = nh->sock_fd; + chunk.data_transfer.aio_buf = chunk.buf; + chunk.data_transfer.aio_nbytes = chunk.trans_unit_size; + chunk.data_transfer.aio_sigevent.sigev_notify = SIGEV_THREAD; + chunk.data_transfer.aio_sigevent.sigev_notify_function = data_finished; + chunk.data_transfer.aio_sigevent.sigev_notify_attributes = NULL; + chunk.data_transfer.aio_sigevent.sigev_value.sival_ptr = &chunk; + + chunk.resp_transfer.aio_fildes = nh->sock_fd; + chunk.resp_transfer.aio_buf = &chunk.resp; + chunk.resp_transfer.aio_nbytes = DATA_RES_PKT_SIZE; + chunk.resp_transfer.aio_sigevent.sigev_notify = SIGEV_THREAD; + chunk.resp_transfer.aio_sigevent.sigev_notify_function = resp_finished; + chunk.resp_transfer.aio_sigevent.sigev_notify_attributes = NULL; + chunk.resp_transfer.aio_sigevent.sigev_value.sival_ptr = &chunk; + + ret = t_thor_prep_next_chunk(&chunk, &transfer); + if (ret) + goto cancel_chunks; - while (data_left) { - size = data_left > trans_unit_size ? - trans_unit_size : data_left; + while (!transfer.completed) { + int err; - ret = data->get_block(data, chunk, size); - if (ret < 0 || ret != size) - goto cleanup; + err = aio_error(&chunk.data_transfer); + if (err < 0) + break; - memset(chunk + size, 0, trans_unit_size - size); - if (th) { - ret = t_thor_net_send_chunk(th, chunk, trans_unit_size, - chunk_number); - if (ret) - goto cleanup; - } + err = aio_error(&chunk.resp_transfer); + if (err < 0) + break; + } - data_sent += size; - data_left -= size; - ++chunk_number; - if (report_progress) - report_progress(NULL, data, data_sent, data_left, - chunk_number, user_data); + if (transfer.data_in_progress) { + ret = transfer.ret; + goto cancel_chunks; } - ret = 0; + free(chunk.buf); + return transfer.ret; -cleanup: - free(chunk); +cancel_chunks: + aio_cancel(nh->sock_fd, NULL); + + free(chunk.buf); return ret; } - static struct thor_transport_ops thor_net_ops = { .open = thor_net_open, .close = thor_net_close, |