summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xCMakeLists.txt2
-rw-r--r--libthor/thor_net.c267
2 files changed, 225 insertions, 44 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 4d67887..e4f315c 100755
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -66,7 +66,7 @@ ADD_LIBRARY(libthor ${LIBTHOR_SRCS})
ADD_EXECUTABLE(${PROJECT_NAME} ${SRCS})
-TARGET_LINK_LIBRARIES(${PROJECT_NAME} libthor ${pkgs_LDFLAGS})
+TARGET_LINK_LIBRARIES(${PROJECT_NAME} libthor rt ${pkgs_LDFLAGS})
INSTALL(TARGETS ${PROJECT_NAME} DESTINATION ${BINDIR})
diff --git a/libthor/thor_net.c b/libthor/thor_net.c
index 08d479f..a3e8a8f 100644
--- a/libthor/thor_net.c
+++ b/libthor/thor_net.c
@@ -4,6 +4,7 @@
#include <stdlib.h>
#include <unistd.h>
#include <sys/socket.h>
+#include <aio.h>
#include "thor.h"
#include "thor_internal.h"
@@ -135,26 +136,180 @@ static int thor_net_recv(thor_device_handle *th, unsigned char *buf,
return 0;
}
-static int t_thor_net_send_chunk(thor_device_handle *th, unsigned char *chunk,
- off_t size, int chunk_number)
-{
+struct t_thor_net_chunk {
+ struct net_device_handle *nh;
+ struct aiocb data_transfer;
+ struct aiocb resp_transfer;
+ void *user_data;
+ off_t useful_size;
struct data_res_pkt resp;
+ unsigned char *buf;
+ off_t trans_unit_size;
+ int chunk_number;
+ int data_finished;
+ int resp_finished;
+};
+
+struct t_thor_net_transfer {
+ struct thor_device_handle *th;
+ struct thor_data_src *data;
+ thor_progress_cb report_progress;
+ void *user_data;
+ off_t data_left;
+ off_t data_sent;
+ off_t data_in_progress;
+ int chunk_number;
+ int completed;
int ret;
+};
- ret = thor_net_send(th, chunk, size, 0);
- if (ret < 0)
- return ret;
+static int t_thor_submit_chunk(struct t_thor_net_chunk *chunk)
+{
+ struct t_thor_net_transfer *transfer = chunk->user_data;
+ int ret;
+
+ chunk->data_finished = chunk->resp_finished = 0;
+
+ ret = aio_write(&chunk->data_transfer);
+ if (ret)
+ goto out;
+
+ memset(&chunk->resp, 0, DATA_RES_PKT_SIZE);
+ ret = aio_read(&chunk->resp_transfer);
+ if (ret)
+ goto cancel_data_transfer;
+
+ return 0;
+cancel_data_transfer:
+ aio_cancel(chunk->nh->sock_fd, &chunk->data_transfer);
+out:
+ return ret;
+}
- memset(&resp, 0, DATA_RES_PKT_SIZE);
+static int t_thor_prep_next_chunk(struct t_thor_net_chunk *chunk,
+ struct t_thor_net_transfer *transfer)
+{
+ off_t to_read;
+ int ret;
+
+ to_read = transfer->data_left - transfer->data_in_progress;
+ if (to_read <= 0) {
+ printf("to big data in progress\n");
+ fflush(stdout);
+ return -EINVAL;
+ }
- ret = thor_net_recv(th, (unsigned char *)&resp, DATA_RES_PKT_SIZE, 0);
- if (ret < 0)
+ chunk->useful_size = to_read > chunk->trans_unit_size ?
+ chunk->trans_unit_size : to_read;
+
+ ret = transfer->data->get_block(transfer->data,
+ chunk->buf, chunk->useful_size);
+ if (ret < 0 || ret != chunk->useful_size)
return ret;
- if (resp.cnt != chunk_number)
- return -EIO;
+ memset(chunk->buf + chunk->useful_size, 0,
+ chunk->trans_unit_size - chunk->useful_size);
+ chunk->chunk_number = transfer->chunk_number++;
+
+ ret = t_thor_submit_chunk(chunk);
+ if (!ret)
+ transfer->data_in_progress += chunk->useful_size;
+
+ return ret;
+}
+
+static void check_next_chunk(struct t_thor_net_chunk *chunk,
+ struct t_thor_net_transfer *transfer)
+{
+ int ret;
- return resp.ack;
+ if (transfer->data_left - transfer->data_in_progress) {
+ ret = t_thor_prep_next_chunk(chunk, transfer);
+ if (ret) {
+ transfer->ret = ret;
+ transfer->completed = 1;
+ }
+ } else {
+ if (transfer->data_in_progress == 0)
+ transfer->completed = 1;
+ }
+}
+
+static void data_finished(sigval_t sigval)
+{
+ struct t_thor_net_chunk *chunk = sigval.sival_ptr;
+ struct t_thor_net_transfer *transfer = chunk->user_data;
+ int ret;
+
+ chunk->data_finished = 1;
+
+ ret = aio_error(&chunk->data_transfer);
+ if (ret == ECANCELED || transfer->ret) {
+ return;
+ } else if (ret < 0) {
+ transfer->ret = ret;
+ transfer->completed = 1;
+ return;
+ }
+
+ ret = aio_return(&chunk->data_transfer);
+ if (ret < chunk->data_transfer.aio_nbytes) {
+ transfer->ret = -EIO;
+ transfer->completed = 1;
+ return;
+ }
+
+ if (chunk->resp_finished)
+ check_next_chunk(chunk, transfer);
+}
+
+static void resp_finished(sigval_t sigval)
+{
+ struct t_thor_net_chunk *chunk = sigval.sival_ptr;
+ struct t_thor_net_transfer *transfer = chunk->user_data;
+ int ret;
+
+ chunk->resp_finished = 1;
+ transfer->data_in_progress -= chunk->useful_size;
+
+ ret = aio_error(&chunk->resp_transfer);
+ if (ret == ECANCELED || transfer->ret) {
+ return;
+ } else if (ret < 0) {
+ transfer->ret = ret;
+ transfer->completed = 1;
+ return;
+ }
+
+ ret = aio_return(&chunk->resp_transfer);
+ if (ret < chunk->resp_transfer.aio_nbytes) {
+ transfer->ret = -EIO;
+ transfer->completed = 1;
+ return;
+ }
+
+ if (chunk->resp.cnt != chunk->chunk_number) {
+ fprintf(stderr, "chunk number mismatch: %d != %d\n",
+ chunk->resp.cnt, chunk->chunk_number);
+ fflush(stderr);
+ transfer->ret = -EINVAL;
+ transfer->completed = 1;
+ return;
+ }
+
+ transfer->data_sent += chunk->useful_size;
+ transfer->data_left -= chunk->useful_size;
+
+ if (transfer->report_progress)
+ transfer->report_progress(transfer->th,
+ transfer->data,
+ transfer->data_sent,
+ transfer->data_left,
+ chunk->chunk_number,
+ transfer->user_data);
+
+ if (chunk->data_finished)
+ check_next_chunk(chunk, transfer);
}
static int thor_net_send_raw_data(thor_device_handle *th,
@@ -164,54 +319,80 @@ static int thor_net_send_raw_data(thor_device_handle *th,
void *user_data)
{
struct net_device_handle *nh = th->dev_priv;
- unsigned char *chunk;
- off_t data_left;
- off_t size;
- off_t data_sent = 0;
- int chunk_number = 1;
+ struct t_thor_net_transfer transfer;
+ struct t_thor_net_chunk chunk;
int ret;
if (!nh)
return -ENODEV;
- chunk = malloc(trans_unit_size);
- if (!chunk)
+ /* Init transfer */
+ transfer.th = th;
+ transfer.data = data;
+ transfer.report_progress = report_progress;
+ transfer.user_data = user_data;
+ transfer.data_left = data->get_file_length(data);
+ transfer.data_sent = 0;
+ transfer.chunk_number = 1;
+ transfer.completed = 0;
+ transfer.data_in_progress = 0;
+ transfer.ret = 0;
+
+ /* Init chunk */
+ chunk.user_data = &transfer;
+ chunk.useful_size = 0;
+ chunk.trans_unit_size = trans_unit_size;
+ chunk.buf = malloc(trans_unit_size);
+ if (!chunk.buf)
return -ENOMEM;
- data_left = data->get_file_length(data);
+ chunk.data_transfer.aio_fildes = nh->sock_fd;
+ chunk.data_transfer.aio_buf = chunk.buf;
+ chunk.data_transfer.aio_nbytes = chunk.trans_unit_size;
+ chunk.data_transfer.aio_sigevent.sigev_notify = SIGEV_THREAD;
+ chunk.data_transfer.aio_sigevent.sigev_notify_function = data_finished;
+ chunk.data_transfer.aio_sigevent.sigev_notify_attributes = NULL;
+ chunk.data_transfer.aio_sigevent.sigev_value.sival_ptr = &chunk;
+
+ chunk.resp_transfer.aio_fildes = nh->sock_fd;
+ chunk.resp_transfer.aio_buf = &chunk.resp;
+ chunk.resp_transfer.aio_nbytes = DATA_RES_PKT_SIZE;
+ chunk.resp_transfer.aio_sigevent.sigev_notify = SIGEV_THREAD;
+ chunk.resp_transfer.aio_sigevent.sigev_notify_function = resp_finished;
+ chunk.resp_transfer.aio_sigevent.sigev_notify_attributes = NULL;
+ chunk.resp_transfer.aio_sigevent.sigev_value.sival_ptr = &chunk;
+
+ ret = t_thor_prep_next_chunk(&chunk, &transfer);
+ if (ret)
+ goto cancel_chunks;
- while (data_left) {
- size = data_left > trans_unit_size ?
- trans_unit_size : data_left;
+ while (!transfer.completed) {
+ int err;
- ret = data->get_block(data, chunk, size);
- if (ret < 0 || ret != size)
- goto cleanup;
+ err = aio_error(&chunk.data_transfer);
+ if (err < 0)
+ break;
- memset(chunk + size, 0, trans_unit_size - size);
- if (th) {
- ret = t_thor_net_send_chunk(th, chunk, trans_unit_size,
- chunk_number);
- if (ret)
- goto cleanup;
- }
+ err = aio_error(&chunk.resp_transfer);
+ if (err < 0)
+ break;
+ }
- data_sent += size;
- data_left -= size;
- ++chunk_number;
- if (report_progress)
- report_progress(NULL, data, data_sent, data_left,
- chunk_number, user_data);
+ if (transfer.data_in_progress) {
+ ret = transfer.ret;
+ goto cancel_chunks;
}
- ret = 0;
+ free(chunk.buf);
+ return transfer.ret;
-cleanup:
- free(chunk);
+cancel_chunks:
+ aio_cancel(nh->sock_fd, NULL);
+
+ free(chunk.buf);
return ret;
}
-
static struct thor_transport_ops thor_net_ops = {
.open = thor_net_open,
.close = thor_net_close,