summaryrefslogtreecommitdiff
path: root/src/common/mainloop.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/common/mainloop.c')
-rw-r--r--src/common/mainloop.c2625
1 files changed, 2625 insertions, 0 deletions
diff --git a/src/common/mainloop.c b/src/common/mainloop.c
new file mode 100644
index 0000000..5702c15
--- /dev/null
+++ b/src/common/mainloop.c
@@ -0,0 +1,2625 @@
+/*
+ * Copyright (c) 2012-2014, Intel Corporation
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Intel Corporation nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <unistd.h>
+#include <errno.h>
+#include <time.h>
+#include <signal.h>
+#include <limits.h>
+#include <stdarg.h>
+#include <sys/epoll.h>
+#include <sys/signalfd.h>
+#include <sys/socket.h>
+
+#include <murphy/common/macros.h>
+#include <murphy/common/mm.h>
+#include <murphy/common/log.h>
+#include <murphy/common/list.h>
+#include <murphy/common/hashtbl.h>
+#include <murphy/common/json.h>
+#include <murphy/common/msg.h>
+#include <murphy/common/mainloop.h>
+
+#define USECS_PER_SEC (1000 * 1000)
+#define USECS_PER_MSEC (1000)
+#define NSECS_PER_USEC (1000)
+
+/*
+ * I/O watches
+ */
+
+struct mrp_io_watch_s {
+ mrp_list_hook_t hook; /* to list of watches */
+ mrp_list_hook_t deleted; /* to list of pending delete */
+ int (*free)(void *ptr); /* cb to free memory */
+ mrp_mainloop_t *ml; /* mainloop */
+ int fd; /* file descriptor to watch */
+ mrp_io_event_t events; /* events of interest */
+ mrp_io_watch_cb_t cb; /* user callback */
+ void *user_data; /* opaque user data */
+ struct pollfd *pollfd; /* associated pollfd */
+ mrp_list_hook_t slave; /* watches with the same fd */
+ int wrhup; /* EPOLLHUPs delivered */
+};
+
+#define is_master(w) !mrp_list_empty(&(w)->hook)
+#define is_slave(w) !mrp_list_empty(&(w)->slave)
+
+
+/*
+ * timers
+ */
+
+struct mrp_timer_s {
+ mrp_list_hook_t hook; /* to list of timers */
+ mrp_list_hook_t deleted; /* to list of pending delete */
+ int (*free)(void *ptr); /* cb to free memory */
+ mrp_mainloop_t *ml; /* mainloop */
+ unsigned int msecs; /* timer interval */
+ uint64_t expire; /* next expiration time */
+ mrp_timer_cb_t cb; /* user callback */
+ void *user_data; /* opaque user data */
+};
+
+
+/*
+ * deferred callbacks
+ */
+
+struct mrp_deferred_s {
+ mrp_list_hook_t hook; /* to list of cbs */
+ mrp_list_hook_t deleted; /* to list of pending delete */
+ int (*free)(void *ptr); /* cb to free memory */
+ mrp_mainloop_t *ml; /* mainloop */
+ mrp_deferred_cb_t cb; /* user callback */
+ void *user_data; /* opaque user data */
+ int inactive : 1;
+};
+
+
+/*
+ * signal handlers
+ */
+
+struct mrp_sighandler_s {
+ mrp_list_hook_t hook; /* to list of handlers */
+ mrp_list_hook_t deleted; /* to list of pending delete */
+ int (*free)(void *ptr); /* cb to free memory */
+ mrp_mainloop_t *ml; /* mainloop */
+ int signum; /* signal number */
+ mrp_sighandler_cb_t cb; /* user callback */
+ void *user_data; /* opaque user data */
+};
+
+
+/*
+ * wakeup notifications
+ */
+
+struct mrp_wakeup_s {
+ mrp_list_hook_t hook; /* to list of wakeup cbs */
+ mrp_list_hook_t deleted; /* to list of pending delete */
+ int (*free)(void *ptr); /* cb to free memory */
+ mrp_mainloop_t *ml; /* mainloop */
+ mrp_wakeup_event_t events; /* wakeup event mask */
+ uint64_t lpf; /* wakeup at most this often */
+ uint64_t next; /* next wakeup time */
+ mrp_timer_t *timer; /* forced interval timer */
+ mrp_wakeup_cb_t cb; /* user callback */
+ void *user_data; /* opaque user data */
+};
+
+#define mark_deleted(o) do { \
+ (o)->cb = NULL; \
+ mrp_list_append(&(o)->ml->deleted, &(o)->deleted); \
+ } while (0)
+
+#define is_deleted(o) ((o)->cb == NULL)
+
+
+/*
+ * any of the above data structures linked to the list of deleted items
+ *
+ * When deleted, the above data structures are first unlinked from their
+ * native list and linked to the special list of deleted entries. At an
+ * appropriate point upon every iteration of the main loop this list is
+ * checked and all entries are freed. This structure is used to get a
+ * pointer to the real structure that we need free. For this to work link
+ * hooks in all of the above structures need to be kept at the same offset
+ * as it is in deleted_t.
+ */
+
+typedef struct {
+ mrp_list_hook_t hook; /* unfreed deleted items */
+ mrp_list_hook_t deleted; /* to list of pending delete */
+ int (*free)(void *ptr); /* cb to free memory */
+} deleted_t;
+
+
+/*
+ * file descriptor table
+ *
+ * We do not want to associate direct pointers to related data structures
+ * with epoll. We might get delivered pending events for deleted fds (at
+ * least for unix domain sockets this seems to be the case) and with direct
+ * pointers we'd get delivered a dangling pointer together with the event.
+ * Instead we keep these structures in an fd table and use the fd to look
+ * up the associated data structure for events. We ignore events for which
+ * no data structure is found. In the fd table we keep a fixed size direct
+ * table for a small amount of fds (we expect to be using at most in the
+ * vast majority of cases) and we hash in the rest.
+ */
+
+#define FDTBL_SIZE 64
+
+typedef struct {
+ void *t[FDTBL_SIZE];
+ mrp_htbl_t *h;
+} fdtbl_t;
+
+
+/*
+ * external mainloops
+ */
+
+struct mrp_subloop_s {
+ mrp_list_hook_t hook; /* to list of subloops */
+ mrp_list_hook_t deleted; /* to list of pending delete */
+ int (*free)(void *ptr); /* cb to free memory */
+ mrp_mainloop_t *ml; /* main loop */
+ mrp_subloop_ops_t *cb; /* subloop glue callbacks */
+ void *user_data; /* opaque subloop data */
+ int epollfd; /* epollfd for this subloop */
+ struct epoll_event *events; /* epoll event buffer */
+ int nevent; /* epoll event buffer size */
+ fdtbl_t *fdtbl; /* file descriptor table */
+ mrp_io_watch_t *w; /* watch for epollfd */
+ struct pollfd *pollfds; /* pollfds for this subloop */
+ int npollfd; /* number of pollfds */
+ int pending; /* pending events */
+ int poll; /* need to poll for events */
+};
+
+
+/*
+ * event busses
+ */
+
+struct mrp_event_bus_s {
+ char *name; /* bus name */
+ mrp_list_hook_t hook; /* to list of busses */
+ mrp_mainloop_t *ml; /* associated mainloop */
+ mrp_list_hook_t watches; /* event watches on this bus */
+ int busy; /* whether pumping events */
+ int dead;
+};
+
+
+/*
+ * event watches
+ */
+
+struct mrp_event_watch_s {
+ mrp_list_hook_t hook; /* to list of event watches */
+ mrp_event_bus_t *bus; /* associated event bus */
+ mrp_event_mask_t mask; /* mask of watched events */
+ mrp_event_watch_cb_t cb; /* notification callback */
+ void *user_data; /* opaque user data */
+ int dead : 1; /* marked for deletion */
+};
+
+
+/*
+ * pending events
+ */
+
+typedef struct {
+ mrp_list_hook_t hook; /* to event queue */
+ mrp_event_bus_t *bus; /* bus for this event */
+ uint32_t id; /* event id */
+ int format; /* attached data format */
+ void *data; /* attached data */
+} pending_event_t;
+
+
+/*
+ * main loop
+ */
+
+struct mrp_mainloop_s {
+ int epollfd; /* our epoll descriptor */
+ struct epoll_event *events; /* epoll event buffer */
+ int nevent; /* epoll event buffer size */
+ fdtbl_t *fdtbl; /* file descriptor table */
+
+ mrp_list_hook_t iowatches; /* list of I/O watches */
+ int niowatch; /* number of I/O watches */
+ mrp_io_event_t iomode; /* default event trigger mode */
+
+ mrp_list_hook_t timers; /* list of timers */
+ mrp_timer_t *next_timer; /* next expiring timer */
+
+ mrp_list_hook_t deferred; /* list of deferred cbs */
+ mrp_list_hook_t inactive_deferred; /* inactive defferred cbs */
+
+ mrp_list_hook_t wakeups; /* list of wakeup cbs */
+
+ int poll_timeout; /* next poll timeout */
+ int poll_result; /* return value from poll */
+
+ int sigfd; /* signal polling fd */
+ sigset_t sigmask; /* signal mask */
+ mrp_io_watch_t *sigwatch; /* sigfd I/O watch */
+ mrp_list_hook_t sighandlers; /* signal handlers */
+
+ mrp_list_hook_t subloops; /* external main loops */
+
+ mrp_list_hook_t deleted; /* unfreed deleted items */
+ int quit; /* TRUE if _quit called */
+ int exit_code; /* returned from _run */
+
+ mrp_superloop_ops_t *super_ops; /* superloop options */
+ void *super_data; /* superloop glue data */
+ void *iow; /* superloop epollfd watch */
+ void *timer; /* superloop timer */
+ void *work; /* superloop deferred work */
+
+ mrp_list_hook_t busses; /* known event busses */
+ mrp_list_hook_t eventq; /* pending events */
+ mrp_deferred_t *eventd; /* deferred event pump cb */
+};
+
+
+static mrp_event_def_t *events; /* registered events */
+static int nevent; /* number of events */
+static MRP_LIST_HOOK (ewatches); /* global, synchronous 'bus' */
+
+
+static void dump_pollfds(const char *prefix, struct pollfd *fds, int nfd);
+static void adjust_superloop_timer(mrp_mainloop_t *ml);
+static size_t poll_events(void *id, mrp_mainloop_t *ml, void **bufp);
+static void pump_events(mrp_deferred_t *d, void *user_data);
+
+/*
+ * fd table manipulation
+ */
+
+static int fd_cmp(const void *key1, const void *key2)
+{
+ return key2 - key1;
+}
+
+
+static uint32_t fd_hash(const void *key)
+{
+ uint32_t h;
+
+ h = (uint32_t)(ptrdiff_t)key;
+
+ return h;
+}
+
+
+
+static fdtbl_t *fdtbl_create(void)
+{
+ fdtbl_t *ft;
+ mrp_htbl_config_t hcfg;
+
+ if ((ft = mrp_allocz(sizeof(*ft))) != NULL) {
+ mrp_clear(&hcfg);
+
+ hcfg.comp = fd_cmp;
+ hcfg.hash = fd_hash;
+ hcfg.free = NULL;
+ hcfg.nbucket = 16;
+
+ ft->h = mrp_htbl_create(&hcfg);
+
+ if (ft->h != NULL)
+ return ft;
+ else
+ mrp_free(ft);
+ }
+
+ return NULL;
+}
+
+
+static void fdtbl_destroy(fdtbl_t *ft)
+{
+ if (ft != NULL) {
+ mrp_htbl_destroy(ft->h, FALSE);
+ mrp_free(ft);
+ }
+}
+
+
+static void *fdtbl_lookup(fdtbl_t *ft, int fd)
+{
+ if (fd >= 0 && ft != NULL) {
+ if (fd < FDTBL_SIZE)
+ return ft->t[fd];
+ else
+ return mrp_htbl_lookup(ft->h, (void *)(ptrdiff_t)fd);
+ }
+
+ return NULL;
+}
+
+
+static int fdtbl_insert(fdtbl_t *ft, int fd, void *ptr)
+{
+ if (fd >= 0 && ft != NULL) {
+ if (fd < FDTBL_SIZE) {
+ if (ft->t[fd] == NULL) {
+ ft->t[fd] = ptr;
+ return 0;
+ }
+ else
+ errno = EEXIST;
+ }
+ else {
+ if (mrp_htbl_insert(ft->h, (void *)(ptrdiff_t)fd, ptr))
+ return 0;
+ else
+ errno = EEXIST;
+ }
+ }
+ else
+ errno = EINVAL;
+
+ return -1;
+}
+
+
+static void fdtbl_remove(fdtbl_t *ft, int fd)
+{
+ if (fd >= 0 && ft != NULL) {
+ if (fd < FDTBL_SIZE)
+ ft->t[fd] = NULL;
+ else
+ mrp_htbl_remove(ft->h, (void *)(ptrdiff_t)fd, FALSE);
+ }
+}
+
+
+/*
+ * I/O watches
+ */
+
+static uint32_t epoll_event_mask(mrp_io_watch_t *master, mrp_io_watch_t *ignore)
+{
+ mrp_io_watch_t *w;
+ mrp_list_hook_t *p, *n;
+ uint32_t mask;
+
+ mask = (master != ignore ?
+ master->events : master->events & MRP_IO_TRIGGER_EDGE);
+
+ mrp_list_foreach(&master->slave, p, n) {
+ w = mrp_list_entry(p, typeof(*w), slave);
+
+ if (w != ignore)
+ mask |= w->events;
+ }
+
+ mrp_debug("epoll event mask for I/O watch %p: %d", master, mask);
+
+ return mask;
+}
+
+
+static int epoll_add_slave(mrp_io_watch_t *master, mrp_io_watch_t *slave)
+{
+ mrp_mainloop_t *ml = master->ml;
+ struct epoll_event evt;
+
+ evt.events = epoll_event_mask(master, NULL) | slave->events;
+ evt.data.u64 = 0;
+ evt.data.fd = master->fd;
+
+ if (epoll_ctl(ml->epollfd, EPOLL_CTL_MOD, master->fd, &evt) == 0) {
+ mrp_list_append(&master->slave, &slave->slave);
+
+ return 0;
+ }
+
+ return -1;
+}
+
+
+static int epoll_add(mrp_io_watch_t *w)
+{
+ mrp_mainloop_t *ml = w->ml;
+ mrp_io_watch_t *master;
+ struct epoll_event evt;
+
+ if (fdtbl_insert(ml->fdtbl, w->fd, w) == 0) {
+ evt.events = w->events;
+ evt.data.u64 = 0; /* init full union for valgrind... */
+ evt.data.fd = w->fd;
+
+ if (epoll_ctl(ml->epollfd, EPOLL_CTL_ADD, w->fd, &evt) == 0) {
+ mrp_list_append(&ml->iowatches, &w->hook);
+ ml->niowatch++;
+
+ return 0;
+ }
+ else
+ fdtbl_remove(ml->fdtbl, w->fd);
+ }
+ else {
+ if (errno == EEXIST) {
+ master = fdtbl_lookup(ml->fdtbl, w->fd);
+
+ if (master != NULL)
+ return epoll_add_slave(master, w);
+ }
+ }
+
+ return -1;
+}
+
+
+static int epoll_del(mrp_io_watch_t *w)
+{
+ mrp_mainloop_t *ml = w->ml;
+ mrp_io_watch_t *master;
+ struct epoll_event evt;
+ int status;
+
+ if (is_master(w))
+ master = w;
+ else
+ master = fdtbl_lookup(ml->fdtbl, w->fd);
+
+ if (master != NULL) {
+ evt.events = epoll_event_mask(master, w);
+ evt.data.u64 = 0; /* init full union for valgrind... */
+ evt.data.fd = w->fd;
+
+ if ((evt.events & MRP_IO_EVENT_ALL) == 0) {
+ fdtbl_remove(ml->fdtbl, w->fd);
+ status = epoll_ctl(ml->epollfd, EPOLL_CTL_DEL, w->fd, &evt);
+
+ if (status == 0 || (errno == EBADF || errno == ENOENT))
+ ml->niowatch--;
+ }
+ else
+ status = epoll_ctl(ml->epollfd, EPOLL_CTL_MOD, w->fd, &evt);
+
+ if (status == 0 || (errno == EBADF || errno == ENOENT))
+ return 0;
+ else
+ mrp_log_error("Failed to update epoll for deleted I/O watch %p "
+ "(fd %d, %d: %s).", w, w->fd, errno, strerror(errno));
+ }
+ else {
+ mrp_log_error("Failed to find master for deleted I/O watch %p "
+ "(fd %d).", w, w->fd);
+ errno = EINVAL;
+ }
+
+ return -1;
+}
+
+
+static int free_io_watch(void *ptr)
+{
+ mrp_io_watch_t *w = (mrp_io_watch_t *)ptr;
+ mrp_mainloop_t *ml = w->ml;
+ mrp_io_watch_t *master;
+
+ master = fdtbl_lookup(ml->fdtbl, w->fd);
+
+ if (master == w) {
+ fdtbl_remove(ml->fdtbl, w->fd);
+
+ if (!mrp_list_empty(&w->slave)) {
+ /* relink first slave as new master to mainloop */
+ master = mrp_list_entry(w->slave.next, typeof(*master), slave);
+ mrp_list_append(&ml->iowatches, &master->hook);
+
+ fdtbl_insert(ml->fdtbl, master->fd, master);
+ }
+ }
+
+ mrp_list_delete(&w->slave);
+ mrp_free(w);
+
+ return TRUE;
+}
+
+
+mrp_io_watch_t *mrp_add_io_watch(mrp_mainloop_t *ml, int fd,
+ mrp_io_event_t events,
+ mrp_io_watch_cb_t cb, void *user_data)
+{
+ mrp_io_watch_t *w;
+
+ if (fd < 0 || cb == NULL)
+ return NULL;
+
+ if ((w = mrp_allocz(sizeof(*w))) != NULL) {
+ mrp_list_init(&w->hook);
+ mrp_list_init(&w->deleted);
+ mrp_list_init(&w->slave);
+ w->ml = ml;
+ w->fd = fd;
+ w->events = events & MRP_IO_EVENT_ALL;
+
+ switch (events & MRP_IO_TRIGGER_MASK) {
+ case 0:
+ if (ml->iomode == MRP_IO_TRIGGER_EDGE)
+ w->events |= MRP_IO_TRIGGER_EDGE;
+ break;
+ case MRP_IO_TRIGGER_EDGE:
+ w->events |= MRP_IO_TRIGGER_EDGE;
+ break;
+ case MRP_IO_TRIGGER_LEVEL:
+ break;
+ default:
+ mrp_log_warning("Invalid I/O event trigger mode 0x%x.",
+ events & MRP_IO_TRIGGER_MASK);
+ break;
+ }
+
+ w->cb = cb;
+ w->user_data = user_data;
+ w->free = free_io_watch;
+
+ if (epoll_add(w) != 0) {
+ mrp_free(w);
+ w = NULL;
+ }
+ else
+ mrp_debug("added I/O watch %p (fd %d, events 0x%x)", w, w->fd, w->events);
+ }
+
+ return w;
+}
+
+
+void mrp_del_io_watch(mrp_io_watch_t *w)
+{
+ /*
+ * Notes: It is not safe to free the watch here as there might be
+ * a delivered but unprocessed epoll event with a pointer
+ * to the watch. We just mark it deleted and take care of
+ * the actual deletion in the dispatching loop.
+ */
+
+ if (w != NULL && !is_deleted(w)) {
+ mrp_debug("marking I/O watch %p (fd %d) deleted", w, w->fd);
+
+ mark_deleted(w);
+ w->events = 0;
+
+ epoll_del(w);
+ }
+}
+
+
+mrp_mainloop_t *mrp_get_io_watch_mainloop(mrp_io_watch_t *w)
+{
+ return w ? w->ml : NULL;
+}
+
+
+int mrp_set_io_event_mode(mrp_mainloop_t *ml, mrp_io_event_t mode)
+{
+ if (mode == MRP_IO_TRIGGER_LEVEL || mode == MRP_IO_TRIGGER_EDGE) {
+ ml->iomode = mode;
+ return TRUE;
+ }
+ else {
+ mrp_log_error("Invalid I/O event mode 0x%x.", mode);
+ return FALSE;
+ }
+}
+
+
+mrp_io_event_t mrp_get_io_event_mode(mrp_mainloop_t *ml)
+{
+ return ml->iomode ? ml->iomode : MRP_IO_TRIGGER_LEVEL;
+}
+
+
+/*
+ * timers
+ */
+
+static uint64_t time_now(void)
+{
+ struct timespec ts;
+ uint64_t now;
+
+ clock_gettime(CLOCK_MONOTONIC, &ts);
+ now = ts.tv_sec * USECS_PER_SEC;
+ now += ts.tv_nsec / NSECS_PER_USEC;
+
+ return now;
+}
+
+
+static inline int usecs_to_msecs(uint64_t usecs)
+{
+ int msecs;
+
+ msecs = (usecs + USECS_PER_MSEC - 1) / USECS_PER_MSEC;
+
+ return msecs;
+}
+
+
+static void insert_timer(mrp_timer_t *t)
+{
+ mrp_mainloop_t *ml = t->ml;
+ mrp_list_hook_t *p, *n;
+ mrp_timer_t *t1, *next;
+ int inserted;
+
+ /*
+ * Notes:
+ * If there is ever a need to run a large number of
+ * simultaneous timers, we need to change this to a
+ * self-balancing data structure, eg. an red-black tree.
+ */
+
+ inserted = FALSE;
+ next = NULL;
+ mrp_list_foreach(&ml->timers, p, n) {
+ t1 = mrp_list_entry(p, mrp_timer_t, hook);
+
+ if (!is_deleted(t1)) {
+ if (t->expire <= t1->expire) {
+ mrp_list_prepend(p->prev, &t->hook);
+ inserted = TRUE;
+ break;
+ }
+ if (next == NULL)
+ next = t1;
+ }
+ }
+
+ if (!inserted)
+ mrp_list_append(&ml->timers, &t->hook);
+
+ if (next)
+ ml->next_timer = next;
+ else {
+ ml->next_timer = t;
+ adjust_superloop_timer(ml);
+ }
+}
+
+
+static inline void rearm_timer(mrp_timer_t *t)
+{
+ mrp_list_delete(&t->hook);
+ t->expire = time_now() + t->msecs * USECS_PER_MSEC;
+ insert_timer(t);
+}
+
+
+static mrp_timer_t *find_next_timer(mrp_mainloop_t *ml)
+{
+ mrp_list_hook_t *p, *n;
+ mrp_timer_t *t = NULL;
+
+ mrp_list_foreach(&ml->timers, p, n) {
+ t = mrp_list_entry(p, typeof(*t), hook);
+
+ if (!is_deleted(t))
+ break;
+ else
+ t = NULL;
+ }
+
+ ml->next_timer = t;
+ return t;
+}
+
+
+static int free_timer(void *ptr)
+{
+ mrp_timer_t *t = (mrp_timer_t *)ptr;
+
+ mrp_free(t);
+
+ return TRUE;
+}
+
+
+
+mrp_timer_t *mrp_add_timer(mrp_mainloop_t *ml, unsigned int msecs,
+ mrp_timer_cb_t cb, void *user_data)
+{
+ mrp_timer_t *t;
+
+ if (cb == NULL)
+ return NULL;
+
+ if ((t = mrp_allocz(sizeof(*t))) != NULL) {
+ mrp_list_init(&t->hook);
+ mrp_list_init(&t->deleted);
+ t->ml = ml;
+ t->expire = time_now() + msecs * USECS_PER_MSEC;
+ t->msecs = msecs;
+ t->cb = cb;
+ t->user_data = user_data;
+ t->free = free_timer;
+
+ insert_timer(t);
+ }
+
+ return t;
+}
+
+
+void mrp_mod_timer(mrp_timer_t *t, unsigned int msecs)
+{
+ if (t != NULL && !is_deleted(t)) {
+ if (msecs != MRP_TIMER_RESTART)
+ t->msecs = msecs;
+
+ rearm_timer(t);
+ }
+}
+
+
+void mrp_del_timer(mrp_timer_t *t)
+{
+ /*
+ * Notes: It is not safe to simply free this entry here as we might
+ * be dispatching with this entry being the next to process.
+ * We check for this and if it is not the case we relink this
+ * to the list of deleted items which will be then processed
+ * at end of the mainloop iteration. Otherwise we only mark the
+ * this entry for deletion and the rest will be taken care of in
+ * dispatch_timers().
+ */
+
+ if (t != NULL && !is_deleted(t)) {
+ mrp_debug("marking timer %p deleted", t);
+
+ mark_deleted(t);
+
+ if (t->ml->next_timer == t) {
+ find_next_timer(t->ml);
+ adjust_superloop_timer(t->ml);
+ }
+ }
+}
+
+
+mrp_mainloop_t *mrp_get_timer_mainloop(mrp_timer_t *t)
+{
+ return t ? t->ml : NULL;
+}
+
+
+/*
+ * deferred/idle callbacks
+ */
+
+mrp_deferred_t *mrp_add_deferred(mrp_mainloop_t *ml, mrp_deferred_cb_t cb,
+ void *user_data)
+{
+ mrp_deferred_t *d;
+
+ if (cb == NULL)
+ return NULL;
+
+ if ((d = mrp_allocz(sizeof(*d))) != NULL) {
+ mrp_list_init(&d->hook);
+ mrp_list_init(&d->deleted);
+ d->ml = ml;
+ d->cb = cb;
+ d->user_data = user_data;
+
+ mrp_list_append(&ml->deferred, &d->hook);
+ adjust_superloop_timer(ml);
+ }
+
+ return d;
+}
+
+
+void mrp_del_deferred(mrp_deferred_t *d)
+{
+ /*
+ * Notes: It is not safe to simply free this entry here as we might
+ * be dispatching with this entry being the next to process.
+ * We just mark this here deleted and take care of the rest
+ * in the dispatching loop.
+ */
+
+ if (d != NULL && !is_deleted(d)) {
+ mrp_debug("marking deferred %p deleted", d);
+ mark_deleted(d);
+ }
+}
+
+
+void mrp_disable_deferred(mrp_deferred_t *d)
+{
+ if (d != NULL)
+ d->inactive = TRUE;
+}
+
+
+static inline void disable_deferred(mrp_deferred_t *d)
+{
+ if (MRP_LIKELY(d->inactive)) {
+ mrp_list_delete(&d->hook);
+ mrp_list_append(&d->ml->inactive_deferred, &d->hook);
+ }
+
+}
+
+
+void mrp_enable_deferred(mrp_deferred_t *d)
+{
+ if (d != NULL) {
+ if (!is_deleted(d)) {
+ d->inactive = FALSE;
+ mrp_list_delete(&d->hook);
+ mrp_list_append(&d->ml->deferred, &d->hook);
+ }
+ }
+}
+
+
+mrp_mainloop_t *mrp_get_deferred_mainloop(mrp_deferred_t *d)
+{
+ return d ? d->ml : NULL;
+}
+
+
+/*
+ * signal notifications
+ */
+
+static void dispatch_signals(mrp_io_watch_t *w, int fd,
+ mrp_io_event_t events, void *user_data)
+{
+ mrp_mainloop_t *ml = mrp_get_io_watch_mainloop(w);
+ struct signalfd_siginfo sig;
+ mrp_list_hook_t *p, *n;
+ mrp_sighandler_t *h;
+ int signum;
+
+ MRP_UNUSED(events);
+ MRP_UNUSED(user_data);
+
+ while (read(fd, &sig, sizeof(sig)) > 0) {
+ signum = sig.ssi_signo;
+
+ mrp_list_foreach(&ml->sighandlers, p, n) {
+ h = mrp_list_entry(p, typeof(*h), hook);
+
+ if (!is_deleted(h)) {
+ if (h->signum == signum)
+ h->cb(h, signum, h->user_data);
+ }
+ }
+ }
+}
+
+
+static int setup_sighandlers(mrp_mainloop_t *ml)
+{
+ if (ml->sigfd == -1) {
+ sigemptyset(&ml->sigmask);
+
+ ml->sigfd = signalfd(-1, &ml->sigmask, SFD_NONBLOCK | SFD_CLOEXEC);
+
+ if (ml->sigfd == -1)
+ return FALSE;
+
+ ml->sigwatch = mrp_add_io_watch(ml, ml->sigfd, MRP_IO_EVENT_IN,
+ dispatch_signals, NULL);
+
+ if (ml->sigwatch == NULL) {
+ close(ml->sigfd);
+ return FALSE;
+ }
+ }
+
+ return TRUE;
+}
+
+
+mrp_sighandler_t *mrp_add_sighandler(mrp_mainloop_t *ml, int signum,
+ mrp_sighandler_cb_t cb, void *user_data)
+{
+ mrp_sighandler_t *s;
+
+ if (cb == NULL || ml->sigfd == -1)
+ return NULL;
+
+ if ((s = mrp_allocz(sizeof(*s))) != NULL) {
+ mrp_list_init(&s->hook);
+ mrp_list_init(&s->deleted);
+ s->ml = ml;
+ s->signum = signum;
+ s->cb = cb;
+ s->user_data = user_data;
+
+ mrp_list_append(&ml->sighandlers, &s->hook);
+ sigaddset(&ml->sigmask, s->signum);
+ signalfd(ml->sigfd, &ml->sigmask, SFD_NONBLOCK|SFD_CLOEXEC);
+ sigprocmask(SIG_BLOCK, &ml->sigmask, NULL);
+ }
+
+ return s;
+}
+
+
+static void recalc_sigmask(mrp_mainloop_t *ml)
+{
+ mrp_list_hook_t *p, *n;
+ mrp_sighandler_t *h;
+
+ sigprocmask(SIG_UNBLOCK, &ml->sigmask, NULL);
+ sigemptyset(&ml->sigmask);
+
+ mrp_list_foreach(&ml->sighandlers, p, n) {
+ h = mrp_list_entry(p, typeof(*h), hook);
+ if (!is_deleted(h))
+ sigaddset(&ml->sigmask, h->signum);
+ }
+
+ sigprocmask(SIG_BLOCK, &ml->sigmask, NULL);
+}
+
+
+void mrp_del_sighandler(mrp_sighandler_t *h)
+{
+ if (h != NULL && !is_deleted(h)) {
+ mrp_debug("marking sighandler %p deleted", h);
+
+ mark_deleted(h);
+ recalc_sigmask(h->ml);
+ }
+}
+
+
+mrp_mainloop_t *mrp_get_sighandler_mainloop(mrp_sighandler_t *h)
+{
+ return h ? h->ml : NULL;
+}
+
+
+/*
+ * wakeup notifications
+ */
+
+static void wakeup_cb(mrp_wakeup_t *w, mrp_wakeup_event_t event, uint64_t now)
+{
+ if (w->next > now) {
+ mrp_debug("skipping wakeup %p because of low-pass filter", w);
+ return;
+ }
+
+ w->cb(w, event, w->user_data);
+
+ if (w->lpf != MRP_WAKEUP_NOLIMIT)
+ w->next = now + w->lpf;
+
+ if (w->timer != NULL)
+ mrp_mod_timer(w->timer, MRP_TIMER_RESTART);
+}
+
+
+static void forced_wakeup_cb(mrp_timer_t *t, void *user_data)
+{
+ mrp_wakeup_t *w = (mrp_wakeup_t *)user_data;
+
+ MRP_UNUSED(t);
+
+ if (is_deleted(w))
+ return;
+
+ mrp_debug("dispatching forced wakeup cb %p", w);
+
+ wakeup_cb(w, MRP_WAKEUP_EVENT_LIMIT, time_now());
+}
+
+
+mrp_wakeup_t *mrp_add_wakeup(mrp_mainloop_t *ml, mrp_wakeup_event_t events,
+ unsigned int lpf_msecs, unsigned int force_msecs,
+ mrp_wakeup_cb_t cb, void *user_data)
+{
+ mrp_wakeup_t *w;
+
+ if (cb == NULL)
+ return NULL;
+
+ if (lpf_msecs > force_msecs && force_msecs != MRP_WAKEUP_NOLIMIT)
+ return NULL;
+
+ if ((w = mrp_allocz(sizeof(*w))) != NULL) {
+ mrp_list_init(&w->hook);
+ mrp_list_init(&w->deleted);
+ w->ml = ml;
+ w->events = events;
+ w->cb = cb;
+ w->user_data = user_data;
+
+ w->lpf = lpf_msecs * USECS_PER_MSEC;
+
+ if (lpf_msecs != MRP_WAKEUP_NOLIMIT)
+ w->next = time_now() + w->lpf;
+
+ if (force_msecs != MRP_WAKEUP_NOLIMIT) {
+ w->timer = mrp_add_timer(ml, force_msecs, forced_wakeup_cb, w);
+
+ if (w->timer == NULL) {
+ mrp_free(w);
+ return NULL;
+ }
+ }
+
+ mrp_list_append(&ml->wakeups, &w->hook);
+ }
+
+ return w;
+}
+
+
+void mrp_del_wakeup(mrp_wakeup_t *w)
+{
+ /*
+ * Notes: It is not safe to simply free this entry here as we might
+ * be dispatching with this entry being the next to process.
+ * We just mark this here deleted and take care of the rest
+ * in the dispatching loop.
+ */
+
+ if (w != NULL && !is_deleted(w)) {
+ mrp_debug("marking wakeup %p deleted", w);
+ mark_deleted(w);
+ }
+}
+
+
+mrp_mainloop_t *mrp_get_wakeup_mainloop(mrp_wakeup_t *w)
+{
+ return w ? w->ml : NULL;
+}
+
+
+/*
+ * external mainloops we pump
+ */
+
+static int free_subloop(void *ptr)
+{
+ mrp_subloop_t *sl = (mrp_subloop_t *)ptr;
+
+ mrp_debug("freeing subloop %p", sl);
+
+ mrp_free(sl->pollfds);
+ mrp_free(sl->events);
+ mrp_free(sl);
+
+ return TRUE;
+}
+
+
+static void subloop_event_cb(mrp_io_watch_t *w, int fd, mrp_io_event_t events,
+ void *user_data)
+{
+ mrp_subloop_t *sl = (mrp_subloop_t *)user_data;
+
+ MRP_UNUSED(w);
+ MRP_UNUSED(fd);
+ MRP_UNUSED(events);
+
+ mrp_debug("subloop %p has events, setting poll to TRUE", sl);
+
+ sl->poll = TRUE;
+}
+
+
+mrp_subloop_t *mrp_add_subloop(mrp_mainloop_t *ml, mrp_subloop_ops_t *ops,
+ void *user_data)
+{
+ mrp_subloop_t *sl;
+
+ if (ops == NULL || user_data == NULL)
+ return NULL;
+
+ if ((sl = mrp_allocz(sizeof(*sl))) != NULL) {
+ mrp_list_init(&sl->hook);
+ mrp_list_init(&sl->deleted);
+ sl->free = free_subloop;
+ sl->ml = ml;
+ sl->cb = ops;
+ sl->user_data = user_data;
+ sl->epollfd = epoll_create1(EPOLL_CLOEXEC);
+ sl->fdtbl = fdtbl_create();
+
+ if (sl->epollfd >= 0 && sl->fdtbl != NULL) {
+ sl->w = mrp_add_io_watch(ml, sl->epollfd, MRP_IO_EVENT_IN,
+ subloop_event_cb, sl);
+
+ if (sl->w != NULL)
+ mrp_list_append(&ml->subloops, &sl->hook);
+ else
+ goto fail;
+ }
+ else {
+ fail:
+ close(sl->epollfd);
+ fdtbl_destroy(sl->fdtbl);
+ mrp_free(sl);
+ sl = NULL;
+ }
+ }
+
+ return sl;
+}
+
+
+void mrp_del_subloop(mrp_subloop_t *sl)
+{
+ struct epoll_event dummy;
+ int i;
+
+ /*
+ * Notes: It is not safe to free the loop here as there might be
+ * a delivered but unprocessed epoll event with a pointers
+ * to the loops pollfds. However, since we do not dispatch
+ * loops by traversing the list of loops, it is safe to relink
+ * it to the list of data structures to be deleted at the
+ * end of the next main loop iteration. So we just remove the
+ * pollfds from epoll, mark this as deleted and relink it.
+ */
+
+ if (sl != NULL && !is_deleted(sl)) {
+ mrp_debug("deactivating and marking subloop %p deleted", sl);
+
+ mrp_del_io_watch(sl->w);
+
+ /* XXX TODO: Why ? close(sl->epollfd) should be enough... */
+ for (i = 0; i < sl->npollfd; i++)
+ epoll_ctl(sl->epollfd, EPOLL_CTL_DEL, sl->pollfds[i].fd, &dummy);
+
+ close(sl->epollfd);
+ sl->epollfd = -1;
+ fdtbl_destroy(sl->fdtbl);
+ sl->fdtbl = NULL;
+
+ mark_deleted(sl);
+ }
+}
+
+
+/*
+ * external mainloop that pumps us
+ */
+
+
+static void super_io_cb(void *super_data, void *id, int fd,
+ mrp_io_event_t events, void *user_data)
+{
+ mrp_mainloop_t *ml = (mrp_mainloop_t *)user_data;
+ mrp_superloop_ops_t *ops = ml->super_ops;
+
+ MRP_UNUSED(super_data);
+ MRP_UNUSED(id);
+ MRP_UNUSED(fd);
+ MRP_UNUSED(events);
+
+ ops->mod_defer(ml->super_data, ml->work, TRUE);
+}
+
+
+static void super_timer_cb(void *super_data, void *id, void *user_data)
+{
+ mrp_mainloop_t *ml = (mrp_mainloop_t *)user_data;
+ mrp_superloop_ops_t *ops = ml->super_ops;
+
+ MRP_UNUSED(super_data);
+ MRP_UNUSED(id);
+
+ ops->mod_defer(ml->super_data, ml->work, TRUE);
+}
+
+
+static void super_work_cb(void *super_data, void *id, void *user_data)
+{
+ mrp_mainloop_t *ml = (mrp_mainloop_t *)user_data;
+ mrp_superloop_ops_t *ops = ml->super_ops;
+ unsigned int timeout;
+
+ MRP_UNUSED(super_data);
+ MRP_UNUSED(id);
+
+ mrp_mainloop_poll(ml, FALSE);
+ mrp_mainloop_dispatch(ml);
+
+ if (!ml->quit) {
+ mrp_mainloop_prepare(ml);
+
+ /*
+ * Notes:
+ *
+ * Some mainloop abstractions (eg. the one in PulseAudio)
+ * have deferred callbacks that starve all other event
+ * processing until no more deferred callbacks are pending.
+ * For this reason, we cannot map our deferred callbacks
+ * directly to superloop deferred callbacks (in some cases
+ * this could starve the superloop indefinitely). Hence, if
+ * we have enabled deferred callbacks, we arm our timer with
+ * 0 timeout to let the superloop do one round of its event
+ * processing.
+ */
+
+ timeout = mrp_list_empty(&ml->deferred) ? ml->poll_timeout : 0;
+ ops->mod_timer(ml->super_data, ml->timer, timeout);
+ ops->mod_defer(ml->super_data, ml->work, FALSE);
+ }
+ else {
+ ops->del_io(ml->super_data, ml->iow);
+ ops->del_timer(ml->super_data, ml->timer);
+ ops->del_defer(ml->super_data, ml->work);
+
+ ml->iow = NULL;
+ ml->timer = NULL;
+ ml->work = NULL;
+ }
+}
+
+
+static void adjust_superloop_timer(mrp_mainloop_t *ml)
+{
+ mrp_superloop_ops_t *ops = ml->super_ops;
+ unsigned int timeout;
+
+ if (ops == NULL)
+ return;
+
+ mrp_mainloop_prepare(ml);
+ timeout = mrp_list_empty(&ml->deferred) ? ml->poll_timeout : 0;
+ ops->mod_timer(ml->super_data, ml->timer, timeout);
+}
+
+
+int mrp_set_superloop(mrp_mainloop_t *ml, mrp_superloop_ops_t *ops,
+ void *loop_data)
+{
+ mrp_io_event_t events;
+ int timeout;
+
+ if (ml->super_ops == NULL) {
+ if (ops->poll_io != NULL)
+ ops->poll_events = poll_events;
+
+ ml->super_ops = ops;
+ ml->super_data = loop_data;
+
+ mrp_mainloop_prepare(ml);
+
+ events = MRP_IO_EVENT_IN | MRP_IO_EVENT_OUT | MRP_IO_EVENT_HUP;
+ ml->iow = ops->add_io(ml->super_data, ml->epollfd, events,
+ super_io_cb, ml);
+ ml->work = ops->add_defer(ml->super_data, super_work_cb, ml);
+
+ /*
+ * Notes:
+ *
+ * Some mainloop abstractions (eg. the one in PulseAudio)
+ * have deferred callbacks that starve all other event
+ * processing until no more deferred callbacks are pending.
+ * For this reason, we cannot map our deferred callbacks
+ * directly to superloop deferred callbacks (in some cases
+ * this could starve the superloop indefinitely). Hence, if
+ * we have enabled deferred callbacks, we arm our timer with
+ * 0 timeout to let the superloop do one round of its event
+ * processing.
+ */
+
+ timeout = mrp_list_empty(&ml->deferred) ? ml->poll_timeout : 0;
+ ml->timer = ops->add_timer(ml->super_data, timeout, super_timer_cb, ml);
+
+ if (ml->iow != NULL && ml->timer != NULL && ml->work != NULL)
+ return TRUE;
+ else
+ mrp_clear_superloop(ml);
+ }
+
+ return FALSE;
+}
+
+
+int mrp_clear_superloop(mrp_mainloop_t *ml)
+{
+ mrp_superloop_ops_t *ops = ml->super_ops;
+ void *data = ml->super_data;
+
+ if (ops != NULL) {
+ if (ml->iow != NULL) {
+ ops->del_io(data, ml->iow);
+ ml->iow = NULL;
+ }
+
+ if (ml->work != NULL) {
+ ops->del_defer(data, ml->work);
+ ml->work = NULL;
+ }
+
+ if (ml->timer != NULL) {
+ ops->del_timer(data, ml->timer);
+ ml->timer = NULL;
+ }
+
+ ml->super_ops = NULL;
+ ml->super_data = NULL;
+
+ ops->unregister(data);
+
+ return TRUE;
+ }
+ else
+ return FALSE;
+}
+
+
+int mrp_mainloop_unregister(mrp_mainloop_t *ml)
+{
+ return mrp_clear_superloop(ml);
+}
+
+
+/*
+ * mainloop
+ */
+
+static void purge_io_watches(mrp_mainloop_t *ml)
+{
+ mrp_list_hook_t *p, *n, *sp, *sn;
+ mrp_io_watch_t *w, *s;
+
+ mrp_list_foreach(&ml->iowatches, p, n) {
+ w = mrp_list_entry(p, typeof(*w), hook);
+ mrp_list_delete(&w->hook);
+ mrp_list_delete(&w->deleted);
+
+ mrp_list_foreach(&w->slave, sp, sn) {
+ s = mrp_list_entry(sp, typeof(*s), slave);
+ mrp_list_delete(&s->slave);
+ mrp_free(s);
+ }
+
+ mrp_free(w);
+ }
+}
+
+
+static void purge_timers(mrp_mainloop_t *ml)
+{
+ mrp_list_hook_t *p, *n;
+ mrp_timer_t *t;
+
+ mrp_list_foreach(&ml->timers, p, n) {
+ t = mrp_list_entry(p, typeof(*t), hook);
+ mrp_list_delete(&t->hook);
+ mrp_list_delete(&t->deleted);
+ mrp_free(t);
+ }
+}
+
+
+static void purge_deferred(mrp_mainloop_t *ml)
+{
+ mrp_list_hook_t *p, *n;
+ mrp_deferred_t *d;
+
+ mrp_list_foreach(&ml->deferred, p, n) {
+ d = mrp_list_entry(p, typeof(*d), hook);
+ mrp_list_delete(&d->hook);
+ mrp_list_delete(&d->deleted);
+ mrp_free(d);
+ }
+
+ mrp_list_foreach(&ml->inactive_deferred, p, n) {
+ d = mrp_list_entry(p, typeof(*d), hook);
+ mrp_list_delete(&d->hook);
+ mrp_list_delete(&d->deleted);
+ mrp_free(d);
+ }
+}
+
+
+static void purge_sighandlers(mrp_mainloop_t *ml)
+{
+ mrp_list_hook_t *p, *n;
+ mrp_sighandler_t *s;
+
+ mrp_list_foreach(&ml->sighandlers, p, n) {
+ s = mrp_list_entry(p, typeof(*s), hook);
+ mrp_list_delete(&s->hook);
+ mrp_list_delete(&s->deleted);
+ mrp_free(s);
+ }
+}
+
+
+static void purge_wakeups(mrp_mainloop_t *ml)
+{
+ mrp_list_hook_t *p, *n;
+ mrp_wakeup_t *w;
+
+ mrp_list_foreach(&ml->wakeups, p, n) {
+ w = mrp_list_entry(p, typeof(*w), hook);
+ mrp_list_delete(&w->hook);
+ mrp_list_delete(&w->deleted);
+ mrp_free(w);
+ }
+}
+
+
+static void purge_deleted(mrp_mainloop_t *ml)
+{
+ mrp_list_hook_t *p, *n;
+ deleted_t *d;
+
+ mrp_list_foreach(&ml->deleted, p, n) {
+ d = mrp_list_entry(p, typeof(*d), deleted);
+ mrp_list_delete(&d->deleted);
+ mrp_list_delete(&d->hook);
+ if (d->free == NULL) {
+ mrp_debug("purging deleted object %p", d);
+ mrp_free(d);
+ }
+ else {
+ mrp_debug("purging deleted object %p (free cb: %p)", d, d->free);
+ if (!d->free(d)) {
+ mrp_log_error("Failed to free purged item %p.", d);
+ mrp_list_prepend(p, &d->deleted);
+ }
+ }
+ }
+}
+
+
+static void purge_subloops(mrp_mainloop_t *ml)
+{
+ mrp_list_hook_t *p, *n;
+ mrp_subloop_t *sl;
+
+ mrp_list_foreach(&ml->subloops, p, n) {
+ sl = mrp_list_entry(p, typeof(*sl), hook);
+ mrp_list_delete(&sl->hook);
+ mrp_list_delete(&sl->deleted);
+ free_subloop(sl);
+ }
+}
+
+
+mrp_mainloop_t *mrp_mainloop_create(void)
+{
+ mrp_mainloop_t *ml;
+
+ if ((ml = mrp_allocz(sizeof(*ml))) != NULL) {
+ ml->epollfd = epoll_create1(EPOLL_CLOEXEC);
+ ml->sigfd = -1;
+ ml->fdtbl = fdtbl_create();
+
+ if (ml->epollfd >= 0 && ml->fdtbl != NULL) {
+ mrp_list_init(&ml->iowatches);
+ mrp_list_init(&ml->timers);
+ mrp_list_init(&ml->deferred);
+ mrp_list_init(&ml->inactive_deferred);
+ mrp_list_init(&ml->sighandlers);
+ mrp_list_init(&ml->wakeups);
+ mrp_list_init(&ml->deleted);
+ mrp_list_init(&ml->subloops);
+ mrp_list_init(&ml->busses);
+ mrp_list_init(&ml->eventq);
+
+ ml->eventd = mrp_add_deferred(ml, pump_events, ml);
+ if (ml->eventd == NULL)
+ goto fail;
+ mrp_disable_deferred(ml->eventd);
+
+ if (!setup_sighandlers(ml))
+ goto fail;
+ }
+ else {
+ fail:
+ close(ml->epollfd);
+ fdtbl_destroy(ml->fdtbl);
+ mrp_free(ml);
+ ml = NULL;
+ }
+ }
+
+
+
+ return ml;
+}
+
+
+void mrp_mainloop_destroy(mrp_mainloop_t *ml)
+{
+ if (ml != NULL) {
+ mrp_clear_superloop(ml);
+ purge_io_watches(ml);
+ purge_timers(ml);
+ purge_deferred(ml);
+ purge_sighandlers(ml);
+ purge_wakeups(ml);
+ purge_subloops(ml);
+ purge_deleted(ml);
+
+ close(ml->sigfd);
+ close(ml->epollfd);
+ fdtbl_destroy(ml->fdtbl);
+
+ mrp_free(ml->events);
+ mrp_free(ml);
+ }
+}
+
+
+static int prepare_subloop(mrp_subloop_t *sl)
+{
+ /*
+ * Notes:
+ *
+ * If we have a relatively large number of file descriptors to
+ * poll but typically only a small fraction of them has pending
+ * events per mainloop iteration epoll has significant advantages
+ * over poll. This is the main reason why our mainloop uses epoll.
+ * However, there is a considerable amount of pain one needs to
+ * go through to integrate an external poll-based (sub-)mainloop
+ * (e.g. glib's GMainLoop) with an epoll-based mainloop. I mean,
+ * just look at the code below !
+ *
+ * If it eventually turns out that we typically only have a small
+ * number of file descriptors while at the same time we practically
+ * always need to pump GMainLoop, it is probably a good idea to
+ * bite the bullet and change our mainloop to be poll-based as well.
+ * But let's not go there yet...
+ */
+
+
+ struct epoll_event evt;
+ struct pollfd *fds, *pollfds;
+ int timeout;
+ int nfd, npollfd, n, i;
+ int nmatch;
+ int fd, idx;
+
+ MRP_UNUSED(dump_pollfds);
+
+ mrp_debug("preparing subloop %p", sl);
+
+ pollfds = sl->pollfds;
+ npollfd = sl->npollfd;
+
+ if (sl->cb->prepare(sl->user_data)) {
+ mrp_debug("subloop %p prepare reported ready, dispatching it", sl);
+ sl->cb->dispatch(sl->user_data);
+ }
+ sl->poll = FALSE;
+
+ nfd = npollfd;
+ fds = nfd ? mrp_allocz(nfd * sizeof(*fds)) : NULL;
+
+ MRP_ASSERT(nfd == 0 || fds != NULL, "failed to allocate pollfd's");
+
+ while ((n = sl->cb->query(sl->user_data, fds, nfd, &timeout)) > nfd) {
+ fds = mrp_reallocz(fds, nfd, n);
+ nfd = n;
+ MRP_ASSERT(fds != NULL, "failed to allocate pollfd's");
+ }
+ nfd = n;
+
+
+#if 0
+ printf("-------------------------\n");
+ dump_pollfds("old: ", sl->pollfds, sl->npollfd);
+ dump_pollfds("new: ", fds, nfd);
+ printf("-------------------------\n");
+#endif
+
+
+ /*
+ * skip over the identical portion of the old and new pollfd's
+ */
+
+ for (i = nmatch = 0; i < npollfd && i < n; i++, nmatch++) {
+ if (fds[i].fd != pollfds[i].fd ||
+ fds[i].events != pollfds[i].events)
+ break;
+ else
+ fds[i].revents = pollfds[i].revents = 0;
+ }
+
+
+ if (nmatch == npollfd && npollfd == nfd) {
+ mrp_free(fds);
+ goto out;
+ }
+
+
+ /*
+ * replace file descriptors with the new set (remove old, add new)
+ */
+
+ for (i = 0; i < npollfd; i++) {
+ fd = pollfds[i].fd;
+ fdtbl_remove(sl->fdtbl, fd);
+ if (epoll_ctl(sl->epollfd, EPOLL_CTL_DEL, fd, &evt) < 0) {
+ if (errno != EBADF && errno != ENOENT)
+ mrp_log_error("Failed to delete subloop fd %d from epoll "
+ "(%d: %s)", fd, errno, strerror(errno));
+ }
+ }
+
+ for (i = 0; i < nfd; i++) {
+ fd = fds[i].fd;
+ idx = i + 1;
+
+ evt.events = fds[i].events;
+ evt.data.u64 = 0; /* init full union for valgrind... */
+ evt.data.fd = fd;
+
+ if (fdtbl_insert(sl->fdtbl, fd, (void *)(ptrdiff_t)idx) == 0) {
+ if (epoll_ctl(sl->epollfd, EPOLL_CTL_ADD, fd, &evt) != 0) {
+ mrp_log_error("Failed to add subloop fd %d to epoll "
+ "(%d: %s)", fd, errno, strerror(errno));
+ }
+ }
+ else {
+ mrp_log_error("Failed to add subloop fd %d to fd table "
+ "(%d: %s)", fd, errno, strerror(errno));
+ }
+
+ fds[i].revents = 0;
+ }
+
+ mrp_free(sl->pollfds);
+ sl->pollfds = fds;
+ sl->npollfd = nfd;
+
+
+ /*
+ * resize event buffer if needed
+ */
+
+ if (sl->nevent < nfd) {
+ sl->nevent = nfd;
+ sl->events = mrp_realloc(sl->events, sl->nevent * sizeof(*sl->events));
+
+ MRP_ASSERT(sl->events != NULL || sl->nevent == 0,
+ "can't allocate epoll event buffer");
+ }
+
+ out:
+ mrp_debug("subloop %p: fds: %d, timeout: %d, poll: %s",
+ sl, sl->npollfd, timeout, sl->poll ? "TRUE" : "FALSE");
+
+ return timeout;
+}
+
+
+static int prepare_subloops(mrp_mainloop_t *ml)
+{
+ mrp_list_hook_t *p, *n;
+ mrp_subloop_t *sl;
+ int ext_timeout, min_timeout;
+
+ min_timeout = INT_MAX;
+
+ mrp_list_foreach(&ml->subloops, p, n) {
+ sl = mrp_list_entry(p, typeof(*sl), hook);
+
+ if (!is_deleted(sl)) {
+ ext_timeout = prepare_subloop(sl);
+ min_timeout = MRP_MIN(min_timeout, ext_timeout);
+ }
+ else
+ mrp_debug("skipping deleted subloop %p", sl);
+ }
+
+ return min_timeout;
+}
+
+
+#if 0
+static inline void dump_timers(mrp_mainloop_t *ml)
+{
+ mrp_timer_t *t;
+ mrp_list_hook_t *p, *n;
+ int i;
+ mrp_timer_t *next = NULL;
+
+ mrp_debug("timer dump:");
+ i = 0;
+ mrp_list_foreach(&ml->timers, p, n) {
+ t = mrp_list_entry(p, typeof(*t), hook);
+
+ mrp_debug(" #%d: %p, @%u, next %llu (%s)", i, t, t->msecs, t->expire,
+ is_deleted(t) ? "DEAD" : "alive");
+
+ if (!is_deleted(t) && next == NULL)
+ next = t;
+
+ i++;
+ }
+
+ mrp_debug("next timer: %p", ml->next_timer);
+ mrp_debug("poll timer: %d", ml->poll_timeout);
+
+ if (next != NULL && ml->next_timer != NULL &&
+ !is_deleted(ml->next_timer) && next != ml->next_timer) {
+ mrp_debug("*** BUG ml->next_timer is not the nearest !!! ***");
+ if (getenv("__MURPHY_TIMER_CHECK_ABORT") != NULL)
+ abort();
+ }
+}
+#endif
+
+
+int mrp_mainloop_prepare(mrp_mainloop_t *ml)
+{
+ mrp_timer_t *next_timer;
+ int timeout, ext_timeout;
+ uint64_t now;
+
+ if (!mrp_list_empty(&ml->deferred)) {
+ timeout = 0;
+ }
+ else {
+ next_timer = ml->next_timer;
+
+ if (next_timer == NULL)
+ timeout = -1;
+ else {
+ now = time_now();
+ if (MRP_UNLIKELY(next_timer->expire <= now))
+ timeout = 0;
+ else
+ timeout = usecs_to_msecs(next_timer->expire - now);
+ }
+ }
+
+ ext_timeout = prepare_subloops(ml);
+
+ if (ext_timeout != -1 && timeout != -1)
+ ml->poll_timeout = MRP_MIN(timeout, ext_timeout);
+ else if (ext_timeout == -1)
+ ml->poll_timeout = timeout;
+ else
+ ml->poll_timeout = ext_timeout;
+
+ if (ml->nevent < ml->niowatch) {
+ ml->nevent = ml->niowatch;
+ ml->events = mrp_realloc(ml->events, ml->nevent * sizeof(*ml->events));
+
+ MRP_ASSERT(ml->events != NULL, "can't allocate epoll event buffer");
+ }
+
+ mrp_debug("mainloop %p prepared: %d I/O watches, timeout %d", ml,
+ ml->niowatch, ml->poll_timeout);
+
+ return TRUE;
+}
+
+
+static size_t poll_events(void *id, mrp_mainloop_t *ml, void **bufp)
+{
+ void *buf;
+ int n;
+
+ if (MRP_UNLIKELY(id != ml->iow)) {
+ mrp_log_error("superloop polling with invalid I/O watch (%p != %p)",
+ id, ml->iow);
+ *bufp = NULL;
+ return 0;
+ }
+
+ buf = mrp_allocz(ml->nevent * sizeof(ml->events[0]));
+
+ if (buf != NULL) {
+ n = epoll_wait(ml->epollfd, buf, ml->nevent, 0);
+
+ if (n < 0)
+ n = 0;
+ }
+ else
+ n = 0;
+
+ *bufp = buf;
+ return n * sizeof(ml->events[0]);
+}
+
+
+int mrp_mainloop_poll(mrp_mainloop_t *ml, int may_block)
+{
+ int n, timeout;
+
+ timeout = may_block && mrp_list_empty(&ml->deferred) ? ml->poll_timeout : 0;
+
+ if (ml->nevent > 0) {
+ if (ml->super_ops == NULL || ml->super_ops->poll_io == NULL) {
+ mrp_debug("polling %d descriptors with timeout %d",
+ ml->nevent, timeout);
+
+ n = epoll_wait(ml->epollfd, ml->events, ml->nevent, timeout);
+
+ if (n < 0 && errno == EINTR)
+ n = 0;
+ }
+ else {
+ mrp_superloop_ops_t *super_ops = ml->super_ops;
+ void *super_data = ml->super_data;
+ void *id = ml->iow;
+ void *buf = ml->events;
+ size_t size = ml->nevent * sizeof(ml->events[0]);
+
+ size = super_ops->poll_io(super_data, id, buf, size);
+ n = size / sizeof(ml->events[0]);
+
+ MRP_ASSERT(n * sizeof(ml->events[0]) == size,
+ "superloop passed us a partial epoll_event");
+ }
+
+ mrp_debug("mainloop %p has %d/%d I/O events waiting", ml, n,
+ ml->nevent);
+
+ ml->poll_result = n;
+ }
+ else {
+ /*
+ * Notes: Practically we should never branch here because
+ * we always have at least ml->sigfd registered for epoll.
+ */
+ if (timeout > 0)
+ usleep(timeout * USECS_PER_MSEC);
+
+ ml->poll_result = 0;
+ }
+
+ return TRUE;
+}
+
+
+static int poll_subloop(mrp_subloop_t *sl)
+{
+ struct epoll_event *e;
+ struct pollfd *pfd;
+ int fd, idx, n, i;
+
+ if (sl->poll) {
+ n = epoll_wait(sl->epollfd, sl->events, sl->nevent, 0);
+
+ if (n < 0 && errno == EINTR)
+ n = 0;
+
+ for (i = 0, e = sl->events; i < n; i++, e++) {
+ fd = e->data.fd;
+ idx = ((int)(ptrdiff_t)fdtbl_lookup(sl->fdtbl, fd)) - 1;
+
+ if (0 <= idx && idx < sl->npollfd) {
+ pfd = sl->pollfds + idx;
+ pfd->revents = e->events;
+ }
+ }
+
+ mrp_debug("subloop %p has %d fds ready", sl, sl->npollfd);
+
+ return n;
+ }
+ else {
+ mrp_debug("subloop %p has poll flag off", sl);
+
+ return 0;
+ }
+}
+
+
+static void dispatch_wakeup(mrp_mainloop_t *ml)
+{
+ mrp_list_hook_t *p, *n;
+ mrp_wakeup_t *w;
+ mrp_wakeup_event_t event;
+ uint64_t now;
+
+ if (ml->poll_timeout == 0) {
+ mrp_debug("skipping wakeup callbacks (poll timeout was 0)");
+ return;
+ }
+
+ if (ml->poll_result == 0) {
+ mrp_debug("woken up by timeout");
+ event = MRP_WAKEUP_EVENT_TIMER;
+ }
+ else {
+ mrp_debug("woken up by I/O (or signal)");
+ event = MRP_WAKEUP_EVENT_IO;
+ }
+
+ now = time_now();
+
+ mrp_list_foreach(&ml->wakeups, p, n) {
+ w = mrp_list_entry(p, typeof(*w), hook);
+
+ if (!(w->events & event))
+ continue;
+
+ if (!is_deleted(w)) {
+ mrp_debug("dispatching wakeup cb %p", w);
+ wakeup_cb(w, event, now);
+ }
+ else
+ mrp_debug("skipping deleted wakeup cb %p", w);
+
+ if (ml->quit)
+ break;
+ }
+}
+
+
+static void dispatch_deferred(mrp_mainloop_t *ml)
+{
+ mrp_list_hook_t *p, *n;
+ mrp_deferred_t *d;
+
+ mrp_list_foreach(&ml->deferred, p, n) {
+ d = mrp_list_entry(p, typeof(*d), hook);
+
+ if (!is_deleted(d) && !d->inactive) {
+ mrp_debug("dispatching active deferred cb %p", d);
+ d->cb(d, d->user_data);
+ }
+ else
+ mrp_debug("skipping %s deferred cb %p",
+ is_deleted(d) ? "deleted" : "inactive", d);
+
+ if (!is_deleted(d) && d->inactive)
+ disable_deferred(d);
+
+ if (ml->quit)
+ break;
+ }
+}
+
+
+static void dispatch_timers(mrp_mainloop_t *ml)
+{
+ mrp_list_hook_t *p, *n;
+ mrp_timer_t *t;
+ uint64_t now;
+
+ now = time_now();
+
+ mrp_list_foreach(&ml->timers, p, n) {
+ t = mrp_list_entry(p, typeof(*t), hook);
+
+ if (!is_deleted(t)) {
+ if (t->expire <= now) {
+ mrp_debug("dispatching expired timer %p", t);
+
+ t->cb(t, t->user_data);
+
+ if (!is_deleted(t))
+ rearm_timer(t);
+ }
+ else
+ break;
+ }
+ else
+ mrp_debug("skipping deleted timer %p", t);
+
+ if (ml->quit)
+ break;
+ }
+}
+
+
+static void dispatch_subloops(mrp_mainloop_t *ml)
+{
+ mrp_list_hook_t *p, *n;
+ mrp_subloop_t *sl;
+
+ mrp_list_foreach(&ml->subloops, p, n) {
+ sl = mrp_list_entry(p, typeof(*sl), hook);
+
+ if (!is_deleted(sl)) {
+ poll_subloop(sl);
+
+ if (sl->cb->check(sl->user_data, sl->pollfds,
+ sl->npollfd)) {
+ mrp_debug("dispatching subloop %p", sl);
+ sl->cb->dispatch(sl->user_data);
+ }
+ else
+ mrp_debug("skipping subloop %p, check said no", sl);
+ }
+ }
+}
+
+
+static void dispatch_slaves(mrp_io_watch_t *w, struct epoll_event *e)
+{
+ mrp_io_watch_t *s;
+ mrp_list_hook_t *p, *n;
+ mrp_io_event_t events;
+
+ events = e->events & ~(MRP_IO_EVENT_INOUT & w->events);
+
+ mrp_list_foreach(&w->slave, p, n) {
+ if (events == MRP_IO_EVENT_NONE)
+ break;
+
+ s = mrp_list_entry(p, typeof(*s), slave);
+
+ if (!is_deleted(s)) {
+ mrp_debug("dispatching slave I/O watch %p (fd %d)", s, s->fd);
+ s->cb(s, s->fd, events, s->user_data);
+ }
+ else
+ mrp_debug("skipping slave I/O watch %p (fd %d)", s, s->fd);
+
+ events &= ~(MRP_IO_EVENT_INOUT & s->events);
+ }
+}
+
+
+static void dispatch_poll_events(mrp_mainloop_t *ml)
+{
+ struct epoll_event *e;
+ mrp_io_watch_t *w, *tblw;
+ int i, fd;
+
+ for (i = 0, e = ml->events; i < ml->poll_result; i++, e++) {
+ fd = e->data.fd;
+ w = fdtbl_lookup(ml->fdtbl, fd);
+
+ if (w == NULL) {
+ mrp_debug("ignoring event for deleted fd %d", fd);
+ continue;
+ }
+
+ if (!is_deleted(w)) {
+ mrp_debug("dispatching I/O watch %p (fd %d)", w, fd);
+ w->cb(w, w->fd, e->events, w->user_data);
+ }
+ else
+ mrp_debug("skipping deleted I/O watch %p (fd %d)", w, fd);
+
+ if (!mrp_list_empty(&w->slave))
+ dispatch_slaves(w, e);
+
+ if (e->events & EPOLLRDHUP) {
+ tblw = fdtbl_lookup(ml->fdtbl, w->fd);
+
+ if (tblw == w) {
+ mrp_debug("forcibly stop polling fd %d for watch %p", w->fd, w);
+ epoll_del(w);
+ }
+ else if (tblw != NULL)
+ mrp_debug("don't stop polling reused fd %d of watch %p",
+ w->fd, w);
+ }
+ else {
+ if ((e->events & EPOLLHUP) && !is_deleted(w)) {
+ /*
+ * Notes:
+ *
+ * If the user does not react to EPOLLHUPs delivered
+ * we stop monitoring the fd to avoid sitting in an
+ * infinite busy loop just delivering more EPOLLHUP
+ * notifications...
+ */
+
+ if (w->wrhup++ > 5) {
+ tblw = fdtbl_lookup(ml->fdtbl, w->fd);
+
+ if (tblw == w) {
+ mrp_debug("forcibly stop polling fd %d for watch %p",
+ w->fd, w);
+ epoll_del(w);
+ }
+ else if (tblw != NULL)
+ mrp_debug("don't stop polling reused fd %d of watch %p",
+ w->fd, w);
+ }
+ }
+ }
+
+ if (ml->quit)
+ break;
+ }
+
+ if (ml->quit)
+ return;
+
+ dispatch_subloops(ml);
+
+ mrp_debug("done dispatching poll events");
+}
+
+
+int mrp_mainloop_dispatch(mrp_mainloop_t *ml)
+{
+ dispatch_wakeup(ml);
+
+ if (ml->quit)
+ goto quit;
+
+ dispatch_deferred(ml);
+
+ if (ml->quit)
+ goto quit;
+
+ dispatch_timers(ml);
+
+ if (ml->quit)
+ goto quit;
+
+ dispatch_poll_events(ml);
+
+ quit:
+ purge_deleted(ml);
+
+ return !ml->quit;
+}
+
+
+int mrp_mainloop_iterate(mrp_mainloop_t *ml)
+{
+ return
+ mrp_mainloop_prepare(ml) &&
+ mrp_mainloop_poll(ml, TRUE) &&
+ mrp_mainloop_dispatch(ml) &&
+ !ml->quit;
+}
+
+
+int mrp_mainloop_run(mrp_mainloop_t *ml)
+{
+ while (mrp_mainloop_iterate(ml))
+ ;
+
+ return ml->exit_code;
+}
+
+
+void mrp_mainloop_quit(mrp_mainloop_t *ml, int exit_code)
+{
+ ml->exit_code = exit_code;
+ ml->quit = TRUE;
+}
+
+
+/*
+ * debugging routines
+ */
+
+
+static void dump_pollfds(const char *prefix, struct pollfd *fds, int nfd)
+{
+ char *t;
+ int i;
+
+ printf("%s (%d): ", prefix, nfd);
+ for (i = 0, t = ""; i < nfd; i++, t = ", ")
+ printf("%s%d/0x%x", t, fds[i].fd, fds[i].events);
+ printf("\n");
+}
+
+
+/*
+ * event bus and events
+ */
+
+static inline void *ref_event_data(void *data, int format)
+{
+ switch (format & MRP_EVENT_FORMAT_MASK) {
+ case MRP_EVENT_FORMAT_JSON:
+ return mrp_json_ref((mrp_json_t *)data);
+ case MRP_EVENT_FORMAT_MSG:
+ return mrp_msg_ref((mrp_msg_t *)data);
+ default:
+ return data;
+ }
+}
+
+
+static inline void unref_event_data(void *data, int format)
+{
+ switch (format & MRP_EVENT_FORMAT_MASK) {
+ case MRP_EVENT_FORMAT_JSON:
+ mrp_json_unref((mrp_json_t *)data);
+ break;
+ case MRP_EVENT_FORMAT_MSG:
+ mrp_msg_unref((mrp_msg_t *)data);
+ break;
+ default:
+ break;
+ }
+}
+
+
+mrp_event_bus_t *mrp_event_bus_get(mrp_mainloop_t *ml, const char *name)
+{
+ mrp_list_hook_t *p, *n;
+ mrp_event_bus_t *bus;
+
+ if (name == NULL || !strcmp(name, MRP_GLOBAL_BUS_NAME))
+ return MRP_GLOBAL_BUS;
+
+ mrp_list_foreach(&ml->busses, p, n) {
+ bus = mrp_list_entry(p, typeof(*bus), hook);
+
+ if (!strcmp(bus->name, name))
+ return bus;
+ }
+
+ bus = mrp_allocz(sizeof(*bus));
+
+ if (bus == NULL)
+ return NULL;
+
+ bus->name = mrp_strdup(name);
+
+ if (bus->name == NULL) {
+ mrp_free(bus);
+ return NULL;
+ }
+
+ mrp_list_init(&bus->hook);
+ mrp_list_init(&bus->watches);
+ bus->ml = ml;
+
+ mrp_list_append(&ml->busses, &bus->hook);
+
+ return bus;
+}
+
+
+uint32_t mrp_event_id(const char *name)
+{
+ mrp_event_def_t *e;
+ int i;
+
+ if (events != NULL)
+ for (i = 0, e = events; i < nevent; i++, e++)
+ if (!strcmp(e->name, name))
+ return e->id;
+
+ if (!mrp_reallocz(events, nevent, nevent + 1))
+ return 0;
+
+ e = events + nevent;
+
+ e->id = nevent;
+ e->name = mrp_strdup(name);
+
+ if (e->name == NULL) {
+ mrp_reallocz(events, nevent + 1, nevent);
+ return 0;
+ }
+
+ nevent++;
+
+ return e->id;
+}
+
+
+const char *mrp_event_name(uint32_t id)
+{
+ if ((int)id < nevent)
+ return events[id].name;
+ else
+ return MRP_EVENT_UNKNOWN_NAME;
+}
+
+
+char *mrp_event_dump_mask(mrp_event_mask_t *mask, char *buf, size_t size)
+{
+ char *p, *t;
+ int l, n, id;
+
+ p = buf;
+ l = (int)size;
+ t = "";
+
+ MRP_MASK_FOREACH_SET(mask, id, 1) {
+ n = snprintf(p, l, "%s%s", t, mrp_event_name(id));
+ t = "|";
+
+ if (n >= l)
+ return "<insufficient mask dump buffer>";
+
+ p += n;
+ l -= n;
+ }
+
+ return buf;
+}
+
+
+mrp_event_watch_t *mrp_event_add_watch(mrp_event_bus_t *bus, uint32_t id,
+ mrp_event_watch_cb_t cb, void *user_data)
+{
+ mrp_list_hook_t *watches = bus ? &bus->watches : &ewatches;
+ mrp_event_watch_t *w;
+
+ w = mrp_allocz(sizeof(*w));
+
+ if (w == NULL)
+ return NULL;
+
+ mrp_list_init(&w->hook);
+ mrp_mask_init(&w->mask);
+ w->bus = bus;
+ w->cb = cb;
+ w->user_data = user_data;
+
+ if (!mrp_mask_set(&w->mask, id)) {
+ mrp_free(w);
+ return NULL;
+ }
+
+ mrp_list_append(watches, &w->hook);
+
+ mrp_debug("added event watch %p for event %d (%s) on bus %s", w, id,
+ mrp_event_name(id), bus ? bus->name : MRP_GLOBAL_BUS_NAME);
+
+ return w;
+}
+
+
+mrp_event_watch_t *mrp_event_add_watch_mask(mrp_event_bus_t *bus,
+ mrp_event_mask_t *mask,
+ mrp_event_watch_cb_t cb,
+ void *user_data)
+{
+ mrp_list_hook_t *watches = bus ? &bus->watches : &ewatches;
+ mrp_event_watch_t *w;
+ char events[512];
+
+ w = mrp_allocz(sizeof(*w));
+
+ if (w == NULL)
+ return NULL;
+
+ mrp_list_init(&w->hook);
+ mrp_mask_init(&w->mask);
+ w->bus = bus;
+ w->cb = cb;
+ w->user_data = user_data;
+
+ if (!mrp_mask_copy(&w->mask, mask)) {
+ mrp_free(w);
+ return NULL;
+ }
+
+ mrp_list_append(watches, &w->hook);
+
+ mrp_debug("added event watch %p for events <%s> on bus %s", w,
+ mrp_event_dump_mask(&w->mask, events, sizeof(events)),
+ bus ? bus->name : MRP_GLOBAL_BUS_NAME);
+
+ return w;
+}
+
+
+void mrp_event_del_watch(mrp_event_watch_t *w)
+{
+ if (w == NULL)
+ return;
+
+ if (w->bus != NULL && w->bus->busy) {
+ w->dead = TRUE;
+ w->bus->dead++;
+ return;
+ }
+
+ mrp_list_delete(&w->hook);
+ mrp_mask_reset(&w->mask);
+ mrp_free(w);
+}
+
+
+void bus_purge_dead(mrp_event_bus_t *bus)
+{
+ mrp_event_watch_t *w;
+ mrp_list_hook_t *p, *n;
+
+ if (!bus->dead)
+ return;
+
+ mrp_list_foreach(&bus->watches, p, n) {
+ w = mrp_list_entry(p, typeof(*w), hook);
+
+ if (!w->dead)
+ continue;
+
+ mrp_list_delete(&w->hook);
+ mrp_mask_reset(&w->mask);
+ mrp_free(w);
+ }
+
+ bus->dead = 0;
+}
+
+
+static int queue_event(mrp_event_bus_t *bus, uint32_t id, void *data,
+ mrp_event_flag_t flags)
+{
+ pending_event_t *e;
+
+ e = mrp_allocz(sizeof(*e));
+
+ if (e == NULL)
+ return -1;
+
+ mrp_list_init(&e->hook);
+ e->bus = bus;
+ e->id = id;
+ e->format = flags & MRP_EVENT_FORMAT_MASK;
+ e->data = ref_event_data(data, e->format);
+ mrp_list_append(&bus->ml->eventq, &e->hook);
+
+ mrp_enable_deferred(bus->ml->eventd);
+
+ return 0;
+}
+
+
+static int emit_event(mrp_event_bus_t *bus, uint32_t id, void *data,
+ mrp_event_flag_t flags)
+{
+ mrp_list_hook_t *watches;
+ mrp_event_watch_t *w;
+ mrp_list_hook_t *p, *n;
+
+ if (bus)
+ watches = &bus->watches;
+ else {
+ if (!(flags & MRP_EVENT_SYNCHRONOUS)) {
+ errno = EINVAL;
+ return -1;
+ }
+ watches = &ewatches;
+ }
+
+ if (bus)
+ bus->busy++;
+
+ mrp_debug("emitting event 0x%x (%s) on bus <%s>", id, mrp_event_name(id),
+ bus ? bus->name : MRP_GLOBAL_BUS_NAME);
+
+ mrp_list_foreach(watches, p, n) {
+ w = mrp_list_entry(p, typeof(*w), hook);
+
+ if (w->dead)
+ continue;
+
+ if (mrp_mask_test(&w->mask, id))
+ w->cb(w, id, flags & MRP_EVENT_FORMAT_MASK, data, w->user_data);
+ }
+
+ if (bus) {
+ bus->busy--;
+
+ if (!bus->busy)
+ bus_purge_dead(bus);
+ }
+
+ return 0;
+}
+
+
+static void pump_events(mrp_deferred_t *d, void *user_data)
+{
+ mrp_mainloop_t *ml = (mrp_mainloop_t *)user_data;
+ mrp_list_hook_t *p, *n;
+ pending_event_t *e;
+
+ pump:
+ mrp_list_foreach(&ml->eventq, p, n) {
+ e = mrp_list_entry(p, typeof(*e), hook);
+
+ emit_event(e->bus, e->id, e->data, e->format);
+
+ mrp_list_delete(&e->hook);
+ unref_event_data(e->data, e->format);
+
+ mrp_free(e);
+ }
+
+ if (!mrp_list_empty(&ml->eventq))
+ goto pump;
+
+ mrp_disable_deferred(d);
+}
+
+
+int mrp_emit_event(mrp_event_bus_t *bus, uint32_t id, mrp_event_flag_t flags,
+ void *data)
+{
+ int status;
+
+ if (flags & MRP_EVENT_SYNCHRONOUS) {
+ ref_event_data(data, flags);
+ status = emit_event(bus, id, data, flags);
+ unref_event_data(data, flags);
+
+ return status;
+ }
+ else {
+ if (bus != NULL)
+ return queue_event(bus, id, data, flags);
+
+ errno = EOPNOTSUPP;
+ return -1;
+ }
+}
+
+
+int _mrp_event_emit_msg(mrp_event_bus_t *bus, uint32_t id,
+ mrp_event_flag_t flags, ...)
+{
+ mrp_msg_t *msg;
+ uint16_t tag;
+ va_list ap;
+ int status;
+
+ va_start(ap, flags);
+ tag = va_arg(ap, unsigned int);
+ msg = tag ? mrp_msg_createv(tag, ap) : NULL;
+ va_end(ap);
+
+ flags &= ~MRP_EVENT_FORMAT_MASK;
+ status = mrp_emit_event(bus, id, flags | MRP_EVENT_FORMAT_MSG, msg);
+ mrp_msg_unref(msg);
+
+ return status;
+}
+
+
+MRP_INIT static void init_events(void)
+{
+ MRP_ASSERT(mrp_event_id(MRP_EVENT_UNKNOWN_NAME) == MRP_EVENT_UNKNOWN,
+ "reserved id 0x%x for builtin event <%s> already taken",
+ MRP_EVENT_UNKNOWN, MRP_EVENT_UNKNOWN_NAME);
+}