summaryrefslogtreecommitdiff
path: root/daemons/cmirrord/cluster.c
diff options
context:
space:
mode:
Diffstat (limited to 'daemons/cmirrord/cluster.c')
-rw-r--r--daemons/cmirrord/cluster.c173
1 files changed, 135 insertions, 38 deletions
diff --git a/daemons/cmirrord/cluster.c b/daemons/cmirrord/cluster.c
index b6c925a..70c76c3 100644
--- a/daemons/cmirrord/cluster.c
+++ b/daemons/cmirrord/cluster.c
@@ -20,10 +20,12 @@
#include <corosync/cpg.h>
#include <errno.h>
-#include <openais/saAis.h>
-#include <openais/saCkpt.h>
#include <signal.h>
#include <unistd.h>
+#if CMIRROR_HAS_CHECKPOINT
+#include <openais/saAis.h>
+#include <openais/saCkpt.h>
+#endif
/* Open AIS error codes */
#define str_ais_error(x) \
@@ -62,13 +64,13 @@
RQ_TYPE((x) & ~DM_ULOG_RESPONSE)
static uint32_t my_cluster_id = 0xDEAD;
+#if CMIRROR_HAS_CHECKPOINT
static SaCkptHandleT ckpt_handle = 0;
static SaCkptCallbacksT callbacks = { 0, 0 };
static SaVersionT version = { 'B', 1, 1 };
+#endif
#define DEBUGGING_HISTORY 100
-//static char debugging[DEBUGGING_HISTORY][128];
-//static int idx = 0;
#define LOG_SPRINT(cc, f, arg...) do { \
cc->idx++; \
cc->idx = cc->idx % DEBUGGING_HISTORY; \
@@ -77,6 +79,7 @@ static SaVersionT version = { 'B', 1, 1 };
static int log_resp_rec = 0;
+#define RECOVERING_REGION_SECTION_SIZE 64
struct checkpoint_data {
uint32_t requester;
char uuid[CPG_MAX_NAME_LENGTH];
@@ -86,7 +89,7 @@ struct checkpoint_data {
char *clean_bits;
char *recovering_region;
struct checkpoint_data *next;
-};
+};
#define INVALID 0
#define VALID 1
@@ -128,7 +131,6 @@ static struct dm_list clog_cpg_list;
int cluster_send(struct clog_request *rq)
{
int r;
- int count=0;
int found = 0;
struct iovec iov;
struct clog_cpg *entry;
@@ -165,7 +167,10 @@ int cluster_send(struct clog_request *rq)
if (entry->cpg_state != VALID)
return -EINVAL;
+#if CMIRROR_HAS_CHECKPOINT
do {
+ int count = 0;
+
r = cpg_mcast_joined(entry->handle, CPG_TYPE_AGREED, &iov, 1);
if (r != SA_AIS_ERR_TRY_AGAIN)
break;
@@ -189,12 +194,14 @@ int cluster_send(struct clog_request *rq)
str_ais_error(r));
usleep(1000);
} while (1);
-
- if (r == CPG_OK)
+#else
+ r = cpg_mcast_joined(entry->handle, CPG_TYPE_AGREED, &iov, 1);
+#endif
+ if (r == CS_OK)
return 0;
/* error codes found in openais/cpg.h */
- LOG_ERROR("cpg_mcast_joined error: %s", str_ais_error(r));
+ LOG_ERROR("cpg_mcast_joined error: %d", r);
rq->u_rq.error = -EBADE;
return -EBADE;
@@ -419,6 +426,7 @@ static void free_checkpoint(struct checkpoint_data *cp)
free(cp);
}
+#if CMIRROR_HAS_CHECKPOINT
static int export_checkpoint(struct checkpoint_data *cp)
{
SaCkptCheckpointCreationAttributesT attr;
@@ -587,7 +595,54 @@ rr_create_retry:
return 0;
}
-static int import_checkpoint(struct clog_cpg *entry, int no_read)
+#else
+static int export_checkpoint(struct checkpoint_data *cp)
+{
+ int r, rq_size;
+ struct clog_request *rq;
+
+ rq_size = sizeof(*rq);
+ rq_size += RECOVERING_REGION_SECTION_SIZE;
+ rq_size += cp->bitmap_size * 2; /* clean|sync_bits */
+
+ rq = malloc(rq_size);
+ if (!rq) {
+ LOG_ERROR("export_checkpoint: "
+ "Unable to allocate transfer structs");
+ return -ENOMEM;
+ }
+ memset(rq, 0, rq_size);
+
+ dm_list_init(&rq->u.list);
+ rq->u_rq.request_type = DM_ULOG_CHECKPOINT_READY;
+ rq->originator = cp->requester;
+ strncpy(rq->u_rq.uuid, cp->uuid, CPG_MAX_NAME_LENGTH);
+ rq->u_rq.seq = my_cluster_id;
+ rq->u_rq.data_size = rq_size - sizeof(*rq);
+
+ /* Sync bits */
+ memcpy(rq->u_rq.data, cp->sync_bits, cp->bitmap_size);
+
+ /* Clean bits */
+ memcpy(rq->u_rq.data + cp->bitmap_size, cp->clean_bits, cp->bitmap_size);
+
+ /* Recovering region */
+ memcpy(rq->u_rq.data + (cp->bitmap_size * 2), cp->recovering_region,
+ strlen(cp->recovering_region));
+
+ r = cluster_send(rq);
+ if (r)
+ LOG_ERROR("Failed to send checkpoint ready notice: %s",
+ strerror(-r));
+
+ free(rq);
+ return 0;
+}
+#endif /* CMIRROR_HAS_CHECKPOINT */
+
+#if CMIRROR_HAS_CHECKPOINT
+static int import_checkpoint(struct clog_cpg *entry, int no_read,
+ struct clog_request *rq __attribute__((unused)))
{
int rtn = 0;
SaCkptCheckpointHandleT h;
@@ -619,6 +674,7 @@ open_retry:
if (rv != SA_AIS_OK) {
LOG_ERROR("[%s] Failed to open checkpoint: %s",
SHORT_UUID(entry->name.value), str_ais_error(rv));
+ free(bitmap);
return -EIO; /* FIXME: better error */
}
@@ -647,6 +703,7 @@ init_retry:
if (rv != SA_AIS_OK) {
LOG_ERROR("[%s] Sync checkpoint section creation failed: %s",
SHORT_UUID(entry->name.value), str_ais_error(rv));
+ free(bitmap);
return -EIO; /* FIXME: better error */
}
@@ -740,6 +797,32 @@ no_read:
return rtn;
}
+#else
+static int import_checkpoint(struct clog_cpg *entry, int no_read,
+ struct clog_request *rq)
+{
+ int bitmap_size;
+
+ bitmap_size = (rq->u_rq.data_size - RECOVERING_REGION_SECTION_SIZE) / 2;
+ if (bitmap_size < 0) {
+ LOG_ERROR("Checkpoint has invalid payload size.");
+ return -EINVAL;
+ }
+
+ if (pull_state(entry->name.value, entry->luid, "sync_bits",
+ rq->u_rq.data, bitmap_size) ||
+ pull_state(entry->name.value, entry->luid, "clean_bits",
+ rq->u_rq.data + bitmap_size, bitmap_size) ||
+ pull_state(entry->name.value, entry->luid, "recovering_region",
+ rq->u_rq.data + (bitmap_size * 2),
+ RECOVERING_REGION_SECTION_SIZE)) {
+ LOG_ERROR("Error loading bitmap state from checkpoint.");
+ return -EIO;
+ }
+ return 0;
+}
+#endif /* CMIRROR_HAS_CHECKPOINT */
+
static void do_checkpoints(struct clog_cpg *entry, int leaving)
{
struct checkpoint_data *cp;
@@ -827,10 +910,11 @@ static int resend_requests(struct clog_cpg *entry)
rq->u_rq.seq);
rq->u_rq.data_size = 0;
- kernel_send(&rq->u_rq);
-
+ if (kernel_send(&rq->u_rq))
+ LOG_ERROR("Failed to respond to kernel [%s]",
+ RQ_TYPE(rq->u_rq.request_type));
break;
-
+
default:
/*
* If an action or a response is required, then
@@ -857,13 +941,13 @@ static int resend_requests(struct clog_cpg *entry)
static int do_cluster_work(void *data __attribute__((unused)))
{
- int r = SA_AIS_OK;
+ int r = CS_OK;
struct clog_cpg *entry, *tmp;
dm_list_iterate_items_safe(entry, tmp, &clog_cpg_list) {
- r = cpg_dispatch(entry->handle, CPG_DISPATCH_ALL);
- if (r != SA_AIS_OK)
- LOG_ERROR("cpg_dispatch failed: %s", str_ais_error(r));
+ r = cpg_dispatch(entry->handle, CS_DISPATCH_ALL);
+ if (r != CS_OK)
+ LOG_ERROR("cpg_dispatch failed: %d", r);
if (entry->free_me) {
free(entry);
@@ -874,7 +958,7 @@ static int do_cluster_work(void *data __attribute__((unused)))
resend_requests(entry);
}
- return (r == SA_AIS_OK) ? 0 : -1; /* FIXME: good error number? */
+ return (r == CS_OK) ? 0 : -1; /* FIXME: good error number? */
}
static int flush_startup_list(struct clog_cpg *entry)
@@ -939,16 +1023,19 @@ static void cpg_message_callback(cpg_handle_t handle, const struct cpg_name *gna
struct clog_request *tmp_rq;
struct clog_cpg *match;
- if (clog_request_from_network(rq, msg_len) < 0)
- /* Error message comes from 'clog_request_from_network' */
- return;
-
match = find_clog_cpg(handle);
if (!match) {
LOG_ERROR("Unable to find clog_cpg for cluster message");
return;
}
+ /*
+ * Perform necessary endian and version compatibility conversions
+ */
+ if (clog_request_from_network(rq, msg_len) < 0)
+ /* Any error messages come from 'clog_request_from_network' */
+ return;
+
if ((nodeid == my_cluster_id) &&
!(rq->u_rq.request_type & DM_ULOG_RESPONSE) &&
(rq->u_rq.request_type != DM_ULOG_RESUME) &&
@@ -967,7 +1054,7 @@ static void cpg_message_callback(cpg_handle_t handle, const struct cpg_name *gna
}
memcpy(tmp_rq, rq, sizeof(*rq) + rq->u_rq.data_size);
dm_list_init(&tmp_rq->u.list);
- dm_list_add( &match->working_list, &tmp_rq->u.list);
+ dm_list_add(&match->working_list, &tmp_rq->u.list);
}
if (rq->u_rq.request_type == DM_ULOG_POSTSUSPEND) {
@@ -1020,7 +1107,8 @@ static void cpg_message_callback(cpg_handle_t handle, const struct cpg_name *gna
/* Redundant checkpoints ignored if match->valid */
LOG_SPRINT(match, "[%s] CHECKPOINT_READY notification from %u",
SHORT_UUID(rq->u_rq.uuid), nodeid);
- if (import_checkpoint(match, (match->state != INVALID))) {
+ if (import_checkpoint(match,
+ (match->state != INVALID), rq)) {
LOG_SPRINT(match,
"[%s] Failed to import checkpoint from %u",
SHORT_UUID(rq->u_rq.uuid), nodeid);
@@ -1144,11 +1232,11 @@ out:
_RQ_TYPE(rq->u_rq.request_type),
rq->originator, (response) ? "YES" : "NO");
else
- LOG_SPRINT(match, "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s, RSPR=%u",
+ LOG_SPRINT(match, "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s, RSPR=%u, error=%d",
rq->u_rq.seq, SHORT_UUID(rq->u_rq.uuid),
_RQ_TYPE(rq->u_rq.request_type),
rq->originator, (response) ? "YES" : "NO",
- nodeid);
+ nodeid, rq->u_rq.error);
}
}
@@ -1161,7 +1249,7 @@ static void cpg_join_callback(struct clog_cpg *match,
uint32_t my_pid = (uint32_t)getpid();
uint32_t lowest = match->lowest_id;
struct clog_request *rq;
- char dbuf[32];
+ char dbuf[32] = { 0 };
/* Assign my_cluster_id */
if ((my_cluster_id == 0xDEAD) && (joined->pid == my_pid))
@@ -1177,7 +1265,6 @@ static void cpg_join_callback(struct clog_cpg *match,
if (joined->nodeid == my_cluster_id)
goto out;
- memset(dbuf, 0, sizeof(dbuf));
for (i = 0; i < member_list_entries - 1; i++)
sprintf(dbuf+strlen(dbuf), "%u-", member_list[i].nodeid);
sprintf(dbuf+strlen(dbuf), "(%u)", joined->nodeid);
@@ -1259,7 +1346,9 @@ static void cpg_leave_callback(struct clog_cpg *match,
dm_list_del(&rq->u.list);
if (rq->u_rq.request_type == DM_ULOG_POSTSUSPEND)
- kernel_send(&rq->u_rq);
+ if (kernel_send(&rq->u_rq))
+ LOG_ERROR("Failed to respond to kernel [%s]",
+ RQ_TYPE(rq->u_rq.request_type));
free(rq);
}
@@ -1268,7 +1357,7 @@ static void cpg_leave_callback(struct clog_cpg *match,
match->free_me = 1;
match->lowest_id = 0xDEAD;
match->state = INVALID;
- }
+ }
/* Remove any pending checkpoints for the leaving node. */
for (p_cp = NULL, c_cp = match->checkpoint_list;
@@ -1324,7 +1413,7 @@ static void cpg_leave_callback(struct clog_cpg *match,
left->nodeid);
return;
}
-
+
match->lowest_id = member_list[0].nodeid;
for (i = 0; i < member_list_entries; i++)
if (match->lowest_id > member_list[i].nodeid)
@@ -1413,6 +1502,7 @@ cpg_callbacks_t cpg_callbacks = {
*/
static int remove_checkpoint(struct clog_cpg *entry)
{
+#if CMIRROR_HAS_CHECKPOINT
int len;
SaNameT name;
SaAisErrorT rv;
@@ -1442,7 +1532,7 @@ unlink_retry:
usleep(1000);
goto unlink_retry;
}
-
+
if (rv != SA_AIS_OK) {
LOG_ERROR("[%s] Failed to unlink checkpoint: %s",
SHORT_UUID(entry->name.value), str_ais_error(rv));
@@ -1452,6 +1542,10 @@ unlink_retry:
saCkptCheckpointClose(h);
return 1;
+#else
+ /* No checkpoint to remove, so 'success' */
+ return 1;
+#endif
}
int create_cluster_cpg(char *uuid, uint64_t luid)
@@ -1493,14 +1587,14 @@ int create_cluster_cpg(char *uuid, uint64_t luid)
SHORT_UUID(new->name.value));
r = cpg_initialize(&new->handle, &cpg_callbacks);
- if (r != SA_AIS_OK) {
+ if (r != CS_OK) {
LOG_ERROR("cpg_initialize failed: Cannot join cluster");
free(new);
return -EPERM;
}
r = cpg_join(new->handle, &new->name);
- if (r != SA_AIS_OK) {
+ if (r != CS_OK) {
LOG_ERROR("cpg_join failed: Cannot join cluster");
free(new);
return -EPERM;
@@ -1541,7 +1635,7 @@ static int _destroy_cluster_cpg(struct clog_cpg *del)
{
int r;
int state;
-
+
LOG_COND(log_resend_requests, "[%s] I am leaving.2.....",
SHORT_UUID(del->name.value));
@@ -1573,7 +1667,7 @@ static int _destroy_cluster_cpg(struct clog_cpg *del)
abort_startup(del);
r = cpg_leave(del->handle, &del->name);
- if (r != CPG_OK)
+ if (r != CS_OK)
LOG_ERROR("Error leaving CPG!");
return 0;
}
@@ -1591,24 +1685,27 @@ int destroy_cluster_cpg(char *uuid)
int init_cluster(void)
{
+#if CMIRROR_HAS_CHECKPOINT
SaAisErrorT rv;
- dm_list_init(&clog_cpg_list);
rv = saCkptInitialize(&ckpt_handle, &callbacks, &version);
if (rv != SA_AIS_OK)
return EXIT_CLUSTER_CKPT_INIT;
-
+#endif
+ dm_list_init(&clog_cpg_list);
return 0;
}
void cleanup_cluster(void)
{
+#if CMIRROR_HAS_CHECKPOINT
SaAisErrorT err;
err = saCkptFinalize(ckpt_handle);
if (err != SA_AIS_OK)
LOG_ERROR("Failed to finalize checkpoint handle");
+#endif
}
void cluster_debug(void)