diff options
author | Jeonghoon Park <jh1979.park@samsung.com> | 2018-06-11 17:46:58 +0900 |
---|---|---|
committer | Jeonghoon Park <jh1979.park@samsung.com> | 2018-06-11 17:46:58 +0900 |
commit | 2e81786ae6e280e13b2ce727ae860f76ad965df7 (patch) | |
tree | 16f6dcce928781bb26d6bb67084da993c8e71575 /daemon | |
parent | 0f7cbdf526b1a54653117acd21d50df501806db2 (diff) | |
download | tizen-things-daemon-2e81786ae6e280e13b2ce727ae860f76ad965df7.tar.gz tizen-things-daemon-2e81786ae6e280e13b2ce727ae860f76ad965df7.tar.bz2 tizen-things-daemon-2e81786ae6e280e13b2ce727ae860f76ad965df7.zip |
add command manager module
Change-Id: I51869c3060dfc8b556ca119d7f4173836e04daab
Diffstat (limited to 'daemon')
-rw-r--r-- | daemon/include/ttd-cmd-mgr.h | 29 | ||||
-rw-r--r-- | daemon/src/ttd-cmd-mgr.c | 469 |
2 files changed, 498 insertions, 0 deletions
diff --git a/daemon/include/ttd-cmd-mgr.h b/daemon/include/ttd-cmd-mgr.h new file mode 100644 index 0000000..05bc144 --- /dev/null +++ b/daemon/include/ttd-cmd-mgr.h @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2018 Samsung Electronics Co., Ltd. + * + * Licensed under the Flora License, Version 1.1 (the License); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://floralicense.org/license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#ifndef __TTD_CMD_MRG_H__ +#define __TTD_CMD_MRG_H__ + +#include "ttd-cmd-type.h" + +int ttd_cmd_mgr_init(void); +int ttd_cmd_mgr_fini(void); +int ttd_cmd_mgr_get_cmd(void); +int ttd_cmd_mgr_push_result(const char *id, + ttd_cmd_result_e result, const char *data); + +#endif /* __TTD_CMD_MRG_H__ */ diff --git a/daemon/src/ttd-cmd-mgr.c b/daemon/src/ttd-cmd-mgr.c new file mode 100644 index 0000000..caf0434 --- /dev/null +++ b/daemon/src/ttd-cmd-mgr.c @@ -0,0 +1,469 @@ +/* + * Copyright (c) 2018 Samsung Electronics Co., Ltd. + * + * Licensed under the Flora License, Version 1.1 (the License); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://floralicense.org/license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <glib.h> +#include "ttd-log.h" +#include "ttd-cmd-mgr.h" +#include "ttd-cmd.h" +#include "ttd-cmd-type.h" +#include "ttd-cmd-func.h" +#include "ttd-http.h" +#include "ttd-parse-cmd.h" +#include "ttd-build-json.h" +#include "common-util.h" + +#ifndef SERVER_URL +/* TODO : remove it after test */ +#define TEST_SERVER_URL "http://test.showiot.xyz/api/cmd?&target=test-page-device&owner=test-page&state=created" +#define SERVER_URL TEST_SERVER_URL +#endif + +#define CMD_MGR_GET_INTERVAL_SEC (3600 * 3) +#define RESULT_WAIT_IN_SEC (10) +#define RESULT_WAIT_TIME (RESULT_WAIT_IN_SEC * 1000000) +#define RESULT_WAIT_TRY_MAX (6) + +typedef struct _ttd_result_data_s { + char *cmd_id; + ttd_cmd_result_e result; + char *data; +} ttd_result_data_s; + +struct __cmd_mgr_h { + GAsyncQueue *cmd_id_queue; + GHashTable *cmd_hash; + GAsyncQueue *result_queue; + GMutex mutex; + GThread *get_thread; + int get_thread_running; + int get_by_request; + GMutex get_mutex; + GCond get_cond; + GThread *launch_thread; + int launch_thread_running; +}; + +/* Use singleton */ +static struct __cmd_mgr_h *g_handle = NULL; + +static const char *__ttd_get_cloud_url(void) +{ + const char *url = NULL; + + /* TODO : get cloud url */ + url = SERVER_URL; + + return url; +} + +static void __result_item_free(gpointer data) +{ + ttd_result_data_s *item = data; + if (!item) + return; + + if (item->cmd_id) + _D("free result item[%s]", item->cmd_id); + g_free(item->cmd_id); + g_free(item->data); + + g_free(item); +} + +int ttd_cmd_mgr_push_result(const char *id, + ttd_cmd_result_e result, const char *data) +{ + ttd_result_data_s *result_item = NULL; + + retvm_if(!g_handle, -1, "cmd mgr is not initialized yet"); + retvm_if(!id, -1, "cmd id is NULL"); + + result_item = g_try_new0(ttd_result_data_s, 1); + + result_item->cmd_id = g_strdup(id); + result_item->result = result; + if (data) + result_item->data = g_strdup(data); + + g_async_queue_push(g_handle->result_queue, result_item); + + return 0; +} + +int ttd_cmd_mgr_get_cmd(void) +{ + retvm_if(!g_handle, -1, "cmd mgr is not initialized yet"); + _D("unblocking get_thread"); + g_mutex_lock(&g_handle->get_mutex); + g_handle->get_by_request = 1; + g_cond_broadcast(&g_handle->get_cond); + g_mutex_unlock(&g_handle->get_mutex); + // wait here to get result of getting cmd or not ??? + + return 0; +} + +static int _get_cloud_cmd(struct __cmd_mgr_h *handle, long *res_code) +{ + int ret = 0; + char *cmd = NULL; + long r_code = 0; + GList *cmd_list = NULL; + GList *l = NULL; + + retv_if(!handle, -1); + + ret = ttd_http_get_cloud_cmd(__ttd_get_cloud_url(), &cmd, &r_code); + if (ret) { + _E("failed to get cmd [%ld]", r_code); + if (res_code) + *res_code = r_code; + return -1; + } + if (res_code) + *res_code = r_code; + + retvm_if(!cmd, 0, "there is no new cmd now"); + + ret = ttd_parse_json_to_cmd(cmd, &cmd_list); + if (ret) { + _E("failed to parse cmd"); + g_free(cmd); + return -1; + } + g_free(cmd); + cmd = NULL; + + for (l = cmd_list; l != NULL; l = l->next) { + ttd_cmd_data *cmd_data = NULL; + ttd_cmd_data *item = NULL; + const char *cmd_id = NULL; + + cmd_data = (ttd_cmd_data *)l->data; + continue_if(!cmd_data); + + cmd_id = ttd_cmd_get_id(cmd_data); + + g_mutex_lock(&handle->mutex); + item = g_hash_table_lookup(handle->cmd_hash, cmd_id); + if (!item) { + _D("cmd[%s] is pushed in cmd queue", cmd_id); + g_async_queue_push(handle->cmd_id_queue, g_strdup(cmd_id)); + g_hash_table_insert(handle->cmd_hash, g_strdup(cmd_id), cmd_data); + } else { + _D("cmd[%s] is already in cmd queue", cmd_id); + ttd_cmd_free(cmd_data); + } + g_mutex_unlock(&handle->mutex); + } + g_list_free(cmd_list); + + return 0; +} + +static gpointer _get_thread(gpointer data) +{ + struct __cmd_mgr_h *handle = data; + + g_mutex_lock(&handle->get_mutex); + while (g_atomic_int_get(&(handle->get_thread_running))) { + int ret = 0; + guint64 until = 0; + unsigned int retry = 0; + + until = common_get_monotonic_coarse_time() + + CMD_MGR_GET_INTERVAL_SEC * G_TIME_SPAN_SECOND; + _D("thread blocked for 3 hours"); + g_cond_wait_until(&handle->get_cond, &handle->get_mutex, until); + _D("thread unblocked"); + + if (g_handle->get_by_request) + retry = 5; + + do { + long res_code = 0; + + ret = _get_cloud_cmd(handle, &res_code); + if (ret) + _E("failed to get cmd - %ld", res_code); + else + _D("res_code: %ld", res_code); + + if (res_code == 200) // HTTP OK - success to get cmd + break; + + retry--; + g_usleep(G_USEC_PER_SEC); // sleep in 1 second before retry + } while(retry); + + g_handle->get_by_request = 0; + } + g_mutex_unlock(&handle->get_mutex); + + return NULL; +} + +static gpointer _launch_thread(gpointer data) +{ + struct __cmd_mgr_h *handle = data; + + while (g_atomic_int_get(&(handle->launch_thread_running))) { + char *cmd_id = NULL; + ttd_cmd_data *cmd_data = NULL; + char *report = NULL; + long r_code = 0; + int ret = 0; + ttd_cmd_launch_func launch_func = NULL; + ttd_cmd_type_e cmd_type = TTD_CMD_TYPE_UNKNOWN; + int result_wait = 1; + + // get pop oldest cmd from cmd_id_queue + _D("block launch thread"); + cmd_id = g_async_queue_pop(handle->cmd_id_queue); + if (!cmd_id) { + _D("unblock launch thread"); + _D("cmd queue is empty"); + goto DONE_N_WAIT; + } + _D("unblock launch thread"); + + // get cmd data from cmd_hash + g_mutex_lock(&handle->mutex); + cmd_data = g_hash_table_lookup(handle->cmd_hash, cmd_id); + g_mutex_unlock(&handle->mutex); + if (!cmd_data) { + _E("data for cmd[%s] is not exist", cmd_id); + goto DONE_N_WAIT; + } + + // report 'running' state of the cmd id to cloud + cmd_type = ttd_cmd_get_type(cmd_data); + report = ttd_build_json_create_report(cmd_id, cmd_type, + TTD_CMD_STATE_RUNNING, 0, "state update", NULL); + _D("report - %s", report); + ret = ttd_http_post_cmd_result( + __ttd_get_cloud_url(), report, &r_code); + if (r_code != 200) { + _E("failed to post report [%ld]", r_code); + g_free(report); + goto DONE_N_WAIT; + } + g_free(report); + report = NULL; + ttd_cmd_set_state(cmd_data, TTD_CMD_STATE_RUNNING); + + // execute cmd + launch_func = ttd_cmd_get_launch_func(cmd_type); + if (!launch_func) { + _E("cmd[%s] no proper launch function", cmd_id); + report = ttd_build_json_create_report(cmd_id, cmd_type, + TTD_CMD_STATE_FAILED, 0, "no proper launch function", NULL); + _D("report - %s", report); + ttd_http_post_cmd_result(__ttd_get_cloud_url(), report, NULL); + g_free(report); + report = NULL; + goto DONE_N_WAIT; + } + + ret = launch_func(cmd_data); + if (ret) { + _E("cmd[%s] launch failed", cmd_id); + report = ttd_build_json_create_report(cmd_id, cmd_type, + TTD_CMD_STATE_FAILED, 0, "command launch failed", NULL); + _D("report - %s", report); + ttd_http_post_cmd_result(__ttd_get_cloud_url(), report, NULL); + g_free(report); + report = NULL; + goto DONE_N_WAIT; + } + + while (result_wait) { + ttd_result_data_s *result_item = NULL; + static unsigned int count = 0; + + result_item = g_async_queue_timeout_pop( + g_handle->result_queue, RESULT_WAIT_TIME); + + if (!result_item) { + count++; + + if (count <= RESULT_WAIT_TRY_MAX) + continue; + + /* timeout to wait result, report fail */ + report = ttd_build_json_create_report(cmd_id, cmd_type, + TTD_CMD_STATE_FAILED, 0, "timeout to wait result ", NULL); + _D("report - %s", report); + ttd_http_post_cmd_result(__ttd_get_cloud_url(), report, NULL); + g_free(report); + report = NULL; + + result_wait = 0; + break; + } + + switch (result_item->result) { + case TTD_CMD_RESULT_RUNNING: + /* report running and wait more result */ + report = ttd_build_json_create_report(cmd_id, cmd_type, + TTD_CMD_STATE_RUNNING, 0, + "report in progress", result_item->data); + _D("report - %s", report); + ttd_http_post_cmd_result(__ttd_get_cloud_url(), report, NULL); + g_free(report); + report = NULL; + break; + case TTD_CMD_RESULT_SUCCESS: + /* report done */ + report = ttd_build_json_create_report(cmd_id, cmd_type, + TTD_CMD_STATE_DONE, 0, + "done", result_item->data); + _D("report - %s", report); + ttd_http_post_cmd_result(__ttd_get_cloud_url(), report, NULL); + g_free(report); + report = NULL; + + result_wait = 0; + break; + case TTD_CMD_RESULT_FAIL: + /* report fail */ + report = ttd_build_json_create_report(cmd_id, cmd_type, + TTD_CMD_STATE_FAILED, 0, + "failed to process command", result_item->data); + _D("report - %s", report); + ttd_http_post_cmd_result(__ttd_get_cloud_url(), report, NULL); + g_free(report); + report = NULL; + result_wait = 0; + break; + /* unhandled states */ + default: + break; + } + __result_item_free(result_item); + } + +DONE_N_WAIT: + if (cmd_id) { + g_mutex_lock(&handle->mutex); + g_hash_table_remove(handle->cmd_hash, cmd_id); + g_mutex_unlock(&handle->mutex); + g_free(cmd_id); + } + } + + return NULL; +} + +static void __free_cmd_mgr_handle(void) +{ + if (!g_handle) + return; + + g_mutex_clear(&g_handle->mutex); + + if (g_handle->launch_thread) { + g_atomic_int_set(&(g_handle->launch_thread_running), 0); + g_thread_join(g_handle->launch_thread); + } + + if (g_handle->get_thread) { + g_atomic_int_set(&(g_handle->get_thread_running), 0); + g_thread_join(g_handle->get_thread); + } + g_mutex_clear(&g_handle->get_mutex); + g_cond_clear(&g_handle->get_cond); + + if (g_handle->result_queue) + g_async_queue_unref(g_handle->result_queue); + + if (g_handle->cmd_id_queue) + g_async_queue_unref(g_handle->cmd_id_queue); + + if (g_handle->cmd_hash) { + g_hash_table_remove_all(g_handle->cmd_hash); + g_hash_table_unref(g_handle->cmd_hash); + } + + g_free(g_handle); + g_handle = NULL; +} + +static gboolean __get_cmd_first_time(gpointer data) +{ + ttd_cmd_mgr_get_cmd(); + + return G_SOURCE_REMOVE; +} + +int ttd_cmd_mgr_init(void) +{ + GError *error = NULL; + + retvm_if(g_handle, -1, "cmd mgr already initialized, finalized it first"); + + g_handle = g_try_malloc0(sizeof(struct __cmd_mgr_h)); + retvm_if(!g_handle, -1, "failed to malloc"); + + g_handle->cmd_id_queue = g_async_queue_new_full((GDestroyNotify)g_free); + goto_if(!g_handle->cmd_id_queue, ERROR_N_EXIT); + + g_handle->cmd_hash = g_hash_table_new_full(g_str_hash, g_str_equal, + g_free, (GDestroyNotify)ttd_cmd_free); + goto_if(!g_handle->cmd_hash, ERROR_N_EXIT); + + g_handle->result_queue = g_async_queue_new_full(__result_item_free); + goto_if(!g_handle->result_queue, ERROR_N_EXIT); + + g_mutex_init(&g_handle->mutex); + + g_atomic_int_set(&(g_handle->get_thread_running), 1); + g_mutex_init(&g_handle->get_mutex); + g_cond_init(&g_handle->get_cond); + + g_handle->get_thread = + g_thread_try_new(NULL, (GThreadFunc)_get_thread, g_handle, &error); + if (!g_handle->get_thread) { + _E("failed to create get thread - %s", + error && error->message ? error->message : "Unknown Error"); + goto ERROR_N_EXIT; + } + + g_atomic_int_set(&(g_handle->launch_thread_running), 1); + g_handle->launch_thread = + g_thread_try_new(NULL, (GThreadFunc)_launch_thread, g_handle, &error); + if (!g_handle->launch_thread) { + _E("failed to create launch_thread - %s", + error && error->message ? error->message : "Unknown Error"); + goto ERROR_N_EXIT; + } + + // need it ?? + g_idle_add(__get_cmd_first_time, NULL); + + return 0; + +ERROR_N_EXIT: + g_error_free(error); + __free_cmd_mgr_handle(); + return -1; +} + +int ttd_cmd_mgr_fini(void) +{ + __free_cmd_mgr_handle(); + return 0; +} |