/* * Copyright (c) 2013 Samsung Electronics Co., Ltd All Rights Reserved * * Licensed under the Apache License, Version 2.0 (the License); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an AS IS BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #define _GNU_SOURCE #include #include #include #include // pthread_kill #include // ESRCH #include #include #include #include #include #include #include #include #include #include #include #include #include extern void *g_db_handle; static pthread_mutex_t g_dp_queue_manager_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t g_dp_queue_manager_cond = PTHREAD_COND_INITIALIZER; static pthread_t g_dp_queue_manager_tid = 0; static dp_queue_fmt *g_dp_queue_network_all = NULL; static dp_queue_fmt *g_dp_queue_network_wifi = NULL; static dp_queue_fmt *g_dp_queue_network_data_network = NULL; static dp_queue_fmt *g_dp_queue_network_wifi_direct = NULL; static dp_queue_fmt **__dp_queue_manager_get_queue(int network) { switch (network) { case DP_NETWORK_ALL: //TRACE_DEBUG("network all"); return &g_dp_queue_network_all; case DP_NETWORK_WIFI: //TRACE_DEBUG("network wifi only"); return &g_dp_queue_network_wifi; case DP_NETWORK_DATA_NETWORK: //TRACE_DEBUG("network data network only"); return &g_dp_queue_network_data_network; case DP_NETWORK_WIFI_DIRECT: //TRACE_DEBUG("network wifi-direct"); return &g_dp_queue_network_wifi_direct; default: break; } return NULL; } int dp_queue_manager_push_queue(void *slot, void *request) { dp_request_fmt *requestp = request; if (slot == NULL || request == NULL) { TRACE_DEBUG("check address client:%p request:%p id:%d", slot, request, (request == NULL ? 0 : requestp->id)); return -1; } dp_queue_fmt **queue = __dp_queue_manager_get_queue(requestp->network_type); if (requestp->state != DP_STATE_QUEUED) { TRACE_ERROR("check id:%d state:%s", requestp->id, dp_print_state(requestp->state)); return -1; } if (dp_queue_push(queue, slot, request) < 0) { TRACE_ERROR("failed to push to queue id:%d", requestp->id); return -1; } return 0; } void dp_queue_manager_clear_queue(void *request) { dp_request_fmt *requestp = request; if (request == NULL) { TRACE_DEBUG("check address request:%p id:%d", request, (request == NULL ? 0 : requestp->id)); return ; } dp_queue_fmt **queue = __dp_queue_manager_get_queue(requestp->network_type); dp_queue_clear(queue, request); } // return 0:stale 1:fresh -1:error static int __dp_queue_manager_check_cache_freshness(const char *file_name) { int errorcode = DP_ERROR_NONE; int id = dp_db_get_cache_file_id(g_db_handle, file_name, &errorcode); if (id < 0) { TRACE_ERROR("failed to get cache file id"); return -1; } long max_age = 0; if (dp_db_get_property_int(g_db_handle, id, DP_TABLE_CACHE_FILE, DP_DB_COL_CACHE_MAX_AGE, (long *)&max_age, &errorcode) < 0 || max_age == 0) { TRACE_DEBUG("failed to get max_age"); return 0; } long last_update = 0; if (dp_db_get_property_int(g_db_handle, id, DP_TABLE_CACHE_FILE, DP_DB_COL_CACHE_LAST_UPDATE, (long *)&last_update, &errorcode) < 0 || last_update == 0) { TRACE_DEBUG("failed to get last_update"); return 0; } long current_age = difftime(time(NULL), last_update); if (current_age >= max_age) return 0; return 1; } static int __dp_queue_send_request_for_cache_validation(const char *file_name, dp_client_slots_fmt *slot, dp_request_fmt *request) { int errorcode = DP_ERROR_INVALID_PARAMETER; if (!file_name || !slot || !request) { TRACE_ERROR("NULL CHECK!: file_name, slot or request"); return errorcode; } errorcode = DP_ERROR_NONE; char *etag = NULL; unsigned length = 0; if (dp_db_get_file_cache_string(g_db_handle, file_name, DP_DB_COL_ETAG, (unsigned char **)&etag, &length, &errorcode) < 0 || !etag) { TRACE_DEBUG("failed to get etag"); return errorcode; } const char *header_field = "If-None-Match"; const char *header_value = etag; int check_field = dp_db_check_duplicated_string(slot->client.dbhandle, request->id, DP_TABLE_HEADERS, DP_DB_COL_HEADER_FIELD, 0, header_field, &errorcode); if (check_field < 0) { errorcode = DP_ERROR_DISK_BUSY; } else { if (check_field == 0) { // insert if (dp_db_new_header(slot->client.dbhandle, request->id, header_field, header_value, &errorcode) < 0) { TRACE_ERROR("failed to set header"); errorcode = DP_ERROR_DISK_BUSY; } } else { // update if (dp_db_update_header(slot->client.dbhandle, request->id, header_field, header_value, &errorcode) < 0) { TRACE_ERROR("failed to set header"); errorcode = DP_ERROR_DISK_BUSY; } } if (errorcode == DP_ERROR_NONE) errorcode = dp_start_agent_download(slot, request); } if (etag) free(etag); return errorcode; } static char *__get_ori_file_name(const char *file_name) { int errorcode = DP_ERROR_INVALID_PARAMETER; if (!file_name) { TRACE_ERROR("NULL CHECK!: file_name"); return NULL; } char *ori_file = NULL; unsigned len = 0; if (dp_db_get_file_cache_string(g_db_handle, file_name, DP_DB_COL_FILENAME, (unsigned char **)&ori_file, &len, &errorcode) < 0 || !ori_file) { return NULL; } return ori_file; } static int __dp_queue_manager_try_cache_download(dp_client_slots_fmt *slot, dp_request_fmt *request) { int errorcode = DP_ERROR_NONE; char *url = NULL; unsigned length = 0; if (dp_db_get_property_string(slot->client.dbhandle, request->id, DP_TABLE_REQUEST, DP_DB_COL_URL, (unsigned char **)&url, &length, &errorcode) < 0 || !url) { TRACE_ERROR("faild to get url. id:%d", request->id); return errorcode; } char *file_name = NULL; unsigned len = 0; int pkg_exist = 0; int ret = dp_db_get_cache_file_name(g_db_handle, slot->pkgname, url, &file_name, &len, &pkg_exist, &errorcode); if (ret <= 0) { TRACE_DEBUG("no cache file name. id:%d", request->id); free(url); return errorcode; } else { if (pkg_exist == 0) { if (dp_db_new_app_cache(g_db_handle, slot->pkgname, url, (const char *)file_name, false, &errorcode) < 0) { TRACE_ERROR("failed to add new app cache. id:%d", request->id); goto done; } } int fresh = __dp_queue_manager_check_cache_freshness(file_name); if (fresh < 0) { TRACE_ERROR("failed to check cache freshness. id:%d", request->id); errorcode = DP_ERROR_INVALID_PARAMETER; goto done; } if (fresh == 1) { char *ori_file = __get_ori_file_name(file_name); if (!ori_file) { TRACE_ERROR("failed to get pure file name. id:%d", request->id); errorcode = DP_ERROR_NO_DATA; goto done; } request->file_name = ori_file; // call cache agent start function errorcode = dp_start_cache_download(file_name, slot, request); } else { // stale errorcode = __dp_queue_send_request_for_cache_validation(file_name, slot, request); } } done: free(url); free(file_name); return errorcode; } // if return negative, queue-manager try again. static int __dp_queue_manager_try_download(dp_client_slots_fmt *slot, dp_request_fmt *request) { int errorcode = DP_ERROR_NONE; int result = 0; if (slot == NULL || request == NULL) { TRACE_DEBUG("check address client:%p request:%p id:%d", slot, request, (request == NULL ? 0 : request->id)); // return 0 to ignore this call. return 0; } if (request->state != DP_STATE_QUEUED) { TRACE_ERROR("check id %d state:%d", request->id, request->state); return 0; } // check startcount request->startcount++; request->access_time = (int)time(NULL); if (dp_db_replace_property(slot->client.dbhandle, request->id, DP_TABLE_LOGGING, DP_DB_COL_STARTCOUNT, (void *)&request->startcount, 0, 0, &errorcode) < 0) { TRACE_ERROR("failed to set startcount"); return -1; } errorcode = DP_ERROR_NONE; int cache = 0; if (dp_db_get_property_int(slot->client.dbhandle, request->id, DP_TABLE_REQUEST, DP_DB_COL_CACHE, (int *)&cache, &errorcode) < 0 || cache < 0) { TRACE_ERROR("unable to get cache value for id:%d", request->id); cache = 0; } if (cache && dp_is_alive_cache_download(request->cache_agent_id) > 0) { errorcode = dp_resume_cache_download(request->cache_agent_id); } else if (dp_is_alive_download(request->agent_id) > 0) { errorcode = dp_resume_agent_download(request->agent_id); } else { if (cache) { if (__dp_queue_manager_try_cache_download(slot, request) != DP_ERROR_NONE) errorcode = dp_start_agent_download(slot, request); } else { // call agent start function errorcode = dp_start_agent_download(slot, request); } } if (errorcode == DP_ERROR_NONE) { request->state = DP_STATE_CONNECTING; } else if (errorcode == DP_ERROR_TOO_MANY_DOWNLOADS || errorcode == DP_ERROR_DISK_BUSY || errorcode == DP_ERROR_OUT_OF_MEMORY) { TRACE_ERROR("push queue id:%d error:%s", request->id, dp_print_errorcode(errorcode)); // PENDED request->state = DP_STATE_QUEUED; result = -1; // try again. } else if (errorcode == DP_ERROR_INVALID_STATE) { // by resume // ignore this request result = -1; TRACE_ERROR("failed to resume id:%d", request->id); request->agent_id = -1; // workaround. da_agent will an object for this agent_id later } else if (errorcode != DP_ERROR_NONE) { request->state = DP_STATE_FAILED; } request->error = errorcode; if (result == 0) { // it's not for retrying int sqlerror = DP_ERROR_NONE; if (dp_db_update_logging(slot->client.dbhandle, request->id, request->state, request->error, &sqlerror) < 0) { TRACE_ERROR("logging failure id:%d error:%d", request->id, sqlerror); } if (errorcode != DP_ERROR_NONE && request->state_cb == 1) { // announce state TRACE_ERROR("notify id:%d error:%s", request->id, dp_print_errorcode(errorcode)); if (dp_notify_feedback(slot->client.notify, slot, request->id, request->state, request->error, 0) < 0) { TRACE_ERROR("disable state callback by IO_ERROR id:%d", request->id); request->state_cb = 0; } } } return result; } static int __dp_queue_manager_check_queue(dp_queue_fmt **queue) { dp_client_slots_fmt *slot = NULL; dp_request_fmt *request = NULL; while (dp_queue_pop(queue, (void *)&slot, (void *)&request) == 0) { // pop a request from queue. TRACE_DEBUG("queue-manager pop a request"); if (slot == NULL || request == NULL) { TRACE_DEBUG("queue error client:%p request:%p id:%d", slot, request, (request == NULL ? 0 : request->id)); continue; } CLIENT_MUTEX_LOCK(&slot->mutex); TRACE_DEBUG("queue info slot:%p request:%p id:%d", slot, request, (request == NULL ? 0 : request->id)); int errorcode = DP_ERROR_NONE; int download_state = DP_STATE_NONE; if (slot != NULL && request != NULL && dp_db_get_property_int(slot->client.dbhandle, request->id, DP_TABLE_LOGGING, DP_DB_COL_STATE, &download_state, &errorcode) < 0) { TRACE_ERROR("deny checking id:%d db state:%s memory:%s", request->id, dp_print_state(download_state), dp_print_state(request->state)); errorcode = DP_ERROR_ID_NOT_FOUND; } if (download_state == DP_STATE_QUEUED && __dp_queue_manager_try_download(slot, request) < 0) { // if failed to start, push at the tail of queue. try again. if (dp_queue_push(queue, slot, request) < 0) { TRACE_ERROR("failed to push to queue id:%d", request->id); int errorcode = DP_ERROR_NONE; if (dp_db_update_logging(slot->client.dbhandle, request->id, DP_STATE_FAILED, DP_ERROR_QUEUE_FULL, &errorcode) < 0) TRACE_ERROR("failed to update log id:%d", request->id); request->state = DP_STATE_FAILED; request->error = DP_ERROR_QUEUE_FULL; } CLIENT_MUTEX_UNLOCK(&slot->mutex); return -1; // return negative for taking a break } CLIENT_MUTEX_UNLOCK(&slot->mutex); slot = NULL; request = NULL; } return 0; } static void *__dp_queue_manager(void *arg) { pthread_cond_init(&g_dp_queue_manager_cond, NULL); if (dp_init_agent() != DP_ERROR_NONE) { TRACE_ERROR("failed to init agent"); pthread_cond_destroy(&g_dp_queue_manager_cond); pthread_exit(NULL); return 0; } do { if (g_dp_queue_manager_tid <= 0) { TRACE_INFO("queue-manager is closed by other thread"); break; } // check wifi_direct first if (dp_network_is_wifi_direct() == 1 && __dp_queue_manager_check_queue(&g_dp_queue_network_wifi_direct) < 0) { TRACE_ERROR("download-agent is busy, try again after 15 seconds"); } else { // enter here if disable wifi-direct or download-agent is available int network_status = dp_network_get_status(); if (network_status != DP_NETWORK_OFF) { TRACE_INFO("queue-manager try to check queue network:%d", network_status); if (g_dp_queue_network_all != NULL && __dp_queue_manager_check_queue(&g_dp_queue_network_all) < 0) { TRACE_ERROR("download-agent is busy, try again after 15 seconds"); } else { dp_queue_fmt **queue = __dp_queue_manager_get_queue(network_status); if (__dp_queue_manager_check_queue(queue) < 0) TRACE_ERROR("download-agent is busy, try again after 15 seconds"); } } } struct timeval now; struct timespec ts; gettimeofday(&now, NULL); ts.tv_sec = now.tv_sec + 5; ts.tv_nsec = now.tv_usec * 1000; CLIENT_MUTEX_LOCK(&g_dp_queue_manager_mutex); pthread_cond_timedwait(&g_dp_queue_manager_cond, &g_dp_queue_manager_mutex, &ts); CLIENT_MUTEX_UNLOCK(&g_dp_queue_manager_mutex); } while (g_dp_queue_manager_tid > 0); TRACE_INFO("queue-manager's working is done"); dp_deinit_agent(); dp_queue_clear_all(&g_dp_queue_network_all); pthread_cond_destroy(&g_dp_queue_manager_cond); pthread_exit(NULL); return 0; } static int __dp_queue_manager_start() { if (g_dp_queue_manager_tid == 0 || pthread_kill(g_dp_queue_manager_tid, 0) == ESRCH) { TRACE_DEBUG("try to create queue-manager"); if (pthread_create(&g_dp_queue_manager_tid, NULL, __dp_queue_manager, NULL) != 0) { TRACE_ERROR("failed to create queue-manager"); return -1; } } return 0; } void dp_queue_manager_wake_up() { if (g_dp_queue_manager_tid > 0 && pthread_kill(g_dp_queue_manager_tid, 0) != ESRCH) { int locked = CLIENT_MUTEX_TRYLOCK(&g_dp_queue_manager_mutex); if (locked == 0 || locked == EBUSY) { pthread_cond_signal(&g_dp_queue_manager_cond); CLIENT_MUTEX_UNLOCK(&g_dp_queue_manager_mutex); } } else { __dp_queue_manager_start(); } } void dp_queue_manager_kill() { if (g_dp_queue_manager_tid > 0 && pthread_kill(g_dp_queue_manager_tid, 0) != ESRCH) { //send signal to queue thread void *status; pthread_t tid; tid = g_dp_queue_manager_tid; CLIENT_MUTEX_LOCK(&g_dp_queue_manager_mutex); g_dp_queue_manager_tid = 0; pthread_cond_signal(&g_dp_queue_manager_cond); CLIENT_MUTEX_UNLOCK(&g_dp_queue_manager_mutex); pthread_join(tid, &status); } }