summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjusung <jusung07.son@samsung.com>2021-11-17 17:37:14 +0900
committerjusung <jusung07.son@samsung.com>2021-11-17 17:41:28 +0900
commitebea41881cd8efd7e809673cc60a79c4ee137f37 (patch)
tree27d18c4d0493d7bb7bc0e03bd62f46942ccbda7c
parent59d895bc68472aec7fbc420fcba9ea62bc293fba (diff)
downloadmessage-port-ebea41881cd8efd7e809673cc60a79c4ee137f37.tar.gz
message-port-ebea41881cd8efd7e809673cc60a79c4ee137f37.tar.bz2
message-port-ebea41881cd8efd7e809673cc60a79c4ee137f37.zip
Fix send function implemenation
After queuing the message, message port checks whether writing is now possible or not. Change-Id: I549d66272891c59c0f2cb255f8300c898c5c44d3 Signed-off-by: jusung <jusung07.son@samsung.com>
-rw-r--r--src/message_port_common.c6
-rw-r--r--src/message_port_local.c82
2 files changed, 73 insertions, 15 deletions
diff --git a/src/message_port_common.c b/src/message_port_common.c
index aa601d1..80ea560 100644
--- a/src/message_port_common.c
+++ b/src/message_port_common.c
@@ -55,6 +55,7 @@ int write_socket(int fd,
unsigned int left = nbytes;
ssize_t nb;
int retry_cnt = 0;
+ int tmp_errno;
*sequence += 1;
*bytes_write = 0;
@@ -67,9 +68,10 @@ int write_socket(int fd,
retry_cnt++;
continue;
}
- LOGE("write_socket: ...error fd %d: errno %d\n", fd, errno);
+ tmp_errno = errno;
+ LOGE("write_socket: ...error fd %d: errno %d\n", fd, tmp_errno);
- if (errno == EWOULDBLOCK || errno == EAGAIN)
+ if (tmp_errno == EWOULDBLOCK || tmp_errno == EAGAIN)
return MESSAGE_PORT_ERROR_RESOURCE_UNAVAILABLE;
return MESSAGE_PORT_ERROR_IO_ERROR;
diff --git a/src/message_port_local.c b/src/message_port_local.c
index 6de9c79..c153444 100644
--- a/src/message_port_local.c
+++ b/src/message_port_local.c
@@ -24,6 +24,7 @@
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
+#include <poll.h>
#include <glib.h>
#include <gio/gio.h>
@@ -731,12 +732,39 @@ static bool __validate_delay_port_info(delay_port_info *delay_info)
return true;
}
+
/* LCOV_EXCL_START */
+static int __pop_delayed_message(port_list_info_s *port_info)
+{
+ delay_message_info_s *message;
+ int ret;
+
+ if (port_info->delayed_message_list == NULL)
+ return MESSAGE_PORT_ERROR_NONE;
+
+ message = g_list_nth_data(port_info->delayed_message_list, 0);
+ ret = __send_delayed_message(port_info->send_sock_fd, message);
+
+ if (ret != MESSAGE_PORT_ERROR_NONE)
+ return ret;
+
+ port_info->delayed_message_size -= message->size;
+ port_info->delayed_message_list = g_list_remove(port_info->delayed_message_list, message);
+ _LOGI("pop : delayed_message_size (%d), count(%d)",
+ port_info->delayed_message_size,
+ g_list_length(port_info->delayed_message_list));
+
+ __free_delay_message_info(message);
+
+ return MESSAGE_PORT_ERROR_NONE;
+
+}
+
static gboolean __process_delayed_message(gint fd, GIOCondition cond, gpointer data)
{
delay_port_info *delay_info = (delay_port_info *)data;
port_list_info_s *port_info = delay_info->port_info;
- delay_message_info_s *message;
+
int ret;
if (port_info == NULL)
@@ -755,9 +783,7 @@ static gboolean __process_delayed_message(gint fd, GIOCondition cond, gpointer d
message_port_unlock_mutex();
return G_SOURCE_REMOVE;
} else {
- message = g_list_nth_data(port_info->delayed_message_list, 0);
- ret = __send_delayed_message(port_info->send_sock_fd, message);
-
+ ret = __pop_delayed_message(port_info);
if (ret == MESSAGE_PORT_ERROR_RESOURCE_UNAVAILABLE) {
message_port_unlock_mutex();
return G_SOURCE_CONTINUE;
@@ -766,11 +792,6 @@ static gboolean __process_delayed_message(gint fd, GIOCondition cond, gpointer d
message_port_unlock_mutex();
return G_SOURCE_REMOVE;
}
-
- port_info->delayed_message_size -= message->size;
-
- port_info->delayed_message_list = g_list_remove(port_info->delayed_message_list, message);
- __free_delay_message_info(message);
}
message_port_unlock_mutex();
@@ -780,7 +801,7 @@ static gboolean __process_delayed_message(gint fd, GIOCondition cond, gpointer d
/* LCOV_EXCL_STOP */
/* LCOV_EXCL_START */
-static int __insert_delayed_message(port_list_info_s *port_info,
+static int __push_delayed_message(port_list_info_s *port_info,
int sequence,
bundle_raw *kb_data,
int data_len,
@@ -869,6 +890,22 @@ out:
}
/* LCOV_EXCL_STOP */
+static bool __can_write(int fd)
+{
+ struct pollfd fds[1];
+ fds[0].fd = fd;
+ fds[0].events = POLLOUT;
+ fds[0].revents = 0;
+ int ret = poll(fds, 1, 100);
+ if (ret == 0 || ret < 0) {
+ _LOGI("poll() is failed. fd(%d), ret(%d) error(%s)",
+ fd, ret, ret == 0 ? "timed out" : "");
+ return false;
+ }
+
+ return true;
+}
+
static int __message_port_send_async(port_list_info_s *port_info, bundle *kb, const char *local_port,
bool local_trusted, bool is_bidirection)
{
@@ -927,11 +964,30 @@ static int __message_port_send_async(port_list_info_s *port_info, bundle *kb, co
}
out:
+
if (ret == MESSAGE_PORT_ERROR_RESOURCE_UNAVAILABLE) {
- ret = __insert_delayed_message(port_info, sequence, kb_data, data_len, nb,
+ ret = __push_delayed_message(port_info, sequence, kb_data, data_len, nb,
local_port, local_trusted, is_bidirection);
- if (ret != MESSAGE_PORT_ERROR_NONE)
- ret = MESSAGE_PORT_ERROR_IO_ERROR;
+ if (ret != MESSAGE_PORT_ERROR_NONE) {
+ if (kb_data)
+ free(kb_data);
+ return MESSAGE_PORT_ERROR_IO_ERROR;
+ }
+
+ if (__can_write(port_info->send_sock_fd)) {
+ while (g_list_length(port_info->delayed_message_list) != 0) {
+ ret = __pop_delayed_message(port_info);
+ if (ret != MESSAGE_PORT_ERROR_NONE) {
+ if (ret == MESSAGE_PORT_ERROR_RESOURCE_UNAVAILABLE) {
+ ret = MESSAGE_PORT_ERROR_NONE;
+ } else if (ret == MESSAGE_PORT_ERROR_IO_ERROR) {
+ g_source_remove(port_info->delay_src_id);
+ port_info->delay_src_id = 0;
+ }
+ break;
+ }
+ }
+ }
}
if (kb_data)