diff options
Diffstat (limited to 'daemons/cmirrord/cluster.c')
-rw-r--r-- | daemons/cmirrord/cluster.c | 173 |
1 files changed, 135 insertions, 38 deletions
diff --git a/daemons/cmirrord/cluster.c b/daemons/cmirrord/cluster.c index b6c925a..70c76c3 100644 --- a/daemons/cmirrord/cluster.c +++ b/daemons/cmirrord/cluster.c @@ -20,10 +20,12 @@ #include <corosync/cpg.h> #include <errno.h> -#include <openais/saAis.h> -#include <openais/saCkpt.h> #include <signal.h> #include <unistd.h> +#if CMIRROR_HAS_CHECKPOINT +#include <openais/saAis.h> +#include <openais/saCkpt.h> +#endif /* Open AIS error codes */ #define str_ais_error(x) \ @@ -62,13 +64,13 @@ RQ_TYPE((x) & ~DM_ULOG_RESPONSE) static uint32_t my_cluster_id = 0xDEAD; +#if CMIRROR_HAS_CHECKPOINT static SaCkptHandleT ckpt_handle = 0; static SaCkptCallbacksT callbacks = { 0, 0 }; static SaVersionT version = { 'B', 1, 1 }; +#endif #define DEBUGGING_HISTORY 100 -//static char debugging[DEBUGGING_HISTORY][128]; -//static int idx = 0; #define LOG_SPRINT(cc, f, arg...) do { \ cc->idx++; \ cc->idx = cc->idx % DEBUGGING_HISTORY; \ @@ -77,6 +79,7 @@ static SaVersionT version = { 'B', 1, 1 }; static int log_resp_rec = 0; +#define RECOVERING_REGION_SECTION_SIZE 64 struct checkpoint_data { uint32_t requester; char uuid[CPG_MAX_NAME_LENGTH]; @@ -86,7 +89,7 @@ struct checkpoint_data { char *clean_bits; char *recovering_region; struct checkpoint_data *next; -}; +}; #define INVALID 0 #define VALID 1 @@ -128,7 +131,6 @@ static struct dm_list clog_cpg_list; int cluster_send(struct clog_request *rq) { int r; - int count=0; int found = 0; struct iovec iov; struct clog_cpg *entry; @@ -165,7 +167,10 @@ int cluster_send(struct clog_request *rq) if (entry->cpg_state != VALID) return -EINVAL; +#if CMIRROR_HAS_CHECKPOINT do { + int count = 0; + r = cpg_mcast_joined(entry->handle, CPG_TYPE_AGREED, &iov, 1); if (r != SA_AIS_ERR_TRY_AGAIN) break; @@ -189,12 +194,14 @@ int cluster_send(struct clog_request *rq) str_ais_error(r)); usleep(1000); } while (1); - - if (r == CPG_OK) +#else + r = cpg_mcast_joined(entry->handle, CPG_TYPE_AGREED, &iov, 1); +#endif + if (r == CS_OK) return 0; /* error codes found in openais/cpg.h */ - LOG_ERROR("cpg_mcast_joined error: %s", str_ais_error(r)); + LOG_ERROR("cpg_mcast_joined error: %d", r); rq->u_rq.error = -EBADE; return -EBADE; @@ -419,6 +426,7 @@ static void free_checkpoint(struct checkpoint_data *cp) free(cp); } +#if CMIRROR_HAS_CHECKPOINT static int export_checkpoint(struct checkpoint_data *cp) { SaCkptCheckpointCreationAttributesT attr; @@ -587,7 +595,54 @@ rr_create_retry: return 0; } -static int import_checkpoint(struct clog_cpg *entry, int no_read) +#else +static int export_checkpoint(struct checkpoint_data *cp) +{ + int r, rq_size; + struct clog_request *rq; + + rq_size = sizeof(*rq); + rq_size += RECOVERING_REGION_SECTION_SIZE; + rq_size += cp->bitmap_size * 2; /* clean|sync_bits */ + + rq = malloc(rq_size); + if (!rq) { + LOG_ERROR("export_checkpoint: " + "Unable to allocate transfer structs"); + return -ENOMEM; + } + memset(rq, 0, rq_size); + + dm_list_init(&rq->u.list); + rq->u_rq.request_type = DM_ULOG_CHECKPOINT_READY; + rq->originator = cp->requester; + strncpy(rq->u_rq.uuid, cp->uuid, CPG_MAX_NAME_LENGTH); + rq->u_rq.seq = my_cluster_id; + rq->u_rq.data_size = rq_size - sizeof(*rq); + + /* Sync bits */ + memcpy(rq->u_rq.data, cp->sync_bits, cp->bitmap_size); + + /* Clean bits */ + memcpy(rq->u_rq.data + cp->bitmap_size, cp->clean_bits, cp->bitmap_size); + + /* Recovering region */ + memcpy(rq->u_rq.data + (cp->bitmap_size * 2), cp->recovering_region, + strlen(cp->recovering_region)); + + r = cluster_send(rq); + if (r) + LOG_ERROR("Failed to send checkpoint ready notice: %s", + strerror(-r)); + + free(rq); + return 0; +} +#endif /* CMIRROR_HAS_CHECKPOINT */ + +#if CMIRROR_HAS_CHECKPOINT +static int import_checkpoint(struct clog_cpg *entry, int no_read, + struct clog_request *rq __attribute__((unused))) { int rtn = 0; SaCkptCheckpointHandleT h; @@ -619,6 +674,7 @@ open_retry: if (rv != SA_AIS_OK) { LOG_ERROR("[%s] Failed to open checkpoint: %s", SHORT_UUID(entry->name.value), str_ais_error(rv)); + free(bitmap); return -EIO; /* FIXME: better error */ } @@ -647,6 +703,7 @@ init_retry: if (rv != SA_AIS_OK) { LOG_ERROR("[%s] Sync checkpoint section creation failed: %s", SHORT_UUID(entry->name.value), str_ais_error(rv)); + free(bitmap); return -EIO; /* FIXME: better error */ } @@ -740,6 +797,32 @@ no_read: return rtn; } +#else +static int import_checkpoint(struct clog_cpg *entry, int no_read, + struct clog_request *rq) +{ + int bitmap_size; + + bitmap_size = (rq->u_rq.data_size - RECOVERING_REGION_SECTION_SIZE) / 2; + if (bitmap_size < 0) { + LOG_ERROR("Checkpoint has invalid payload size."); + return -EINVAL; + } + + if (pull_state(entry->name.value, entry->luid, "sync_bits", + rq->u_rq.data, bitmap_size) || + pull_state(entry->name.value, entry->luid, "clean_bits", + rq->u_rq.data + bitmap_size, bitmap_size) || + pull_state(entry->name.value, entry->luid, "recovering_region", + rq->u_rq.data + (bitmap_size * 2), + RECOVERING_REGION_SECTION_SIZE)) { + LOG_ERROR("Error loading bitmap state from checkpoint."); + return -EIO; + } + return 0; +} +#endif /* CMIRROR_HAS_CHECKPOINT */ + static void do_checkpoints(struct clog_cpg *entry, int leaving) { struct checkpoint_data *cp; @@ -827,10 +910,11 @@ static int resend_requests(struct clog_cpg *entry) rq->u_rq.seq); rq->u_rq.data_size = 0; - kernel_send(&rq->u_rq); - + if (kernel_send(&rq->u_rq)) + LOG_ERROR("Failed to respond to kernel [%s]", + RQ_TYPE(rq->u_rq.request_type)); break; - + default: /* * If an action or a response is required, then @@ -857,13 +941,13 @@ static int resend_requests(struct clog_cpg *entry) static int do_cluster_work(void *data __attribute__((unused))) { - int r = SA_AIS_OK; + int r = CS_OK; struct clog_cpg *entry, *tmp; dm_list_iterate_items_safe(entry, tmp, &clog_cpg_list) { - r = cpg_dispatch(entry->handle, CPG_DISPATCH_ALL); - if (r != SA_AIS_OK) - LOG_ERROR("cpg_dispatch failed: %s", str_ais_error(r)); + r = cpg_dispatch(entry->handle, CS_DISPATCH_ALL); + if (r != CS_OK) + LOG_ERROR("cpg_dispatch failed: %d", r); if (entry->free_me) { free(entry); @@ -874,7 +958,7 @@ static int do_cluster_work(void *data __attribute__((unused))) resend_requests(entry); } - return (r == SA_AIS_OK) ? 0 : -1; /* FIXME: good error number? */ + return (r == CS_OK) ? 0 : -1; /* FIXME: good error number? */ } static int flush_startup_list(struct clog_cpg *entry) @@ -939,16 +1023,19 @@ static void cpg_message_callback(cpg_handle_t handle, const struct cpg_name *gna struct clog_request *tmp_rq; struct clog_cpg *match; - if (clog_request_from_network(rq, msg_len) < 0) - /* Error message comes from 'clog_request_from_network' */ - return; - match = find_clog_cpg(handle); if (!match) { LOG_ERROR("Unable to find clog_cpg for cluster message"); return; } + /* + * Perform necessary endian and version compatibility conversions + */ + if (clog_request_from_network(rq, msg_len) < 0) + /* Any error messages come from 'clog_request_from_network' */ + return; + if ((nodeid == my_cluster_id) && !(rq->u_rq.request_type & DM_ULOG_RESPONSE) && (rq->u_rq.request_type != DM_ULOG_RESUME) && @@ -967,7 +1054,7 @@ static void cpg_message_callback(cpg_handle_t handle, const struct cpg_name *gna } memcpy(tmp_rq, rq, sizeof(*rq) + rq->u_rq.data_size); dm_list_init(&tmp_rq->u.list); - dm_list_add( &match->working_list, &tmp_rq->u.list); + dm_list_add(&match->working_list, &tmp_rq->u.list); } if (rq->u_rq.request_type == DM_ULOG_POSTSUSPEND) { @@ -1020,7 +1107,8 @@ static void cpg_message_callback(cpg_handle_t handle, const struct cpg_name *gna /* Redundant checkpoints ignored if match->valid */ LOG_SPRINT(match, "[%s] CHECKPOINT_READY notification from %u", SHORT_UUID(rq->u_rq.uuid), nodeid); - if (import_checkpoint(match, (match->state != INVALID))) { + if (import_checkpoint(match, + (match->state != INVALID), rq)) { LOG_SPRINT(match, "[%s] Failed to import checkpoint from %u", SHORT_UUID(rq->u_rq.uuid), nodeid); @@ -1144,11 +1232,11 @@ out: _RQ_TYPE(rq->u_rq.request_type), rq->originator, (response) ? "YES" : "NO"); else - LOG_SPRINT(match, "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s, RSPR=%u", + LOG_SPRINT(match, "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s, RSPR=%u, error=%d", rq->u_rq.seq, SHORT_UUID(rq->u_rq.uuid), _RQ_TYPE(rq->u_rq.request_type), rq->originator, (response) ? "YES" : "NO", - nodeid); + nodeid, rq->u_rq.error); } } @@ -1161,7 +1249,7 @@ static void cpg_join_callback(struct clog_cpg *match, uint32_t my_pid = (uint32_t)getpid(); uint32_t lowest = match->lowest_id; struct clog_request *rq; - char dbuf[32]; + char dbuf[32] = { 0 }; /* Assign my_cluster_id */ if ((my_cluster_id == 0xDEAD) && (joined->pid == my_pid)) @@ -1177,7 +1265,6 @@ static void cpg_join_callback(struct clog_cpg *match, if (joined->nodeid == my_cluster_id) goto out; - memset(dbuf, 0, sizeof(dbuf)); for (i = 0; i < member_list_entries - 1; i++) sprintf(dbuf+strlen(dbuf), "%u-", member_list[i].nodeid); sprintf(dbuf+strlen(dbuf), "(%u)", joined->nodeid); @@ -1259,7 +1346,9 @@ static void cpg_leave_callback(struct clog_cpg *match, dm_list_del(&rq->u.list); if (rq->u_rq.request_type == DM_ULOG_POSTSUSPEND) - kernel_send(&rq->u_rq); + if (kernel_send(&rq->u_rq)) + LOG_ERROR("Failed to respond to kernel [%s]", + RQ_TYPE(rq->u_rq.request_type)); free(rq); } @@ -1268,7 +1357,7 @@ static void cpg_leave_callback(struct clog_cpg *match, match->free_me = 1; match->lowest_id = 0xDEAD; match->state = INVALID; - } + } /* Remove any pending checkpoints for the leaving node. */ for (p_cp = NULL, c_cp = match->checkpoint_list; @@ -1324,7 +1413,7 @@ static void cpg_leave_callback(struct clog_cpg *match, left->nodeid); return; } - + match->lowest_id = member_list[0].nodeid; for (i = 0; i < member_list_entries; i++) if (match->lowest_id > member_list[i].nodeid) @@ -1413,6 +1502,7 @@ cpg_callbacks_t cpg_callbacks = { */ static int remove_checkpoint(struct clog_cpg *entry) { +#if CMIRROR_HAS_CHECKPOINT int len; SaNameT name; SaAisErrorT rv; @@ -1442,7 +1532,7 @@ unlink_retry: usleep(1000); goto unlink_retry; } - + if (rv != SA_AIS_OK) { LOG_ERROR("[%s] Failed to unlink checkpoint: %s", SHORT_UUID(entry->name.value), str_ais_error(rv)); @@ -1452,6 +1542,10 @@ unlink_retry: saCkptCheckpointClose(h); return 1; +#else + /* No checkpoint to remove, so 'success' */ + return 1; +#endif } int create_cluster_cpg(char *uuid, uint64_t luid) @@ -1493,14 +1587,14 @@ int create_cluster_cpg(char *uuid, uint64_t luid) SHORT_UUID(new->name.value)); r = cpg_initialize(&new->handle, &cpg_callbacks); - if (r != SA_AIS_OK) { + if (r != CS_OK) { LOG_ERROR("cpg_initialize failed: Cannot join cluster"); free(new); return -EPERM; } r = cpg_join(new->handle, &new->name); - if (r != SA_AIS_OK) { + if (r != CS_OK) { LOG_ERROR("cpg_join failed: Cannot join cluster"); free(new); return -EPERM; @@ -1541,7 +1635,7 @@ static int _destroy_cluster_cpg(struct clog_cpg *del) { int r; int state; - + LOG_COND(log_resend_requests, "[%s] I am leaving.2.....", SHORT_UUID(del->name.value)); @@ -1573,7 +1667,7 @@ static int _destroy_cluster_cpg(struct clog_cpg *del) abort_startup(del); r = cpg_leave(del->handle, &del->name); - if (r != CPG_OK) + if (r != CS_OK) LOG_ERROR("Error leaving CPG!"); return 0; } @@ -1591,24 +1685,27 @@ int destroy_cluster_cpg(char *uuid) int init_cluster(void) { +#if CMIRROR_HAS_CHECKPOINT SaAisErrorT rv; - dm_list_init(&clog_cpg_list); rv = saCkptInitialize(&ckpt_handle, &callbacks, &version); if (rv != SA_AIS_OK) return EXIT_CLUSTER_CKPT_INIT; - +#endif + dm_list_init(&clog_cpg_list); return 0; } void cleanup_cluster(void) { +#if CMIRROR_HAS_CHECKPOINT SaAisErrorT err; err = saCkptFinalize(ckpt_handle); if (err != SA_AIS_OK) LOG_ERROR("Failed to finalize checkpoint handle"); +#endif } void cluster_debug(void) |