summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--AUTHORSbin112 -> 26479 bytes
-rwxr-xr-xinclude/ccatalog.h1
-rwxr-xr-xinclude/cprocessor_module.h22
-rwxr-xr-xinclude/cworker.h5
-rwxr-xr-xinclude/sf_common.h38
-rw-r--r--libsf-common.manifest6
-rw-r--r--packaging/libsf-common.manifest5
-rw-r--r--packaging/libsf-common.spec10
-rwxr-xr-xsrc/ccatalog.cpp28
-rwxr-xr-xsrc/cipc_worker.cpp21
-rwxr-xr-xsrc/cprocessor_module.cpp178
-rwxr-xr-xsrc/csock.cpp53
-rwxr-xr-xsrc/cworker.cpp109
13 files changed, 281 insertions, 195 deletions
diff --git a/AUTHORS b/AUTHORS
index 4a9d6ba..cb0b5dc 100644
--- a/AUTHORS
+++ b/AUTHORS
Binary files differ
diff --git a/include/ccatalog.h b/include/ccatalog.h
index 15ac885..3c6c95c 100755
--- a/include/ccatalog.h
+++ b/include/ccatalog.h
@@ -44,7 +44,6 @@ public:
char *value(char *group, char *name);
char *value(char *group, char *name, int idx);
- char *value(char *group, char *name, void *handle);
int count_of_values(char *group, char *name);
void *iterate_init(void);
diff --git a/include/cprocessor_module.h b/include/cprocessor_module.h
index 04ba069..b8bd123 100755
--- a/include/cprocessor_module.h
+++ b/include/cprocessor_module.h
@@ -30,6 +30,10 @@ class cprocessor_module : public cmodule
public:
static const int SF_PLUGIN_PROCESSOR = SF_PLUGIN_BASE + 20;
+ struct interval_list_t : public clist {
+ unsigned int interval;
+ };
+
cprocessor_module();
virtual ~cprocessor_module();
@@ -59,24 +63,20 @@ public:
virtual int get_property(unsigned int property_level , void *property_struct ) = 0;
virtual int get_struct_value(unsigned int struct_type , void *struct_values) = 0;
+ int add_interval_to_list(int interval, unsigned long polling_interval);
+ int del_interval_to_list(int interval, unsigned long polling_interval);
+ int check_hz(int time_ms);
+ int norm_interval(int time_ms);
+
void lock(void);
void unlock(void);
protected:
- struct event_callback_t : public clist
- {
- void *(*handler)(cprocessor_module *inst, void *data);
- void *data;
- cprocessor_module *inst;
- bool (*rm_cb_data)(void *data);
- };
-
- event_callback_t *m_cb_head;
- event_callback_t *m_cb_tail;
-
cworker *m_worker;
cmutex m_mutex;
+ interval_list_t *m_interval_list_head;
+ interval_list_t *m_interval_list_tail;
};
#endif
diff --git a/include/cworker.h b/include/cworker.h
index 830d0ee..95ab743 100755
--- a/include/cworker.h
+++ b/include/cworker.h
@@ -35,13 +35,15 @@ public:
STARTED = 0x02,
STOP = 0x03,
TERMINATE = 0x04,
- ENUM_LAST = 0x05,
+ INITIAL = 0x05,
+ ENUM_LAST = 0x06,
};
cworker(void);
bool start(void);
bool stop(void);
+ bool stopped(void);
bool terminate(void);
worker_state_s state(void);
@@ -69,6 +71,7 @@ private:
static void *started(void *data);
pthread_mutex_t mutex_lock;
+ pthread_cond_t th_cond;
};
diff --git a/include/sf_common.h b/include/sf_common.h
index 9f4540f..f10ad4a 100755
--- a/include/sf_common.h
+++ b/include/sf_common.h
@@ -29,7 +29,7 @@
#include <cpacket.h>
#include <csock.h>
-#define DEFAULT_SENSOR_KEY_PREFIX "memory/sensor/"
+#define DEFAULT_SENSOR_KEY_PREFIX "memory/private/sensor/"
#define MAX_KEY_LEN 30
#define MAX_DATA_STREAM_SIZE (sizeof(cmd_get_struct_t) + sizeof(base_data_struct) + 8) /*always check this size when a new cmd_type struct added*/
#define MAX_VALUE_SIZE 12
@@ -61,21 +61,25 @@ enum data_unit_idx_t {
IDX_UNIT_LEVEL_1_TO_10,
IDX_UNIT_STATE_ON_OFF,
IDX_UNIT_DEGREE_PER_SECOND,
+ IDX_UNIT_HECTOPASCAL,
+ IDX_UNIT_CELSIUS,
+ IDX_UNIT_METER,
IDX_UNIT_VENDOR_UNIT = 100,
IDX_UNIT_FILTER_CONVERTED,
};
enum sensor_id_t{
- ID_UNKNOWN = 0x0000,
- ID_ACCEL = 0x0001,
- ID_GEOMAG = 0x0002,
- ID_LUMINANT = 0x0004,
- ID_PROXI = 0x0008,
- ID_THERMER = 0x0010,
+ ID_UNKNOWN = 0x0000,
+ ID_ACCEL = 0x0001,
+ ID_GEOMAG = 0x0002,
+ ID_LUMINANT = 0x0004,
+ ID_PROXI = 0x0008,
+ ID_THERMER = 0x0010,
ID_GYROSCOPE = 0x0020,
- ID_PRESSURE = 0x0040,
+ ID_PRESSURE = 0x0040,
ID_MOTION_ENGINE = 0x0080,
+ ID_FUSION = 0x0100,
};
enum packet_type_t {
@@ -102,9 +106,25 @@ enum reg_type_t {
REG_ADD = 0x01,
REG_DEL = 0x02,
REG_CHK = 0x03,
+ REG_CHG = 0x04,
};
-
+enum poll_value_t {
+ POLL_100HZ = 100,
+ POLL_50HZ = 50,
+ POLL_25HZ = 25,
+ POLL_20HZ = 20,
+ POLL_10HZ = 10,
+ POLL_5HZ = 5,
+ POLL_1HZ = 1,
+ POLL_100HZ_MS = 10,
+ POLL_50HZ_MS = 20,
+ POLL_25HZ_MS = 40,
+ POLL_20HZ_MS = 50,
+ POLL_10HZ_MS = 100,
+ POLL_5HZ_MS = 200,
+ POLL_1HZ_MS = 1000,
+};
typedef struct {
int id; //!< Select sensor
diff --git a/libsf-common.manifest b/libsf-common.manifest
new file mode 100644
index 0000000..41a9320
--- /dev/null
+++ b/libsf-common.manifest
@@ -0,0 +1,6 @@
+<manifest>
+ <request>
+ <domain name="_"/>
+ </request>
+</manifest>
+
diff --git a/packaging/libsf-common.manifest b/packaging/libsf-common.manifest
new file mode 100644
index 0000000..017d22d
--- /dev/null
+++ b/packaging/libsf-common.manifest
@@ -0,0 +1,5 @@
+<manifest>
+ <request>
+ <domain name="_"/>
+ </request>
+</manifest>
diff --git a/packaging/libsf-common.spec b/packaging/libsf-common.spec
index bf48681..21e815d 100644
--- a/packaging/libsf-common.spec
+++ b/packaging/libsf-common.spec
@@ -1,11 +1,12 @@
+#sbs-git:slp/pkgs/l/libsf-common libsf-common 0.3.6 07588b34636f76e6457efb6d65e9318513c5957c
Name: libsf-common
Summary: Commonly used code and defintions for the sensor framework
-Version: 0.3.6
+Version: 0.3.18
Release: 1
Group: TO_BE/FILLED_IN
-License: TO BE FILLED IN
-Source0: libsf-common-%{version}.tar.bz2
-BuildRequires: cmake
+License: Apache 2.0
+Source0: libsf-common-%{version}.tar.gz
+BuildRequires: cmake, libattr-devel
BuildRequires: pkgconfig(dlog)
@@ -39,6 +40,7 @@ rm -rf %{buildroot}
%make_install
%files
+%manifest libsf-common.manifest
%{_libdir}/libsf_common.so
diff --git a/src/ccatalog.cpp b/src/ccatalog.cpp
index 9f6d285..9821988 100755
--- a/src/ccatalog.cpp
+++ b/src/ccatalog.cpp
@@ -297,6 +297,7 @@ bool ccatalog::unload(void)
}
+
char *ccatalog::value(char *group, char *name)
{
group_t *grp;
@@ -325,33 +326,6 @@ char *ccatalog::value(char *group, char *name)
return NULL;
}
-char *ccatalog::value(char *group, char *name, void *handle)
-{
- group_t *grp = (group_t*)handle;
-
- while (grp) {
- if (!strcmp(group, grp->name)) {
- break;
- }
-
- grp = (group_t*)grp->next();
- }
-
- if (grp) {
- desc_t *desc;
- desc = grp->head;
-
- while (desc) {
- if (!strcmp(name, desc->name)) {
- return desc->value;
- }
- desc = (desc_t*)desc->next();
- }
- }
-
- return NULL;
-}
-
char *ccatalog::value(char *group, char *name, int idx)
{
group_t *grp;
diff --git a/src/cipc_worker.cpp b/src/cipc_worker.cpp
index f93074f..2799ed5 100755
--- a/src/cipc_worker.cpp
+++ b/src/cipc_worker.cpp
@@ -134,31 +134,28 @@ void *cipc_worker::started(void *data)
inst->m_state = TERMINATE;
pthread_mutex_unlock(&(inst->mutex_lock));
inst->m_func[STOP](inst->m_context);
+ delete inst;
return NULL;
}
}while(state == START && inst->m_state == START);
DBG("\n\n\n############Client worker thread END############\n\n\n");
+ pthread_mutex_lock(&(inst->mutex_lock));
+ inst->m_state = STOPPED;
+ pthread_mutex_unlock(&(inst->mutex_lock));
+ delete inst;
return NULL;
}
bool cipc_worker::stop(void)
{
ipc_worker_state_s state;
- pthread_mutex_lock(&(mutex_lock));
- m_state = STOP;
- pthread_mutex_unlock(&(mutex_lock));
- DBG("Stop function [Client worker]\n");
-
+
state = (ipc_worker_state_s)(int)m_func[STOP](m_context);
- if (state == TERMINATE) {
- pthread_mutex_lock(&(mutex_lock));
- m_state = TERMINATE;
- pthread_mutex_unlock(&(mutex_lock));
- delete this;
- return false;
- }
+ pthread_mutex_lock(&(mutex_lock));
+ m_state = TERMINATE;
+ pthread_mutex_unlock(&(mutex_lock));
return true;
}
diff --git a/src/cprocessor_module.cpp b/src/cprocessor_module.cpp
index dd08236..7725477 100755
--- a/src/cprocessor_module.cpp
+++ b/src/cprocessor_module.cpp
@@ -57,10 +57,13 @@
#include <cfilter_module.h>
#include <cprocessor_module.h>
+#define BASE_GATHERING_INTERVAL 1000
+#define MS_TO_US 1000
+
cprocessor_module::cprocessor_module()
-: m_cb_head(NULL)
-, m_cb_tail(NULL)
-, m_worker(NULL)
+: m_worker(NULL)
+, m_interval_list_head(NULL)
+, m_interval_list_tail(NULL)
{
ctype::set_type(SF_PLUGIN_PROCESSOR);
@@ -107,28 +110,6 @@ bool cprocessor_module::stop(void)
bool cprocessor_module::add_event_callback(void *(*handler)(cprocessor_module *, void *), void *data, bool (*rm_cb_data)(void *data))
{
- event_callback_t *item;
-
- try {
- item = new event_callback_t;
- } catch (...) {
- ERR("Failed to allocate memory\n");
- return false;
- }
-
- item->handler = handler;
- item->inst = this;
- item->data = data;
- item->rm_cb_data = rm_cb_data;
-
- if (m_cb_head == NULL || m_cb_tail == NULL) {
- m_cb_tail = m_cb_head = item;
- } else {
- item->link(clist::AFTER, m_cb_tail);
- m_cb_tail = item;
- }
- DBG("Event ADDED =======================\n");
-
return true;
}
@@ -136,64 +117,13 @@ bool cprocessor_module::add_event_callback(void *(*handler)(cprocessor_module *
bool cprocessor_module::rm_event_callback(void *(*handler)(cprocessor_module *, void*), void *data)
{
- event_callback_t *iterator;
- event_callback_t *next;
- bool found = false;
-
- iterator = m_cb_head;
- while (iterator) {
- next = (event_callback_t*)iterator->next();
-
- if (iterator->handler == handler && iterator->data == data) {
- event_callback_t *prev = (event_callback_t*)iterator->prev();
-
- iterator->unlink();
-
- if (iterator == m_cb_tail) {
- m_cb_tail = prev;
- }
-
- if (iterator == m_cb_head) {
- m_cb_head = next;
- }
-
- if (iterator->rm_cb_data) {
- iterator->rm_cb_data(iterator->data);
- }
-
- delete iterator;
- iterator=NULL;
- found = true;
- break;
- }
-
- iterator = next;
- }
-
- return found;
+ return true;
}
void cprocessor_module::wakeup_all_client(void)
{
- event_callback_t *iterator;
- event_callback_t *next;
-
- lock();
- iterator = m_cb_head;
- while (iterator) {
- next = (event_callback_t*)iterator->next();
-
- if (this == iterator->inst) {
- iterator->handler(this, iterator->data);
- rm_event_callback(iterator->handler, iterator->data);
- DBG("Wakeup all client $$$$$$$$$$$$$$$$$$$$$$$$4\n");
- }
-
- iterator = next;
- }
- unlock();
}
void cprocessor_module::lock(void)
@@ -208,4 +138,98 @@ void cprocessor_module::unlock(void)
m_mutex.unlock();
}
+int cprocessor_module::add_interval_to_list(int interval, unsigned long polling_interval)
+{
+ interval_list_t *interval_data = new interval_list_t;
+ interval_data->interval = interval;
+
+ if(!m_interval_list_head && !m_interval_list_tail){
+ m_interval_list_head = m_interval_list_tail = interval_data;
+ } else{
+ interval_data->link(clist::AFTER, m_interval_list_tail);
+ m_interval_list_tail = interval_data;
+ }
+
+ return (polling_interval > interval * MS_TO_US) ? 0 : -1;
+}
+
+int cprocessor_module::del_interval_to_list(int interval, unsigned long polling_interval)
+{
+ interval_list_t *iterator = m_interval_list_head;
+ unsigned int min_interval = BASE_GATHERING_INTERVAL;
+
+ //find interval passed by param and remove
+ while(iterator){
+ if(interval == iterator->interval)
+ {
+ if(iterator->next() == NULL)
+ m_interval_list_tail = (interval_list_t*)iterator->prev();
+ if(iterator->prev() == NULL)
+ m_interval_list_head = (interval_list_t*)iterator->next();
+ iterator->unlink();
+ delete iterator;
+ break;
+ }
+ iterator = (interval_list_t*)iterator->next();
+ }
+
+ //find min interval value in list
+ interval_list_t *iterator_min_search = m_interval_list_head;
+
+ while(iterator_min_search){
+ if(iterator_min_search->interval <= min_interval){
+ min_interval = iterator_min_search->interval;
+ }
+ iterator_min_search = (interval_list_t*)iterator_min_search->next();
+ }
+
+ return min_interval;
+}
+
+
+int cprocessor_module::check_hz(int time_ms)
+{
+ if(time_ms >= POLL_1HZ_MS)
+ return POLL_1HZ;
+ else if (time_ms < POLL_1HZ_MS && time_ms >= POLL_5HZ_MS)
+ return POLL_5HZ;
+ else if (time_ms < POLL_5HZ_MS && time_ms >= POLL_10HZ_MS)
+ return POLL_10HZ;
+ else if (time_ms < POLL_10HZ_MS && time_ms >= POLL_20HZ_MS)
+ return POLL_20HZ;
+ else if (time_ms < POLL_20HZ_MS && time_ms >= POLL_25HZ_MS)
+ return POLL_25HZ;
+ else if (time_ms < POLL_25HZ_MS && time_ms >= POLL_50HZ_MS)
+ return POLL_50HZ;
+ else if (time_ms < POLL_50HZ_MS && time_ms >= POLL_100HZ_MS)
+ return POLL_100HZ;
+ else
+ {
+ DBG("Cannot support input time [%d]",time_ms);
+ return -1;
+ }
+}
+
+int cprocessor_module::norm_interval(int time_ms)
+{
+ if(time_ms >= POLL_1HZ_MS)
+ return POLL_1HZ_MS; //1000ms
+ else if (time_ms < POLL_1HZ_MS && time_ms >= POLL_5HZ_MS)
+ return POLL_5HZ_MS; //200ms
+ else if (time_ms < POLL_5HZ_MS && time_ms >= POLL_10HZ_MS)
+ return POLL_10HZ_MS; //100ms
+ else if (time_ms < POLL_10HZ_MS && time_ms >= POLL_20HZ_MS)
+ return POLL_20HZ_MS; //50ms
+ else if (time_ms < POLL_20HZ_MS && time_ms >= POLL_25HZ_MS)
+ return POLL_25HZ_MS; //40ms
+ else if (time_ms < POLL_25HZ_MS && time_ms >= POLL_50HZ_MS)
+ return POLL_50HZ_MS; // 20ms
+ else if (time_ms < POLL_50HZ_MS && time_ms >= POLL_100HZ_MS)
+ return POLL_100HZ_MS;//10ms
+ else
+ {
+ DBG("Cannot support input time [%d]",time_ms);
+ return -1;
+ }
+}
//! End of a file
diff --git a/src/csock.cpp b/src/csock.cpp
index 8a14d89..5f189eb 100755
--- a/src/csock.cpp
+++ b/src/csock.cpp
@@ -49,7 +49,8 @@
extern int errno;
-
+const int MAX_CONNECT = 10;
+const int DELAY_FOR_CONNECT = 10000;
csock::csock(int handle, int mode)
@@ -85,6 +86,7 @@ csock::csock(char *name, int mode, int port, int server)
int domain;
int type;
sockaddr *sock_ptr = NULL;
+ struct timeval tv;
m_start=NULL;
m_running=NULL;
@@ -212,6 +214,14 @@ csock::csock(char *name, int mode, int port, int server)
}
}
}
+ else
+ {
+ tv.tv_sec = 3;
+ tv.tv_usec = 0;
+
+ if( setsockopt(m_handle, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) != 0)
+ ERR("setsockopt fail");
+ }
}
@@ -248,6 +258,7 @@ void *csock::client_ctx(void)
bool csock::connect_to_server(void)
{
int type = (m_mode & SOCK_UDP) ? SOCK_DGRAM : SOCK_STREAM;
+ int i = 0;
if (type == SOCK_STREAM) {
int len;
@@ -261,12 +272,23 @@ bool csock::connect_to_server(void)
sock_ptr = (struct sockaddr*)&m_addr;
}
- if (connect(m_handle, sock_ptr, len) < 0) {
- ERR("connect fail , m_handle : %d , sock_ptr : %p , len : %d ,%s\n",m_handle , sock_ptr , len, strerror(errno));
- close(m_handle);
- m_handle = -1;
- return false;
+ for(i = 0 ; i < MAX_CONNECT ; i++)
+ {
+ if (connect(m_handle, sock_ptr, len) == 0)
+ {
+ return true;
+ }
+ else
+ {
+ DBG("wait for accept worker");
+ usleep(DELAY_FOR_CONNECT);
+ }
}
+
+ ERR("connect fail , m_handle : %d , sock_ptr : %p , len : %d ,%s\n",m_handle , sock_ptr , len, strerror(errno));
+ close(m_handle);
+ m_handle = -1;
+ return false;
}
return true;
@@ -428,6 +450,7 @@ bool csock::recv(void *buffer, int size)
ssize_t recv_size;
int total_recv_size = 0;
+ DBG("Recv message : data size is %d\n", size);
if (m_handle < 0) {
ERR("Invalid handle\n");
return false;
@@ -438,6 +461,12 @@ bool csock::recv(void *buffer, int size)
return true;
}
+ if (size < 0)
+ {
+ ERR("invalid size of packet");
+ return false;
+ }
+
if (m_mode & SOCK_UDP) {
sockaddr *sock_ptr;
socklen_t len;
@@ -456,8 +485,8 @@ bool csock::recv(void *buffer, int size)
DBG("recvfrom %s\n", (char*)buffer);
} else {
do {
- recv_size = read(m_handle,
- (char*)buffer + total_recv_size, size - total_recv_size);
+ recv_size = ::recv(m_handle, (char*)buffer + total_recv_size, size - total_recv_size, MSG_NOSIGNAL |MSG_WAITALL);
+
if (recv_size <= 0) {
ERR("Error recv_size check fail , recv_size : %d\n",recv_size);
close(m_handle);
@@ -488,6 +517,12 @@ bool csock::send(void *buffer, int size)
if (size == 0) {
return true;
}
+
+ if (size < 0)
+ {
+ ERR("invalid size of packet");
+ return false;
+ }
if (m_handle < 0) {
ERR("Invalid handle\n");
@@ -520,7 +555,7 @@ bool csock::send(void *buffer, int size)
} else if (m_mode & SOCK_TCP) {
DBG("TCP send enabled\n");
- send_size = write(m_handle, buffer, size);
+ send_size = ::send(m_handle, buffer, size, MSG_NOSIGNAL);
if (send_size <= 0) {
ERR("Error send_size check fail , send_size : %d\n",send_size);
close(m_handle);
diff --git a/src/cworker.cpp b/src/cworker.cpp
index 09143fe..81e0c78 100755
--- a/src/cworker.cpp
+++ b/src/cworker.cpp
@@ -37,25 +37,44 @@
#include <cworker.h>
#include <common.h>
-cworker::cworker(void)
-: m_state(STOPPED)
-, m_context(NULL)
-, mutex_lock(PTHREAD_MUTEX_INITIALIZER)
+ cworker::cworker(void)
+ : m_state(INITIAL)
+ , m_context(NULL)
+ , mutex_lock(PTHREAD_MUTEX_INITIALIZER)
+ , th_cond(PTHREAD_COND_INITIALIZER)
{
register int i;
-
+ int ret;
for (i = 0; i < ENUM_LAST; i ++) {
m_func[i] = NULL;
}
-
+
+ ret = pthread_mutex_init(&mutex_lock, NULL);
+ if (ret != 0) {
+ ERR("pthread_mutex_init : %s",strerror(errno));
+ throw EINVAL;
+ }
+
+ ret = pthread_cond_init(&th_cond, NULL);
+ if (ret != 0) {
+ ERR("pthread_cond_init : %s",strerror(errno));
+ throw EINVAL;
+ }
DBG("processor worker created\n");
}
cworker::~cworker(void)
{
DBG("----------Processor WORKER TERMINATED--------\n");
-
+
+ pthread_mutex_lock(&(mutex_lock));
+
m_state = TERMINATE;
+ pthread_cond_signal(&th_cond);
+
+ pthread_mutex_unlock(&(mutex_lock));
+
+ pthread_cond_destroy(&th_cond);
if (m_func[TERMINATE])
m_func[TERMINATE](m_context);
@@ -68,41 +87,41 @@ bool cworker::start(void)
int ret = 0;
pthread_mutex_lock(&(mutex_lock));
- if (m_state == START) {
- ERR("Already started\n");
- pthread_mutex_unlock(&(mutex_lock));
- return false;
- }
- m_state = START;
- pthread_mutex_unlock(&(mutex_lock));
-
- DBG("cworker start\n");
-
- ret = pthread_create(&m_thid, NULL, started, this);
-
- if(ret != 0)
+ if(m_state == INITIAL)
{
- pthread_mutex_lock(&(mutex_lock));
- m_state = STOP;
pthread_mutex_unlock(&(mutex_lock));
- ERR("thread create fail\n");
- return false;
- }
- else
- {
- ret = pthread_detach(m_thid);
+ ret = pthread_create(&m_thid, NULL, started, this);
+
if(ret != 0)
{
- ERR("thread detach fail\n");
+ pthread_mutex_lock(&(mutex_lock));
+ m_state = INITIAL;
+ pthread_mutex_unlock(&(mutex_lock));
+ ERR("thread create fail\n");
return false;
}
else
{
- DBG("Thread creation for Processor worker END\n");
+ pthread_detach(m_thid);
}
+
+ pthread_mutex_lock(&(mutex_lock));
}
-
+ else if (m_state == START) {
+ ERR("Already started\n");
+ pthread_mutex_unlock(&(mutex_lock));
+ return false;
+ }
+
+ ret = pthread_cond_signal(&th_cond);
+ if (ret != 0) {
+ ERR("pthread_cond_wait : %s",strerror(errno));
+ }
+
+ m_state = START;
+ pthread_mutex_unlock(&(mutex_lock));
+
return true;
}
@@ -118,22 +137,17 @@ bool cworker::terminate(void)
void *cworker::started(void *data)
{
cworker *inst = (cworker*)data;
- worker_state_s state;
+ worker_state_s state = STOPPED;
do
{
state = (worker_state_s)(int)inst->m_func[STARTED](inst->m_context);
- if (state == STOPPED) {
- pthread_mutex_lock(&(inst->mutex_lock));
- inst->m_state = STOP;
- pthread_mutex_unlock(&(inst->mutex_lock));
- ERR("Abnormal Situation: processor_plugin->working() returned STOPPED\n");
- return NULL;
- }
- }while(state == STARTED && inst->m_state == START);
- DBG("\n\n\n#############Processor worker thread END###########\n\n\n");
-
+ if(state == STOPPED || inst->m_state == STOP)
+ inst->stopped();
+
+ }while(state != TERMINATE);
+
return NULL;
}
@@ -145,10 +159,17 @@ bool cworker::stop(void)
pthread_mutex_unlock(&(mutex_lock));
return false;
}
-
m_state = STOP;
pthread_mutex_unlock(&(mutex_lock));
- DBG("Stop function for Processor worker END ");
+
+ return true;
+}
+
+bool cworker::stopped(void)
+{
+ pthread_mutex_lock(&mutex_lock);
+ pthread_cond_wait(&th_cond, &mutex_lock);
+ pthread_mutex_unlock(&mutex_lock);
return true;
}