From 8cc8e513633a1a8b12c416e32fb5362fcf4d65dd Mon Sep 17 00:00:00 2001
From: Christine Caulfield <ccaulfie@redhat.com>
Date: Thu, 5 Mar 2015 16:45:15 +0000
Subject: [PATCH] cpg: Add support for messages larger than 1Mb
If a cpg client sends a message larger than 1Mb (actually slightly
less to allow for internal buffers) cpg will now fragment that into
several corosync messages before sending it around the ring.
cpg_mcast_joined() can now return CS_ERR_INTERRUPT which means that the
cpg membership was disrupted during the send operation and the message
needs to be resent.
The new API call cpg_max_atomic_msgsize_get() returns the maximum size
of a message that will not be fragmented internally.
New test program cpghum was written to stress test this functionality,
it checks message integrity and order of receipt.
Signed-off-by: Christine Caulfield <ccaulfie@redhat.com>
Reviewed-by: Jan Friesse <jfriesse@redhat.com>
---
configure.ac | 1 +
corosync.spec.in | 1 +
exec/cpg.c | 182 +++++++++++++++++++++++++++++++++++++++++++-
include/corosync/cpg.h | 7 ++
include/corosync/ipc_cpg.h | 35 ++++++++-
lib/cpg.c | 171 ++++++++++++++++++++++++++++++++++++++++-
test/Makefile.am | 3 +-
7 files changed, 393 insertions(+), 7 deletions(-)
diff --git a/configure.ac b/configure.ac
index 0c371aa..b394329 100644
--- a/configure.ac
+++ b/configure.ac
@@ -163,6 +163,7 @@ AC_CHECK_LIB([pthread], [pthread_create])
AC_CHECK_LIB([socket], [socket])
AC_CHECK_LIB([nsl], [t_open])
AC_CHECK_LIB([rt], [sched_getscheduler])
+AC_CHECK_LIB([z], [crc32])
# Checks for library functions.
AC_FUNC_ALLOCA
diff --git a/corosync.spec.in b/corosync.spec.in
index 3ca75b7..a2ba584 100644
--- a/corosync.spec.in
+++ b/corosync.spec.in
@@ -40,6 +40,7 @@ Conflicts: openais <= 0.89, openais-devel <= 0.89
BuildRequires: groff
BuildRequires: libqb-devel
BuildRequires: nss-devel
+BuildRequires: zlib-devel
%if %{with runautogen}
BuildRequires: autoconf automake libtool
%endif
diff --git a/exec/cpg.c b/exec/cpg.c
index 1c6fbb9..a18b850 100644
--- a/exec/cpg.c
+++ b/exec/cpg.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2006-2012 Red Hat, Inc.
+ * Copyright (c) 2006-2015 Red Hat, Inc.
*
* All rights reserved.
*
@@ -83,7 +83,8 @@ enum cpg_message_req_types {
MESSAGE_REQ_EXEC_CPG_JOINLIST = 2,
MESSAGE_REQ_EXEC_CPG_MCAST = 3,
MESSAGE_REQ_EXEC_CPG_DOWNLIST_OLD = 4,
- MESSAGE_REQ_EXEC_CPG_DOWNLIST = 5
+ MESSAGE_REQ_EXEC_CPG_DOWNLIST = 5,
+ MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST = 6,
};
struct zcb_mapped {
@@ -156,6 +157,8 @@ struct cpg_pd {
enum cpd_state cpd_state;
unsigned int flags;
int initial_totem_conf_sent;
+ uint64_t transition_counter; /* These two are used when sending fragmented messages */
+ uint64_t initial_transition_counter;
struct list_head list;
struct list_head iteration_instance_list_head;
struct list_head zcb_mapped_list_head;
@@ -224,6 +227,10 @@ static void message_handler_req_exec_cpg_mcast (
const void *message,
unsigned int nodeid);
+static void message_handler_req_exec_cpg_partial_mcast (
+ const void *message,
+ unsigned int nodeid);
+
static void message_handler_req_exec_cpg_downlist_old (
const void *message,
unsigned int nodeid);
@@ -238,6 +245,8 @@ static void exec_cpg_joinlist_endian_convert (void *msg);
static void exec_cpg_mcast_endian_convert (void *msg);
+static void exec_cpg_partial_mcast_endian_convert (void *msg);
+
static void exec_cpg_downlist_endian_convert_old (void *msg);
static void exec_cpg_downlist_endian_convert (void *msg);
@@ -250,6 +259,8 @@ static void message_handler_req_lib_cpg_finalize (void *conn, const void *messag
static void message_handler_req_lib_cpg_mcast (void *conn, const void *message);
+static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message);
+
static void message_handler_req_lib_cpg_membership (void *conn,
const void *message);
@@ -383,7 +394,10 @@ static struct corosync_lib_handler cpg_lib_engine[] =
.lib_handler_fn = message_handler_req_lib_cpg_zc_execute,
.flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
},
-
+ { /* 12 */
+ .lib_handler_fn = message_handler_req_lib_cpg_partial_mcast,
+ .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
+ },
};
@@ -413,6 +427,10 @@ static struct corosync_exec_handler cpg_exec_engine[] =
.exec_handler_fn = message_handler_req_exec_cpg_downlist,
.exec_endian_convert_fn = exec_cpg_downlist_endian_convert
},
+ { /* 6 - MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST */
+ .exec_handler_fn = message_handler_req_exec_cpg_partial_mcast,
+ .exec_endian_convert_fn = exec_cpg_partial_mcast_endian_convert
+ },
};
struct corosync_service_engine cpg_service_engine = {
@@ -457,6 +475,17 @@ struct req_exec_cpg_mcast {
mar_uint8_t message[] __attribute__((aligned(8)));
};
+struct req_exec_cpg_partial_mcast {
+ struct qb_ipc_request_header header __attribute__((aligned(8)));
+ mar_cpg_name_t group_name __attribute__((aligned(8)));
+ mar_uint32_t msglen __attribute__((aligned(8)));
+ mar_uint32_t fraglen __attribute__((aligned(8)));
+ mar_uint32_t pid __attribute__((aligned(8)));
+ mar_uint32_t type __attribute__((aligned(8)));
+ mar_message_source_t source __attribute__((aligned(8)));
+ mar_uint8_t message[] __attribute__((aligned(8)));
+};
+
struct req_exec_cpg_downlist_old {
struct qb_ipc_request_header header __attribute__((aligned(8)));
mar_uint32_t left_nodes __attribute__((aligned(8)));
@@ -740,6 +769,7 @@ static int notify_lib_joinlist(
cpd->cpd_state == CPD_STATE_LEAVE_STARTED) {
api->ipc_dispatch_send (cpd->conn, buf, size);
+ cpd->transition_counter++;
}
if (left_list_entries) {
if (left_list[0].pid == cpd->pid &&
@@ -1186,6 +1216,19 @@ static void exec_cpg_mcast_endian_convert (void *msg)
swab_mar_message_source_t (&req_exec_cpg_mcast->source);
}
+static void exec_cpg_partial_mcast_endian_convert (void *msg)
+{
+ struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = msg;
+
+ swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
+ swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
+ req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
+ req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
+ req_exec_cpg_mcast->fraglen = swab32(req_exec_cpg_mcast->fraglen);
+ req_exec_cpg_mcast->type = swab32(req_exec_cpg_mcast->type);
+ swab_mar_message_source_t (&req_exec_cpg_mcast->source);
+}
+
static struct process_info *process_info_find(const mar_cpg_name_t *group_name, uint32_t pid, unsigned int nodeid) {
struct list_head *iter;
@@ -1453,6 +1496,68 @@ static void message_handler_req_exec_cpg_mcast (
}
}
+static void message_handler_req_exec_cpg_partial_mcast (
+ const void *message,
+ unsigned int nodeid)
+{
+ const struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = message;
+ struct res_lib_cpg_partial_deliver_callback res_lib_cpg_mcast;
+ int msglen = req_exec_cpg_mcast->fraglen;
+ struct list_head *iter, *pi_iter;
+ struct cpg_pd *cpd;
+ struct iovec iovec[2];
+ int known_node = 0;
+
+ log_printf(LOGSYS_LEVEL_DEBUG, "Got fragmented message from node %d, size = %d bytes\n", nodeid, msglen);
+
+ res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK;
+ res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen;
+ res_lib_cpg_mcast.fraglen = msglen;
+ res_lib_cpg_mcast.msglen = req_exec_cpg_mcast->msglen;
+ res_lib_cpg_mcast.pid = req_exec_cpg_mcast->pid;
+ res_lib_cpg_mcast.type = req_exec_cpg_mcast->type;
+ res_lib_cpg_mcast.nodeid = nodeid;
+
+ memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name,
+ sizeof(mar_cpg_name_t));
+ iovec[0].iov_base = (void *)&res_lib_cpg_mcast;
+ iovec[0].iov_len = sizeof (res_lib_cpg_mcast);
+
+ iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast);
+ iovec[1].iov_len = msglen;
+
+ for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; ) {
+ cpd = list_entry(iter, struct cpg_pd, list);
+ iter = iter->next;
+
+ if ((cpd->cpd_state == CPD_STATE_LEAVE_STARTED || cpd->cpd_state == CPD_STATE_JOIN_COMPLETED)
+ && (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) {
+
+ if (!known_node) {
+ /* Try to find, if we know the node */
+ for (pi_iter = process_info_list_head.next;
+ pi_iter != &process_info_list_head; pi_iter = pi_iter->next) {
+
+ struct process_info *pi = list_entry (pi_iter, struct process_info, list);
+
+ if (pi->nodeid == nodeid &&
+ mar_name_compare (&pi->group, &req_exec_cpg_mcast->group_name) == 0) {
+ known_node = 1;
+ break;
+ }
+ }
+ }
+
+ if (!known_node) {
+ log_printf(LOGSYS_LEVEL_WARNING, "Unknown node -> we will not deliver message");
+ return ;
+ }
+
+ api->ipc_dispatch_iov_send (cpd->conn, iovec, 2);
+ }
+ }
+}
+
static int cpg_exec_send_downlist(void)
{
@@ -1864,6 +1969,77 @@ static void message_handler_req_lib_cpg_zc_free (
res_header.size);
}
+/* Fragmented mcast message from the library */
+static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message)
+{
+ const struct req_lib_cpg_partial_mcast *req_lib_cpg_mcast = message;
+ struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
+ mar_cpg_name_t group_name = cpd->group_name;
+
+ struct iovec req_exec_cpg_iovec[2];
+ struct req_exec_cpg_partial_mcast req_exec_cpg_mcast;
+ struct res_lib_cpg_partial_send res_lib_cpg_partial_send;
+ int msglen = req_lib_cpg_mcast->fraglen;
+ int result;
+ cs_error_t error = CS_ERR_NOT_EXIST;
+
+ log_printf(LOGSYS_LEVEL_TRACE, "got fragmented mcast request on %p", conn);
+ log_printf(LOGSYS_LEVEL_DEBUG, "Sending fragmented message size = %d bytes\n", msglen);
+
+ switch (cpd->cpd_state) {
+ case CPD_STATE_UNJOINED:
+ error = CS_ERR_NOT_EXIST;
+ break;
+ case CPD_STATE_LEAVE_STARTED:
+ error = CS_ERR_NOT_EXIST;
+ break;
+ case CPD_STATE_JOIN_STARTED:
+ error = CS_OK;
+ break;
+ case CPD_STATE_JOIN_COMPLETED:
+ error = CS_OK;
+ break;
+ }
+
+ res_lib_cpg_partial_send.header.size = sizeof(res_lib_cpg_partial_send);
+ res_lib_cpg_partial_send.header.id = MESSAGE_RES_CPG_PARTIAL_SEND;
+
+ if (req_lib_cpg_mcast->type == LIBCPG_PARTIAL_FIRST) {
+ cpd->initial_transition_counter = cpd->transition_counter;
+ }
+ if (cpd->transition_counter != cpd->initial_transition_counter) {
+ error = CS_ERR_INTERRUPT;
+ }
+
+ if (error == CS_OK) {
+ req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
+ req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
+ MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST);
+ req_exec_cpg_mcast.pid = cpd->pid;
+ req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
+ req_exec_cpg_mcast.type = req_lib_cpg_mcast->type;
+ req_exec_cpg_mcast.fraglen = req_lib_cpg_mcast->fraglen;
+ api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
+ memcpy(&req_exec_cpg_mcast.group_name, &group_name,
+ sizeof(mar_cpg_name_t));
+
+ req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
+ req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
+ req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message;
+ req_exec_cpg_iovec[1].iov_len = msglen;
+
+ result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
+ assert(result == 0);
+ } else {
+ log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d",
+ conn, group_name.value, cpd->cpd_state, error);
+ }
+
+ res_lib_cpg_partial_send.header.error = error;
+ api->ipc_response_send (conn, &res_lib_cpg_partial_send,
+ sizeof (res_lib_cpg_partial_send));
+}
+
/* Mcast message from the library */
static void message_handler_req_lib_cpg_mcast (void *conn, const void *message)
{
diff --git a/include/corosync/cpg.h b/include/corosync/cpg.h
index 55fc4b8..f66fb14 100644
--- a/include/corosync/cpg.h
+++ b/include/corosync/cpg.h
@@ -186,6 +186,13 @@ cs_error_t cpg_fd_get (
int *fd);
/**
+ * Get maximum size of a message that will not be fragmented
+ */
+cs_error_t cpg_max_atomic_msgsize_get (
+ cpg_handle_t handle,
+ uint32_t *size);
+
+/**
* Get contexts for a CPG handle
*/
cs_error_t cpg_context_get (
diff --git a/include/corosync/ipc_cpg.h b/include/corosync/ipc_cpg.h
index a95335a..5008acf 100644
--- a/include/corosync/ipc_cpg.h
+++ b/include/corosync/ipc_cpg.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2006-2011 Red Hat, Inc.
+ * Copyright (c) 2006-2015 Red Hat, Inc.
*
* All rights reserved.
*
@@ -55,6 +55,7 @@ enum req_cpg_types {
MESSAGE_REQ_CPG_ZC_ALLOC = 9,
MESSAGE_REQ_CPG_ZC_FREE = 10,
MESSAGE_REQ_CPG_ZC_EXECUTE = 11,
+ MESSAGE_REQ_CPG_PARTIAL_MCAST = 12,
};
enum res_cpg_types {
@@ -75,6 +76,8 @@ enum res_cpg_types {
MESSAGE_RES_CPG_ZC_ALLOC = 14,
MESSAGE_RES_CPG_ZC_FREE = 15,
MESSAGE_RES_CPG_ZC_EXECUTE = 16,
+ MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK = 17,
+ MESSAGE_RES_CPG_PARTIAL_SEND = 18,
};
enum lib_cpg_confchg_reason {
@@ -85,6 +88,12 @@ enum lib_cpg_confchg_reason {
CONFCHG_CPG_REASON_PROCDOWN = 5
};
+enum lib_cpg_partial_types {
+ LIBCPG_PARTIAL_FIRST = 1,
+ LIBCPG_PARTIAL_CONTINUED = 2,
+ LIBCPG_PARTIAL_LAST = 3,
+};
+
typedef struct {
uint32_t length __attribute__((aligned(8)));
char value[CPG_MAX_NAME_LENGTH] __attribute__((aligned(8)));
@@ -200,6 +209,10 @@ struct res_lib_cpg_local_get {
mar_uint32_t local_nodeid __attribute__((aligned(8)));
};
+struct res_lib_cpg_partial_send {
+ struct qb_ipc_response_header header __attribute__((aligned(8)));
+};
+
struct req_lib_cpg_mcast {
struct qb_ipc_response_header header __attribute__((aligned(8)));
mar_uint32_t guarantee __attribute__((aligned(8)));
@@ -207,6 +220,15 @@ struct req_lib_cpg_mcast {
mar_uint8_t message[] __attribute__((aligned(8)));
};
+struct req_lib_cpg_partial_mcast {
+ struct qb_ipc_response_header header __attribute__((aligned(8)));
+ mar_uint32_t guarantee __attribute__((aligned(8)));
+ mar_uint32_t msglen __attribute__((aligned(8)));
+ mar_uint32_t fraglen __attribute__((aligned(8)));
+ mar_uint32_t type __attribute__((aligned(8)));
+ mar_uint8_t message[] __attribute__((aligned(8)));
+};
+
struct res_lib_cpg_mcast {
struct qb_ipc_response_header header __attribute__((aligned(8)));
};
@@ -223,6 +245,17 @@ struct res_lib_cpg_deliver_callback {
mar_uint8_t message[] __attribute__((aligned(8)));
};
+struct res_lib_cpg_partial_deliver_callback {
+ struct qb_ipc_response_header header __attribute__((aligned(8)));
+ mar_cpg_name_t group_name __attribute__((aligned(8)));
+ mar_uint32_t msglen __attribute__((aligned(8)));
+ mar_uint32_t fraglen __attribute__((aligned(8)));
+ mar_uint32_t nodeid __attribute__((aligned(8)));
+ mar_uint32_t pid __attribute__((aligned(8)));
+ mar_uint32_t type __attribute__((aligned(8)));
+ mar_uint8_t message[] __attribute__((aligned(8)));
+};
+
struct res_lib_cpg_flowcontrol_callback {
struct qb_ipc_response_header header __attribute__((aligned(8)));
mar_uint32_t flow_control_state __attribute__((aligned(8)));
diff --git a/lib/cpg.c b/lib/cpg.c
index 4b92f44..037e8a9 100644
--- a/lib/cpg.c
+++ b/lib/cpg.c
@@ -1,7 +1,7 @@
/*
* vi: set autoindent tabstop=4 shiftwidth=4 :
*
- * Copyright (c) 2006-2012 Red Hat, Inc.
+ * Copyright (c) 2006-2015 Red Hat, Inc.
*
* All rights reserved.
*
@@ -70,6 +70,12 @@
#endif
/*
+ * Maximum number of times to retry a send when transmitting
+ * a large message fragment
+ */
+#define MAX_RETRIES 100
+
+/*
* ZCB files have following umask (umask is same as used in libqb)
*/
#define CPG_MEMORY_MAP_UMASK 077
@@ -83,6 +89,14 @@ struct cpg_inst {
cpg_model_v1_data_t model_v1_data;
};
struct list_head iteration_list_head;
+ uint32_t max_msg_size;
+ char *assembly_buf;
+ uint32_t assembly_buf_ptr;
+ int assembling; /* Flag that says we have started assembling a message.
+ * It's here to catch the situation where a node joins
+ * the cluster/group in the middle of a CPG message send
+ * so we don't pass on a partial message to the client.
+ */
};
static void cpg_inst_free (void *inst);
@@ -210,6 +224,8 @@ cs_error_t cpg_model_initialize (
}
}
+ /* Allow space for corosync internal headers */
+ cpg_inst->max_msg_size = IPC_REQUEST_SIZE - 1024;
cpg_inst->model_data.model = model;
cpg_inst->context = context;
@@ -291,6 +307,25 @@ cs_error_t cpg_fd_get (
return (error);
}
+cs_error_t cpg_max_atomic_msgsize_get (
+ cpg_handle_t handle,
+ uint32_t *size)
+{
+ cs_error_t error;
+ struct cpg_inst *cpg_inst;
+
+ error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
+ if (error != CS_OK) {
+ return (error);
+ }
+
+ *size = cpg_inst->max_msg_size;
+
+ hdb_handle_put (&cpg_handle_t_db, handle);
+
+ return (error);
+}
+
cs_error_t cpg_context_get (
cpg_handle_t handle,
void **context)
@@ -339,6 +374,7 @@ cs_error_t cpg_dispatch (
struct cpg_inst *cpg_inst;
struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
+ struct res_lib_cpg_partial_deliver_callback *res_cpg_partial_deliver_callback;
struct res_lib_cpg_totem_confchg_callback *res_cpg_totem_confchg_callback;
struct cpg_inst cpg_inst_copy;
struct qb_ipc_response_header *dispatch_data;
@@ -361,7 +397,7 @@ cs_error_t cpg_dispatch (
/*
* Timeout instantly for CS_DISPATCH_ONE_NONBLOCKING or CS_DISPATCH_ALL and
- * wait indefinately for CS_DISPATCH_ONE or CS_DISPATCH_BLOCKING
+ * wait indefinitely for CS_DISPATCH_ONE or CS_DISPATCH_BLOCKING
*/
if (dispatch_types == CS_DISPATCH_ALL || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
timeout = 0;
@@ -428,6 +464,43 @@ cs_error_t cpg_dispatch (
res_cpg_deliver_callback->msglen);
break;
+ case MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK:
+ res_cpg_partial_deliver_callback = (struct res_lib_cpg_partial_deliver_callback *)dispatch_data;
+
+ marshall_from_mar_cpg_name_t (
+ &group_name,
+ &res_cpg_partial_deliver_callback->group_name);
+
+ if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_FIRST) {
+ /*
+ * Allocate a buffer to contain a full message.
+ */
+ cpg_inst->assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen);
+ if (!cpg_inst->assembly_buf) {
+ error = CS_ERR_NO_MEMORY;
+ goto error_put;
+ }
+ cpg_inst->assembling = 1;
+ cpg_inst->assembly_buf_ptr = 0;
+ }
+ if (cpg_inst->assembling) {
+ memcpy(cpg_inst->assembly_buf + cpg_inst->assembly_buf_ptr,
+ res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen);
+ cpg_inst->assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen;
+
+ if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_LAST) {
+ cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
+ &group_name,
+ res_cpg_partial_deliver_callback->nodeid,
+ res_cpg_partial_deliver_callback->pid,
+ cpg_inst->assembly_buf,
+ res_cpg_partial_deliver_callback->msglen);
+ free(cpg_inst->assembly_buf);
+ cpg_inst->assembling = 0;
+ }
+ }
+ break;
+
case MESSAGE_RES_CPG_CONFCHG_CALLBACK:
if (cpg_inst_copy.model_v1_data.cpg_confchg_fn == NULL) {
break;
@@ -921,6 +994,12 @@ cs_error_t cpg_zcb_mcast_joined (
if (error != CS_OK) {
return (error);
}
+
+ if (msg_len > IPC_REQUEST_SIZE) {
+ error = CS_ERR_TOO_BIG;
+ goto error_exit;
+ }
+
req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)(((char *)msg) - sizeof (struct req_lib_cpg_mcast));
req_lib_cpg_mcast->header.size = sizeof (struct req_lib_cpg_mcast) +
msg_len;
@@ -957,6 +1036,88 @@ error_exit:
return (error);
}
+static cs_error_t send_fragments (
+ struct cpg_inst *cpg_inst,
+ cpg_guarantee_t guarantee,
+ size_t msg_len,
+ const struct iovec *iovec,
+ unsigned int iov_len)
+{
+ int i;
+ cs_error_t error = CS_OK;
+ struct iovec iov[2];
+ struct req_lib_cpg_partial_mcast req_lib_cpg_mcast;
+ struct res_lib_cpg_partial_send res_lib_cpg_partial_send;
+ size_t sent = 0;
+ size_t iov_sent = 0;
+ int retry_count;
+
+ req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_PARTIAL_MCAST;
+ req_lib_cpg_mcast.guarantee = guarantee;
+ req_lib_cpg_mcast.msglen = msg_len;
+
+ iov[0].iov_base = (void *)&req_lib_cpg_mcast;
+ iov[0].iov_len = sizeof (struct req_lib_cpg_partial_mcast);
+
+ i=0;
+ iov_sent = 0 ;
+ qb_ipcc_fc_enable_max_set(cpg_inst->c, 2);
+
+ while (error == CS_OK && sent < msg_len) {
+
+ retry_count = 0;
+ if ( (iovec[i].iov_len - iov_sent) > cpg_inst->max_msg_size) {
+ iov[1].iov_len = cpg_inst->max_msg_size;
+ }
+ else {
+ iov[1].iov_len = iovec[i].iov_len - iov_sent;
+ }
+
+ if (sent == 0) {
+ req_lib_cpg_mcast.type = LIBCPG_PARTIAL_FIRST;
+ }
+ else if ((sent + iov[1].iov_len) == msg_len) {
+ req_lib_cpg_mcast.type = LIBCPG_PARTIAL_LAST;
+ }
+ else {
+ req_lib_cpg_mcast.type = LIBCPG_PARTIAL_CONTINUED;
+ }
+
+ req_lib_cpg_mcast.fraglen = iov[1].iov_len;
+ req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_partial_mcast) + iov[1].iov_len;
+ iov[1].iov_base = (char *)iovec[i].iov_base + iov_sent;
+
+ resend:
+ error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 2,
+ &res_lib_cpg_partial_send,
+ sizeof (res_lib_cpg_partial_send));
+
+ if (error == CS_ERR_TRY_AGAIN) {
+ fprintf(stderr, "sleep. counter=%d\n", retry_count);
+ if (++retry_count > MAX_RETRIES) {
+ goto error_exit;
+ }
+ usleep(10000);
+ goto resend;
+ }
+
+ iov_sent += iov[1].iov_len;
+ sent += iov[1].iov_len;
+
+ /* Next iovec */
+ if (iov_sent >= iovec[i].iov_len) {
+ i++;
+ iov_sent = 0;
+ }
+ error = res_lib_cpg_partial_send.header.error;
+ }
+error_exit:
+ qb_ipcc_fc_enable_max_set(cpg_inst->c, 1);
+
+ return error;
+}
+
+
cs_error_t cpg_mcast_joined (
cpg_handle_t handle,
cpg_guarantee_t guarantee,
@@ -979,6 +1140,11 @@ cs_error_t cpg_mcast_joined (
msg_len += iovec[i].iov_len;
}
+ if (msg_len > cpg_inst->max_msg_size) {
+ error = send_fragments(cpg_inst, guarantee, msg_len, iovec, iov_len);
+ goto error_exit;
+ }
+
req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_mcast) +
msg_len;
@@ -994,6 +1160,7 @@ cs_error_t cpg_mcast_joined (
error = qb_to_cs_error(qb_ipcc_sendv(cpg_inst->c, iov, iov_len + 1));
qb_ipcc_fc_enable_max_set(cpg_inst->c, 1);
+error_exit:
hdb_handle_put (&cpg_handle_t_db, handle);
return (error);
diff --git a/test/Makefile.am b/test/Makefile.am
index c19e506..bb11518 100644
--- a/test/Makefile.am
+++ b/test/Makefile.am
@@ -34,7 +34,7 @@ MAINTAINERCLEANFILES = Makefile.in
EXTRA_DIST = ploadstart.sh
-noinst_PROGRAMS = cpgverify testcpg testcpg2 cpgbench \
+noinst_PROGRAMS = cpgverify testcpg testcpg2 cpgbench cpghum \
testquorum testvotequorum1 testvotequorum2 \
stress_cpgfdget stress_cpgcontext cpgbound testsam \
testcpgzc cpgbenchzc testzcgc stress_cpgzc
@@ -48,6 +48,7 @@ testzcgc_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la
stress_cpgzc_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la
stress_cpgfdget_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la
stress_cpgcontext_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la
+cpghum_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la -lz
testquorum_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libquorum.la
testvotequorum1_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libvotequorum.la
testvotequorum2_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libvotequorum.la
--
1.7.1