summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--deps/evcom/evcom.c1233
-rw-r--r--deps/evcom/evcom.h127
-rw-r--r--deps/evcom/recv_states.dot37
-rw-r--r--deps/evcom/send_states.dot65
-rw-r--r--deps/evcom/test/echo.c6
-rw-r--r--deps/evcom/test/test.c478
-rw-r--r--src/http.js2
-rw-r--r--src/net.cc32
-rw-r--r--src/net.h7
-rw-r--r--test/mjsunit/test-tcp-many-clients.js3
-rw-r--r--test/mjsunit/test-tcp-reconnect.js2
-rw-r--r--test/mjsunit/test-tcp-throttle-kernel-buffer.js2
-rw-r--r--test/mjsunit/test-tcp-throttle.js2
13 files changed, 1456 insertions, 540 deletions
diff --git a/deps/evcom/evcom.c b/deps/evcom/evcom.c
index 6fe862e73..febed43a3 100644
--- a/deps/evcom/evcom.c
+++ b/deps/evcom/evcom.c
@@ -1,6 +1,6 @@
/* Copyright (c) 2008,2009 Ryan Dahl
*
- * evcom_queue comes from ngx_queue.h
+ * evcom_queue comes from ngx_queue.h
* Copyright (C) 2002-2009 Igor Sysoev
*
* Redistribution and use in source and binary forms, with or without
@@ -36,20 +36,45 @@
#include <sys/types.h>
#include <sys/socket.h> /* shutdown */
#include <sys/un.h>
+#include <netinet/in.h> /* sockaddr_in, sockaddr_in6 */
#include <ev.h>
#include <evcom.h>
#if EV_MULTIPLICITY
# define D_LOOP_(d) (d)->loop,
+# define D_LOOP_SET(d, _loop) do { (d)->loop = (_loop); } while (0)
#else
# define D_LOOP_(d)
+# define D_LOOP_SET(d, _loop)
#endif // EV_MULTIPLICITY
+
+/* SEND STATES */
+static int stream_send__wait_for_connection (evcom_stream*);
+static int stream_send__data (evcom_stream*);
+static int stream_send__drain (evcom_stream*);
+static int stream_send__wait_for_eof (evcom_stream*);
+static int stream_send__wait_for_buf (evcom_stream*);
+static int stream_send__shutdown (evcom_stream*);
+#if EVCOM_HAVE_GNUTLS
+static int stream_send__gnutls_bye (evcom_stream*);
+#endif
+static int stream_send__close_one (evcom_stream*);
+static int stream_send__close (evcom_stream*);
+
+/* RECV STATES */
+static int stream_recv__data (evcom_stream*);
+static int stream_recv__wait_for_resume (evcom_stream*);
+static int stream_recv__wait_for_close (evcom_stream*);
+static int stream_recv__close_one (evcom_stream*);
+static int stream_recv__close (evcom_stream*);
+
+/* COMMON STATES */
#if EVCOM_HAVE_GNUTLS
-static int secure_hangup (evcom_descriptor *);
+static int stream__handshake (evcom_stream*);
#endif
-static int recv_send (evcom_descriptor *);
+static int stream__close_both (evcom_stream*);
#undef TRUE
#define TRUE 1
@@ -60,18 +85,16 @@ static int recv_send (evcom_descriptor *);
#define OKAY 0
#define AGAIN 1
-#define ERROR 2
#define ATTACHED(s) ((s)->flags & EVCOM_ATTACHED)
#define LISTENING(s) ((s)->flags & EVCOM_LISTENING)
#define CONNECTED(s) ((s)->flags & EVCOM_CONNECTED)
#define SECURE(s) ((s)->flags & EVCOM_SECURE)
-#define GOT_HALF_CLOSE(s) ((s)->flags & EVCOM_GOT_HALF_CLOSE)
-#define GOT_FULL_CLOSE(s) ((s)->flags & EVCOM_GOT_FULL_CLOSE)
+#define DUPLEX(s) ((s)->flags & EVCOM_DUPLEX)
+#define GOT_CLOSE(s) ((s)->flags & EVCOM_GOT_CLOSE)
#define PAUSED(s) ((s)->flags & EVCOM_PAUSED)
#define READABLE(s) ((s)->flags & EVCOM_READABLE)
#define WRITABLE(s) ((s)->flags & EVCOM_WRITABLE)
-#define GOT_WRITE_EVENT(s) ((s)->flags & EVCOM_GOT_WRITE_EVENT)
static int too_many_connections = 0;
@@ -87,33 +110,26 @@ set_nonblock (int fd)
return 0;
}
-void
-evcom_buf_destroy (evcom_buf *buf)
-{
- free(buf->base);
- free(buf);
-}
-
evcom_buf *
evcom_buf_new2 (size_t len)
{
- evcom_buf *buf = malloc(sizeof(evcom_buf));
- if (!buf) return NULL;
- buf->base = malloc(len);
- if (!buf->base) {
- free(buf);
- return NULL;
- }
+ void *data = malloc(sizeof(evcom_buf) + len);
+ if (!data) return NULL;
+
+ evcom_buf *buf = data;
buf->len = len;
- buf->release = evcom_buf_destroy;
+ buf->release = (void (*)(evcom_buf*))free;
+ buf->base = data + sizeof(evcom_buf);
+
return buf;
}
evcom_buf *
evcom_buf_new (const char *base, size_t len)
{
- evcom_buf *buf = evcom_buf_new2(len);
+ evcom_buf* buf = evcom_buf_new2(len);
if (!buf) return NULL;
+
memcpy(buf->base, base, len);
return buf;
@@ -124,12 +140,6 @@ close_asap (evcom_descriptor *d)
{
if (d->fd < 0) return OKAY;
- /* In any case we need to feed an event in order
- * to get the on_close callback. In the case of EINTR
- * we need an event so that we can call close() again.
- */
- ev_feed_fd_event(D_LOOP_(d) d->fd, EV_READ);
-
int r = close(d->fd);
if (r < 0) {
@@ -146,102 +156,171 @@ close_asap (evcom_descriptor *d)
return OKAY;
}
-
+
+#define release_write_buffer(writer) \
+do { \
+ while (!evcom_queue_empty(&(writer)->out)) { \
+ evcom_queue *q = evcom_queue_last(&(writer)->out); \
+ evcom_buf *buf = evcom_queue_data(q, evcom_buf, queue); \
+ evcom_queue_remove(q); \
+ if (buf->release) buf->release(buf); \
+ } \
+} while (0)
+
+static int
+close_writer_asap (evcom_writer *writer)
+{
+ release_write_buffer(writer);
+ ev_feed_event(D_LOOP_(writer) &writer->write_watcher, EV_WRITE);
+ return close_asap((evcom_descriptor*)writer);
+}
+
static inline void
-release_write_buffer(evcom_stream *stream)
+evcom_perror (const char *msg, int errorno)
{
- while (!evcom_queue_empty(&stream->out)) {
- evcom_queue *q = evcom_queue_last(&stream->out);
- evcom_buf *buf = evcom_queue_data(q, evcom_buf, queue);
- evcom_queue_remove(q);
- if (buf->release) buf->release(buf);
- }
+ fprintf(stderr, "(evcom) %s %s\n", msg, strerror(errorno));
}
static int
-close_stream_asap (evcom_stream *stream)
+stream_send__wait_for_buf (evcom_stream *stream)
{
- release_write_buffer(stream); // needed?
-
- if (too_many_connections && stream->server) {
-#if EV_MULTIPLICITY
- struct ev_loop *loop = stream->server->loop;
-#endif
- evcom_server_attach(EV_A_ stream->server);
+ if (evcom_queue_empty(&stream->out)) {
+ if (GOT_CLOSE(stream)) {
+ stream->send_action = stream_send__drain;
+ return OKAY;
+ }
+ ev_io_stop(D_LOOP_(stream) &stream->write_watcher);
+ return AGAIN;
}
- too_many_connections = 0;
-
- int r = close_asap((evcom_descriptor*)stream);
- if (r == AGAIN) return AGAIN;
- evcom_stream_detach(stream);
+ stream->send_action = stream_send__data;
return OKAY;
}
static inline void
-evcom_perror (const char *msg, int errorno)
+stream__set_recv_closed (evcom_stream *stream)
{
- fprintf(stderr, "(evcom) %s %s\n", msg, strerror(errorno));
+ stream->flags &= ~EVCOM_READABLE;
+ stream->recvfd = -1;
+ stream->recv_action = NULL;
+ ev_io_stop(D_LOOP_(stream) &stream->read_watcher);
}
-// This is to be called when ever the out is empty
-// and we need to change state.
static inline void
-change_state_for_empty_out (evcom_stream *stream)
+stream__set_send_closed (evcom_stream *stream)
{
- if (GOT_FULL_CLOSE(stream)) {
-#if EVCOM_HAVE_GNUTLS
- if (SECURE(stream) && READABLE(stream) && WRITABLE(stream)) {
- secure_hangup((evcom_descriptor*)stream);
- } else
-#endif
- {
- close_stream_asap(stream);
- }
- return;
- }
+ release_write_buffer(stream);
+ stream->flags &= ~EVCOM_WRITABLE;
+ stream->sendfd = -1;
+ stream->send_action = NULL;
+ ev_io_stop(D_LOOP_(stream) &stream->write_watcher);
+}
- if (GOT_HALF_CLOSE(stream)) {
- if (WRITABLE(stream)) {
- stream->action = recv_send;
- recv_send((evcom_descriptor*)stream);
- } else {
- close_stream_asap(stream);
- }
- return;
- }
+static int
+stream_send__close_one (evcom_stream *stream)
+{
+ assert(stream->sendfd >= 0);
- if (ATTACHED(stream)) {
- ev_io_stop(D_LOOP_(stream) &stream->write_watcher);
- }
+ close(stream->sendfd);
+
+ /* TODO recover from EINTR */
+
+ stream__set_send_closed(stream);
+ if (DUPLEX(stream)) stream__set_recv_closed(stream);
+
+ return OKAY;
}
-static inline void
-update_write_buffer_after_send (evcom_stream *stream, ssize_t sent)
+static int
+stream__close_both (evcom_stream *stream)
{
- evcom_queue *q = evcom_queue_last(&stream->out);
- evcom_buf *buf = evcom_queue_data(q, evcom_buf, queue);
- buf->written += sent;
+ assert(stream->sendfd != stream->recvfd);
- if (buf->written == buf->len) {
- evcom_queue_remove(q);
+ assert(stream->sendfd >= 0);
+ assert(stream->recvfd >= 0);
- if (buf->release) buf->release(buf);
+ close(stream->recvfd);
+ close(stream->sendfd);
- if (evcom_queue_empty(&stream->out)) {
- change_state_for_empty_out(stream);
- }
- }
+ /* TODO recover from EINTR */
+
+ stream__set_send_closed(stream);
+ stream__set_recv_closed(stream);
+
+ return OKAY;
+}
+
+static int
+stream_send__close (evcom_stream *stream)
+{
+ stream->send_action = DUPLEX(stream) ?
+ stream_send__close_one : stream__close_both;
+ return OKAY;
+}
+
+static int
+stream_recv__close_one (evcom_stream *stream)
+{
+ assert(stream->recvfd >= 0);
+
+ close(stream->recvfd);
+
+ /* TODO recover from EINTR */
+
+ stream__set_recv_closed(stream);
+ if (DUPLEX(stream)) stream__set_send_closed(stream);
+
+ return OKAY;
}
+static int
+stream_recv__close (evcom_stream *stream)
+{
+ stream->recv_action = DUPLEX(stream) ?
+ stream_recv__close_one : stream__close_both;
+ return OKAY;
+}
+
+static int
+stream_send__drain (evcom_stream *stream)
+{
+ if (!GOT_CLOSE(stream)) {
+ stream->send_action = stream_send__wait_for_buf;
+ return OKAY;
+ }
+
#if EVCOM_HAVE_GNUTLS
-/* TODO can this be done without ignoring SIGPIPE? */
-static ssize_t
-nosigpipe_push (gnutls_transport_ptr_t data, const void *buf, size_t len)
+ if (SECURE(stream)) {
+ stream->send_action = stream_send__gnutls_bye;
+ return OKAY;
+ }
+#endif
+
+ if (DUPLEX(stream)) {
+ stream->send_action = stream_send__shutdown;
+ return OKAY;
+ }
+
+ stream->send_action = stream_send__close_one;
+ return OKAY;
+}
+
+static int
+stream_send__wait_for_eof (evcom_stream *stream)
{
- evcom_stream *stream = (evcom_stream*)data;
- assert(SECURE(stream));
+ if (READABLE(stream)) {
+ ev_io_stop(D_LOOP_(stream) &stream->write_watcher);
+ assert(stream->send_action == stream_send__wait_for_eof);
+ return AGAIN;
+ }
+
+ stream->send_action = stream_send__close_one;
+ return OKAY;
+}
+static inline ssize_t
+nosigpipe_send (int fd, const void *buf, size_t len)
+{
int flags = 0;
#ifdef MSG_NOSIGNAL
flags |= MSG_NOSIGNAL;
@@ -249,73 +328,99 @@ nosigpipe_push (gnutls_transport_ptr_t data, const void *buf, size_t len)
#ifdef MSG_DONTWAIT
flags |= MSG_DONTWAIT;
#endif
- ssize_t r = send(stream->fd, buf, len, flags);
+ return send(fd, buf, len, flags);
+}
+
+static inline ssize_t
+nosigpipe_stream_send (evcom_stream *stream, const void *buf, size_t len)
+{
+ return write(stream->sendfd, buf, len);
+}
- return r;
+#if EVCOM_HAVE_GNUTLS
+static ssize_t
+nosigpipe_push (gnutls_transport_ptr_t data, const void *buf, size_t len)
+{
+ evcom_stream *stream = (evcom_stream*)data;
+ assert(SECURE(stream));
+
+ return nosigpipe_stream_send(stream, buf, len);
}
-#define SET_DIRECTION(stream) \
-do { \
- if (0 == gnutls_record_get_direction((stream)->session)) { \
- ev_io_stop(D_LOOP_(stream) &(stream)->write_watcher); \
- ev_io_start(D_LOOP_(stream) &(stream)->read_watcher); \
- } else { \
- ev_io_stop(D_LOOP_(stream) &(stream)->read_watcher); \
- ev_io_start(D_LOOP_(stream) &(stream)->write_watcher); \
- } \
-} while (0)
+static ssize_t
+pull (gnutls_transport_ptr_t data, void* buf, size_t len)
+{
+ evcom_stream *stream = (evcom_stream*)data;
+ assert(SECURE(stream));
+
+ return read(stream->recvfd, buf, len);
+}
static int
-secure_handshake (evcom_descriptor *d)
+stream__handshake (evcom_stream *stream)
{
- evcom_stream *stream = (evcom_stream*) d;
-
assert(SECURE(stream));
int r = gnutls_handshake(stream->session);
if (gnutls_error_is_fatal(r)) {
stream->gnutls_errorno = r;
- return close_stream_asap(stream);
+ stream->send_action = stream_send__close;
+ stream->recv_action = stream_recv__close;
+ return OKAY;
}
+ evcom_stream_reset_timeout(stream);
+
if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) {
- SET_DIRECTION(stream);
- stream->action = secure_handshake;
+ if (0 == gnutls_record_get_direction((stream)->session)) {
+ ev_io_start(D_LOOP_(stream) &(stream)->read_watcher);
+ ev_io_stop(D_LOOP_(stream) &(stream)->write_watcher);
+ } else {
+ ev_io_stop(D_LOOP_(stream) &(stream)->read_watcher);
+ ev_io_start(D_LOOP_(stream) &(stream)->write_watcher);
+ }
+ assert(stream->recv_action == stream__handshake);
+ assert(stream->send_action == stream__handshake);
return AGAIN;
}
- stream->action = recv_send;
-
assert(!CONNECTED(stream));
stream->flags |= EVCOM_CONNECTED;
if (stream->on_connect) stream->on_connect(stream);
- evcom_stream_reset_timeout(stream);
+ ev_io_start(D_LOOP_(stream) &stream->read_watcher);
+ ev_io_start(D_LOOP_(stream) &stream->write_watcher);
- return recv_send((evcom_descriptor*)stream);
+ stream->send_action = stream_send__data;
+ stream->recv_action = stream_recv__data;
+
+ return OKAY;
}
static int
-secure_hangup (evcom_descriptor *d)
+stream_send__gnutls_bye (evcom_stream *stream)
{
- evcom_stream *stream = (evcom_stream*)d;
-
assert(SECURE(stream));
- int r = gnutls_bye(stream->session, GNUTLS_SHUT_RDWR);
+ int r = gnutls_bye(stream->session, GNUTLS_SHUT_WR);
if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) {
- SET_DIRECTION(stream);
- stream->action = secure_hangup;
+ assert(1 == gnutls_record_get_direction((stream)->session));
+ assert(stream->send_action == stream_send__gnutls_bye);
return AGAIN;
}
if (gnutls_error_is_fatal(r)) {
stream->gnutls_errorno = r;
+ stream->send_action = stream_send__close;
+ return OKAY;
}
- return close_stream_asap(stream);
+ stream->flags &= ~EVCOM_WRITABLE;
+
+ stream->send_action = stream_send__wait_for_eof;
+ return OKAY;
}
void
@@ -326,58 +431,92 @@ evcom_stream_set_secure_session (evcom_stream *stream, gnutls_session_t session)
}
#endif /* HAVE GNUTLS */
-static inline int
-recv_data (evcom_stream *stream)
+static int
+stream_recv__wait_for_close (evcom_stream *stream)
+{
+ assert(!READABLE(stream));
+
+ if (!WRITABLE(stream)) {
+ stream->recv_action = stream_recv__close;
+ return OKAY;
+ }
+
+ ev_io_stop(D_LOOP_(stream) &stream->read_watcher);
+ return AGAIN;
+}
+
+static int
+stream_recv__wait_for_resume (evcom_stream *stream)
+{
+ stream->flags |= EVCOM_PAUSED;
+ ev_io_stop(D_LOOP_(stream) &stream->read_watcher);
+ assert(stream->recv_action == stream_recv__wait_for_resume);
+ return AGAIN;
+}
+
+static int
+stream_recv__data (evcom_stream *stream)
{
char buf[EVCOM_CHUNKSIZE];
size_t buf_size = EVCOM_CHUNKSIZE;
ssize_t recved;
- while (stream->fd >= 0) {
- assert(READABLE(stream));
+ while (READABLE(stream)) {
+ assert(CONNECTED(stream));
if (PAUSED(stream)) {
- ev_io_stop(D_LOOP_(stream) &stream->read_watcher);
- return AGAIN;
+ stream->recv_action = stream_recv__wait_for_resume;
+ return OKAY;
}
- if (!SECURE(stream)) {
- recved = recv(stream->fd, buf, buf_size, 0);
- }
#if EVCOM_HAVE_GNUTLS
- else {
+ if (SECURE(stream)) {
recved = gnutls_record_recv(stream->session, buf, buf_size);
if (gnutls_error_is_fatal(recved)) {
stream->gnutls_errorno = recved;
- return close_stream_asap(stream);
+ stream->recv_action = stream_recv__close;
+ return OKAY;
}
if (recved == GNUTLS_E_INTERRUPTED || recved == GNUTLS_E_AGAIN) {
- SET_DIRECTION(stream);
+ if (1 == gnutls_record_get_direction((stream)->session)) {
+ fprintf(stderr, "(evcom) gnutls recv: unexpected switch direction!\n");
+ ev_io_stop(D_LOOP_(stream) &(stream)->read_watcher);
+ ev_io_start(D_LOOP_(stream) &(stream)->write_watcher);
+ }
return AGAIN;
}
/* A server may also receive GNUTLS_E_REHANDSHAKE when a client has
- * initiated a handshake. In that case the server can only initiate a
+ * initiated a andshake. In that case the server can only initiate a
* handshake or terminate the connection. */
if (recved == GNUTLS_E_REHANDSHAKE) {
- if (READABLE(stream) && WRITABLE(stream)) {
- stream->action = secure_handshake;
- return OKAY;
- } else {
- stream->gnutls_errorno = GNUTLS_E_REHANDSHAKE;
- return close_stream_asap(stream);
- }
+ assert(WRITABLE(stream));
+ stream->recv_action = stream__handshake;
+ stream->send_action = stream__handshake;
+ return OKAY;
}
- }
+ } else
#endif /* EVCOM_HAVE_GNUTLS */
+ {
+ recved = read(stream->recvfd, buf, buf_size);
+ }
if (recved < 0) {
- if (errno == EAGAIN || errno == EINTR) return AGAIN;
+ if (errno == EAGAIN || errno == EINTR) {
+ assert(stream->recv_action == stream_recv__data);
+ return AGAIN;
+ }
+
+ if (errno != ECONNRESET) {
+ evcom_perror("recv()", stream->errorno);
+ }
+
stream->errorno = errno;
- return close_stream_asap(stream);
+ stream->recv_action = stream_recv__close;
+ return OKAY;
}
evcom_stream_reset_timeout(stream);
@@ -393,7 +532,7 @@ recv_data (evcom_stream *stream)
if (stream->on_read) stream->on_read(stream, buf, recved);
if (recved == 0) {
- if (!WRITABLE(stream)) return close_stream_asap(stream);
+ stream->recv_action = stream_recv__wait_for_close;
return OKAY;
}
}
@@ -401,11 +540,11 @@ recv_data (evcom_stream *stream)
}
static int
-send_data (evcom_stream *stream)
+stream_send__data (evcom_stream *stream)
{
ssize_t sent;
- while (stream->fd >= 0 && !evcom_queue_empty(&stream->out)) {
+ while (!evcom_queue_empty(&stream->out)) {
assert(WRITABLE(stream));
evcom_queue *q = evcom_queue_last(&stream->out);
@@ -414,202 +553,176 @@ send_data (evcom_stream *stream)
#if EVCOM_HAVE_GNUTLS
if (SECURE(stream)) {
sent = gnutls_record_send(stream->session,
- buf->base + buf->written,
- buf->len - buf->written);
+ buf->base + buf->written,
+ buf->len - buf->written);
+
+ if (sent == GNUTLS_E_INTERRUPTED || sent == GNUTLS_E_AGAIN) {
+ if (0 == gnutls_record_get_direction((stream)->session)) {
+ fprintf(stderr, "(evcom) gnutls send: unexpected switch direction!\n");
+ ev_io_start(D_LOOP_(stream) &(stream)->read_watcher);
+ ev_io_stop(D_LOOP_(stream) &(stream)->write_watcher);
+ }
+ return AGAIN;
+ }
if (gnutls_error_is_fatal(sent)) {
stream->gnutls_errorno = sent;
- return close_stream_asap(stream);
+ stream->send_action = stream_send__close;
+ return OKAY;
}
} else
#endif // EVCOM_HAVE_GNUTLS
{
-
- int flags = 0;
-#ifdef MSG_NOSIGNAL
- flags |= MSG_NOSIGNAL;
-#endif
-#ifdef MSG_DONTWAIT
- flags |= MSG_DONTWAIT;
-#endif
-
/* TODO use writev() here? */
- sent = send(stream->fd,
- buf->base + buf->written,
- buf->len - buf->written,
- flags);
+ sent = nosigpipe_stream_send(stream,
+ buf->base + buf->written,
+ buf->len - buf->written);
}
if (sent <= 0) {
- switch (errno) {
- case EAGAIN:
- case EINTR:
- return AGAIN;
+ if (errno == EAGAIN || errno == EINTR) {
+ assert(stream->send_action == stream_send__data);
+ return AGAIN;
+ }
- case EPIPE:
- stream->flags &= ~EVCOM_WRITABLE;
- if (!READABLE(stream)) return close_stream_asap(stream);
- return OKAY;
+ stream->errorno = errno;
+ evcom_perror("send()", errno);
- default:
- stream->errorno = errno;
- return close_stream_asap(stream);
- }
+ stream->send_action = stream_send__close;
+ return OKAY;
}
evcom_stream_reset_timeout(stream);
- update_write_buffer_after_send(stream, sent);
+ assert(sent >= 0);
+
+ buf->written += sent;
+
+ if (buf->written == buf->len) {
+ evcom_queue_remove(q);
+ if (buf->release) buf->release(buf);
+ }
}
assert(evcom_queue_empty(&stream->out));
- ev_io_stop(D_LOOP_(stream) &stream->write_watcher);
- return AGAIN;
+ stream->send_action = stream_send__drain;
+ return OKAY;
}
static int
-shutdown_write (evcom_stream *stream)
+stream_send__shutdown (evcom_stream *stream)
{
- int r;
-
-#if EVCOM_HAVE_GNUTLS
- if (SECURE(stream)) {
- r = gnutls_bye(stream->session, GNUTLS_SHUT_WR);
-
- if (gnutls_error_is_fatal(r)) {
- stream->gnutls_errorno = r;
- return close_stream_asap(stream);
- }
-
- if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) {
- SET_DIRECTION(stream);
- }
- }
-#endif
-
- r = shutdown(stream->fd, SHUT_WR);
+ int r = shutdown(stream->sendfd, SHUT_WR);
if (r < 0) {
stream->errorno = errno;
evcom_perror("shutdown()", errno);
- return close_stream_asap(stream);
+ stream->send_action = stream_send__close;
+ return OKAY;
}
stream->flags &= ~EVCOM_WRITABLE;
+ stream->send_action = stream_send__wait_for_eof;
return OKAY;
}
-static int
-recv_send (evcom_descriptor *d)
-{
- evcom_stream *stream = (evcom_stream*) d;
-
- int r = AGAIN;
-
- if (READABLE(stream) && !PAUSED(stream)) {
- r = recv_data(stream);
- }
-
- if (stream->fd < 0) return AGAIN;
-
- if (WRITABLE(stream)) {
- if (GOT_HALF_CLOSE(stream) && evcom_queue_empty(&stream->out)) {
-
- if (READABLE(stream)) {
- return shutdown_write(stream);
- } else {
- return close_stream_asap(stream);
- }
-
- } else {
- return send_data(stream);
- }
- }
-
- return r;
-}
-
-static inline int
-connection_established (evcom_stream *stream)
+static int
+stream__connection_established (evcom_stream *stream)
{
- ev_io_start(D_LOOP_(stream) &stream->read_watcher);
assert(!CONNECTED(stream));
#if EVCOM_HAVE_GNUTLS
if (SECURE(stream)) {
- stream->action = secure_handshake;
- return secure_handshake((evcom_descriptor*)stream);
- } else
-#endif /* EVCOM_HAVE_GNUTLS */
+ stream->send_action = stream__handshake;
+ stream->recv_action = stream__handshake;
+ } else
+#endif
{
stream->flags |= EVCOM_CONNECTED;
if (stream->on_connect) stream->on_connect(stream);
- stream->action = recv_send;
- return recv_send((evcom_descriptor*)stream);
+ stream->send_action = stream_send__data;
+ stream->recv_action = stream_recv__data;
}
+
+ ev_io_start(D_LOOP_(stream) &stream->write_watcher);
+ ev_io_start(D_LOOP_(stream) &stream->read_watcher);
+
+ return OKAY;
}
static int
-wait_for_connection (evcom_descriptor *d)
+stream_send__wait_for_connection (evcom_stream *stream)
{
- evcom_stream *stream = (evcom_stream*)d;
-
- if (!GOT_WRITE_EVENT(d)) {
- ev_io_stop(D_LOOP_(stream) &stream->read_watcher);
- return AGAIN;
- }
+ assert(DUPLEX(stream));
int connect_error;
socklen_t len = sizeof(int);
- int r = getsockopt(d->fd, SOL_SOCKET, SO_ERROR, &connect_error, &len);
+ int r = getsockopt(stream->sendfd, SOL_SOCKET, SO_ERROR, &connect_error, &len);
+
if (r < 0) {
- d->errorno = r;
- return close_asap(d);
+ stream->errorno = r;
+ stream->send_action = stream_send__close;
+ return OKAY;
}
- switch (connect_error) {
- case 0:
- return connection_established((evcom_stream*)d);
-
- case EINTR:
- case EINPROGRESS:
- return AGAIN;
+ if (connect_error == 0) {
+ stream->send_action = stream__connection_established;
+ return OKAY;
- default:
- d->errorno = connect_error;
- return close_asap(d);
+ } else if (connect_error == EINPROGRESS || connect_error == EINTR) {
+ assert(stream->send_action == stream_send__wait_for_connection);
+ return AGAIN;
}
+
+ stream->errorno = connect_error;
+ stream->send_action = stream_send__close;
+ return OKAY;
}
static void
-assign_file_descriptor (evcom_stream *stream, int fd)
+evcom_stream_assign_fds (evcom_stream *stream, int recvfd, int sendfd)
{
- stream->fd = fd;
+ assert(recvfd >= 0);
+ assert(sendfd >= 0);
+
+ if (recvfd == sendfd) stream->flags |= EVCOM_DUPLEX;
+
+#ifdef SO_NOSIGPIPE
+ if (DUPLEX(stream)) {
+ int flags = 1;
+ int r = setsockopt(sendfd, SOL_SOCKET, SO_NOSIGPIPE, &flags, sizeof(flags));
+ if (r < 0) {
+ evcom_perror("setsockopt(SO_NOSIGPIPE)", errno);
+ }
+ }
+#endif
+
+ ev_io_set(&stream->read_watcher, recvfd, EV_READ);
+ ev_io_set(&stream->write_watcher, sendfd, EV_WRITE);
+
+ stream->recvfd = recvfd;
+ stream->sendfd = sendfd;
- ev_io_set (&stream->read_watcher, fd, EV_READ);
- ev_io_set (&stream->write_watcher, fd, EV_WRITE);
+ stream->send_action = stream__connection_established;
+ stream->recv_action = stream__connection_established;
stream->flags |= EVCOM_READABLE;
stream->flags |= EVCOM_WRITABLE;
#if EVCOM_HAVE_GNUTLS
if (SECURE(stream)) {
- gnutls_transport_set_lowat(stream->session, 0);
+ gnutls_transport_set_lowat(stream->session, 0);
gnutls_transport_set_push_function(stream->session, nosigpipe_push);
- gnutls_transport_set_ptr2(stream->session,
- (gnutls_transport_ptr_t)(intptr_t)fd, /* recv */
- stream); /* send */
+ gnutls_transport_set_pull_function(stream->session, pull);
+ gnutls_transport_set_ptr2(stream->session, stream, stream);
}
-#endif
-
- stream->action = wait_for_connection;
+#endif
}
-
-/* Retruns evcom_stream if a connection could be accepted.
+/* Retruns evcom_stream if a connection could be accepted.
* The returned stream is not yet attached to the event loop.
* Otherwise NULL
*/
@@ -618,7 +731,7 @@ accept_connection (evcom_server *server)
{
struct sockaddr address; /* connector's address information */
socklen_t addr_len = sizeof(address);
-
+
int fd = accept(server->fd, &address, &addr_len);
if (fd < 0) {
switch (errno) {
@@ -648,31 +761,22 @@ accept_connection (evcom_server *server)
close(fd);
return NULL;
}
-
+
if (set_nonblock(fd) != 0) {
evcom_perror("set_nonblock()", errno);
return NULL;
}
-
-#ifdef SO_NOSIGPIPE
- int flags = 1;
- int r = setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &flags, sizeof(flags));
- if (r < 0) {
- evcom_perror("setsockopt()", errno);
- return NULL;
- }
-#endif
stream->server = server;
- assign_file_descriptor(stream, fd);
+ evcom_stream_assign_fds(stream, fd, fd);
return stream;
}
-/* Internal callback
+/* Internal callback
* Called by server->watcher.
*/
-static int
+static int
accept_connections (evcom_descriptor *d)
{
evcom_server *server = (evcom_server *)d;
@@ -683,7 +787,6 @@ accept_connections (evcom_descriptor *d)
evcom_stream *stream;
while (server->fd >= 0 && (stream = accept_connection(server))) {
evcom_stream_attach(D_LOOP_(server) stream);
- connection_established(stream);
}
return AGAIN;
@@ -747,17 +850,17 @@ evcom_server_listen (evcom_server *server, struct sockaddr *address, int backlog
close(fd);
return -1;
}
-
+
if (listen(fd, backlog) < 0) {
server->errorno = errno;
evcom_perror("listen()", errno);
close(fd);
return -1;
}
-
+
server->flags |= EVCOM_LISTENING;
server->action = accept_connections;
-
+
return 0;
}
@@ -765,9 +868,12 @@ evcom_server_listen (evcom_server *server, struct sockaddr *address, int backlog
* Stops the server. Will not accept new connections. Does not drop
* existing connections.
*/
-void
+void
evcom_server_close (evcom_server *server)
{
+ ev_io_start(D_LOOP_(server) &server->watcher);
+ ev_feed_event(D_LOOP_(server) &server->watcher, EV_READ);
+
close_asap((evcom_descriptor*)server);
}
@@ -775,9 +881,7 @@ void
evcom_server_attach (EV_P_ evcom_server *server)
{
ev_io_start (EV_A_ &server->watcher);
-#if EV_MULTIPLICITY
- server->loop = EV_A;
-#endif
+ D_LOOP_SET(server, EV_A);
server->flags |= EVCOM_ATTACHED;
}
@@ -785,33 +889,26 @@ void
evcom_server_detach (evcom_server *server)
{
ev_io_stop (D_LOOP_(server) &server->watcher);
-#if EV_MULTIPLICITY
- server->loop = NULL;
-#endif
+ D_LOOP_SET(server, NULL);
server->flags &= ~EVCOM_ATTACHED;
}
-static void
+static void
io_event(EV_P_ ev_io *watcher, int revents)
{
evcom_descriptor *d = watcher->data;
#if EV_MULTIPLICITY
assert(d->loop == loop);
-#endif
+#endif
+ int r = OKAY;
if (revents & EV_ERROR) {
d->errorno = 1;
- close_asap(d);
- }
-
- if (revents & EV_WRITE) {
- d->flags |= EVCOM_GOT_WRITE_EVENT;
+ r = close_asap(d);
}
- int r = OKAY;
-
while (r == OKAY && d->action && d->fd >= 0) {
r = d->action(d);
}
@@ -824,20 +921,27 @@ io_event(EV_P_ ev_io *watcher, int revents)
}
}
-void
+static void
+evcom_descriptor_init (evcom_descriptor *d)
+{
+ d->fd = -1;
+ D_LOOP_SET(d, NULL);
+ d->flags = 0;
+ d->errorno = 0;
+ d->action = NULL;
+}
+
+void
evcom_server_init (evcom_server *server)
{
- server->flags = 0;
- server->fd = -1;
- server->watcher.data = server;
- server->action = NULL;
+ evcom_descriptor_init((evcom_descriptor*)server);
ev_init (&server->watcher, io_event);
+ server->watcher.data = server;
server->on_connection = NULL;
- server->on_close = NULL;
}
/* Internal callback. called by stream->timeout_watcher */
-static void
+static void
on_timeout (EV_P_ ev_timer *watcher, int revents)
{
evcom_stream *stream = watcher->data;
@@ -854,9 +958,51 @@ on_timeout (EV_P_ ev_timer *watcher, int revents)
}
if (stream->on_timeout) stream->on_timeout(stream);
- // timeout does not automatically kill your connection. you must!
+
+ evcom_stream_force_close(stream);
}
+static void
+stream_event (EV_P_ ev_io *w, int revents)
+{
+ evcom_stream *stream = w->data;
+
+ if (revents & EV_READ) {
+ while (stream->recv_action) {
+ int r = stream->recv_action(stream);
+ if (r == AGAIN) break;
+ }
+ }
+
+ if (revents & EV_WRITE) {
+ while (stream->send_action) {
+ int r = stream->send_action(stream);
+ if (r == AGAIN) break;
+ }
+ }
+
+ if (stream->send_action == NULL) {
+ ev_io_stop(EV_A_ &stream->write_watcher);
+ }
+
+ if (stream->recv_action == NULL) {
+ ev_io_stop(EV_A_ &stream->read_watcher);
+ }
+
+ if (stream->sendfd < 0 && stream->recvfd < 0) {
+ ev_timer_stop(EV_A_ &stream->timeout_watcher);
+
+ if (too_many_connections && stream->server) {
+#if EV_MULTIPLICITY
+ struct ev_loop *loop = stream->server->loop;
+#endif
+ evcom_server_attach(EV_A_ stream->server);
+ }
+ too_many_connections = 0;
+
+ if (stream->on_close) stream->on_close(stream);
+ }
+}
/**
* If using SSL do consider setting
@@ -865,139 +1011,151 @@ on_timeout (EV_P_ ev_timer *watcher, int revents)
* gnutls_db_set_store_function (stream->session, _);
* gnutls_db_set_ptr (stream->session, _);
*/
-void
+void
evcom_stream_init (evcom_stream *stream, float timeout)
{
- stream->fd = -1;
- stream->server = NULL;
-#if EV_MULTIPLICITY
- stream->loop = NULL;
-#endif
stream->flags = 0;
+ stream->errorno = 0;
+ stream->recvfd = -1;
+ stream->sendfd = -1;
- evcom_queue_init(&stream->out);
+ // reader things
+ ev_init(&stream->read_watcher, stream_event);
+ stream->read_watcher.data = stream;
+ stream->recv_action = NULL;
- ev_init(&stream->write_watcher, io_event);
- ev_init(&stream->read_watcher, io_event);
+ // writer things
+ ev_init(&stream->write_watcher, stream_event);
stream->write_watcher.data = stream;
- stream->read_watcher.data = stream;
-
- stream->errorno = 0;
+ evcom_queue_init(&stream->out);
+ stream->send_action = NULL;
+ // stream things
+ stream->server = NULL;
#if EVCOM_HAVE_GNUTLS
stream->gnutls_errorno = 0;
stream->session = NULL;
-#endif
-
+#endif
ev_timer_init(&stream->timeout_watcher, on_timeout, 0., timeout);
- stream->timeout_watcher.data = stream;
-
- stream->action = NULL;
+ stream->timeout_watcher.data = stream;
stream->on_connect = NULL;
- stream->on_read = NULL;
- stream->on_drain = NULL;
stream->on_timeout = NULL;
+ stream->on_read = NULL;
+ stream->on_close = NULL;
}
-void
+void
evcom_stream_close (evcom_stream *stream)
{
- stream->flags |= EVCOM_GOT_HALF_CLOSE;
- if (evcom_queue_empty(&stream->out)) {
- change_state_for_empty_out(stream);
- }
-}
-
-void
-evcom_stream_full_close (evcom_stream *stream)
-{
- stream->flags |= EVCOM_GOT_FULL_CLOSE;
- if (evcom_queue_empty(&stream->out)) {
- change_state_for_empty_out(stream);
+ stream->flags |= EVCOM_GOT_CLOSE;
+ if (WRITABLE(stream)) {
+ ev_io_start(D_LOOP_(stream) &stream->write_watcher);
}
}
void evcom_stream_force_close (evcom_stream *stream)
{
- close_stream_asap(stream);
-
- // Even if close returned EINTR
- stream->action = NULL;
- stream->fd = -1;
+ close(stream->recvfd);
+ /* XXX What to do on EINTR? */
+ stream__set_recv_closed(stream);
+
+ if (!DUPLEX(stream)) close(stream->sendfd);
+ stream__set_send_closed(stream);
evcom_stream_detach(stream);
}
-void
-evcom_stream_write (evcom_stream *stream, evcom_buf *buf)
+void
+evcom_stream_write (evcom_stream *stream, const char *str, size_t len)
{
- if (!WRITABLE(stream) || GOT_FULL_CLOSE(stream) || GOT_HALF_CLOSE(stream)) {
- assert(0 && "Do not write to a closed stream");
- if (buf->release) buf->release(buf);
+ if (!WRITABLE(stream) || GOT_CLOSE(stream)) {
+ assert(0 && "Do not write to a closed stream");
return;
}
- evcom_queue_insert_head(&stream->out, &buf->queue);
- buf->written = 0;
+ ssize_t sent = 0;
+
+ if ( stream->send_action == stream_send__wait_for_buf
+ && evcom_queue_empty(&stream->out)
+ )
+ {
+ assert(CONNECTED(stream));
+#if EVCOM_HAVE_GNUTLS
+ if (SECURE(stream)) {
+ sent = gnutls_record_send(stream->session, str, len);
+
+ if (gnutls_error_is_fatal(sent)) {
+ stream->gnutls_errorno = sent;
+ goto close;
+ }
+ } else
+#endif // EVCOM_HAVE_GNUTLS
+ {
+ /* TODO use writev() here? */
+ sent = nosigpipe_stream_send(stream, str, len);
+ }
+
+ if (sent < 0) {
+ switch (errno) {
+ case EINTR:
+ case EAGAIN:
+ sent = 0;
+ break;
+
+ default:
+ stream->errorno = errno;
+ evcom_perror("send()", stream->errorno);
+ goto close;
+ }
+ }
+ } /* TODO else { memcpy to last buffer on head } */
+
+ assert(sent >= 0);
+ if ((size_t)sent == len) return; /* sent the whole buffer */
+
+ len -= sent;
+ str += sent;
+
+ evcom_buf *b = evcom_buf_new(str, len);
+ evcom_queue_insert_head(&stream->out, &b->queue);
+ b->written = 0;
+
+ assert(stream->sendfd >= 0);
if (ATTACHED(stream)) {
ev_io_start(D_LOOP_(stream) &stream->write_watcher);
- if (stream->action == recv_send) {
- send_data(stream);
- }
}
-}
+ return;
-void
-evcom_stream_reset_timeout (evcom_stream *stream)
-{
- ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher);
-}
-
-static void
-free_simple_buf (evcom_buf *buf)
-{
- free(buf->base);
- free(buf);
+close:
+ stream->send_action = stream_send__close;
+ stream->recv_action = stream_recv__close;
+ if (ATTACHED(stream)) {
+ ev_io_start(D_LOOP_(stream) &stream->write_watcher);
+ }
}
-/* Writes a string to the stream.
- * NOTE: Allocates memory. Avoid for performance applications.
- */
void
-evcom_stream_write_simple (evcom_stream *stream, const char *str, size_t len)
+evcom_stream_reset_timeout (evcom_stream *stream)
{
- evcom_buf *buf = malloc(sizeof(evcom_buf));
- buf->release = free_simple_buf;
- buf->base = strdup(str);
- buf->len = len;
-
- evcom_stream_write(stream, buf);
+ ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher);
}
void
evcom_stream_attach (EV_P_ evcom_stream *stream)
{
-#if EV_MULTIPLICITY
- stream->loop = EV_A;
-#endif
+ D_LOOP_SET(stream, EV_A);
stream->flags |= EVCOM_ATTACHED;
ev_timer_again(EV_A_ &stream->timeout_watcher);
- if (!CONNECTED(stream)) {
- ev_io_start(EV_A_ &stream->write_watcher);
- } else {
- if (READABLE(stream) && !PAUSED(stream)) {
- ev_io_start(EV_A_ &stream->read_watcher);
- }
-
- if (WRITABLE(stream)) {
- ev_io_start(EV_A_ &stream->write_watcher);
- }
+ if (READABLE(stream)) {
+ ev_io_start(EV_A_ &stream->read_watcher);
+ }
- ev_feed_fd_event(D_LOOP_(stream) stream->fd, EV_WRITE);
+ if (WRITABLE(stream)) {
+ ev_io_start(EV_A_ &stream->write_watcher);
}
}
@@ -1007,28 +1165,29 @@ evcom_stream_detach (evcom_stream *stream)
ev_io_stop(D_LOOP_(stream) &stream->write_watcher);
ev_io_stop(D_LOOP_(stream) &stream->read_watcher);
ev_timer_stop(D_LOOP_(stream) &stream->timeout_watcher);
-#if EV_MULTIPLICITY
- stream->loop = NULL;
-#endif
+ D_LOOP_SET(stream, NULL);
stream->flags &= ~EVCOM_ATTACHED;
}
void
evcom_stream_read_pause (evcom_stream *stream)
{
- ev_io_stop(D_LOOP_(stream) &stream->read_watcher);
- ev_clear_pending(D_LOOP_(stream) &stream->read_watcher);
stream->flags |= EVCOM_PAUSED;
+ if (stream->recv_action == stream_recv__data) {
+ ev_io_stop(D_LOOP_(stream) &stream->read_watcher);
+ stream->recv_action = stream_recv__wait_for_resume;
+ }
}
void
evcom_stream_read_resume (evcom_stream *stream)
{
- evcom_stream_reset_timeout(stream);
-
stream->flags &= ~EVCOM_PAUSED;
-
- if (READABLE(stream)) {
+ evcom_stream_reset_timeout(stream);
+ if (stream->recv_action == stream_recv__wait_for_resume) {
+ stream->recv_action = stream_recv__data;
+ }
+ if (ATTACHED(stream) && READABLE(stream)) {
ev_io_start(D_LOOP_(stream) &stream->read_watcher);
}
}
@@ -1050,11 +1209,6 @@ evcom_stream_connect (evcom_stream *stream, struct sockaddr *address)
close(fd);
return -1;
}
-
-#ifdef SO_NOSIGPIPE
- int flags = 1;
- setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &flags, sizeof(flags));
-#endif
r = connect(fd, address, address_length(address));
@@ -1065,23 +1219,53 @@ evcom_stream_connect (evcom_stream *stream, struct sockaddr *address)
return -1;
}
- assign_file_descriptor(stream, fd);
+ evcom_stream_assign_fds(stream, fd, fd);
+
+ stream->send_action = stream_send__wait_for_connection;
+ stream->recv_action = NULL;
+
+ return 0;
+}
+
+int evcom_stream_pair (evcom_stream *a, evcom_stream *b)
+{
+ int sv[2];
+ int old_errno;
+
+ int r = socketpair(PF_LOCAL, SOCK_STREAM, 0, sv);
+ if (r < 0) return -1;
+
+ r = set_nonblock(sv[0]);
+ if (r < 0) goto set_nonblock_error;
+ r = set_nonblock(sv[1]);
+ if (r < 0) goto set_nonblock_error;
+
+ evcom_stream_assign_fds(a, sv[0], sv[0]);
+ evcom_stream_assign_fds(b, sv[1], sv[1]);
return 0;
+
+set_nonblock_error:
+ old_errno = errno;
+ evcom_perror("set_nonblock()", errno);
+ close(sv[0]);
+ close(sv[1]);
+ errno = old_errno;
+ return -1;
}
enum evcom_stream_state
evcom_stream_state (evcom_stream *stream)
{
- if (stream->fd < 0 && stream->flags == 0) return EVCOM_INITIALIZED;
+ if (stream->recvfd < 0 && stream->sendfd && stream->flags == 0) {
+ return EVCOM_INITIALIZED;
+ }
- if (stream->fd < 0) return EVCOM_CLOSED;
+ if (stream->recvfd < 0 && stream->sendfd < 0) return EVCOM_CLOSED;
if (!CONNECTED(stream)) return EVCOM_CONNECTING;
- if (GOT_FULL_CLOSE(stream)) return EVCOM_CLOSING;
-
- if (GOT_HALF_CLOSE(stream)) {
+ if (GOT_CLOSE(stream)) {
if (READABLE(stream)) {
return EVCOM_CONNECTED_RO;
} else {
@@ -1098,3 +1282,216 @@ evcom_stream_state (evcom_stream *stream)
return EVCOM_CLOSING;
}
+static int
+reader_recv (evcom_descriptor *d)
+{
+ evcom_reader* reader = (evcom_reader*) d;
+
+ char buf[EVCOM_CHUNKSIZE];
+ size_t buf_size = EVCOM_CHUNKSIZE;
+ ssize_t recved;
+
+ while (reader->fd >= 0) {
+ recved = read(reader->fd, buf, buf_size);
+
+ if (recved < 0) {
+ if (errno == EAGAIN || errno == EINTR) return AGAIN;
+ reader->errorno = errno;
+ evcom_perror("read()", reader->errorno);
+ return close_asap(d);
+ }
+
+ /* NOTE: EOF is signaled with recved == 0 on callback */
+ if (reader->on_read) reader->on_read(reader, buf, recved);
+
+ if (recved == 0) return close_asap(d);
+ }
+ return AGAIN;
+}
+
+void
+evcom_reader_init (evcom_reader *reader)
+{
+ evcom_descriptor_init((evcom_descriptor*)reader);
+
+ reader->on_close = NULL;
+ reader->on_read = NULL;
+
+ ev_init(&reader->read_watcher, io_event);
+ reader->read_watcher.data = reader;
+}
+
+void
+evcom_reader_set (evcom_reader *reader, int fd)
+{
+ assert(fd >= 0);
+ reader->fd = fd;
+
+ ev_io_set(&reader->read_watcher, fd, EV_READ);
+ reader->action = reader_recv;
+}
+
+void
+evcom_reader_attach (EV_P_ evcom_reader *reader)
+{
+ ev_io_start(EV_A_ &reader->read_watcher);
+ D_LOOP_SET(reader, EV_A);
+}
+
+void
+evcom_reader_detach (evcom_reader *reader)
+{
+ ev_io_stop(D_LOOP_(reader) &reader->read_watcher);
+ D_LOOP_SET(reader, NULL);
+}
+
+void
+evcom_reader_close (evcom_reader *reader)
+{
+ ev_io_start(D_LOOP_(reader) &reader->read_watcher);
+ ev_feed_event(D_LOOP_(reader) &reader->read_watcher, EV_READ);
+
+ close_asap((evcom_descriptor*)reader);
+}
+
+static int
+writer_send (evcom_descriptor *d)
+{
+ evcom_writer* writer = (evcom_writer*) d;
+ assert(writer->fd >= 0);
+
+ while (!evcom_queue_empty(&writer->out)) {
+ evcom_queue *q = evcom_queue_last(&writer->out);
+ evcom_buf *buf = evcom_queue_data(q, evcom_buf, queue);
+
+ ssize_t sent = write(writer->fd, buf->base + buf->written,
+ buf->len - buf->written);
+
+ if (sent < 0) {
+ switch (errno) {
+ case ECONNRESET:
+ case EPIPE:
+ return close_writer_asap(writer);
+
+ case EINTR:
+ case EAGAIN:
+ sent = 0;
+ return AGAIN;
+
+ default:
+ writer->errorno = errno;
+ evcom_perror("send()", writer->errorno);
+ return close_writer_asap(writer);
+ }
+ }
+ assert(sent >= 0);
+
+ buf->written += sent;
+
+ if (buf->written == buf->len) {
+ evcom_queue_remove(q);
+ if (buf->release) buf->release(buf);
+ }
+ }
+
+ if (GOT_CLOSE(writer)) {
+ assert(evcom_queue_empty(&writer->out));
+ return close_writer_asap(writer);
+ } else {
+ ev_io_stop(D_LOOP_(writer) &writer->write_watcher);
+ return AGAIN;
+ }
+}
+
+void
+evcom_writer_init (evcom_writer* writer)
+{
+ evcom_descriptor_init((evcom_descriptor*)writer);
+
+ writer->on_close = NULL;
+
+ ev_init(&writer->write_watcher, io_event);
+ writer->write_watcher.data = writer;
+
+ evcom_queue_init(&writer->out);
+}
+
+void
+evcom_writer_set (evcom_writer* writer, int fd)
+{
+ assert(fd >= 0);
+ writer->fd = fd;
+
+ ev_io_set(&writer->write_watcher, fd, EV_WRITE);
+ writer->action = writer_send;
+}
+
+void
+evcom_writer_attach (EV_P_ evcom_writer* writer)
+{
+ if (!evcom_queue_empty(&writer->out)) {
+ ev_io_start (EV_A_ &writer->write_watcher);
+ }
+ D_LOOP_SET(writer, EV_A);
+}
+
+void
+evcom_writer_detach (evcom_writer* writer)
+{
+ ev_io_stop(D_LOOP_(writer) &writer->write_watcher);
+ D_LOOP_SET(writer, NULL);
+}
+
+void
+evcom_writer_write (evcom_writer* writer, const char* buf, size_t len)
+{
+ assert(writer->fd >= 0);
+
+ ssize_t sent = 0;
+
+ if (evcom_queue_empty(&writer->out)) {
+ sent = write(writer->fd, buf, len);
+
+ if (sent < 0) {
+ switch (errno) {
+ case ECONNRESET:
+ case EPIPE:
+ goto close;
+
+ case EINTR:
+ case EAGAIN:
+ sent = 0;
+ break;
+
+ default:
+ writer->errorno = errno;
+ evcom_perror("send()", writer->errorno);
+ goto close;
+ }
+ }
+ } /* TODO else { memcpy to last buffer on head } */
+
+ assert(sent >= 0);
+ if ((size_t)sent == len) return; /* sent the whole buffer */
+
+ len -= sent;
+ buf += sent;
+
+ evcom_buf *b = evcom_buf_new(buf, len);
+ evcom_queue_insert_head(&writer->out, &b->queue);
+ b->written = 0;
+
+ assert(writer->fd >= 0);
+ ev_io_start(D_LOOP_(writer) &writer->write_watcher);
+ return;
+
+close:
+ close_writer_asap(writer);
+}
+
+void
+evcom_writer_close (evcom_writer* writer)
+{
+ writer->flags |= EVCOM_GOT_CLOSE;
+ if (evcom_queue_empty(&writer->out)) close_writer_asap(writer);
+}
diff --git a/deps/evcom/evcom.h b/deps/evcom/evcom.h
index f34dc3c73..1bbfd03f1 100644
--- a/deps/evcom/evcom.h
+++ b/deps/evcom/evcom.h
@@ -33,7 +33,7 @@
#ifdef __cplusplus
extern "C" {
-#endif
+#endif
#ifndef EVCOM_HAVE_GNUTLS
# define EVCOM_HAVE_GNUTLS 0
@@ -52,12 +52,11 @@ extern "C" {
#define EVCOM_LISTENING 0x0002
#define EVCOM_CONNECTED 0x0004
#define EVCOM_SECURE 0x0008
-#define EVCOM_GOT_HALF_CLOSE 0x0010
-#define EVCOM_GOT_FULL_CLOSE 0x0020
+#define EVCOM_DUPLEX 0x0010
+#define EVCOM_GOT_CLOSE 0x0020
#define EVCOM_PAUSED 0x0040
#define EVCOM_READABLE 0x0080
#define EVCOM_WRITABLE 0x0100
-#define EVCOM_GOT_WRITE_EVENT 0x0200
enum evcom_stream_state { EVCOM_INITIALIZED
, EVCOM_CONNECTING
@@ -91,95 +90,112 @@ typedef struct evcom_buf {
# define EVCOM_LOOP
#endif
-#define EVCOM_DESCRIPTOR(type) \
- unsigned int flags; /* private */ \
- int (*action) (struct evcom_descriptor*); /* private */ \
- int errorno; /* read-only */ \
- int fd; /* read-only */ \
- EVCOM_LOOP /* read-only */ \
- void *data; /* public */ \
- void (*on_close) (struct type*); /* public */
+#define EVCOM_DESCRIPTOR(type) \
+ /* private */ unsigned int flags; \
+ /* private */ int (*action) (struct evcom_descriptor*); \
+ /* read-only */ int errorno; \
+ /* read-only */ int fd; \
+ /* read-only */ EVCOM_LOOP \
+ /* public */ void *data; \
+ /* public */ void (*on_close) (struct type*);
+/* abstract base class */
typedef struct evcom_descriptor {
EVCOM_DESCRIPTOR(evcom_descriptor)
} evcom_descriptor;
-typedef struct evcom_server {
- EVCOM_DESCRIPTOR(evcom_server)
-
- /* PRIVATE */
- ev_io watcher;
+typedef struct evcom_reader {
+ EVCOM_DESCRIPTOR(evcom_reader)
+ ev_io read_watcher; /* private */
+ void (*on_read) (struct evcom_reader*, const void* buf, size_t len); /* public */
+} evcom_reader;
- /* PUBLIC */
- struct evcom_stream*
- (*on_connection)(struct evcom_server *, struct sockaddr *remote_addr);
-} evcom_server;
+typedef struct evcom_writer {
+ EVCOM_DESCRIPTOR(evcom_writer)
+ ev_io write_watcher; /* private */
+ evcom_queue out; /* private */
+} evcom_writer;
typedef struct evcom_stream {
- EVCOM_DESCRIPTOR(evcom_stream)
-
- /* PRIVATE */
- ev_io write_watcher;
+ /* PRIVATE */
+ EVCOM_LOOP
+ int errorno;
+ unsigned int flags;
+ evcom_queue out;
ev_io read_watcher;
+ ev_io write_watcher;
+ int (*send_action) (struct evcom_stream*);
+ int (*recv_action) (struct evcom_stream*);
ev_timer timeout_watcher;
#if EVCOM_HAVE_GNUTLS
gnutls_session_t session;
#endif
- /* READ-ONLY */
+ /* READ-ONLY */
+ int recvfd;
+ int sendfd;
struct evcom_server *server;
- evcom_queue out;
#if EVCOM_HAVE_GNUTLS
int gnutls_errorno;
#endif
/* PUBLIC */
void (*on_connect) (struct evcom_stream *);
- void (*on_read) (struct evcom_stream *, const void *buf, size_t count);
- void (*on_drain) (struct evcom_stream *);
void (*on_timeout) (struct evcom_stream *);
+ void (*on_read) (struct evcom_stream *, const void* buf, size_t len);
+ void (*on_close) (struct evcom_stream *);
+ void *data;
} evcom_stream;
+typedef struct evcom_server {
+ EVCOM_DESCRIPTOR(evcom_server)
+
+ /* PRIVATE */
+ ev_io watcher;
+
+ /* PUBLIC */
+ struct evcom_stream*
+ (*on_connection)(struct evcom_server *, struct sockaddr *remote_addr);
+} evcom_server;
+
+void evcom_reader_init (evcom_reader*);
+void evcom_reader_set (evcom_reader*, int fd);
+void evcom_reader_attach (EV_P_ evcom_reader*);
+void evcom_reader_detach (evcom_reader*);
+void evcom_reader_close (evcom_reader*);
+
+void evcom_writer_init (evcom_writer*);
+void evcom_writer_set (evcom_writer*, int fd);
+void evcom_writer_attach (EV_P_ evcom_writer*);
+void evcom_writer_detach (evcom_writer*);
+void evcom_writer_write (evcom_writer*, const char *str, size_t len);
+void evcom_writer_close (evcom_writer*);
+
void evcom_server_init (evcom_server *);
int evcom_server_listen (evcom_server *, struct sockaddr *address, int backlog);
void evcom_server_attach (EV_P_ evcom_server *);
void evcom_server_detach (evcom_server *);
-void evcom_server_close (evcom_server *); // synchronous
+void evcom_server_close (evcom_server *);
void evcom_stream_init (evcom_stream *, float timeout);
+
+ int evcom_stream_pair (evcom_stream *a, evcom_stream *b);
int evcom_stream_connect (evcom_stream *, struct sockaddr *address);
+
void evcom_stream_attach (EV_P_ evcom_stream *);
void evcom_stream_detach (evcom_stream *);
void evcom_stream_read_resume (evcom_stream *);
void evcom_stream_read_pause (evcom_stream *);
-
-/* Resets the timeout to stay alive for another stream->timeout seconds
- */
+/* Resets the timeout to stay alive for another stream->timeout seconds */
void evcom_stream_reset_timeout (evcom_stream *);
-
-/* Writes a buffer to the stream.
- */
-void evcom_stream_write (evcom_stream *, evcom_buf *);
-
-void evcom_stream_write_simple (evcom_stream *, const char *str, size_t len);
-
+void evcom_stream_write (evcom_stream *, const char *str, size_t len);
/* Once the write buffer is drained, evcom_stream_close will shutdown the
* writing end of the stream and will close the read end once the server
- * replies with an EOF.
+ * replies with an EOF.
*/
void evcom_stream_close (evcom_stream *);
-/* Do not wait for the server to reply with EOF. This will only be called
- * once the write buffer is drained.
- * Warning: For TCP stream, the OS kernel may (should) reply with RST
- * packets if this is called when data is still being received from the
- * server.
- */
-void evcom_stream_full_close (evcom_stream *);
-
-/* The most extreme measure.
- * Will not wait for the write queue to complete.
- */
+/* Will not wait for the write queue to complete. Closes both directions */
void evcom_stream_force_close (evcom_stream *);
@@ -195,9 +211,8 @@ void evcom_stream_set_secure_session (evcom_stream *, gnutls_session_t);
enum evcom_stream_state evcom_stream_state (evcom_stream *stream);
-evcom_buf * evcom_buf_new (const char* base, size_t len);
-evcom_buf * evcom_buf_new2 (size_t len);
-void evcom_buf_destroy (evcom_buf *);
+evcom_buf* evcom_buf_new (const char* base, size_t len);
+evcom_buf* evcom_buf_new2 (size_t len);
EV_INLINE void
evcom_queue_init (evcom_queue *q)
@@ -235,5 +250,5 @@ evcom_queue_remove (evcom_queue *x)
#ifdef __cplusplus
}
-#endif
+#endif
#endif /* evcom_h */
diff --git a/deps/evcom/recv_states.dot b/deps/evcom/recv_states.dot
new file mode 100644
index 000000000..52f962423
--- /dev/null
+++ b/deps/evcom/recv_states.dot
@@ -0,0 +1,37 @@
+strict digraph recv_states {
+ start [peripheries=2];
+ end [peripheries=2];
+ handshake;
+ recv_data;
+ wait_for_resume;
+ wait_for_close;
+ close_one;
+ close_both;
+
+ node [label="", shape="box", height=0.1, width=0.1];
+ close;
+
+
+
+ start -> handshake [label="tls"];
+ start -> recv_data;
+
+ handshake -> close [label="error"];
+ handshake -> recv_data;
+
+ recv_data -> handshake [label="rehandshake"];
+ recv_data -> wait_for_resume [label="pause"];
+ recv_data -> wait_for_close [label="eof"];
+ recv_data -> close [label="error"];
+
+ wait_for_resume -> recv_data;
+
+ wait_for_close -> close;
+
+ close -> close_one [label="duplex"];
+ close -> close_both;
+
+ close_one -> end;
+ close_both -> end;
+
+}
diff --git a/deps/evcom/send_states.dot b/deps/evcom/send_states.dot
new file mode 100644
index 000000000..bb913736c
--- /dev/null
+++ b/deps/evcom/send_states.dot
@@ -0,0 +1,65 @@
+strict digraph send_states {
+ start [peripheries=2];
+ end [peripheries=2];
+ connection_established;
+ handshake;
+ send_data;
+ shutdown;
+ gnutls_bye;
+ close_one;
+ close_both;
+
+ wait_for_connect;
+ wait_for_buf;
+ wait_for_eof;
+
+ node [label="", shape="box", height=0.1, width=0.1];
+ close;
+ drain;
+ hangup;
+ hangup_unsecure;
+
+
+
+ start -> wait_for_connect [label="duplex"];
+ start -> connection_established;
+
+ wait_for_connect -> connection_established;
+ wait_for_connect -> close [label="error"];
+
+ connection_established -> handshake [label="tls"];
+ connection_established -> send_data;
+
+ handshake -> close [label="error"];
+ handshake -> send_data;
+
+ send_data -> close [label="error"];
+ send_data -> drain [label="drain"];
+
+ drain -> wait_for_buf;
+ drain -> hangup [label="got_close"];
+
+ wait_for_buf -> send_data;
+ wait_for_buf -> drain [label="empty_buf"];
+
+ hangup -> gnutls_bye [label="tls"];
+ hangup -> hangup_unsecure;
+
+ gnutls_bye -> wait_for_eof;
+ gnutls_bye -> close [label="error"];
+
+ hangup_unsecure -> shutdown [label="duplex"];
+ hangup_unsecure -> close_one;
+
+ shutdown -> wait_for_eof;
+ shutdown -> close [label="error"];
+
+ wait_for_eof -> close_one;
+ close_one -> wait_for_eof [label="readable"];
+
+ close -> close_both;
+ close -> close_one [label="duplex"];
+
+ close_both -> end;
+ close_one -> end;
+}
diff --git a/deps/evcom/test/echo.c b/deps/evcom/test/echo.c
index 969cefb57..4e05f1483 100644
--- a/deps/evcom/test/echo.c
+++ b/deps/evcom/test/echo.c
@@ -12,7 +12,9 @@
#include <ev.h>
#include <evcom.h>
-#include <gnutls/gnutls.h>
+#if EVCOM_HAVE_GNUTLS
+# include <gnutls/gnutls.h>
+#endif
#define HOST "127.0.0.1"
#define SOCKFILE "/tmp/oi.sock"
@@ -46,7 +48,7 @@ on_peer_read (evcom_stream *stream, const void *base, size_t len)
{
if(len == 0) return;
- evcom_stream_write_simple(stream, base, len);
+ evcom_stream_write(stream, base, len);
}
static evcom_stream*
diff --git a/deps/evcom/test/test.c b/deps/evcom/test/test.c
index 9d789296f..6b4b765fa 100644
--- a/deps/evcom/test/test.c
+++ b/deps/evcom/test/test.c
@@ -16,13 +16,20 @@
# include <gnutls/gnutls.h>
#endif
-#define MARK_PROGRESS write(STDERR_FILENO, ".", 1)
+#undef MAX
+#define MAX(a,b) ((a) > (b) ? (a) : (b))
+
+#undef MIN
+#define MIN(a,b) ((a) < (b) ? (a) : (b))
+
+#define MARK_PROGRESS(c,cur,max) \
+ if (cur % (MAX(max,50)/50) == 0) write(STDERR_FILENO, c, 1)
#define SOCKFILE "/tmp/oi.sock"
#define PORT 5000
static evcom_server server;
-static int nconnections;
+static int nconnections;
static int use_tls;
static int got_server_close;
@@ -36,7 +43,7 @@ common_on_server_close (evcom_server *s)
evcom_server_detach(s);
}
-static void
+static void
common_on_peer_close (evcom_stream *stream)
{
assert(EVCOM_CLOSED == evcom_stream_state(stream));
@@ -50,14 +57,14 @@ common_on_peer_close (evcom_stream *stream)
free(stream);
}
-static void
+static void
common_on_client_timeout (evcom_stream *stream)
{
assert(stream);
printf("client connection timeout\n");
}
-static void
+static void
common_on_peer_timeout (evcom_stream *stream)
{
assert(stream);
@@ -110,12 +117,12 @@ void anon_tls_client (evcom_stream *stream)
#define PING "PING"
#define PONG "PONG"
-#define EXCHANGES 500
+#define EXCHANGES 500
#define PINGPONG_TIMEOUT 5.0
-static int successful_ping_count;
+static int successful_ping_count;
-static void
+static void
pingpong_on_peer_read (evcom_stream *stream, const void *base, size_t len)
{
if (len == 0) {
@@ -128,10 +135,10 @@ pingpong_on_peer_read (evcom_stream *stream, const void *base, size_t len)
buf[len] = 0;
printf("server got message: %s\n", buf);
- evcom_stream_write_simple(stream, PONG, sizeof PONG);
+ evcom_stream_write(stream, PONG, sizeof PONG);
}
-static void
+static void
pingpong_on_client_close (evcom_stream *stream)
{
assert(EVCOM_CLOSED == evcom_stream_state(stream));
@@ -141,7 +148,7 @@ pingpong_on_client_close (evcom_stream *stream)
evcom_stream_detach(stream);
}
-static evcom_stream*
+static evcom_stream*
pingpong_on_server_connection (evcom_server *_server, struct sockaddr *addr)
{
assert(_server == &server);
@@ -166,15 +173,15 @@ pingpong_on_server_connection (evcom_server *_server, struct sockaddr *addr)
return stream;
}
-static void
+static void
pingpong_on_client_connect (evcom_stream *stream)
{
printf("client connected. sending ping\n");
- evcom_stream_write_simple(stream, PING, sizeof PING);
+ evcom_stream_write(stream, PING, sizeof PING);
assert(EVCOM_CONNECTED_RW == evcom_stream_state(stream));
}
-static void
+static void
pingpong_on_client_read (evcom_stream *stream, const void *base, size_t len)
{
if(len == 0) {
@@ -188,17 +195,17 @@ pingpong_on_client_read (evcom_stream *stream, const void *base, size_t len)
strncpy(buf, base, len);
buf[len] = 0;
printf("client got message: %s\n", buf);
-
+
assert(strcmp(buf, PONG) == 0);
if (++successful_ping_count > EXCHANGES) {
evcom_stream_close(stream);
return;
- }
+ }
- if (successful_ping_count % (EXCHANGES/20) == 0) MARK_PROGRESS;
+ MARK_PROGRESS(".", successful_ping_count, EXCHANGES);
- evcom_stream_write_simple(stream, PING, sizeof PING);
+ evcom_stream_write(stream, PING, sizeof PING);
}
int
@@ -206,13 +213,13 @@ pingpong (struct sockaddr *address)
{
int r;
evcom_stream client;
-
+
successful_ping_count = 0;
nconnections = 0;
got_server_close = 0;
- printf("sizeof(evcom_server): %d\n", sizeof(evcom_server));
- printf("sizeof(evcom_stream): %d\n", sizeof(evcom_stream));
+ printf("sizeof(evcom_server): %d\n", (int)sizeof(evcom_server));
+ printf("sizeof(evcom_stream): %d\n", (int)sizeof(evcom_stream));
evcom_server_init(&server);
server.on_connection = pingpong_on_server_connection;
@@ -253,17 +260,17 @@ pingpong (struct sockaddr *address)
#define NCONN 50
#define CONNINT_TIMEOUT 10.0
-static void
+static void
send_bye_and_close(evcom_stream *stream, const void *base, size_t len)
{
assert(base);
assert(len == 0);
- evcom_stream_write_simple(stream, "BYE", 3);
+ evcom_stream_write(stream, "BYE", 3);
printf("server wrote bye\n");
evcom_stream_close(stream);
}
-static evcom_stream*
+static evcom_stream*
connint_on_connection(evcom_server *_server, struct sockaddr *addr)
{
assert(_server == &server);
@@ -284,21 +291,21 @@ connint_on_connection(evcom_server *_server, struct sockaddr *addr)
return stream;
}
-static void
+static void
connint_on_client_connect (evcom_stream *stream)
{
printf("on client connection\n");
evcom_stream_close(stream);
}
-static void
+static void
connint_on_client_close (evcom_stream *stream)
{
evcom_stream_close(stream); // already closed, but it shouldn't crash if we try to do it again
printf("client connection closed\n");
- if (nconnections % (NCONN/20) == 0) MARK_PROGRESS;
+ MARK_PROGRESS(".", nconnections, NCONN);
if(++nconnections == NCONN) {
evcom_server_close(&server);
@@ -308,7 +315,7 @@ connint_on_client_close (evcom_stream *stream)
evcom_stream_detach(stream);
}
-static void
+static void
connint_on_client_read (evcom_stream *stream, const void *base, size_t len)
{
if (len == 0) {
@@ -321,12 +328,12 @@ connint_on_client_read (evcom_stream *stream, const void *base, size_t len)
buf[len] = 0;
printf("client got message: %s\n", buf);
-
+
assert(strcmp(buf, "BYE") == 0);
evcom_stream_close(stream);
}
-int
+int
connint (struct sockaddr *address)
{
int r;
@@ -367,6 +374,365 @@ connint (struct sockaddr *address)
}
+static evcom_reader reader;
+static evcom_writer writer;
+static int reader_got_close = 0;
+static int reader_got_eof = 0;
+static int reader_got_hello = 0;
+static int reader_cnt = 0;
+static int writer_got_close = 0;
+#define PIPE_MSG "hello world"
+#define PIPE_CNT 5000
+
+static void
+reader_read (evcom_reader *r, const void *str, size_t len)
+{
+ assert(r == &reader);
+
+ if (len == 0) {
+ reader_got_eof = 1;
+ return;
+ }
+
+ assert(len == strlen(PIPE_MSG));
+
+ if (strncmp(str, PIPE_MSG, strlen(PIPE_MSG)) == 0) {
+ reader_got_hello = 1;
+ }
+
+ if (++reader_cnt < PIPE_CNT) {
+ MARK_PROGRESS(".", reader_cnt, PIPE_CNT);
+ evcom_writer_write(&writer, PIPE_MSG, strlen(PIPE_MSG));
+ } else {
+ evcom_writer_close(&writer);
+ }
+}
+
+static void
+reader_close (evcom_reader *r)
+{
+ assert(r == &reader);
+ reader_got_close = 1;
+ evcom_reader_detach(r);
+}
+
+static void
+writer_close (evcom_writer *w)
+{
+ assert(w == &writer);
+ writer_got_close = 1;
+ evcom_writer_detach(w);
+}
+
+int
+pipe_stream (void)
+{
+ reader_cnt = 0;
+ reader_got_close = 0;
+ reader_got_hello = 0;
+ reader_got_eof = 0;
+ writer_got_close = 0;
+
+ int pipefd[2];
+ int r = pipe(pipefd);
+ if (r < 0) {
+ perror("pipe()");
+ return -1;
+ }
+
+ evcom_reader_init(&reader);
+ reader.on_read = reader_read;
+ reader.on_close = reader_close;
+ evcom_reader_set(&reader, pipefd[0]);
+ evcom_reader_attach(EV_DEFAULT_ &reader);
+
+ evcom_writer_init(&writer);
+ writer.on_close = writer_close;
+ evcom_writer_set(&writer, pipefd[1]);
+ evcom_writer_attach(EV_DEFAULT_ &writer);
+
+ evcom_writer_write(&writer, PIPE_MSG, strlen(PIPE_MSG));
+
+ ev_loop(EV_DEFAULT_ 0);
+
+ assert(reader_got_close);
+ assert(reader_got_hello);
+ assert(reader_got_eof);
+ assert(writer_got_close);
+ assert(reader_cnt == PIPE_CNT);
+
+ return 0;
+}
+
+#define PAIR_PINGPONG_TIMEOUT 5000.0
+#define PAIR_PINGPONG_EXCHANGES 50
+static int a_got_close;
+static int a_got_connect;
+static int b_got_close;
+static int b_got_connect;
+static int pair_pingpong_cnt;
+static evcom_stream a, b;
+
+void a_connect (evcom_stream *stream)
+{
+ assert(stream == &a);
+ a_got_connect = 1;
+}
+
+void a_close (evcom_stream *stream)
+{
+ evcom_stream_detach(stream);
+ assert(stream == &a);
+ a_got_close = 1;
+
+ assert(stream->errorno == 0);
+#if EVCOM_HAVE_GNUTLS
+ if (stream->gnutls_errorno) {
+ fprintf(stderr, "\nGNUTLS ERROR: %s\n", gnutls_strerror(stream->gnutls_errorno));
+ }
+ assert(stream->gnutls_errorno == 0);
+ if (use_tls) gnutls_deinit(stream->session);
+#endif
+}
+
+void a_read (evcom_stream *stream, const void *buf, size_t len)
+{
+ assert(stream == &a);
+ if (len == 0) return;
+
+ assert(len == strlen(PONG));
+ assert(strncmp(buf, PONG, strlen(PONG)) == 0);
+
+ if (++pair_pingpong_cnt < PAIR_PINGPONG_EXCHANGES) {
+ evcom_stream_write(&a, PING, strlen(PING));
+ } else if (pair_pingpong_cnt == PAIR_PINGPONG_EXCHANGES) {
+ evcom_stream_close(stream);
+ }
+
+ MARK_PROGRESS(".", pair_pingpong_cnt, PAIR_PINGPONG_EXCHANGES);
+}
+
+void b_connect (evcom_stream *stream)
+{
+ assert(stream == &b);
+ b_got_connect = 1;
+}
+
+void b_close (evcom_stream *stream)
+{
+ evcom_stream_detach(stream);
+ assert(stream == &b);
+ b_got_close = 1;
+
+ assert(stream->errorno == 0);
+#if EVCOM_HAVE_GNUTLS
+ if (stream->gnutls_errorno) {
+ fprintf(stderr, "\nGNUTLS ERROR: %s\n", gnutls_strerror(stream->gnutls_errorno));
+ }
+ assert(stream->gnutls_errorno == 0);
+ if (use_tls) gnutls_deinit(stream->session);
+#endif
+}
+
+void b_read (evcom_stream *stream, const void *buf, size_t len)
+{
+ assert(stream == &b);
+ if (len == 0) {
+ evcom_stream_close(stream);
+ return;
+ }
+
+ assert(len == strlen(PING));
+ assert(strncmp(buf, PING, strlen(PING)) == 0);
+
+ evcom_stream_write(&b, PONG, strlen(PONG));
+}
+
+int
+pair_pingpong ()
+{
+ a_got_close = 0;
+ a_got_connect = 0;
+ b_got_close = 0;
+ b_got_connect = 0;
+ pair_pingpong_cnt = 0;
+
+ evcom_stream_init(&a, PAIR_PINGPONG_TIMEOUT);
+ a.on_close = a_close;
+ a.on_connect = a_connect;
+ a.on_read = a_read;
+#if EVCOM_HAVE_GNUTLS
+ if (use_tls) anon_tls_client(&a);
+#endif
+
+ evcom_stream_init(&b, PAIR_PINGPONG_TIMEOUT);
+ b.on_close = b_close;
+ b.on_connect = b_connect;
+ b.on_read = b_read;
+#if EVCOM_HAVE_GNUTLS
+ if (use_tls) anon_tls_server(&b);
+#endif
+
+ int r = evcom_stream_pair(&a, &b);
+ assert(r == 0);
+
+ evcom_stream_attach(EV_DEFAULT_ &a);
+ evcom_stream_attach(EV_DEFAULT_ &b);
+
+ evcom_stream_write(&a, PING, strlen(PING));
+
+ ev_loop(EV_DEFAULT_ 0);
+
+ assert(a_got_close);
+ assert(a_got_connect);
+ assert(b_got_close);
+ assert(b_got_connect);
+ assert(pair_pingpong_cnt == PAIR_PINGPONG_EXCHANGES);
+
+ return 0;
+}
+
+
+static void
+free_stream (evcom_stream *stream)
+{
+ assert(stream->errorno == 0);
+ free(stream);
+}
+
+#define ZERO_TIMEOUT 50.0
+static size_t zero_to_write = 0;
+static size_t zero_written = 0;
+static size_t zero_read = 0;
+static size_t zero_client_closed = 0;
+
+static void
+error_out (evcom_stream *stream)
+{
+ assert(stream);
+ fprintf(stderr, "peer connection timeout\n");
+ assert(0);
+}
+
+static void
+echo (evcom_stream *stream, const void *base, size_t len)
+{
+ if(len == 0) {
+ fprintf(stderr, "close");
+ evcom_stream_close(stream);
+ } else {
+ evcom_stream_write(stream, base, len);
+ }
+}
+
+static evcom_stream*
+make_echo_connection (evcom_server *server, struct sockaddr *addr)
+{
+ assert(server);
+ assert(addr);
+
+ evcom_stream *stream = malloc(sizeof(evcom_stream));
+ evcom_stream_init(stream, ZERO_TIMEOUT);
+ stream->on_read = echo;
+ stream->on_close = free_stream;
+ stream->on_timeout = error_out;
+
+#if EVCOM_HAVE_GNUTLS
+ if (use_tls) anon_tls_server(stream);
+#endif
+
+ return stream;
+}
+
+
+static void
+zero_start (evcom_stream *stream)
+{
+ evcom_stream_write(stream, "0", 1);
+ zero_written++;
+}
+
+static void
+zero_close (evcom_stream *stream)
+{
+ assert(stream);
+ zero_client_closed = 1;
+}
+
+static void
+zero_recv (evcom_stream *stream, const void *buf, size_t len)
+{
+ MARK_PROGRESS("-", zero_read, zero_to_write);
+ zero_read += len;
+
+ size_t i;
+
+ for (i = 0; i < len; i++) {
+ assert(((char*)buf)[i] == '0');
+ }
+
+ for (i = 0; i < MIN(zero_to_write - zero_written, 90000); i++) {
+ evcom_stream_write(stream, "0", 1);
+ zero_written++;
+
+ MARK_PROGRESS(".", zero_written, zero_to_write);
+
+ if (zero_written == zero_to_write) {
+
+ fprintf(stderr, "CLOSE");
+ evcom_stream_close(stream);
+ }
+ }
+
+ if (len == 0) {
+ fprintf(stderr, "finish");
+ evcom_server_close(&server);
+ }
+}
+
+int
+zero_stream (struct sockaddr *address, size_t to_write)
+{
+ int r;
+
+ assert(to_write >= 1024); // should be kind of big at least.
+ zero_to_write = to_write;
+ got_server_close = 0;
+ zero_written = 0;
+ zero_read = 0;
+ zero_client_closed = 0;
+
+ evcom_server_init(&server);
+ server.on_connection = make_echo_connection;
+ server.on_close = common_on_server_close;
+
+ evcom_server_listen(&server, address, 1000);
+ evcom_server_attach(EV_DEFAULT_ &server);
+
+ evcom_stream client;
+ evcom_stream_init(&client, ZERO_TIMEOUT);
+ client.on_read = zero_recv;
+ client.on_connect = zero_start;
+ client.on_close = zero_close;
+ client.on_timeout = error_out;
+#if EVCOM_HAVE_GNUTLS
+ if (use_tls) anon_tls_client(&client);
+#endif
+ r = evcom_stream_connect(&client, address);
+ assert(r == 0 && "problem connecting");
+ evcom_stream_attach(EV_DEFAULT_ &client);
+
+ ev_loop(EV_DEFAULT_ 0);
+
+ assert(got_server_close);
+ assert(zero_written == zero_to_write);
+ assert(zero_read == zero_to_write);
+ assert(zero_client_closed) ;
+
+ return 0;
+}
+
+
struct sockaddr *
create_unix_address (void)
{
@@ -386,6 +752,11 @@ create_unix_address (void)
void
free_unix_address (struct sockaddr *address)
{
+ struct stat tstat;
+ if (lstat(SOCKFILE, &tstat) == 0) {
+ assert(S_ISSOCK(tstat.st_mode));
+ unlink(SOCKFILE);
+ }
free(address);
}
@@ -405,48 +776,87 @@ main (void)
gnutls_anon_set_server_dh_params (server_credentials, dh_params);
#endif
+
struct sockaddr_in tcp_address;
memset(&tcp_address, 0, sizeof(struct sockaddr_in));
tcp_address.sin_family = AF_INET;
tcp_address.sin_port = htons(PORT);
tcp_address.sin_addr.s_addr = INADDR_ANY;
-
use_tls = 0;
+
+ fprintf(stderr, "zero_stream tcp: ");
+ assert(zero_stream((struct sockaddr*)&tcp_address, 5*1024*1024) == 0);
+ fprintf(stderr, "\n");
+
+ fprintf(stderr, "pipe_stream: ");
+ assert(pipe_stream() == 0);
+ fprintf(stderr, "\n");
+
+ fprintf(stderr, "pair_pingpong: ");
+ assert(pair_pingpong() == 0);
+ fprintf(stderr, "\n");
+
+ fprintf(stderr, "pingpong tcp: ");
assert(pingpong((struct sockaddr*)&tcp_address) == 0);
+ fprintf(stderr, "\n");
+
+ fprintf(stderr, "connint tcp: ");
assert(connint((struct sockaddr*)&tcp_address) == 0);
+ fprintf(stderr, "\n");
#if EVCOM_HAVE_GNUTLS
use_tls = 1;
+
+ fprintf(stderr, "zero_stream ssl: ");
+ assert(zero_stream((struct sockaddr*)&tcp_address, 50*1024) == 0);
+ fprintf(stderr, "\n");
+
+ fprintf(stderr, "pair_pingpong ssl: ");
+ assert(pair_pingpong() == 0);
+ fprintf(stderr, "\n");
+
+ fprintf(stderr, "pingpong ssl: ");
assert(pingpong((struct sockaddr*)&tcp_address) == 0);
+ fprintf(stderr, "\n");
+
+ fprintf(stderr, "connint ssl: ");
assert(connint((struct sockaddr*)&tcp_address) == 0);
-#endif
+ fprintf(stderr, "\n");
+#endif
struct sockaddr *unix_address;
-
use_tls = 0;
+ fprintf(stderr, "pingpong unix: ");
unix_address = create_unix_address();
assert(pingpong(unix_address) == 0);
free_unix_address(unix_address);
+ fprintf(stderr, "\n");
+ fprintf(stderr, "connint unix: ");
unix_address = create_unix_address();
assert(connint(unix_address) == 0);
free_unix_address(unix_address);
+ fprintf(stderr, "\n");
#if EVCOM_HAVE_GNUTLS
use_tls = 1;
+ fprintf(stderr, "pingpong unix ssl: ");
unix_address = create_unix_address();
assert(pingpong(unix_address) == 0);
free_unix_address(unix_address);
+ fprintf(stderr, "\n");
+ fprintf(stderr, "connint unix ssl: ");
unix_address = create_unix_address();
assert(connint(unix_address) == 0);
free_unix_address(unix_address);
-#endif
+ fprintf(stderr, "\n");
+#endif
return 0;
}
diff --git a/src/http.js b/src/http.js
index 773cdb2b7..b59896a20 100644
--- a/src/http.js
+++ b/src/http.js
@@ -394,7 +394,7 @@ function connectionListener (connection) {
res.should_keep_alive = should_keep_alive;
res.addListener("flush", function () {
if(flushMessageQueue(connection, responses)) {
- connection.fullClose();
+ connection.close();
}
});
responses.push(res);
diff --git a/src/net.cc b/src/net.cc
index 8d5e98433..adedb1e8e 100644
--- a/src/net.cc
+++ b/src/net.cc
@@ -62,7 +62,6 @@ Connection::Initialize (v8::Handle<v8::Object> target)
NODE_SET_PROTOTYPE_METHOD(constructor_template, "connect", Connect);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "send", Send);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "close", Close);
- NODE_SET_PROTOTYPE_METHOD(constructor_template, "fullClose", FullClose);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "forceClose", ForceClose);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "setEncoding", SetEncoding);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "readPause", ReadPause);
@@ -116,7 +115,8 @@ Connection::Init (void)
Connection::~Connection ()
{
- assert(stream_.fd < 0 && "garbage collecting open Connection");
+ assert(stream_.recvfd < 0 && "garbage collecting open Connection");
+ assert(stream_.sendfd < 0 && "garbage collecting open Connection");
ForceClose();
}
@@ -149,7 +149,8 @@ Connection::Connect (const Arguments& args)
return ThrowException(String::New("Socket is not in CLOSED state."));
}
- assert(connection->stream_.fd < 0);
+ assert(connection->stream_.recvfd < 0);
+ assert(connection->stream_.sendfd < 0);
if (args.Length() == 0)
return ThrowException(String::New("Must specify a port."));
@@ -345,17 +346,6 @@ Connection::Close (const Arguments& args)
}
Handle<Value>
-Connection::FullClose (const Arguments& args)
-{
- HandleScope scope;
- Connection *connection = ObjectWrap::Unwrap<Connection>(args.Holder());
- assert(connection);
-
- connection->FullClose();
- return Undefined();
-}
-
-Handle<Value>
Connection::ForceClose (const Arguments& args)
{
HandleScope scope;
@@ -394,31 +384,31 @@ Connection::Send (const Arguments& args)
enum encoding enc = ParseEncoding(args[1]);
Local<String> s = args[0]->ToString();
size_t len = s->Utf8Length();
- evcom_buf *buf = node::buf_new(len);
+ char buf[len];
switch (enc) {
case RAW:
case ASCII:
- s->WriteAscii(buf->base, 0, len);
+ s->WriteAscii(buf, 0, len);
break;
case UTF8:
- s->WriteUtf8(buf->base, len);
+ s->WriteUtf8(buf, len);
break;
default:
assert(0 && "unhandled string encoding");
}
- connection->Send(buf);
+ connection->Send(buf, len);
} else if (args[0]->IsArray()) {
Handle<Array> array = Handle<Array>::Cast(args[0]);
size_t len = array->Length();
- evcom_buf *buf = node::buf_new(len);
+ char buf[len];
for (size_t i = 0; i < len; i++) {
Local<Value> int_value = array->Get(Integer::New(i));
- buf->base[i] = int_value->IntegerValue();
+ buf[i] = int_value->IntegerValue();
}
- connection->Send(buf);
+ connection->Send(buf, len);
} else return ThrowException(String::New("Bad argument"));
diff --git a/src/net.h b/src/net.h
index f71faa368..404d22ea5 100644
--- a/src/net.h
+++ b/src/net.h
@@ -24,7 +24,6 @@ protected:
static v8::Handle<v8::Value> Send (const v8::Arguments& args);
static v8::Handle<v8::Value> SendUtf8 (const v8::Arguments& args);
static v8::Handle<v8::Value> Close (const v8::Arguments& args);
- static v8::Handle<v8::Value> FullClose (const v8::Arguments& args);
static v8::Handle<v8::Value> ForceClose (const v8::Arguments& args);
static v8::Handle<v8::Value> SetEncoding (const v8::Arguments& args);
static v8::Handle<v8::Value> ReadPause (const v8::Arguments& args);
@@ -47,9 +46,8 @@ protected:
int Connect (struct sockaddr *address) {
return evcom_stream_connect (&stream_, address);
}
- void Send (evcom_buf *buf) { evcom_stream_write(&stream_, buf); }
+ void Send (const char *buf, size_t len) { evcom_stream_write(&stream_, buf, len); }
void Close (void) { evcom_stream_close(&stream_); }
- void FullClose (void) { evcom_stream_full_close(&stream_); }
void ForceClose (void) { evcom_stream_force_close(&stream_); }
void ReadPause (void) { evcom_stream_read_pause(&stream_); }
void ReadResume (void) { evcom_stream_read_resume(&stream_); }
@@ -92,7 +90,8 @@ private:
evcom_stream_detach(s);
- assert(connection->stream_.fd < 0);
+ assert(connection->stream_.recvfd < 0);
+ assert(connection->stream_.sendfd < 0);
connection->OnClose();
diff --git a/test/mjsunit/test-tcp-many-clients.js b/test/mjsunit/test-tcp-many-clients.js
index 1231562d1..624513254 100644
--- a/test/mjsunit/test-tcp-many-clients.js
+++ b/test/mjsunit/test-tcp-many-clients.js
@@ -18,7 +18,7 @@ var server = node.tcp.createServer(function (c) {
total_connections++;
print("#");
c.send(body);
- c.fullClose();
+ c.close();
});
});
server.listen(port);
@@ -29,6 +29,7 @@ function runClient (callback) {
client.setEncoding("utf8");
client.addListener("connect", function () {
+ print("c");
client.recved = "";
client.connections += 1;
});
diff --git a/test/mjsunit/test-tcp-reconnect.js b/test/mjsunit/test-tcp-reconnect.js
index 292e5fd25..379b0c268 100644
--- a/test/mjsunit/test-tcp-reconnect.js
+++ b/test/mjsunit/test-tcp-reconnect.js
@@ -36,7 +36,7 @@ function onLoad () {
client_recv_count += 1;
puts("client_recv_count " + client_recv_count);
assertEquals("hello\r\n", chunk);
- client.fullClose();
+ client.close();
});
client.addListener("close", function (had_error) {
diff --git a/test/mjsunit/test-tcp-throttle-kernel-buffer.js b/test/mjsunit/test-tcp-throttle-kernel-buffer.js
index 5639c6aa5..7092b7390 100644
--- a/test/mjsunit/test-tcp-throttle-kernel-buffer.js
+++ b/test/mjsunit/test-tcp-throttle-kernel-buffer.js
@@ -13,7 +13,7 @@ puts("start server on port " + PORT);
server = node.tcp.createServer(function (connection) {
connection.addListener("connect", function () {
connection.send(body);
- connection.fullClose();
+ connection.close();
});
});
server.listen(PORT);
diff --git a/test/mjsunit/test-tcp-throttle.js b/test/mjsunit/test-tcp-throttle.js
index b5aab9a81..37bc9bc34 100644
--- a/test/mjsunit/test-tcp-throttle.js
+++ b/test/mjsunit/test-tcp-throttle.js
@@ -5,7 +5,7 @@ N = 500;
server = node.tcp.createServer(function (connection) {
function send (j) {
if (j >= N) {
- connection.fullClose();
+ connection.close();
return;
}
setTimeout(function () {