#include #include #include #include #include #include #include #include "thor.h" #include "thor_internal.h" #include "thor_transport.h" #define INVALID_SOCKFD (-1) static int thor_net_connect_device(struct net_device_handle *nh, struct net_device_id *dev_id, int wait) { struct sockaddr_in server; int s; int ret; reconnect: s = socket(AF_INET, SOCK_STREAM, 0); if (s < 0) return -EIO; memset(&server, 0, sizeof(server)); ret = inet_pton(AF_INET, dev_id->ip_addr, &server.sin_addr); if (ret <= 0) { fprintf(stderr, "IP addr is not valid:%s\n", dev_id->ip_addr); close(s); return -EINVAL; } server.sin_family = AF_INET; server.sin_port = htons(dev_id->port); if (connect(s, (struct sockaddr *)&server, sizeof(server)) < 0) { if (!wait) { close(s); return -ENODEV; } sleep(1); close(s); goto reconnect; } nh->sock_fd = s; return 0; } static int thor_net_open(thor_device_handle *th, struct thor_device_id *user_dev_id, int wait) { struct net_device_handle *nh; struct net_device_id *dev_id = &user_dev_id->net_dev; int ret; if (!dev_id->ip_addr || !dev_id->port) { fprintf(stderr, "net mode requires --ip-addr and --tcp-port\n"); return -EINVAL; } nh = calloc(sizeof(*nh), 1); if (!nh) return -ENOMEM; nh->sock_fd = INVALID_SOCKFD; ret = thor_net_connect_device(nh, dev_id, wait); if (ret) goto close_dev; th->dev_priv = nh; return 0; close_dev: th->ops->close(th); return ret; } static void thor_net_close(thor_device_handle *th) { struct net_device_handle *nh = th->dev_priv; if (!nh) return; if (nh->sock_fd >= 0) close(nh->sock_fd); free(nh); th->dev_priv = NULL; } static int thor_net_send(thor_device_handle *th, unsigned char *buf, off_t count, int timeout) { struct net_device_handle *nh = th->dev_priv; ssize_t transferred; if (!nh) return -ENODEV; transferred = send(nh->sock_fd, buf, count, 0); if (transferred < 0) return transferred; if (transferred < (ssize_t)count) return -EIO; return 0; } static int thor_net_recv(thor_device_handle *th, unsigned char *buf, off_t count, int timeout) { struct net_device_handle *nh = th->dev_priv; ssize_t transferred; if (!nh) return -ENODEV; transferred = recv(nh->sock_fd, buf, count, MSG_WAITALL); if (transferred < 0) return transferred; if (transferred < (ssize_t)count) return -EIO; return 0; } struct t_thor_net_chunk { struct net_device_handle *nh; struct aiocb data_transfer; struct aiocb resp_transfer; void *user_data; off_t useful_size; struct data_res_pkt resp; unsigned char *buf; off_t trans_unit_size; int chunk_number; int data_finished; int resp_finished; }; struct t_thor_net_transfer { struct thor_device_handle *th; struct thor_data_src *data; thor_progress_cb report_progress; void *user_data; off_t data_left; off_t data_sent; off_t data_in_progress; int chunk_number; int completed; int ret; }; static int t_thor_submit_chunk(struct t_thor_net_chunk *chunk) { struct t_thor_net_transfer *transfer = chunk->user_data; int ret; chunk->data_finished = chunk->resp_finished = 0; ret = aio_write(&chunk->data_transfer); if (ret) goto out; memset(&chunk->resp, 0, DATA_RES_PKT_SIZE); ret = aio_read(&chunk->resp_transfer); if (ret) goto cancel_data_transfer; return 0; cancel_data_transfer: aio_cancel(chunk->nh->sock_fd, &chunk->data_transfer); out: return ret; } static int t_thor_prep_next_chunk(struct t_thor_net_chunk *chunk, struct t_thor_net_transfer *transfer) { off_t to_read; int ret; to_read = transfer->data_left - transfer->data_in_progress; if (to_read <= 0) { printf("to big data in progress\n"); fflush(stdout); return -EINVAL; } chunk->useful_size = to_read > chunk->trans_unit_size ? chunk->trans_unit_size : to_read; ret = transfer->data->get_block(transfer->data, chunk->buf, chunk->useful_size); if (ret < 0 || ret != chunk->useful_size) return ret; memset(chunk->buf + chunk->useful_size, 0, chunk->trans_unit_size - chunk->useful_size); chunk->chunk_number = transfer->chunk_number++; ret = t_thor_submit_chunk(chunk); if (!ret) transfer->data_in_progress += chunk->useful_size; return ret; } static void check_next_chunk(struct t_thor_net_chunk *chunk, struct t_thor_net_transfer *transfer) { int ret; if (transfer->data_left - transfer->data_in_progress) { ret = t_thor_prep_next_chunk(chunk, transfer); if (ret) { transfer->ret = ret; transfer->completed = 1; } } else { if (transfer->data_in_progress == 0) transfer->completed = 1; } } static void data_finished(sigval_t sigval) { struct t_thor_net_chunk *chunk = sigval.sival_ptr; struct t_thor_net_transfer *transfer = chunk->user_data; int ret; chunk->data_finished = 1; ret = aio_error(&chunk->data_transfer); if (ret == ECANCELED || transfer->ret) { return; } else if (ret < 0) { transfer->ret = ret; transfer->completed = 1; return; } ret = aio_return(&chunk->data_transfer); if (ret < chunk->data_transfer.aio_nbytes) { transfer->ret = -EIO; transfer->completed = 1; return; } if (chunk->resp_finished) check_next_chunk(chunk, transfer); } static void resp_finished(sigval_t sigval) { struct t_thor_net_chunk *chunk = sigval.sival_ptr; struct t_thor_net_transfer *transfer = chunk->user_data; int ret; chunk->resp_finished = 1; transfer->data_in_progress -= chunk->useful_size; ret = aio_error(&chunk->resp_transfer); if (ret == ECANCELED || transfer->ret) { return; } else if (ret < 0) { transfer->ret = ret; transfer->completed = 1; return; } ret = aio_return(&chunk->resp_transfer); if (ret < chunk->resp_transfer.aio_nbytes) { transfer->ret = -EIO; transfer->completed = 1; return; } if (chunk->resp.cnt != chunk->chunk_number) { fprintf(stderr, "chunk number mismatch: %d != %d\n", chunk->resp.cnt, chunk->chunk_number); fflush(stderr); transfer->ret = -EINVAL; transfer->completed = 1; return; } transfer->data_sent += chunk->useful_size; transfer->data_left -= chunk->useful_size; if (transfer->report_progress) transfer->report_progress(transfer->th, transfer->data, transfer->data_sent, transfer->data_left, chunk->chunk_number, transfer->user_data); if (chunk->data_finished) check_next_chunk(chunk, transfer); } static int thor_net_send_raw_data(thor_device_handle *th, struct thor_data_src *data, off_t trans_unit_size, thor_progress_cb report_progress, void *user_data) { struct net_device_handle *nh = th->dev_priv; struct t_thor_net_transfer transfer; struct t_thor_net_chunk chunk; int ret; if (!nh) return -ENODEV; /* Init transfer */ transfer.th = th; transfer.data = data; transfer.report_progress = report_progress; transfer.user_data = user_data; transfer.data_left = data->get_file_length(data); transfer.data_sent = 0; transfer.chunk_number = 1; transfer.completed = 0; transfer.data_in_progress = 0; transfer.ret = 0; /* Init chunk */ memset(&chunk, 0, sizeof(struct t_thor_net_chunk)); chunk.nh = nh; chunk.user_data = &transfer; chunk.useful_size = 0; chunk.trans_unit_size = trans_unit_size; chunk.buf = malloc(trans_unit_size); if (!chunk.buf) return -ENOMEM; chunk.data_transfer.aio_fildes = nh->sock_fd; chunk.data_transfer.aio_buf = chunk.buf; chunk.data_transfer.aio_nbytes = chunk.trans_unit_size; chunk.data_transfer.aio_sigevent.sigev_notify = SIGEV_THREAD; chunk.data_transfer.aio_sigevent.sigev_notify_function = data_finished; chunk.data_transfer.aio_sigevent.sigev_notify_attributes = NULL; chunk.data_transfer.aio_sigevent.sigev_value.sival_ptr = &chunk; chunk.resp_transfer.aio_fildes = nh->sock_fd; chunk.resp_transfer.aio_buf = &chunk.resp; chunk.resp_transfer.aio_nbytes = DATA_RES_PKT_SIZE; chunk.resp_transfer.aio_sigevent.sigev_notify = SIGEV_THREAD; chunk.resp_transfer.aio_sigevent.sigev_notify_function = resp_finished; chunk.resp_transfer.aio_sigevent.sigev_notify_attributes = NULL; chunk.resp_transfer.aio_sigevent.sigev_value.sival_ptr = &chunk; ret = t_thor_prep_next_chunk(&chunk, &transfer); if (ret) goto cancel_chunks; while (!transfer.completed) { int err; err = aio_error(&chunk.data_transfer); if (err < 0) break; err = aio_error(&chunk.resp_transfer); if (err < 0) break; } if (transfer.data_in_progress) { ret = transfer.ret; goto cancel_chunks; } free(chunk.buf); return transfer.ret; cancel_chunks: aio_cancel(nh->sock_fd, NULL); free(chunk.buf); return ret; } static struct thor_transport_ops thor_net_ops = { .open = thor_net_open, .close = thor_net_close, .send = thor_net_send, .recv = thor_net_recv, .send_data = thor_net_send_raw_data, }; int thor_net_init(thor_device_handle *th) { th->ops = &thor_net_ops; return 0; } void thor_net_cleanup(thor_device_handle *th) { return; }