summaryrefslogtreecommitdiff
path: root/tts-mqtt-test/src/paho-mqtt/Thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'tts-mqtt-test/src/paho-mqtt/Thread.c')
-rw-r--r--tts-mqtt-test/src/paho-mqtt/Thread.c462
1 files changed, 462 insertions, 0 deletions
diff --git a/tts-mqtt-test/src/paho-mqtt/Thread.c b/tts-mqtt-test/src/paho-mqtt/Thread.c
new file mode 100644
index 0000000..3fd78dc
--- /dev/null
+++ b/tts-mqtt-test/src/paho-mqtt/Thread.c
@@ -0,0 +1,462 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2017 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Ian Craggs - initial implementation
+ * Ian Craggs, Allan Stockdill-Mander - async client updates
+ * Ian Craggs - bug #415042 - start Linux thread as disconnected
+ * Ian Craggs - fix for bug #420851
+ * Ian Craggs - change MacOS semaphore implementation
+ *******************************************************************************/
+
+/**
+ * @file
+ * \brief Threading related functions
+ *
+ * Used to create platform independent threading functions
+ */
+
+
+#include "../paho-mqtt/Thread.h"
+#if defined(THREAD_UNIT_TESTS)
+#define NOSTACKTRACE
+#endif
+#include "../paho-mqtt/StackTrace.h"
+
+#undef malloc
+#undef realloc
+#undef free
+
+#if !defined(WIN32) && !defined(WIN64)
+#include <errno.h>
+#include <unistd.h>
+#include <sys/time.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <sys/stat.h>
+#include <limits.h>
+#endif
+#include <stdlib.h>
+
+#include "../paho-mqtt/OsWrapper.h"
+
+/**
+ * Start a new thread
+ * @param fn the function to run, must be of the correct signature
+ * @param parameter pointer to the function parameter, can be NULL
+ * @return the new thread
+ */
+thread_type Thread_start(thread_fn fn, void* parameter)
+{
+#if defined(WIN32) || defined(WIN64)
+ thread_type thread = NULL;
+#else
+ thread_type thread = 0;
+ pthread_attr_t attr;
+#endif
+
+ FUNC_ENTRY;
+#if defined(WIN32) || defined(WIN64)
+ thread = CreateThread(NULL, 0, fn, parameter, 0, NULL);
+#else
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+ if (pthread_create(&thread, &attr, fn, parameter) != 0)
+ thread = 0;
+ pthread_attr_destroy(&attr);
+#endif
+ FUNC_EXIT;
+ return thread;
+}
+
+
+/**
+ * Create a new mutex
+ * @return the new mutex
+ */
+mutex_type Thread_create_mutex(void)
+{
+ mutex_type mutex = NULL;
+ int rc = 0;
+
+ FUNC_ENTRY;
+ #if defined(WIN32) || defined(WIN64)
+ mutex = CreateMutex(NULL, 0, NULL);
+ if (mutex == NULL)
+ rc = GetLastError();
+ #else
+ mutex = malloc(sizeof(pthread_mutex_t));
+ rc = pthread_mutex_init(mutex, NULL);
+ #endif
+ FUNC_EXIT_RC(rc);
+ return mutex;
+}
+
+
+/**
+ * Lock a mutex which has alrea
+ * @return completion code, 0 is success
+ */
+int Thread_lock_mutex(mutex_type mutex)
+{
+ int rc = -1;
+
+ /* don't add entry/exit trace points as the stack log uses mutexes - recursion beckons */
+ #if defined(WIN32) || defined(WIN64)
+ /* WaitForSingleObject returns WAIT_OBJECT_0 (0), on success */
+ rc = WaitForSingleObject(mutex, INFINITE);
+ #else
+ rc = pthread_mutex_lock(mutex);
+ #endif
+
+ return rc;
+}
+
+
+/**
+ * Unlock a mutex which has already been locked
+ * @param mutex the mutex
+ * @return completion code, 0 is success
+ */
+int Thread_unlock_mutex(mutex_type mutex)
+{
+ int rc = -1;
+
+ /* don't add entry/exit trace points as the stack log uses mutexes - recursion beckons */
+ #if defined(WIN32) || defined(WIN64)
+ /* if ReleaseMutex fails, the return value is 0 */
+ if (ReleaseMutex(mutex) == 0)
+ rc = GetLastError();
+ else
+ rc = 0;
+ #else
+ rc = pthread_mutex_unlock(mutex);
+ #endif
+
+ return rc;
+}
+
+
+/**
+ * Destroy a mutex which has already been created
+ * @param mutex the mutex
+ */
+void Thread_destroy_mutex(mutex_type mutex)
+{
+ int rc = 0;
+
+ FUNC_ENTRY;
+ #if defined(WIN32) || defined(WIN64)
+ rc = CloseHandle(mutex);
+ #else
+ rc = pthread_mutex_destroy(mutex);
+ free(mutex);
+ #endif
+ FUNC_EXIT_RC(rc);
+}
+
+
+/**
+ * Get the thread id of the thread from which this function is called
+ * @return thread id, type varying according to OS
+ */
+thread_id_type Thread_getid(void)
+{
+ #if defined(WIN32) || defined(WIN64)
+ return GetCurrentThreadId();
+ #else
+ return pthread_self();
+ #endif
+}
+
+
+/**
+ * Create a new semaphore
+ * @return the new condition variable
+ */
+sem_type Thread_create_sem(void)
+{
+ sem_type sem = NULL;
+ int rc = 0;
+
+ FUNC_ENTRY;
+ #if defined(WIN32) || defined(WIN64)
+ sem = CreateEvent(
+ NULL, /* default security attributes */
+ FALSE, /* manual-reset event? */
+ FALSE, /* initial state is nonsignaled */
+ NULL /* object name */
+ );
+ #elif defined(OSX)
+ sem = dispatch_semaphore_create(0L);
+ rc = (sem == NULL) ? -1 : 0;
+ #else
+ sem = malloc(sizeof(sem_t));
+ rc = sem_init(sem, 0, 0);
+ #endif
+ FUNC_EXIT_RC(rc);
+ return sem;
+}
+
+
+/**
+ * Wait for a semaphore to be posted, or timeout.
+ * @param sem the semaphore
+ * @param timeout the maximum time to wait, in milliseconds
+ * @return completion code
+ */
+int Thread_wait_sem(sem_type sem, int timeout)
+{
+/* sem_timedwait is the obvious call to use, but seemed not to work on the Viper,
+ * so I've used trywait in a loop instead. Ian Craggs 23/7/2010
+ */
+ int rc = -1;
+#if !defined(WIN32) && !defined(WIN64) && !defined(OSX)
+#define USE_TRYWAIT
+#if defined(USE_TRYWAIT)
+ int i = 0;
+ int interval = 10000; /* 10000 microseconds: 10 milliseconds */
+ int count = (1000 * timeout) / interval; /* how many intervals in timeout period */
+#else
+ struct timespec ts;
+#endif
+#endif
+
+ FUNC_ENTRY;
+ #if defined(WIN32) || defined(WIN64)
+ rc = WaitForSingleObject(sem, timeout < 0 ? 0 : timeout);
+ #elif defined(OSX)
+ rc = (int)dispatch_semaphore_wait(sem, dispatch_time(DISPATCH_TIME_NOW, (int64_t)timeout*1000000L));
+ #elif defined(USE_TRYWAIT)
+ while (++i < count && (rc = sem_trywait(sem)) != 0)
+ {
+ if (rc == -1 && ((rc = errno) != EAGAIN))
+ {
+ rc = 0;
+ break;
+ }
+ usleep(interval); /* microseconds - .1 of a second */
+ }
+ #else
+ if (clock_gettime(CLOCK_REALTIME, &ts) != -1)
+ {
+ ts.tv_sec += timeout;
+ rc = sem_timedwait(sem, &ts);
+ }
+ #endif
+
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+/**
+ * Check to see if a semaphore has been posted, without waiting.
+ * @param sem the semaphore
+ * @return 0 (false) or 1 (true)
+ */
+int Thread_check_sem(sem_type sem)
+{
+#if defined(WIN32) || defined(WIN64)
+ return WaitForSingleObject(sem, 0) == WAIT_OBJECT_0;
+#elif defined(OSX)
+ return dispatch_semaphore_wait(sem, DISPATCH_TIME_NOW) == 0;
+#else
+ int semval = -1;
+ sem_getvalue(sem, &semval);
+ return semval > 0;
+#endif
+}
+
+
+/**
+ * Post a semaphore
+ * @param sem the semaphore
+ * @return completion code
+ */
+int Thread_post_sem(sem_type sem)
+{
+ int rc = 0;
+
+ FUNC_ENTRY;
+ #if defined(WIN32) || defined(WIN64)
+ if (SetEvent(sem) == 0)
+ rc = GetLastError();
+ #elif defined(OSX)
+ rc = (int)dispatch_semaphore_signal(sem);
+ #else
+ if (sem_post(sem) == -1)
+ rc = errno;
+ #endif
+
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+/**
+ * Destroy a semaphore which has already been created
+ * @param sem the semaphore
+ */
+int Thread_destroy_sem(sem_type sem)
+{
+ int rc = 0;
+
+ FUNC_ENTRY;
+ #if defined(WIN32) || defined(WIN64)
+ rc = CloseHandle(sem);
+ #elif defined(OSX)
+ dispatch_release(sem);
+ #else
+ rc = sem_destroy(sem);
+ free(sem);
+ #endif
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+
+#if !defined(WIN32) && !defined(WIN64)
+/**
+ * Create a new condition variable
+ * @return the condition variable struct
+ */
+cond_type Thread_create_cond(void)
+{
+ cond_type condvar = NULL;
+ int rc = 0;
+
+ FUNC_ENTRY;
+ condvar = malloc(sizeof(cond_type_struct));
+ rc = pthread_cond_init(&condvar->cond, NULL);
+ rc = pthread_mutex_init(&condvar->mutex, NULL);
+
+ FUNC_EXIT_RC(rc);
+ return condvar;
+}
+
+/**
+ * Signal a condition variable
+ * @return completion code
+ */
+int Thread_signal_cond(cond_type condvar)
+{
+ int rc = 0;
+
+ pthread_mutex_lock(&condvar->mutex);
+ rc = pthread_cond_signal(&condvar->cond);
+ pthread_mutex_unlock(&condvar->mutex);
+
+ return rc;
+}
+
+/**
+ * Wait with a timeout (seconds) for condition variable
+ * @return completion code
+ */
+int Thread_wait_cond(cond_type condvar, int timeout)
+{
+ FUNC_ENTRY;
+ int rc = 0;
+ struct timespec cond_timeout;
+ struct timeval cur_time;
+
+ gettimeofday(&cur_time, NULL);
+
+ cond_timeout.tv_sec = cur_time.tv_sec + timeout;
+ cond_timeout.tv_nsec = cur_time.tv_usec * 1000;
+
+ pthread_mutex_lock(&condvar->mutex);
+ rc = pthread_cond_timedwait(&condvar->cond, &condvar->mutex, &cond_timeout);
+ pthread_mutex_unlock(&condvar->mutex);
+
+ FUNC_EXIT_RC(rc);
+ return rc;
+}
+
+/**
+ * Destroy a condition variable
+ * @return completion code
+ */
+int Thread_destroy_cond(cond_type condvar)
+{
+ int rc = 0;
+
+ rc = pthread_mutex_destroy(&condvar->mutex);
+ rc = pthread_cond_destroy(&condvar->cond);
+ free(condvar);
+
+ return rc;
+}
+#endif
+
+
+#if defined(THREAD_UNIT_TESTS)
+
+#include <stdio.h>
+
+thread_return_type secondary(void* n)
+{
+ int rc = 0;
+
+ /*
+ cond_type cond = n;
+
+ printf("Secondary thread about to wait\n");
+ rc = Thread_wait_cond(cond);
+ printf("Secondary thread returned from wait %d\n", rc);*/
+
+ sem_type sem = n;
+
+ printf("Secondary thread about to wait\n");
+ rc = Thread_wait_sem(sem);
+ printf("Secondary thread returned from wait %d\n", rc);
+
+ printf("Secondary thread about to wait\n");
+ rc = Thread_wait_sem(sem);
+ printf("Secondary thread returned from wait %d\n", rc);
+ printf("Secondary check sem %d\n", Thread_check_sem(sem));
+
+ return 0;
+}
+
+
+int main(int argc, char *argv[])
+{
+ int rc = 0;
+
+ sem_type sem = Thread_create_sem();
+
+ printf("check sem %d\n", Thread_check_sem(sem));
+
+ printf("post secondary\n");
+ rc = Thread_post_sem(sem);
+ printf("posted secondary %d\n", rc);
+
+ printf("check sem %d\n", Thread_check_sem(sem));
+
+ printf("Starting secondary thread\n");
+ Thread_start(secondary, (void*)sem);
+
+ sleep(3);
+ printf("check sem %d\n", Thread_check_sem(sem));
+
+ printf("post secondary\n");
+ rc = Thread_post_sem(sem);
+ printf("posted secondary %d\n", rc);
+
+ sleep(3);
+
+ printf("Main thread ending\n");
+}
+
+#endif