diff options
Diffstat (limited to 'block/quorum.c')
-rw-r--r-- | block/quorum.c | 1227 |
1 files changed, 1227 insertions, 0 deletions
diff --git a/block/quorum.c b/block/quorum.c new file mode 100644 index 000000000..b10fc2089 --- /dev/null +++ b/block/quorum.c @@ -0,0 +1,1227 @@ +/* + * Quorum Block filter + * + * Copyright (C) 2012-2014 Nodalink, EURL. + * + * Author: + * BenoƮt Canet <benoit.canet@irqsave.net> + * + * Based on the design and code of blkverify.c (Copyright (C) 2010 IBM, Corp) + * and blkmirror.c (Copyright (C) 2011 Red Hat, Inc). + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + */ + +#include "qemu/osdep.h" +#include "qemu/cutils.h" +#include "qemu/module.h" +#include "qemu/option.h" +#include "block/block_int.h" +#include "block/qdict.h" +#include "qapi/error.h" +#include "qapi/qapi-events-block.h" +#include "qapi/qmp/qdict.h" +#include "qapi/qmp/qerror.h" +#include "qapi/qmp/qlist.h" +#include "qapi/qmp/qstring.h" +#include "crypto/hash.h" + +#define HASH_LENGTH 32 + +#define INDEXSTR_LEN 32 + +#define QUORUM_OPT_VOTE_THRESHOLD "vote-threshold" +#define QUORUM_OPT_BLKVERIFY "blkverify" +#define QUORUM_OPT_REWRITE "rewrite-corrupted" +#define QUORUM_OPT_READ_PATTERN "read-pattern" + +/* This union holds a vote hash value */ +typedef union QuorumVoteValue { + uint8_t h[HASH_LENGTH]; /* SHA-256 hash */ + int64_t l; /* simpler 64 bits hash */ +} QuorumVoteValue; + +/* A vote item */ +typedef struct QuorumVoteItem { + int index; + QLIST_ENTRY(QuorumVoteItem) next; +} QuorumVoteItem; + +/* this structure is a vote version. A version is the set of votes sharing the + * same vote value. + * The set of votes will be tracked with the items field and its cardinality is + * vote_count. + */ +typedef struct QuorumVoteVersion { + QuorumVoteValue value; + int index; + int vote_count; + QLIST_HEAD(, QuorumVoteItem) items; + QLIST_ENTRY(QuorumVoteVersion) next; +} QuorumVoteVersion; + +/* this structure holds a group of vote versions together */ +typedef struct QuorumVotes { + QLIST_HEAD(, QuorumVoteVersion) vote_list; + bool (*compare)(QuorumVoteValue *a, QuorumVoteValue *b); +} QuorumVotes; + +/* the following structure holds the state of one quorum instance */ +typedef struct BDRVQuorumState { + BdrvChild **children; /* children BlockDriverStates */ + int num_children; /* children count */ + unsigned next_child_index; /* the index of the next child that should + * be added + */ + int threshold; /* if less than threshold children reads gave the + * same result a quorum error occurs. + */ + bool is_blkverify; /* true if the driver is in blkverify mode + * Writes are mirrored on two children devices. + * On reads the two children devices' contents are + * compared and if a difference is spotted its + * location is printed and the code aborts. + * It is useful to debug other block drivers by + * comparing them with a reference one. + */ + bool rewrite_corrupted;/* true if the driver must rewrite-on-read corrupted + * block if Quorum is reached. + */ + + QuorumReadPattern read_pattern; +} BDRVQuorumState; + +typedef struct QuorumAIOCB QuorumAIOCB; + +/* Quorum will create one instance of the following structure per operation it + * performs on its children. + * So for each read/write operation coming from the upper layer there will be + * $children_count QuorumChildRequest. + */ +typedef struct QuorumChildRequest { + BlockDriverState *bs; + QEMUIOVector qiov; + uint8_t *buf; + int ret; + QuorumAIOCB *parent; +} QuorumChildRequest; + +/* Quorum will use the following structure to track progress of each read/write + * operation received by the upper layer. + * This structure hold pointers to the QuorumChildRequest structures instances + * used to do operations on each children and track overall progress. + */ +struct QuorumAIOCB { + BlockDriverState *bs; + Coroutine *co; + + /* Request metadata */ + uint64_t offset; + uint64_t bytes; + int flags; + + QEMUIOVector *qiov; /* calling IOV */ + + QuorumChildRequest *qcrs; /* individual child requests */ + int count; /* number of completed AIOCB */ + int success_count; /* number of successfully completed AIOCB */ + + int rewrite_count; /* number of replica to rewrite: count down to + * zero once writes are fired + */ + + QuorumVotes votes; + + bool is_read; + int vote_ret; + int children_read; /* how many children have been read from */ +}; + +typedef struct QuorumCo { + QuorumAIOCB *acb; + int idx; +} QuorumCo; + +static void quorum_aio_finalize(QuorumAIOCB *acb) +{ + g_free(acb->qcrs); + g_free(acb); +} + +static bool quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b) +{ + return !memcmp(a->h, b->h, HASH_LENGTH); +} + +static bool quorum_64bits_compare(QuorumVoteValue *a, QuorumVoteValue *b) +{ + return a->l == b->l; +} + +static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs, + QEMUIOVector *qiov, + uint64_t offset, + uint64_t bytes, + int flags) +{ + BDRVQuorumState *s = bs->opaque; + QuorumAIOCB *acb = g_new(QuorumAIOCB, 1); + int i; + + *acb = (QuorumAIOCB) { + .co = qemu_coroutine_self(), + .bs = bs, + .offset = offset, + .bytes = bytes, + .flags = flags, + .qiov = qiov, + .votes.compare = quorum_sha256_compare, + .votes.vote_list = QLIST_HEAD_INITIALIZER(acb.votes.vote_list), + }; + + acb->qcrs = g_new0(QuorumChildRequest, s->num_children); + for (i = 0; i < s->num_children; i++) { + acb->qcrs[i].buf = NULL; + acb->qcrs[i].ret = 0; + acb->qcrs[i].parent = acb; + } + + return acb; +} + +static void quorum_report_bad(QuorumOpType type, uint64_t offset, + uint64_t bytes, char *node_name, int ret) +{ + const char *msg = NULL; + int64_t start_sector = offset / BDRV_SECTOR_SIZE; + int64_t end_sector = DIV_ROUND_UP(offset + bytes, BDRV_SECTOR_SIZE); + + if (ret < 0) { + msg = strerror(-ret); + } + + qapi_event_send_quorum_report_bad(type, !!msg, msg, node_name, start_sector, + end_sector - start_sector); +} + +static void quorum_report_failure(QuorumAIOCB *acb) +{ + const char *reference = bdrv_get_device_or_node_name(acb->bs); + int64_t start_sector = acb->offset / BDRV_SECTOR_SIZE; + int64_t end_sector = DIV_ROUND_UP(acb->offset + acb->bytes, + BDRV_SECTOR_SIZE); + + qapi_event_send_quorum_failure(reference, start_sector, + end_sector - start_sector); +} + +static int quorum_vote_error(QuorumAIOCB *acb); + +static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb) +{ + BDRVQuorumState *s = acb->bs->opaque; + + if (acb->success_count < s->threshold) { + acb->vote_ret = quorum_vote_error(acb); + quorum_report_failure(acb); + return true; + } + + return false; +} + +static int read_fifo_child(QuorumAIOCB *acb); + +static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source) +{ + int i; + assert(dest->niov == source->niov); + assert(dest->size == source->size); + for (i = 0; i < source->niov; i++) { + assert(dest->iov[i].iov_len == source->iov[i].iov_len); + memcpy(dest->iov[i].iov_base, + source->iov[i].iov_base, + source->iov[i].iov_len); + } +} + +static void quorum_report_bad_acb(QuorumChildRequest *sacb, int ret) +{ + QuorumAIOCB *acb = sacb->parent; + QuorumOpType type = acb->is_read ? QUORUM_OP_TYPE_READ : QUORUM_OP_TYPE_WRITE; + quorum_report_bad(type, acb->offset, acb->bytes, sacb->bs->node_name, ret); +} + +static void quorum_report_bad_versions(BDRVQuorumState *s, + QuorumAIOCB *acb, + QuorumVoteValue *value) +{ + QuorumVoteVersion *version; + QuorumVoteItem *item; + + QLIST_FOREACH(version, &acb->votes.vote_list, next) { + if (acb->votes.compare(&version->value, value)) { + continue; + } + QLIST_FOREACH(item, &version->items, next) { + quorum_report_bad(QUORUM_OP_TYPE_READ, acb->offset, acb->bytes, + s->children[item->index]->bs->node_name, 0); + } + } +} + +static void quorum_rewrite_entry(void *opaque) +{ + QuorumCo *co = opaque; + QuorumAIOCB *acb = co->acb; + BDRVQuorumState *s = acb->bs->opaque; + + /* Ignore any errors, it's just a correction attempt for already + * corrupted data. + * Mask out BDRV_REQ_WRITE_UNCHANGED because this overwrites the + * area with different data from the other children. */ + bdrv_co_pwritev(s->children[co->idx], acb->offset, acb->bytes, + acb->qiov, acb->flags & ~BDRV_REQ_WRITE_UNCHANGED); + + /* Wake up the caller after the last rewrite */ + acb->rewrite_count--; + if (!acb->rewrite_count) { + qemu_coroutine_enter_if_inactive(acb->co); + } +} + +static bool quorum_rewrite_bad_versions(QuorumAIOCB *acb, + QuorumVoteValue *value) +{ + QuorumVoteVersion *version; + QuorumVoteItem *item; + int count = 0; + + /* first count the number of bad versions: done first to avoid concurrency + * issues. + */ + QLIST_FOREACH(version, &acb->votes.vote_list, next) { + if (acb->votes.compare(&version->value, value)) { + continue; + } + QLIST_FOREACH(item, &version->items, next) { + count++; + } + } + + /* quorum_rewrite_entry will count down this to zero */ + acb->rewrite_count = count; + + /* now fire the correcting rewrites */ + QLIST_FOREACH(version, &acb->votes.vote_list, next) { + if (acb->votes.compare(&version->value, value)) { + continue; + } + QLIST_FOREACH(item, &version->items, next) { + Coroutine *co; + QuorumCo data = { + .acb = acb, + .idx = item->index, + }; + + co = qemu_coroutine_create(quorum_rewrite_entry, &data); + qemu_coroutine_enter(co); + } + } + + /* return true if any rewrite is done else false */ + return count; +} + +static void quorum_count_vote(QuorumVotes *votes, + QuorumVoteValue *value, + int index) +{ + QuorumVoteVersion *v = NULL, *version = NULL; + QuorumVoteItem *item; + + /* look if we have something with this hash */ + QLIST_FOREACH(v, &votes->vote_list, next) { + if (votes->compare(&v->value, value)) { + version = v; + break; + } + } + + /* It's a version not yet in the list add it */ + if (!version) { + version = g_new0(QuorumVoteVersion, 1); + QLIST_INIT(&version->items); + memcpy(&version->value, value, sizeof(version->value)); + version->index = index; + version->vote_count = 0; + QLIST_INSERT_HEAD(&votes->vote_list, version, next); + } + + version->vote_count++; + + item = g_new0(QuorumVoteItem, 1); + item->index = index; + QLIST_INSERT_HEAD(&version->items, item, next); +} + +static void quorum_free_vote_list(QuorumVotes *votes) +{ + QuorumVoteVersion *version, *next_version; + QuorumVoteItem *item, *next_item; + + QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) { + QLIST_REMOVE(version, next); + QLIST_FOREACH_SAFE(item, &version->items, next, next_item) { + QLIST_REMOVE(item, next); + g_free(item); + } + g_free(version); + } +} + +static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash) +{ + QEMUIOVector *qiov = &acb->qcrs[i].qiov; + size_t len = sizeof(hash->h); + uint8_t *data = hash->h; + + /* XXX - would be nice if we could pass in the Error ** + * and propagate that back, but this quorum code is + * restricted to just errno values currently */ + if (qcrypto_hash_bytesv(QCRYPTO_HASH_ALG_SHA256, + qiov->iov, qiov->niov, + &data, &len, + NULL) < 0) { + return -EINVAL; + } + + return 0; +} + +static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes) +{ + int max = 0; + QuorumVoteVersion *candidate, *winner = NULL; + + QLIST_FOREACH(candidate, &votes->vote_list, next) { + if (candidate->vote_count > max) { + max = candidate->vote_count; + winner = candidate; + } + } + + return winner; +} + +/* qemu_iovec_compare is handy for blkverify mode because it returns the first + * differing byte location. Yet it is handcoded to compare vectors one byte + * after another so it does not benefit from the libc SIMD optimizations. + * quorum_iovec_compare is written for speed and should be used in the non + * blkverify mode of quorum. + */ +static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b) +{ + int i; + int result; + + assert(a->niov == b->niov); + for (i = 0; i < a->niov; i++) { + assert(a->iov[i].iov_len == b->iov[i].iov_len); + result = memcmp(a->iov[i].iov_base, + b->iov[i].iov_base, + a->iov[i].iov_len); + if (result) { + return false; + } + } + + return true; +} + +static bool quorum_compare(QuorumAIOCB *acb, QEMUIOVector *a, QEMUIOVector *b) +{ + BDRVQuorumState *s = acb->bs->opaque; + ssize_t offset; + + /* This driver will replace blkverify in this particular case */ + if (s->is_blkverify) { + offset = qemu_iovec_compare(a, b); + if (offset != -1) { + fprintf(stderr, "quorum: offset=%" PRIu64 " bytes=%" PRIu64 + " contents mismatch at offset %" PRIu64 "\n", + acb->offset, acb->bytes, acb->offset + offset); + exit(1); + } + return true; + } + + return quorum_iovec_compare(a, b); +} + +/* Do a vote to get the error code */ +static int quorum_vote_error(QuorumAIOCB *acb) +{ + BDRVQuorumState *s = acb->bs->opaque; + QuorumVoteVersion *winner = NULL; + QuorumVotes error_votes; + QuorumVoteValue result_value; + int i, ret = 0; + bool error = false; + + QLIST_INIT(&error_votes.vote_list); + error_votes.compare = quorum_64bits_compare; + + for (i = 0; i < s->num_children; i++) { + ret = acb->qcrs[i].ret; + if (ret) { + error = true; + result_value.l = ret; + quorum_count_vote(&error_votes, &result_value, i); + } + } + + if (error) { + winner = quorum_get_vote_winner(&error_votes); + ret = winner->value.l; + } + + quorum_free_vote_list(&error_votes); + + return ret; +} + +static void quorum_vote(QuorumAIOCB *acb) +{ + bool quorum = true; + int i, j, ret; + QuorumVoteValue hash; + BDRVQuorumState *s = acb->bs->opaque; + QuorumVoteVersion *winner; + + if (quorum_has_too_much_io_failed(acb)) { + return; + } + + /* get the index of the first successful read */ + for (i = 0; i < s->num_children; i++) { + if (!acb->qcrs[i].ret) { + break; + } + } + + assert(i < s->num_children); + + /* compare this read with all other successful reads stopping at quorum + * failure + */ + for (j = i + 1; j < s->num_children; j++) { + if (acb->qcrs[j].ret) { + continue; + } + quorum = quorum_compare(acb, &acb->qcrs[i].qiov, &acb->qcrs[j].qiov); + if (!quorum) { + break; + } + } + + /* Every successful read agrees */ + if (quorum) { + quorum_copy_qiov(acb->qiov, &acb->qcrs[i].qiov); + return; + } + + /* compute hashes for each successful read, also store indexes */ + for (i = 0; i < s->num_children; i++) { + if (acb->qcrs[i].ret) { + continue; + } + ret = quorum_compute_hash(acb, i, &hash); + /* if ever the hash computation failed */ + if (ret < 0) { + acb->vote_ret = ret; + goto free_exit; + } + quorum_count_vote(&acb->votes, &hash, i); + } + + /* vote to select the most represented version */ + winner = quorum_get_vote_winner(&acb->votes); + + /* if the winner count is smaller than threshold the read fails */ + if (winner->vote_count < s->threshold) { + quorum_report_failure(acb); + acb->vote_ret = -EIO; + goto free_exit; + } + + /* we have a winner: copy it */ + quorum_copy_qiov(acb->qiov, &acb->qcrs[winner->index].qiov); + + /* some versions are bad print them */ + quorum_report_bad_versions(s, acb, &winner->value); + + /* corruption correction is enabled */ + if (s->rewrite_corrupted) { + quorum_rewrite_bad_versions(acb, &winner->value); + } + +free_exit: + /* free lists */ + quorum_free_vote_list(&acb->votes); +} + +static void read_quorum_children_entry(void *opaque) +{ + QuorumCo *co = opaque; + QuorumAIOCB *acb = co->acb; + BDRVQuorumState *s = acb->bs->opaque; + int i = co->idx; + QuorumChildRequest *sacb = &acb->qcrs[i]; + + sacb->bs = s->children[i]->bs; + sacb->ret = bdrv_co_preadv(s->children[i], acb->offset, acb->bytes, + &acb->qcrs[i].qiov, 0); + + if (sacb->ret == 0) { + acb->success_count++; + } else { + quorum_report_bad_acb(sacb, sacb->ret); + } + + acb->count++; + assert(acb->count <= s->num_children); + assert(acb->success_count <= s->num_children); + + /* Wake up the caller after the last read */ + if (acb->count == s->num_children) { + qemu_coroutine_enter_if_inactive(acb->co); + } +} + +static int read_quorum_children(QuorumAIOCB *acb) +{ + BDRVQuorumState *s = acb->bs->opaque; + int i; + + acb->children_read = s->num_children; + for (i = 0; i < s->num_children; i++) { + acb->qcrs[i].buf = qemu_blockalign(s->children[i]->bs, acb->qiov->size); + qemu_iovec_init(&acb->qcrs[i].qiov, acb->qiov->niov); + qemu_iovec_clone(&acb->qcrs[i].qiov, acb->qiov, acb->qcrs[i].buf); + } + + for (i = 0; i < s->num_children; i++) { + Coroutine *co; + QuorumCo data = { + .acb = acb, + .idx = i, + }; + + co = qemu_coroutine_create(read_quorum_children_entry, &data); + qemu_coroutine_enter(co); + } + + while (acb->count < s->num_children) { + qemu_coroutine_yield(); + } + + /* Do the vote on read */ + quorum_vote(acb); + for (i = 0; i < s->num_children; i++) { + qemu_vfree(acb->qcrs[i].buf); + qemu_iovec_destroy(&acb->qcrs[i].qiov); + } + + while (acb->rewrite_count) { + qemu_coroutine_yield(); + } + + return acb->vote_ret; +} + +static int read_fifo_child(QuorumAIOCB *acb) +{ + BDRVQuorumState *s = acb->bs->opaque; + int n, ret; + + /* We try to read the next child in FIFO order if we failed to read */ + do { + n = acb->children_read++; + acb->qcrs[n].bs = s->children[n]->bs; + ret = bdrv_co_preadv(s->children[n], acb->offset, acb->bytes, + acb->qiov, 0); + if (ret < 0) { + quorum_report_bad_acb(&acb->qcrs[n], ret); + } + } while (ret < 0 && acb->children_read < s->num_children); + + /* FIXME: rewrite failed children if acb->children_read > 1? */ + + return ret; +} + +static int quorum_co_preadv(BlockDriverState *bs, uint64_t offset, + uint64_t bytes, QEMUIOVector *qiov, int flags) +{ + BDRVQuorumState *s = bs->opaque; + QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags); + int ret; + + acb->is_read = true; + acb->children_read = 0; + + if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) { + ret = read_quorum_children(acb); + } else { + ret = read_fifo_child(acb); + } + quorum_aio_finalize(acb); + + return ret; +} + +static void write_quorum_entry(void *opaque) +{ + QuorumCo *co = opaque; + QuorumAIOCB *acb = co->acb; + BDRVQuorumState *s = acb->bs->opaque; + int i = co->idx; + QuorumChildRequest *sacb = &acb->qcrs[i]; + + sacb->bs = s->children[i]->bs; + sacb->ret = bdrv_co_pwritev(s->children[i], acb->offset, acb->bytes, + acb->qiov, acb->flags); + if (sacb->ret == 0) { + acb->success_count++; + } else { + quorum_report_bad_acb(sacb, sacb->ret); + } + acb->count++; + assert(acb->count <= s->num_children); + assert(acb->success_count <= s->num_children); + + /* Wake up the caller after the last write */ + if (acb->count == s->num_children) { + qemu_coroutine_enter_if_inactive(acb->co); + } +} + +static int quorum_co_pwritev(BlockDriverState *bs, uint64_t offset, + uint64_t bytes, QEMUIOVector *qiov, int flags) +{ + BDRVQuorumState *s = bs->opaque; + QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags); + int i, ret; + + for (i = 0; i < s->num_children; i++) { + Coroutine *co; + QuorumCo data = { + .acb = acb, + .idx = i, + }; + + co = qemu_coroutine_create(write_quorum_entry, &data); + qemu_coroutine_enter(co); + } + + while (acb->count < s->num_children) { + qemu_coroutine_yield(); + } + + quorum_has_too_much_io_failed(acb); + + ret = acb->vote_ret; + quorum_aio_finalize(acb); + + return ret; +} + +static int64_t quorum_getlength(BlockDriverState *bs) +{ + BDRVQuorumState *s = bs->opaque; + int64_t result; + int i; + + /* check that all file have the same length */ + result = bdrv_getlength(s->children[0]->bs); + if (result < 0) { + return result; + } + for (i = 1; i < s->num_children; i++) { + int64_t value = bdrv_getlength(s->children[i]->bs); + if (value < 0) { + return value; + } + if (value != result) { + return -EIO; + } + } + + return result; +} + +static coroutine_fn int quorum_co_flush(BlockDriverState *bs) +{ + BDRVQuorumState *s = bs->opaque; + QuorumVoteVersion *winner = NULL; + QuorumVotes error_votes; + QuorumVoteValue result_value; + int i; + int result = 0; + int success_count = 0; + + QLIST_INIT(&error_votes.vote_list); + error_votes.compare = quorum_64bits_compare; + + for (i = 0; i < s->num_children; i++) { + result = bdrv_co_flush(s->children[i]->bs); + if (result) { + quorum_report_bad(QUORUM_OP_TYPE_FLUSH, 0, 0, + s->children[i]->bs->node_name, result); + result_value.l = result; + quorum_count_vote(&error_votes, &result_value, i); + } else { + success_count++; + } + } + + if (success_count >= s->threshold) { + result = 0; + } else { + winner = quorum_get_vote_winner(&error_votes); + result = winner->value.l; + } + quorum_free_vote_list(&error_votes); + + return result; +} + +static bool quorum_recurse_can_replace(BlockDriverState *bs, + BlockDriverState *to_replace) +{ + BDRVQuorumState *s = bs->opaque; + int i; + + for (i = 0; i < s->num_children; i++) { + /* + * We have no idea whether our children show the same data as + * this node (@bs). It is actually highly likely that + * @to_replace does not, because replacing a broken child is + * one of the main use cases here. + * + * We do know that the new BDS will match @bs, so replacing + * any of our children by it will be safe. It cannot change + * the data this quorum node presents to its parents. + * + * However, replacing @to_replace by @bs in any of our + * children's chains may change visible data somewhere in + * there. We therefore cannot recurse down those chains with + * bdrv_recurse_can_replace(). + * (More formally, bdrv_recurse_can_replace() requires that + * @to_replace will be replaced by something matching the @bs + * passed to it. We cannot guarantee that.) + * + * Thus, we can only check whether any of our immediate + * children matches @to_replace. + * + * (In the future, we might add a function to recurse down a + * chain that checks that nothing there cares about a change + * in data from the respective child in question. For + * example, most filters do not care when their child's data + * suddenly changes, as long as their parents do not care.) + */ + if (s->children[i]->bs == to_replace) { + /* + * We now have to ensure that there is no other parent + * that cares about replacing this child by a node with + * potentially different data. + * We do so by checking whether there are any other parents + * at all, which is stricter than necessary, but also very + * simple. (We may decide to implement something more + * complex and permissive when there is an actual need for + * it.) + */ + return QLIST_FIRST(&to_replace->parents) == s->children[i] && + QLIST_NEXT(s->children[i], next_parent) == NULL; + } + } + + return false; +} + +static int quorum_valid_threshold(int threshold, int num_children, Error **errp) +{ + + if (threshold < 1) { + error_setg(errp, QERR_INVALID_PARAMETER_VALUE, + "vote-threshold", "value >= 1"); + return -ERANGE; + } + + if (threshold > num_children) { + error_setg(errp, "threshold may not exceed children count"); + return -ERANGE; + } + + return 0; +} + +static QemuOptsList quorum_runtime_opts = { + .name = "quorum", + .head = QTAILQ_HEAD_INITIALIZER(quorum_runtime_opts.head), + .desc = { + { + .name = QUORUM_OPT_VOTE_THRESHOLD, + .type = QEMU_OPT_NUMBER, + .help = "The number of vote needed for reaching quorum", + }, + { + .name = QUORUM_OPT_BLKVERIFY, + .type = QEMU_OPT_BOOL, + .help = "Trigger block verify mode if set", + }, + { + .name = QUORUM_OPT_REWRITE, + .type = QEMU_OPT_BOOL, + .help = "Rewrite corrupted block on read quorum", + }, + { + .name = QUORUM_OPT_READ_PATTERN, + .type = QEMU_OPT_STRING, + .help = "Allowed pattern: quorum, fifo. Quorum is default", + }, + { /* end of list */ } + }, +}; + +static int quorum_open(BlockDriverState *bs, QDict *options, int flags, + Error **errp) +{ + BDRVQuorumState *s = bs->opaque; + Error *local_err = NULL; + QemuOpts *opts = NULL; + const char *pattern_str; + bool *opened; + int i; + int ret = 0; + + qdict_flatten(options); + + /* count how many different children are present */ + s->num_children = qdict_array_entries(options, "children."); + if (s->num_children < 0) { + error_setg(errp, "Option children is not a valid array"); + ret = -EINVAL; + goto exit; + } + if (s->num_children < 1) { + error_setg(errp, "Number of provided children must be 1 or more"); + ret = -EINVAL; + goto exit; + } + + opts = qemu_opts_create(&quorum_runtime_opts, NULL, 0, &error_abort); + if (!qemu_opts_absorb_qdict(opts, options, errp)) { + ret = -EINVAL; + goto exit; + } + + s->threshold = qemu_opt_get_number(opts, QUORUM_OPT_VOTE_THRESHOLD, 0); + /* and validate it against s->num_children */ + ret = quorum_valid_threshold(s->threshold, s->num_children, errp); + if (ret < 0) { + goto exit; + } + + pattern_str = qemu_opt_get(opts, QUORUM_OPT_READ_PATTERN); + if (!pattern_str) { + ret = QUORUM_READ_PATTERN_QUORUM; + } else { + ret = qapi_enum_parse(&QuorumReadPattern_lookup, pattern_str, + -EINVAL, NULL); + } + if (ret < 0) { + error_setg(errp, "Please set read-pattern as fifo or quorum"); + goto exit; + } + s->read_pattern = ret; + + if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) { + s->is_blkverify = qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false); + if (s->is_blkverify && (s->num_children != 2 || s->threshold != 2)) { + error_setg(errp, "blkverify=on can only be set if there are " + "exactly two files and vote-threshold is 2"); + ret = -EINVAL; + goto exit; + } + + s->rewrite_corrupted = qemu_opt_get_bool(opts, QUORUM_OPT_REWRITE, + false); + if (s->rewrite_corrupted && s->is_blkverify) { + error_setg(errp, + "rewrite-corrupted=on cannot be used with blkverify=on"); + ret = -EINVAL; + goto exit; + } + } + + /* allocate the children array */ + s->children = g_new0(BdrvChild *, s->num_children); + opened = g_new0(bool, s->num_children); + + for (i = 0; i < s->num_children; i++) { + char indexstr[INDEXSTR_LEN]; + ret = snprintf(indexstr, INDEXSTR_LEN, "children.%d", i); + assert(ret < INDEXSTR_LEN); + + s->children[i] = bdrv_open_child(NULL, options, indexstr, bs, + &child_of_bds, BDRV_CHILD_DATA, false, + &local_err); + if (local_err) { + error_propagate(errp, local_err); + ret = -EINVAL; + goto close_exit; + } + + opened[i] = true; + } + s->next_child_index = s->num_children; + + bs->supported_write_flags = BDRV_REQ_WRITE_UNCHANGED; + + g_free(opened); + goto exit; + +close_exit: + /* cleanup on error */ + for (i = 0; i < s->num_children; i++) { + if (!opened[i]) { + continue; + } + bdrv_unref_child(bs, s->children[i]); + } + g_free(s->children); + g_free(opened); +exit: + qemu_opts_del(opts); + return ret; +} + +static void quorum_close(BlockDriverState *bs) +{ + BDRVQuorumState *s = bs->opaque; + int i; + + for (i = 0; i < s->num_children; i++) { + bdrv_unref_child(bs, s->children[i]); + } + + g_free(s->children); +} + +static void quorum_add_child(BlockDriverState *bs, BlockDriverState *child_bs, + Error **errp) +{ + BDRVQuorumState *s = bs->opaque; + BdrvChild *child; + char indexstr[INDEXSTR_LEN]; + int ret; + + if (s->is_blkverify) { + error_setg(errp, "Cannot add a child to a quorum in blkverify mode"); + return; + } + + assert(s->num_children <= INT_MAX / sizeof(BdrvChild *)); + if (s->num_children == INT_MAX / sizeof(BdrvChild *) || + s->next_child_index == UINT_MAX) { + error_setg(errp, "Too many children"); + return; + } + + ret = snprintf(indexstr, INDEXSTR_LEN, "children.%u", s->next_child_index); + if (ret < 0 || ret >= INDEXSTR_LEN) { + error_setg(errp, "cannot generate child name"); + return; + } + s->next_child_index++; + + bdrv_drained_begin(bs); + + /* We can safely add the child now */ + bdrv_ref(child_bs); + + child = bdrv_attach_child(bs, child_bs, indexstr, &child_of_bds, + BDRV_CHILD_DATA, errp); + if (child == NULL) { + s->next_child_index--; + goto out; + } + s->children = g_renew(BdrvChild *, s->children, s->num_children + 1); + s->children[s->num_children++] = child; + +out: + bdrv_drained_end(bs); +} + +static void quorum_del_child(BlockDriverState *bs, BdrvChild *child, + Error **errp) +{ + BDRVQuorumState *s = bs->opaque; + char indexstr[INDEXSTR_LEN]; + int i; + + for (i = 0; i < s->num_children; i++) { + if (s->children[i] == child) { + break; + } + } + + /* we have checked it in bdrv_del_child() */ + assert(i < s->num_children); + + if (s->num_children <= s->threshold) { + error_setg(errp, + "The number of children cannot be lower than the vote threshold %d", + s->threshold); + return; + } + + /* We know now that num_children > threshold, so blkverify must be false */ + assert(!s->is_blkverify); + + snprintf(indexstr, INDEXSTR_LEN, "children.%u", s->next_child_index - 1); + if (!strncmp(child->name, indexstr, INDEXSTR_LEN)) { + s->next_child_index--; + } + + bdrv_drained_begin(bs); + + /* We can safely remove this child now */ + memmove(&s->children[i], &s->children[i + 1], + (s->num_children - i - 1) * sizeof(BdrvChild *)); + s->children = g_renew(BdrvChild *, s->children, --s->num_children); + bdrv_unref_child(bs, child); + + bdrv_drained_end(bs); +} + +static void quorum_gather_child_options(BlockDriverState *bs, QDict *target, + bool backing_overridden) +{ + BDRVQuorumState *s = bs->opaque; + QList *children_list; + int i; + + /* + * The generic implementation for gathering child options in + * bdrv_refresh_filename() would use the names of the children + * as specified for bdrv_open_child() or bdrv_attach_child(), + * which is "children.%u" with %u being a value + * (s->next_child_index) that is incremented each time a new child + * is added (and never decremented). Since children can be + * deleted at runtime, there may be gaps in that enumeration. + * When creating a new quorum BDS and specifying the children for + * it through runtime options, the enumeration used there may not + * have any gaps, though. + * + * Therefore, we have to create a new gap-less enumeration here + * (which we can achieve by simply putting all of the children's + * full_open_options into a QList). + * + * XXX: Note that there are issues with the current child option + * structure quorum uses (such as the fact that children do + * not really have unique permanent names). Therefore, this + * is going to have to change in the future and ideally we + * want quorum to be covered by the generic implementation. + */ + + children_list = qlist_new(); + qdict_put(target, "children", children_list); + + for (i = 0; i < s->num_children; i++) { + qlist_append(children_list, + qobject_ref(s->children[i]->bs->full_open_options)); + } +} + +static char *quorum_dirname(BlockDriverState *bs, Error **errp) +{ + /* In general, there are multiple BDSs with different dirnames below this + * one; so there is no unique dirname we could return (unless all are equal + * by chance, or there is only one). Therefore, to be consistent, just + * always return NULL. */ + error_setg(errp, "Cannot generate a base directory for quorum nodes"); + return NULL; +} + +static void quorum_child_perm(BlockDriverState *bs, BdrvChild *c, + BdrvChildRole role, + BlockReopenQueue *reopen_queue, + uint64_t perm, uint64_t shared, + uint64_t *nperm, uint64_t *nshared) +{ + BDRVQuorumState *s = bs->opaque; + + *nperm = perm & DEFAULT_PERM_PASSTHROUGH; + if (s->rewrite_corrupted) { + *nperm |= BLK_PERM_WRITE; + } + + /* + * We cannot share RESIZE or WRITE, as this would make the + * children differ from each other. + */ + *nshared = (shared & (BLK_PERM_CONSISTENT_READ | + BLK_PERM_WRITE_UNCHANGED)) + | DEFAULT_PERM_UNCHANGED; +} + +static const char *const quorum_strong_runtime_opts[] = { + QUORUM_OPT_VOTE_THRESHOLD, + QUORUM_OPT_BLKVERIFY, + QUORUM_OPT_REWRITE, + QUORUM_OPT_READ_PATTERN, + + NULL +}; + +static BlockDriver bdrv_quorum = { + .format_name = "quorum", + + .instance_size = sizeof(BDRVQuorumState), + + .bdrv_open = quorum_open, + .bdrv_close = quorum_close, + .bdrv_gather_child_options = quorum_gather_child_options, + .bdrv_dirname = quorum_dirname, + + .bdrv_co_flush_to_disk = quorum_co_flush, + + .bdrv_getlength = quorum_getlength, + + .bdrv_co_preadv = quorum_co_preadv, + .bdrv_co_pwritev = quorum_co_pwritev, + + .bdrv_add_child = quorum_add_child, + .bdrv_del_child = quorum_del_child, + + .bdrv_child_perm = quorum_child_perm, + + .bdrv_recurse_can_replace = quorum_recurse_can_replace, + + .strong_runtime_opts = quorum_strong_runtime_opts, +}; + +static void bdrv_quorum_init(void) +{ + if (!qcrypto_hash_supports(QCRYPTO_HASH_ALG_SHA256)) { + /* SHA256 hash support is required for quorum device */ + return; + } + bdrv_register(&bdrv_quorum); +} + +block_init(bdrv_quorum_init); |