[prev in list] [next in list] [prev in thread] [next in thread]
List: hail-devel
Subject: [PATCH] Re: [PATCHv2 2/2] cld: kill CLD_MAX_PKT_MSG, add CLD_MAX_PAYLOAD_SZ
From: Jeff Garzik <jeff () garzik ! org>
Date: 2010-02-03 23:20:42
Message-ID: 4B6A04CA.7040705 () garzik ! org
[Download RAW message or body]
On 02/03/2010 08:45 AM, Colin McCabe wrote:
> Get rid of CLD_MAX_PKT_MSG. It only existed so that we could use static arrays
> in a few places.
>
> Create CLD_MAX_PAYLOAD_SZ to represent the maximum size of a message that the
> API user can GET or PUT. Reducing this constant could break users who
> relied on the old maximum data size, so we should try not to do it often.
>
> Signed-off-by: Colin McCabe<cmccabe@alumni.cmu.edu>
> ---
> include/cld_msg.h | 7 ++++---
> include/cldc.h | 4 ++--
> lib/cldc.c | 24 +++++++++++++++++-------
> server/msg.c | 7 +++++++
> server/session.c | 12 ++++++++----
> 5 files changed, 38 insertions(+), 16 deletions(-)
applied
I've attached the latest cld->cld.rpcgen working diff, current as of commit
commit a592ee77012f3c32d775c1e67fff66c944e7b5fe
Author: Colin McCabe <cmccabe@alumni.cmu.edu>
Date: Wed Feb 3 18:17:24 2010 -0500
libcldc: use typed message completion callback
Signed-off-by: Colin McCabe <cmccabe@alumni.cmu.edu>
Signed-off-by: Jeff Garzik <jgarzik@redhat.com>
["patch" (text/plain)]
diff -X /garz/tmp/dontdiff -urNp cld/.gitignore cld.rpcgen/.gitignore
--- cld/.gitignore 2010-01-29 00:36:25.000000000 -0500
+++ cld.rpcgen/.gitignore 2010-01-22 18:15:07.000000000 -0500
@@ -32,5 +32,9 @@ cld-config.h*
cscope.*
ncscope.*
+# XDR files
+*_rpc.h
+*_rpc_xdr.c
+
# ignore Doxygen output directory
gendoc
diff -X /garz/tmp/dontdiff -urNp cld/include/cldc.h cld.rpcgen/include/cldc.h
--- cld/include/cldc.h 2010-02-03 17:54:01.000000000 -0500
+++ cld.rpcgen/include/cldc.h 2010-02-03 15:57:56.000000000 -0500
@@ -22,7 +22,7 @@
#include <sys/types.h>
#include <stdbool.h>
#include <glib.h>
-#include <cld_msg.h>
+#include <cld_msg_rpc.h>
#include <cld_common.h>
#include <hail_log.h>
@@ -35,34 +35,30 @@ struct cldc_call_opts {
void *private;
/* private; lib-owned */
- enum cld_msg_ops op;
- union {
- struct {
- struct cld_msg_get_resp resp;
- const char *buf;
- unsigned int size;
- char inode_name[CLD_INODE_NAME_MAX + 1];
- } get;
- } u;
+ struct cld_msg_get_resp resp;
};
+void cldc_call_opts_get_data(struct cldc_call_opts *copts,
+ char **data, size_t *data_len);
+
struct cldc_pkt_info {
int pkt_len;
+ int hdr_len;
int retries;
+ char user[CLD_MAX_USERNAME];
/* must be at end of struct */
- struct cld_packet pkt;
- uint8_t data[0];
+ char data[0];
};
/** an outgoing message, from client to server */
struct cldc_msg {
uint64_t xid;
-
+ enum cld_msg_op op;
struct cldc_session *sess;
ssize_t (*cb)(struct cldc_msg *, const void *, size_t,
- enum cle_err_codes);
+ enum cle_err_codes resp_rc);
void *cb_private;
struct cldc_call_opts copts;
@@ -71,18 +67,15 @@ struct cldc_msg {
time_t expire_time;
- int data_len;
int n_pkts;
- uint8_t *data;
-
/* must be at end of struct */
struct cldc_pkt_info *pkt_info[0];
};
/** an open file handle associated with a session */
struct cldc_fh {
- uint64_t fh_le; /* fh id, LE */
+ uint64_t fh;
struct cldc_session *sess;
bool valid;
};
@@ -129,8 +122,11 @@ struct cldc_session {
bool confirmed;
+ enum cld_msg_op msg_buf_op;
unsigned int msg_buf_len;
char msg_buf[CLD_MAX_MSG_SZ];
+ char payload[CLD_MAX_PAYLOAD_SZ];
+ char inode_name_temp[CLD_INODE_NAME_MAX];
};
/** Information for a single CLD server host */
diff -X /garz/tmp/dontdiff -urNp cld/include/cld_common.h cld.rpcgen/include/cld_common.h
--- cld/include/cld_common.h 2010-02-03 00:13:44.000000000 -0500
+++ cld.rpcgen/include/cld_common.h 2010-02-03 15:58:19.000000000 -0500
@@ -3,7 +3,6 @@
/*
* Copyright 2009 Red Hat, Inc.
- * Copyright 2010, Colin McCabe
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -24,6 +23,7 @@
#include <stdbool.h>
#include <string.h>
#include <time.h>
+#include <cld_msg_rpc.h>
#define CLD_ALIGN8(n) ((8 - ((n) & 7)) & 7)
@@ -91,7 +91,7 @@ extern int __cld_authsign(struct hail_lo
const void *buf, size_t buf_len, void *sha);
/* Returns a constant string representing a message operation */
-extern const char *__cld_opstr(enum cld_msg_ops op);
+extern const char *__cld_opstr(enum cld_msg_op);
/*
* We use a unified format for sid so it can be searched in log files (* in vi).
diff -X /garz/tmp/dontdiff -urNp cld/include/cld_msg.h cld.rpcgen/include/cld_msg.h
--- cld/include/cld_msg.h 2010-02-03 17:33:54.000000000 -0500
+++ cld.rpcgen/include/cld_msg.h 1969-12-31 19:00:00.000000000 -0500
@@ -1,245 +0,0 @@
-#ifndef __CLD_MSG_H__
-#define __CLD_MSG_H__
-
-/*
- * Copyright 2009 Red Hat, Inc.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; see the file COPYING. If not, write to
- * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
- *
- */
-
-#include <stdint.h>
-
-#define CLD_PKT_MAGIC "CLDc1pkt"
-#define CLD_MSG_MAGIC "CLDc1msg"
-
-enum {
- CLD_MAGIC_SZ = 8, /**< length of magic number */
- CLD_SID_SZ = 8, /**< length of session id */
-
- CLD_INODE_NAME_MAX = 256, /**< max total pathname len */
-
- CLD_MAX_USERNAME = 32, /**< includes req. nul */
- CLD_MAX_SECRET_KEY = 128, /**< includes req. nul */
-
- CLD_MAX_PKT_MSG_SZ = 1024,
- CLD_MAX_PAYLOAD_SZ = 131072, /**< maximum size of data that users
- can GET or PUT */
- CLD_MAX_MSG_SZ = 196608, /**< maximum total
- msg size, including all packets */
-};
-
-/** available RPC operations */
-enum cld_msg_ops {
- /* client -> server */
- CMO_NOP = 0, /**< no op */
- CMO_NEW_SESS = 1, /**< new session */
- CMO_OPEN = 2, /**< open file */
- CMO_GET_META = 3, /**< get metadata */
- CMO_GET = 4, /**< get metadata + data */
- CMO_PUT = 6, /**< put data */
- CMO_CLOSE = 7, /**< close file */
- CMO_DEL = 8, /**< delete file */
- CMO_LOCK = 9, /**< lock */
- CMO_UNLOCK = 10, /**< unlock */
- CMO_TRYLOCK = 11, /**< trylock */
- CMO_ACK = 12, /**< ack of seqid rx'd */
- CMO_END_SESS = 13, /**< end session */
-
- /* server -> client */
- CMO_PING = 30, /**< server to client ping */
- CMO_NOT_MASTER = 31, /**< I am not the master! */
- CMO_EVENT = 32, /**< server->cli async event */
- CMO_ACK_FRAG = 33, /**< ack partial msg */
-};
-
-/** CLD error codes */
-enum cle_err_codes {
- CLE_OK = 0, /**< success / no error */
- CLE_SESS_EXISTS = 1, /**< session exists */
- CLE_SESS_INVAL = 2, /**< session doesn't exist */
- CLE_DB_ERR = 3, /**< db error */
- CLE_BAD_PKT = 4, /**< invalid/corrupted packet */
- CLE_INODE_INVAL = 5, /**< inode doesn't exist */
- CLE_NAME_INVAL = 6, /**< inode name invalid */
- CLE_OOM = 7, /**< server out of memory */
- CLE_FH_INVAL = 8, /**< file handle invalid */
- CLE_DATA_INVAL = 9, /**< invalid data pkt */
- CLE_LOCK_INVAL = 10, /**< invalid lock */
- CLE_LOCK_CONFLICT = 11, /**< conflicting lock held */
- CLE_LOCK_PENDING = 12, /**< lock waiting to be acq. */
- CLE_MODE_INVAL = 13, /**< op incompat. w/ file mode */
- CLE_INODE_EXISTS = 14, /**< inode exists */
- CLE_DIR_NOTEMPTY = 15, /**< dir not empty */
- CLE_INTERNAL_ERR = 16, /**< nonspecific internal err */
- CLE_TIMEOUT = 17, /**< session timed out */
- CLE_SIG_INVAL = 18, /**< HMAC sig bad / auth failed */
-};
-
-/** availble OPEN mode flags */
-enum cld_open_modes {
- COM_READ = (1 << 0), /**< read */
- COM_WRITE = (1 << 1), /**< write */
- COM_LOCK = (1 << 2), /**< lock */
- COM_ACL = (1 << 3), /**< ACL update */
- COM_CREATE = (1 << 4), /**< create file, if not exist */
- COM_EXCL = (1 << 5), /**< fail create if file exists */
- COM_DIRECTORY = (1 << 6), /**< operate on a directory */
-};
-
-/** potential events client may receive */
-enum cld_events {
- CE_UPDATED = (1 << 0), /**< contents updated */
- CE_DELETED = (1 << 1), /**< inode deleted */
- CE_LOCKED = (1 << 2), /**< lock acquired */
- CE_MASTER_FAILOVER = (1 << 3), /**< master failover */
- CE_SESS_FAILED = (1 << 4),
-};
-
-/** LOCK flags */
-enum cld_lock_flags {
- CLF_SHARED = (1 << 0), /**< a shared (read) lock */
-};
-
-/** CLD packet flags */
-enum cld_packet_flags {
- CPF_FIRST = (1 << 0), /**< first fragment */
- CPF_LAST = (1 << 1), /**< last fragment */
-};
-
-/** header for each packet */
-struct cld_packet {
- uint8_t magic[CLD_MAGIC_SZ]; /**< magic number; constant */
- uint64_t seqid; /**< sequence id */
- uint8_t sid[CLD_SID_SZ]; /**< client id */
- uint32_t flags; /**< CPF_xxx flags */
- uint8_t res[4];
- char user[CLD_MAX_USERNAME]; /**< authenticated user */
-};
-
-/** header for each message */
-struct cld_msg_hdr {
- uint8_t magic[CLD_MAGIC_SZ]; /**< magic number; constant */
- uint64_t xid; /**< opaque message id */
- uint8_t op; /**< operation code */
- uint8_t res1[7];
-};
-
-/** standard response for each message */
-struct cld_msg_resp {
- struct cld_msg_hdr hdr;
-
- uint32_t code; /**< error code, CLE_xxx */
- uint32_t rsv; /**< reserved */
- uint64_t xid_in; /**< C->S xid */
-};
-
-/** ACK-FRAG message */
-struct cld_msg_ack_frag {
- struct cld_msg_hdr hdr;
-
- uint64_t seqid; /**< sequence id to ack */
-};
-
-/** OPEN message */
-struct cld_msg_open {
- struct cld_msg_hdr hdr;
-
- uint32_t mode; /**< open mode, COM_xxx */
- uint32_t events; /**< events mask, CE_xxx */
- uint16_t name_len; /**< length of file name */
- uint8_t res[6];
- /* inode name */
-};
-
-/** OPEN message response */
-struct cld_msg_open_resp {
- struct cld_msg_resp resp;
-
- uint64_t fh; /**< handle opened */
-};
-
-/** GET message */
-struct cld_msg_get {
- struct cld_msg_hdr hdr;
-
- uint64_t fh; /**< open file handle */
-};
-
-/** GET message response */
-struct cld_msg_get_resp {
- struct cld_msg_resp resp;
-
- uint64_t inum; /**< unique inode number */
- uint32_t ino_len; /**< inode name len */
- uint32_t size; /**< data size */
- uint64_t version; /**< inode version */
- uint64_t time_create; /**< creation time */
- uint64_t time_modify; /**< last modification time */
- uint32_t flags; /**< inode flags; CIFL_xxx */
- uint8_t res[4];
- /* inode name */
-};
-
-/** PUT message */
-struct cld_msg_put {
- struct cld_msg_hdr hdr;
-
- uint64_t fh; /**< open file handle */
- uint32_t data_size; /**< total size of data */
- uint8_t res[4];
-};
-
-/** CLOSE message */
-struct cld_msg_close {
- struct cld_msg_hdr hdr;
-
- uint64_t fh; /**< open file handle */
-};
-
-/** DEL message */
-struct cld_msg_del {
- struct cld_msg_hdr hdr;
-
- uint16_t name_len; /**< length of file name */
- uint8_t res[6];
- /* inode name */
-};
-
-/** UNLOCK message */
-struct cld_msg_unlock {
- struct cld_msg_hdr hdr;
-
- uint64_t fh; /**< open file handle */
-};
-
-/** LOCK message */
-struct cld_msg_lock {
- struct cld_msg_hdr hdr;
-
- uint64_t fh; /**< open file handle */
- uint32_t flags; /**< CLF_xxx */
- uint8_t res[4];
-};
-
-/** Server-to-client EVENT message */
-struct cld_msg_event {
- struct cld_msg_hdr hdr;
-
- uint64_t fh; /**< open file handle */
- uint32_t events; /**< CE_xxx */
- uint8_t res[4];
-};
-
-#endif /* __CLD_MSG_H__ */
diff -X /garz/tmp/dontdiff -urNp cld/include/cld_pkt.h cld.rpcgen/include/cld_pkt.h
--- cld/include/cld_pkt.h 1969-12-31 19:00:00.000000000 -0500
+++ cld.rpcgen/include/cld_pkt.h 2010-02-03 00:12:58.000000000 -0500
@@ -0,0 +1,56 @@
+#ifndef __CLD_PKT_H__
+#define __CLD_PKT_H__
+
+/*
+ * Copyright 2010, Colin McCabe
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING. If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#include <openssl/sha.h>
+#include <cld_msg_rpc.h>
+#include <stdbool.h>
+
+/* @file cld_pkt.h
+ *
+ * This file has some definitions and helper functions pertaining to the CLD
+ * network protocol. Unlike cld_msg.x, it's not an XDR file.
+ */
+
+/* Returns a string representation of a packet header
+ *
+ * @param scratch (out param) buffer of length
+ * PKT_HDR_TO_STR_SCRATCH_LEN
+ * @param pkt_hdr packet header
+ * @param pkt_len length of packet
+ *
+ * @return pointer to 'scratch'
+ */
+const char *__cld_pkt_hdr_to_str(char *scratch,
+ const char *pkt_hdr, size_t pkt_len);
+
+void __cld_dump_buf(const void *buf, size_t len);
+
+/** Footer that appears at the end of each packet */
+struct __attribute__((packed)) cld_pkt_ftr {
+ uint64_t seqid; /**< packet sequence ID */
+ char sha[SHA_DIGEST_LENGTH]; /**< packet signature */
+};
+
+/** Length of the packet footer. This size is fixed */
+#define CLD_PKT_FTR_LEN sizeof(struct cld_pkt_ftr)
+
+#define PKT_HDR_TO_STR_SCRATCH_LEN 128
+
+#endif
diff -X /garz/tmp/dontdiff -urNp cld/include/Makefile.am cld.rpcgen/include/Makefile.am
--- cld/include/Makefile.am 2010-02-02 20:22:12.000000000 -0500
+++ cld.rpcgen/include/Makefile.am 2010-02-02 22:51:08.000000000 -0500
@@ -1,5 +1,5 @@
EXTRA_DIST = cld-private.h
-include_HEADERS = cldc.h hail_log.h cld_common.h cld_msg.h
+include_HEADERS = cldc.h hail_log.h cld_common.h cld_pkt.h
diff -X /garz/tmp/dontdiff -urNp cld/lib/cldc.c cld.rpcgen/lib/cldc.c
--- cld/lib/cldc.c 2010-02-03 18:03:31.000000000 -0500
+++ cld.rpcgen/lib/cldc.c 2010-02-03 17:49:51.000000000 -0500
@@ -35,6 +35,8 @@
#include <glib.h>
#include <cld-private.h>
#include <cldc.h>
+#include <cld_pkt.h>
+#include <cld_msg_rpc.h>
#include <syslog.h>
enum {
@@ -47,12 +49,7 @@ enum {
static const char *user_key(struct cldc_session *sess, const char *user);
static int sess_send_pkt(struct cldc_session *sess,
- const struct cld_packet *pkt, size_t pkt_len);
-
-static const struct cld_msg_hdr def_msg_ack = {
- .magic = CLD_MSG_MAGIC,
- .op = CMO_ACK,
-};
+ const void *pkt, size_t pkt_len);
#ifndef HAVE_STRNLEN
static size_t strnlen(const char *s, size_t maxlen)
@@ -80,6 +77,13 @@ static size_t strnlen(const char *s, siz
#define EBADE 52
#endif
+void cldc_call_opts_get_data(struct cldc_call_opts *copts,
+ char **data, size_t *data_len)
+{
+ *data = copts->resp.data.data_val;
+ *data_len = copts->resp.data.data_len;
+}
+
static void cldc_errlog(int prio, const char *fmt, ...)
{
char buf[200];
@@ -93,48 +97,72 @@ static void cldc_errlog(int prio, const
static int ack_seqid(struct cldc_session *sess, uint64_t seqid_le)
{
- struct cld_packet *pkt;
- struct cld_msg_hdr *resp;
- size_t pkt_len;
+ XDR xdrs;
+ size_t hdr_len, total_len;
+ char buf[CLD_MAX_PKT_MSG_SZ];
+ struct cld_pkt_hdr pkt;
+ struct cld_pkt_ftr *foot;
int ret;
+ static const char * const magic = CLD_PKT_MAGIC;
const char *secret_key;
- pkt_len = sizeof(*pkt) + sizeof(*resp) + SHA_DIGEST_LENGTH;
- pkt = alloca(pkt_len);
- memset(pkt, 0, pkt_len);
-
- memcpy(pkt->magic, CLD_PKT_MAGIC, CLD_MAGIC_SZ);
- pkt->seqid = seqid_le;
- memcpy(pkt->sid, sess->sid, CLD_SID_SZ);
- pkt->flags = cpu_to_le32(CPF_FIRST | CPF_LAST);
- strncpy(pkt->user, sess->user, CLD_MAX_USERNAME - 1);
+ /* Construct ACK packet */
+ memset(&pkt, 0, sizeof(struct cld_pkt_hdr));
+ memcpy(&pkt.magic, magic, sizeof(pkt.magic));
+ memcpy(&pkt.sid, sess->sid, CLD_SID_SZ);
+ pkt.user = sess->user;
+ pkt.mi.order = CLD_PKT_ORD_FIRST_LAST;
+ pkt.mi.cld_pkt_msg_info_u.mi.xid = 0;
+ pkt.mi.cld_pkt_msg_info_u.mi.op = CMO_ACK;
+
+ /* Serialize packet */
+ xdrmem_create(&xdrs, (char *)buf,
+ sizeof(buf) - CLD_PKT_FTR_LEN, XDR_ENCODE);
+ if (!xdr_cld_pkt_hdr(&xdrs, &pkt)) {
+ HAIL_DEBUG(&sess->log, "%s: failed to encode header "
+ "for ack_seqid %llu",
+ __func__,
+ (unsigned long long) seqid_le);
+ xdr_destroy(&xdrs);
+ return -1009;
+ }
- resp = (struct cld_msg_hdr *) (pkt + 1);
- memcpy(resp, &def_msg_ack, sizeof(*resp));
+ /* Fill in footer */
+ hdr_len = xdr_getpos(&xdrs);
+ total_len = hdr_len + CLD_PKT_FTR_LEN;
+ foot = (struct cld_pkt_ftr *)(buf + hdr_len);
+ foot->seqid = seqid_le;
+ xdr_destroy(&xdrs);
secret_key = user_key(sess, sess->user);
ret = __cld_authsign(&sess->log, secret_key,
- pkt, pkt_len - SHA_DIGEST_LENGTH,
- (uint8_t *)pkt + pkt_len - SHA_DIGEST_LENGTH);
+ buf, total_len - SHA_DIGEST_LENGTH, foot->sha);
if (ret) {
HAIL_ERR(&sess->log, "%s: authsign failed: %d",
__func__, ret);
return ret;
}
- return sess_send_pkt(sess, pkt, pkt_len);
+ return sess_send_pkt(sess, buf, total_len);
}
static int rxmsg_generic(struct cldc_session *sess,
- const struct cld_packet *pkt,
- const void *msgbuf, size_t buflen)
+ const struct cld_pkt_hdr *pkt,
+ const struct cld_pkt_ftr *foot)
{
- const struct cld_msg_resp *resp = msgbuf;
+ XDR xdrs;
+ struct cld_msg_generic_resp resp;
struct cldc_msg *req = NULL;
GList *tmp;
- if (buflen < sizeof(*resp))
+ xdrmem_create(&xdrs, sess->msg_buf, sess->msg_buf_len, XDR_DECODE);
+ if (!xdr_cld_msg_generic_resp(&xdrs, &resp)) {
+ HAIL_DEBUG(&sess->log, "%s: failed to decode "
+ "cld_msg_generic_resp", __func__);
+ xdr_destroy(&xdrs);
return -1008;
+ }
+ xdr_destroy(&xdrs);
/* Find out which outbound message this was a response to */
tmp = sess->out_msg;
@@ -144,10 +172,10 @@ static int rxmsg_generic(struct cldc_ses
HAIL_DEBUG(&sess->log, "%s: comparing req->xid (%llu) "
"with resp.xid_in (%llu)",
__func__,
- (unsigned long long) le64_to_cpu(req->xid),
- (unsigned long long) le64_to_cpu(resp->xid_in));
+ (unsigned long long) req->xid,
+ (unsigned long long) resp.xid_in);
- if (req->xid == resp->xid_in)
+ if (req->xid == resp.xid_in)
break;
tmp = tmp->next;
}
@@ -155,7 +183,7 @@ static int rxmsg_generic(struct cldc_ses
HAIL_DEBUG(&sess->log, "%s: no match found with "
"xid_in %llu",
__func__,
- (unsigned long long) le64_to_cpu(resp->xid_in));
+ (unsigned long long) resp.xid_in);
return -1005;
}
@@ -168,28 +196,37 @@ static int rxmsg_generic(struct cldc_ses
req->done = true;
if (req->cb) {
- ssize_t rc = req->cb(req, msgbuf, buflen, resp->code);
+ ssize_t rc = req->cb(req, sess->msg_buf,
+ sess->msg_buf_len, resp.code);
if (rc < 0)
return rc;
}
}
- return ack_seqid(sess, pkt->seqid);
+ return ack_seqid(sess, foot->seqid);
}
static int rxmsg_ack_frag(struct cldc_session *sess,
- const struct cld_packet *pkt,
- const void *msgbuf, size_t buflen)
+ const struct cld_pkt_hdr *pkt,
+ const struct cld_pkt_ftr *foot)
{
- const struct cld_msg_ack_frag *ack_msg = msgbuf;
+ XDR xdrs;
+ struct cld_msg_ack_frag ack_msg;
GList *tmp;
- if (buflen < sizeof(*ack_msg))
+ xdrmem_create(&xdrs, sess->msg_buf, sess->msg_buf_len, XDR_DECODE);
+ memset(&ack_msg, 0, sizeof(ack_msg));
+ if (!xdr_cld_msg_ack_frag(&xdrs, &ack_msg)) {
+ HAIL_INFO(&sess->log, "%s: failed to decode ack_msg",
+ __func__);
+ xdr_destroy(&xdrs);
return -1008;
+ }
+ xdr_destroy(&xdrs);
HAIL_INFO(&sess->log, "%s: seqid %llu, want to ack",
__func__,
- (unsigned long long) ack_msg->seqid);
+ (unsigned long long) ack_msg.seqid);
tmp = sess->out_msg;
while (tmp) {
@@ -201,18 +238,21 @@ static int rxmsg_ack_frag(struct cldc_se
for (i = 0; i < req->n_pkts; i++) {
struct cldc_pkt_info *pi;
+ struct cld_pkt_ftr *f;
uint64_t seqid;
pi = req->pkt_info[i];
if (!pi)
continue;
- seqid = pi->pkt.seqid;
- if (seqid != ack_msg->seqid)
+ f = (struct cld_pkt_ftr *)
+ pi->data + (pi->pkt_len - CLD_PKT_FTR_LEN);
+ seqid = le64_to_cpu(f->seqid);
+ if (seqid != ack_msg.seqid)
continue;
HAIL_DEBUG(&sess->log, "%s: seqid %llu, expiring",
__func__,
- (unsigned long long) ack_msg->seqid);
+ (unsigned long long) ack_msg.seqid);
req->pkt_info[i] = NULL;
free(pi);
@@ -223,19 +263,26 @@ static int rxmsg_ack_frag(struct cldc_se
}
static int rxmsg_event(struct cldc_session *sess,
- const struct cld_packet *pkt,
- const void *msgbuf, size_t buflen)
+ const struct cld_pkt_hdr *pkt,
+ const struct cld_pkt_ftr *foot)
{
- const struct cld_msg_event *ev = msgbuf;
+ XDR xdrs;
+ struct cld_msg_event ev;
struct cldc_fh *fh = NULL;
int i;
- if (buflen < sizeof(*ev))
+ xdrmem_create(&xdrs, sess->msg_buf, sess->msg_buf_len, XDR_DECODE);
+ if (!xdr_cld_msg_event(&xdrs, &ev)) {
+ HAIL_INFO(&sess->log, "%s: failed to decode cld_msg_event",
+ __func__);
+ xdr_destroy(&xdrs);
return -1008;
+ }
+ xdr_destroy(&xdrs);
for (i = 0; i < sess->fh->len; i++) {
fh = &g_array_index(sess->fh, struct cldc_fh, i);
- if (fh->fh_le == ev->fh)
+ if (fh->fh == ev.fh)
break;
else
fh = NULL;
@@ -244,19 +291,11 @@ static int rxmsg_event(struct cldc_sessi
if (!fh)
return -1011;
- sess->ops->event(sess->private, sess, fh, le32_to_cpu(ev->events));
+ sess->ops->event(sess->private, sess, fh, ev.events);
return 0;
}
-static int rxmsg_not_master(struct cldc_session *sess,
- const struct cld_packet *pkt,
- const void *msgbuf, size_t buflen)
-{
- HAIL_DEBUG(&sess->log, "FIXME: not-master message received");
- return -1055; /* FIXME */
-}
-
static void cldc_msg_free(struct cldc_msg *msg)
{
int i;
@@ -264,8 +303,6 @@ static void cldc_msg_free(struct cldc_ms
if (!msg)
return;
- free(msg->data);
-
for (i = 0; i < msg->n_pkts; i++)
free(msg->pkt_info[i]);
@@ -304,128 +341,121 @@ static const char *user_key(struct cldc_
return sess->secret_key;
}
-static int cldc_receive_msg(struct cldc_session *sess,
- const struct cld_packet *pkt,
- size_t pkt_len)
+static int rx_complete(struct cldc_session *sess,
+ const struct cld_pkt_hdr *pkt,
+ const struct cld_pkt_ftr *foot)
{
- const struct cld_msg_hdr *msg = (struct cld_msg_hdr *) sess->msg_buf;
- size_t msglen = sess->msg_buf_len;
-
- if (memcmp(msg->magic, CLD_MSG_MAGIC, sizeof(msg->magic))) {
- HAIL_DEBUG(&sess->log, "%s: bad msg magic", __func__);
- return -EPROTO;
+ switch (sess->msg_buf_op) {
+ case CMO_ACK:
+ HAIL_INFO(&sess->log, "%s: received unexpected ACK",
+ __func__);
+ return -EBADRQC;
+ case CMO_PING:
+ /* send out an ACK */
+ return ack_seqid(sess, foot->seqid);
+ case CMO_NOT_MASTER:
+ HAIL_INFO(&sess->log, "FIXME: not-master message received");
+ return -1055; /* FIXME */
+ case CMO_EVENT:
+ return rxmsg_event(sess, pkt, foot);
+ case CMO_ACK_FRAG:
+ return rxmsg_ack_frag(sess, pkt, foot);
+ default:
+ return rxmsg_generic(sess, pkt, foot);
}
+}
- switch(msg->op) {
- case CMO_NOP:
- case CMO_CLOSE:
- case CMO_DEL:
- case CMO_LOCK:
- case CMO_UNLOCK:
- case CMO_TRYLOCK:
- case CMO_PUT:
+/** Accepts a packet's sequence ID.
+ * Depending on the message op, this may involve initializing the session's
+ * sequence ID, validating that the packet's ID is in range, or doing nothing.
+ *
+ * @param sess The session
+ * @param seqid The sequence ID
+ * @param op The message op
+ *
+ * @return 0 on success; error code otherwise
+ */
+static int accept_seqid(struct cldc_session *sess, uint64_t seqid,
+ enum cld_msg_op op)
+{
+ switch (op) {
case CMO_NEW_SESS:
- case CMO_END_SESS:
- case CMO_OPEN:
- case CMO_GET_META:
- case CMO_GET:
- return rxmsg_generic(sess, pkt, msg, msglen);
+ /* CMO_NEW_SESS initializes the session's sequence id */
+ sess->next_seqid_in = seqid + 1;
+ sess->next_seqid_in_tr =
+ sess->next_seqid_in - CLDC_MSG_REMEMBER;
+ HAIL_DEBUG(&sess->log, "%s: setting next_seqid_in to %llu",
+ __func__, (unsigned long long) seqid);
+ return 0;
+
case CMO_NOT_MASTER:
- return rxmsg_not_master(sess, pkt, msg, msglen);
case CMO_ACK_FRAG:
- return rxmsg_ack_frag(sess, pkt, msg, msglen);
- case CMO_EVENT:
- return rxmsg_event(sess, pkt, msg, msglen);
- case CMO_PING:
- return ack_seqid(sess, pkt->seqid);
- case CMO_ACK:
- return -EBADRQC;
- }
+ /* Ignore sequence ID of these types */
+ return 0;
- /* unknown op code */
- return -EBADRQC;
+ default:
+ /* verify that the sequence id is in range */
+ if (seqid == sess->next_seqid_in) {
+ sess->next_seqid_in++;
+ sess->next_seqid_in_tr++;
+ return 0;
+ }
+
+ if (seqid_in_range(seqid,
+ sess->next_seqid_in_tr,
+ sess->next_seqid_in)) {
+ return 0;
+ }
+
+ return -EBADSLT;
+ }
}
int cldc_receive_pkt(struct cldc_session *sess,
const void *net_addr, size_t net_addrlen,
const void *pktbuf, size_t pkt_len)
{
- const struct cld_packet *pkt = pktbuf;
- const struct cld_msg_hdr *msg = (struct cld_msg_hdr *) (pkt + 1);
const char *secret_key;
- size_t msglen;
struct timeval tv;
time_t current_time;
+ struct cld_pkt_hdr pkt;
+ unsigned int hdr_len, msg_len;
+ const struct cld_pkt_ftr *foot;
uint64_t seqid;
- uint32_t pkt_flags;
- bool first_frag, last_frag, have_new_sess, no_seqid;
- bool have_get;
+ XDR xdrs;
int ret;
gettimeofday(&tv, NULL);
current_time = tv.tv_sec;
- if (pkt_len < (sizeof(*pkt) + SHA_DIGEST_LENGTH)) {
- HAIL_DEBUG(&sess->log, "%s: msg too short", __func__);
+ /* Decode the packet header */
+ if (pkt_len < CLD_PKT_FTR_LEN) {
+ HAIL_DEBUG(&sess->log, "%s: packet too short to have a "
+ "well-formed footer", __func__);
return -EPROTO;
}
-
- msglen = pkt_len - sizeof(*pkt) - SHA_DIGEST_LENGTH;
-
- pkt_flags = le32_to_cpu(pkt->flags);
- first_frag = pkt_flags & CPF_FIRST;
- last_frag = pkt_flags & CPF_LAST;
- have_get = first_frag && (msg->op == CMO_GET);
- have_new_sess = first_frag && (msg->op == CMO_NEW_SESS);
- no_seqid = first_frag && ((msg->op == CMO_NOT_MASTER) ||
- (msg->op == CMO_ACK_FRAG));
-
- if (sess->log.verbose) {
- if (have_get) {
- struct cld_msg_get_resp *dp;
- dp = (struct cld_msg_get_resp *) msg;
- HAIL_DEBUG(&sess->log, "%s(len %u, op %s"
- ", seqid %llu, user %s, size %u)",
- __func__,
- (unsigned int) pkt_len,
- __cld_opstr(msg->op),
- (unsigned long long) le64_to_cpu(pkt->seqid),
- pkt->user,
- le32_to_cpu(dp->size));
- } else if (have_new_sess) {
- struct cld_msg_resp *dp;
- dp = (struct cld_msg_resp *) msg;
- HAIL_DEBUG(&sess->log, "%s(len %u, op %s"
- ", seqid %llu, user %s, xid_in %llu)",
- __func__,
- (unsigned int) pkt_len,
- __cld_opstr(msg->op),
- (unsigned long long) le64_to_cpu(pkt->seqid),
- pkt->user,
- (unsigned long long) le64_to_cpu(dp->xid_in));
- } else {
- HAIL_DEBUG(&sess->log, "%s(len %u, "
- "flags %s%s, op %s, seqid %llu, user %s)",
- __func__,
- (unsigned int) pkt_len,
- first_frag ? "F" : "",
- last_frag ? "L" : "",
- first_frag ? __cld_opstr(msg->op) : "n/a",
- (unsigned long long) le64_to_cpu(pkt->seqid),
- pkt->user);
- }
+ xdrmem_create(&xdrs, (void *)pktbuf,
+ pkt_len - CLD_PKT_FTR_LEN, XDR_DECODE);
+ memset(&pkt, 0, sizeof(pkt));
+ if (!xdr_cld_pkt_hdr(&xdrs, &pkt)) {
+ HAIL_DEBUG(&sess->log, "%s: failed to decode packet header",
+ __func__);
+ xdr_destroy(&xdrs);
+ return -EPROTO;
}
-
- if (memcmp(pkt->magic, CLD_PKT_MAGIC, sizeof(pkt->magic))) {
+ hdr_len = xdr_getpos(&xdrs);
+ xdr_destroy(&xdrs);
+ if (memcmp(&pkt.magic, CLD_PKT_MAGIC, sizeof(pkt.magic))) {
HAIL_DEBUG(&sess->log, "%s: bad pkt magic", __func__);
return -EPROTO;
}
/* check HMAC signature */
- secret_key = user_key(sess, pkt->user);
+ foot = (const struct cld_pkt_ftr *)
+ (((char *)pktbuf) + (pkt_len - CLD_PKT_FTR_LEN));
+ secret_key = user_key(sess, pkt.user);
ret = __cld_authcheck(&sess->log, secret_key,
- pkt, pkt_len - SHA_DIGEST_LENGTH,
- (uint8_t *)pkt + pkt_len - SHA_DIGEST_LENGTH);
+ pktbuf, pkt_len - SHA_DIGEST_LENGTH, foot->sha);
if (ret) {
HAIL_DEBUG(&sess->log, "%s: invalid auth (ret=%d)",
__func__, ret);
@@ -443,48 +473,39 @@ int cldc_receive_pkt(struct cldc_session
if (current_time >= sess->msg_scan_time)
sess_expire_outmsg(sess, current_time);
- if (first_frag)
- sess->msg_buf_len = 0;
-
- if ((sess->msg_buf_len + msglen) > CLD_MAX_MSG_SZ) {
- HAIL_DEBUG(&sess->log, "%s: bad pkt length", __func__);
- return -EPROTO;
+ if (pkt.mi.order & CLD_PKT_IS_FIRST) {
+ /* This packet begins a new message.
+ * Determine the new message's op */
+ sess->msg_buf_op = pkt.mi.cld_pkt_msg_info_u.mi.op;
}
- memcpy(sess->msg_buf + sess->msg_buf_len, msg, msglen);
- sess->msg_buf_len += msglen;
-
- /* verify (or set, for new-sess) sequence id */
- seqid = le64_to_cpu(pkt->seqid);
- if (have_new_sess) {
- sess->next_seqid_in = seqid + 1;
- sess->next_seqid_in_tr =
- sess->next_seqid_in - CLDC_MSG_REMEMBER;
-
- HAIL_DEBUG(&sess->log, "%s: "
- "setting next_seqid_in to %llu",
+ seqid = le64_to_cpu(foot->seqid);
+ ret = accept_seqid(sess, seqid, sess->msg_buf_op);
+ if (ret) {
+ HAIL_DEBUG(&sess->log, "%s: bad seqid %llu",
__func__, (unsigned long long) seqid);
- } else if (!no_seqid) {
- if (seqid != sess->next_seqid_in) {
- if (seqid_in_range(seqid,
- sess->next_seqid_in_tr,
- sess->next_seqid_in))
- return ack_seqid(sess, pkt->seqid);
-
- HAIL_DEBUG(&sess->log, "%s: bad seqid %llu",
- __func__, (unsigned long long) seqid);
- return -EBADSLT;
- }
- sess->next_seqid_in++;
- sess->next_seqid_in_tr++;
+ return ret;
}
+ if (pkt.mi.order & CLD_PKT_IS_FIRST)
+ sess->msg_buf_len = 0;
+ msg_len = pkt_len - hdr_len - CLD_PKT_FTR_LEN;
+ if ((sess->msg_buf_len + msg_len) > CLD_MAX_MSG_SZ) {
+ HAIL_DEBUG(&sess->log, "%s: message too long", __func__);
+ return -EPROTO;
+ }
+ memcpy(sess->msg_buf + sess->msg_buf_len, pktbuf + hdr_len, msg_len);
+ sess->msg_buf_len += msg_len;
sess->expire_time = current_time + CLDC_SESS_EXPIRE;
- if (!last_frag)
- return sess ? ack_seqid(sess, pkt->seqid) : 0;
-
- return cldc_receive_msg(sess, pkt, pkt_len);
+ if (pkt.mi.order & CLD_PKT_IS_LAST) {
+ HAIL_DEBUG(&sess->log, "%s: receiving complete message of "
+ "op %s", __func__,
+ __cld_opstr(sess->msg_buf_op));
+ return rx_complete(sess, &pkt, foot);
+ } else {
+ return ack_seqid(sess, foot->seqid);
+ }
}
static void sess_next_seqid(struct cldc_session *sess, uint64_t *seqid)
@@ -493,19 +514,49 @@ static void sess_next_seqid(struct cldc_
*seqid = rc;
}
+/**
+ * creates a new cldc_msg
+ *
+ * @param sess The session
+ * @param copts The call options
+ * @param op The op of message to create
+ * @param xdrproc The XDR function to use to create the message body
+ * @param data The data to pass to xdrproc
+ *
+ * @return The cldc message, or NULL on error,
+ */
static struct cldc_msg *cldc_new_msg(struct cldc_session *sess,
const struct cldc_call_opts *copts,
- enum cld_msg_ops op,
- size_t msg_len)
+ enum cld_msg_op op,
+ xdrproc_t xdrproc, const void *data)
{
struct cldc_msg *msg;
- struct cld_msg_hdr *hdr;
struct timeval tv;
- int n_pkts, i, data_left;
- void *p;
+ size_t i, body_len, n_pkts;
+ char *body;
+ XDR xbdy;
+
+ /* Encode the message body */
+ body_len = xdr_sizeof(xdrproc, (void *)data);
+ body = alloca(body_len);
+ xdrmem_create(&xbdy, body, body_len, XDR_ENCODE);
+ if (!xdrproc(&xbdy, (void *)data)) {
+ HAIL_DEBUG(&sess->log, "%s: failed to encode "
+ "message", __func__);
+ xdr_destroy(&xbdy);
+ return NULL;
+ }
+ xdr_destroy(&xbdy);
- n_pkts = msg_len / CLD_MAX_PKT_MSG_SZ;
- n_pkts += ((msg_len % CLD_MAX_PKT_MSG_SZ) ? 1 : 0);
+ if (body_len == 0)
+ /* Some packets (like ACKS) just have a header, and no message
+ * body. */
+ n_pkts = 1;
+ else {
+ /* round up */
+ n_pkts = (body_len + CLD_MAX_PKT_MSG_SZ - 1) /
+ CLD_MAX_PKT_MSG_SZ;
+ }
/* Create cldc_msg */
msg = calloc(1, sizeof(*msg) +
@@ -513,57 +564,65 @@ static struct cldc_msg *cldc_new_msg(str
if (!msg)
return NULL;
- msg->data = calloc(1, msg_len);
- if (!msg->data) {
- free(msg);
- return NULL;
- }
-
msg->n_pkts = n_pkts;
__cld_rand64(&msg->xid);
-
+ msg->op = op;
msg->sess = sess;
-
if (copts)
memcpy(&msg->copts, copts, sizeof(msg->copts));
-
gettimeofday(&tv, NULL);
msg->expire_time = tv.tv_sec + CLDC_MSG_EXPIRE;
- msg->data_len = msg_len;
-
- p = msg->data;
- data_left = msg_len;
for (i = 0; i < msg->n_pkts; i++) {
+ XDR xhdr;
+ struct cld_pkt_hdr pkt;
struct cldc_pkt_info *pi;
- int pkt_len;
+ int hdr_len, body_chunk_len, pkt_len;
- pkt_len = MIN(data_left, CLD_MAX_PKT_MSG_SZ);
+ /* Set up packet header */
+ memcpy(&pkt.magic, CLD_PKT_MAGIC, sizeof(pkt.magic));
+ memcpy(&pkt.sid, sess->sid, CLD_SID_SZ);
+ pkt.user = sess->user;
+ if (i == 0) {
+ if (i == (msg->n_pkts - 1))
+ pkt.mi.order = CLD_PKT_ORD_FIRST_LAST;
+ else
+ pkt.mi.order = CLD_PKT_ORD_FIRST;
+ pkt.mi.cld_pkt_msg_info_u.mi.xid = msg->xid;
+ pkt.mi.cld_pkt_msg_info_u.mi.op = op;
+ } else {
+ if (i == (msg->n_pkts - 1))
+ pkt.mi.order = CLD_PKT_ORD_LAST;
+ else
+ pkt.mi.order = CLD_PKT_ORD_MID;
+ }
- pi = calloc(1, sizeof(*pi) + pkt_len + SHA_DIGEST_LENGTH);
+ /* Allocate memory */
+ hdr_len = xdr_sizeof((xdrproc_t)xdr_cld_pkt_hdr, &pkt);
+ body_chunk_len = MIN(body_len, CLD_MAX_PKT_MSG_SZ);
+ pkt_len = hdr_len + body_chunk_len + CLD_PKT_FTR_LEN;
+ pi = calloc(1, sizeof(*pi) + pkt_len);
if (!pi)
goto err_out;
-
pi->pkt_len = pkt_len;
+ msg->pkt_info[i] = pi;
+ strncpy(pi->user, sess->user, CLD_MAX_USERNAME - 1);
- memcpy(pi->pkt.magic, CLD_PKT_MAGIC, CLD_MAGIC_SZ);
- memcpy(pi->pkt.sid, sess->sid, CLD_SID_SZ);
- strncpy(pi->pkt.user, sess->user, CLD_MAX_USERNAME - 1);
-
- if (i == 0)
- pi->pkt.flags |= cpu_to_le32(CPF_FIRST);
- if (i == (msg->n_pkts - 1))
- pi->pkt.flags |= cpu_to_le32(CPF_LAST);
+ /* Fill in the packet header */
+ xdrmem_create(&xhdr, (char *)pi->data, hdr_len, XDR_ENCODE);
+ if (!xdr_cld_pkt_hdr(&xhdr, &pkt)) {
+ HAIL_DEBUG(&sess->log, "%s: failed to encode header "
+ "for packet %zu", __func__, i);
+ xdr_destroy(&xhdr);
+ goto err_out;
+ }
- msg->pkt_info[i] = pi;
- data_left -= pkt_len;
+ /* Fill in the body */
+ memcpy(pi->data + hdr_len, body, body_chunk_len);
+ body += body_chunk_len;
+ body_len -= body_chunk_len;
}
- hdr = (struct cld_msg_hdr *) msg->data;
- memcpy(&hdr->magic, CLD_MSG_MAGIC, CLD_MAGIC_SZ);
- hdr->op = op;
- hdr->xid = msg->xid;
-
return msg;
err_out:
@@ -602,32 +661,8 @@ static void sess_expire(struct cldc_sess
}
static int sess_send_pkt(struct cldc_session *sess,
- const struct cld_packet *pkt, size_t pkt_len)
+ const void *pkt, size_t pkt_len)
{
- if (sess->log.verbose) {
- uint32_t flags = le32_to_cpu(pkt->flags);
- bool first = (flags & CPF_FIRST);
- bool last = (flags & CPF_LAST);
- uint8_t op = CMO_NOP;
-
- if (first) {
- struct cld_msg_hdr *hdr;
-
- hdr = (struct cld_msg_hdr *) (pkt + 1);
- op = hdr->op;
- }
-
- HAIL_DEBUG(&sess->log,
- "%s(len %zu, flags %s%s, "
- "op %s, seqid %llu)",
- __func__,
- pkt_len,
- first ? "F" : "",
- last ? "L" : "",
- first ? __cld_opstr(op) : "n/a",
- (unsigned long long) le64_to_cpu(pkt->seqid));
- }
-
return sess->ops->pkt_send(sess->private,
sess->addr, sess->addr_len,
pkt, pkt_len);
@@ -656,18 +691,12 @@ static int sess_timer(struct cldc_sessio
for (i = 0; i < msg->n_pkts; i++) {
struct cldc_pkt_info *pi;
- int total_pkt_len;
pi = msg->pkt_info[i];
if (!pi)
continue;
-
- total_pkt_len = sizeof(struct cld_packet) +
- pi->pkt_len + SHA_DIGEST_LENGTH;
-
pi->retries++;
-
- sess_send_pkt(sess, &pi->pkt, total_pkt_len);
+ sess_send_pkt(sess, pi->data, pi->pkt_len);
}
}
@@ -679,40 +708,31 @@ static int sess_timer(struct cldc_sessio
static int sess_send(struct cldc_session *sess, struct cldc_msg *msg)
{
int ret, i;
- int data_left;
- void *p;
const char *secret_key;
secret_key = user_key(sess, sess->user);
- p = msg->data;
- data_left = msg->data_len;
for (i = 0; i < msg->n_pkts; i++) {
struct cldc_pkt_info *pi;
- int total_pkt_len;
+ struct cld_pkt_ftr *foot;
pi = msg->pkt_info[i];
- memcpy(pi->data, p, pi->pkt_len);
-
- total_pkt_len = sizeof(struct cld_packet) +
- pi->pkt_len + SHA_DIGEST_LENGTH;
/* Add the sequence number to the end of the packet */
- sess_next_seqid(sess, &pi->pkt.seqid);
-
- p += pi->pkt_len;
- data_left -= pi->pkt_len;
+ foot = (struct cld_pkt_ftr *)
+ (pi->data + pi->pkt_len - CLD_PKT_FTR_LEN);
+ memset(foot, 0, CLD_PKT_FTR_LEN);
+ sess_next_seqid(sess, &foot->seqid);
/* Add the signature to the end of the packet */
ret = __cld_authsign(&sess->log, secret_key,
- &pi->pkt, total_pkt_len-SHA_DIGEST_LENGTH,
- ((uint8_t *)&pi->pkt + total_pkt_len) -
- SHA_DIGEST_LENGTH);
+ pi->data,
+ pi->pkt_len - SHA_DIGEST_LENGTH,foot->sha);
if (ret)
return ret;
/* attempt first send */
- if (sess_send_pkt(sess, &pi->pkt, total_pkt_len) < 0)
+ if (sess_send_pkt(sess, pi->data, pi->pkt_len) < 0)
return -EIO;
}
@@ -762,7 +782,7 @@ int cldc_end_sess(struct cldc_session *s
/* create END-SESS message */
msg = cldc_new_msg(sess, copts, CMO_END_SESS,
- sizeof(struct cld_msg_hdr));
+ (xdrproc_t)xdr_void, NULL);
if (!msg)
return -ENOMEM;
@@ -774,10 +794,8 @@ int cldc_end_sess(struct cldc_session *s
static ssize_t new_sess_cb(struct cldc_msg *msg, const void *resp_p,
size_t resp_len, enum cle_err_codes resp_rc)
{
- struct cldc_session *sess = msg->sess;
-
if (resp_rc == CLE_OK)
- sess->confirmed = true;
+ msg->sess->confirmed = true;
if (msg->copts.cb)
return msg->copts.cb(&msg->copts, resp_rc);
@@ -830,7 +848,7 @@ int cldc_new_sess(const struct cldc_ops
/* create NEW-SESS message */
msg = cldc_new_msg(sess, copts, CMO_NEW_SESS,
- sizeof(struct cld_msg_hdr));
+ (xdrproc_t)xdr_void, NULL);
if (!msg) {
sess_free(sess);
return -ENOMEM;
@@ -877,7 +895,7 @@ int cldc_nop(struct cldc_session *sess,
/* create NOP message */
msg = cldc_new_msg(sess, copts, CMO_NOP,
- sizeof(struct cld_msg_hdr));
+ (xdrproc_t)xdr_void, NULL);
if (!msg)
return -ENOMEM;
@@ -890,8 +908,7 @@ int cldc_del(struct cldc_session *sess,
const char *pathname)
{
struct cldc_msg *msg;
- struct cld_msg_del *del;
- void *p;
+ struct cld_msg_del del;
size_t plen;
if (!sess->confirmed)
@@ -906,33 +923,33 @@ int cldc_del(struct cldc_session *sess,
return -EINVAL;
/* create DEL message */
+ del.inode_name = (char *)pathname;
msg = cldc_new_msg(sess, copts, CMO_DEL,
- sizeof(struct cld_msg_del) + strlen(pathname));
+ (xdrproc_t)xdr_cld_msg_del, &del);
if (!msg)
return -ENOMEM;
msg->cb = generic_end_cb;
- /* fill in DEL-specific name_len, name info */
- del = (struct cld_msg_del *) msg->data;
- del->name_len = cpu_to_le16(plen);
- p = del;
- p += sizeof(struct cld_msg_del);
- memcpy(p, pathname, plen);
-
return sess_send(sess, msg);
}
static ssize_t open_end_cb(struct cldc_msg *msg, const void *resp_p,
size_t resp_len, enum cle_err_codes resp_rc)
{
- const struct cld_msg_open_resp *resp = resp_p;
- struct cldc_fh *fh = msg->cb_private;
-
if (resp_rc == CLE_OK) {
- if (resp_len < sizeof(*resp))
- return -1010;
- fh->fh_le = resp->fh;
+ struct cldc_fh *fh = msg->cb_private;
+ XDR xdrs;
+ struct cld_msg_open_resp resp;
+
+ xdrmem_create(&xdrs, (void *)resp_p, resp_len, XDR_DECODE);
+ memset(&resp, 0, sizeof(resp));
+ if (!xdr_cld_msg_open_resp(&xdrs, &resp)) {
+ xdr_destroy(&xdrs);
+ return -1009;
+ }
+
+ fh->fh = resp.fh;
fh->valid = true;
}
@@ -948,9 +965,8 @@ int cldc_open(struct cldc_session *sess,
uint32_t events, struct cldc_fh **fh_out)
{
struct cldc_msg *msg;
- struct cld_msg_open *open;
+ struct cld_msg_open open;
struct cldc_fh fh, *fhtmp;
- void *p;
size_t plen;
int fh_idx;
@@ -968,8 +984,11 @@ int cldc_open(struct cldc_session *sess,
return -EINVAL;
/* create OPEN message */
+ open.mode = open_mode;
+ open.events = events;
+ open.inode_name = (char *)pathname;
msg = cldc_new_msg(sess, copts, CMO_OPEN,
- sizeof(struct cld_msg_open) + strlen(pathname));
+ (xdrproc_t)xdr_cld_msg_open, &open);
if (!msg)
return -ENOMEM;
@@ -984,15 +1003,6 @@ int cldc_open(struct cldc_session *sess,
msg->cb = open_end_cb;
msg->cb_private = fhtmp;
- /* fill in OPEN-specific info */
- open = (struct cld_msg_open *) msg->data;
- open->mode = cpu_to_le32(open_mode);
- open->events = cpu_to_le32(events);
- open->name_len = cpu_to_le16(plen);
- p = open;
- p += sizeof(struct cld_msg_open);
- memcpy(p, pathname, plen);
-
*fh_out = fhtmp;
return sess_send(sess, msg);
@@ -1002,7 +1012,7 @@ int cldc_close(struct cldc_fh *fh, const
{
struct cldc_session *sess;
struct cldc_msg *msg;
- struct cld_msg_close *close_msg;
+ struct cld_msg_close close_msg;
if (!fh->valid)
return -EINVAL;
@@ -1010,8 +1020,9 @@ int cldc_close(struct cldc_fh *fh, const
sess = fh->sess;
/* create CLOSE message */
+ close_msg.fh = fh->fh;
msg = cldc_new_msg(sess, copts, CMO_CLOSE,
- sizeof(struct cld_msg_close));
+ (xdrproc_t)xdr_cld_msg_close, &close_msg);
if (!msg)
return -ENOMEM;
@@ -1020,10 +1031,6 @@ int cldc_close(struct cldc_fh *fh, const
msg->cb = generic_end_cb;
- /* fill in CLOSE-specific fh info */
- close_msg = (struct cld_msg_close *) msg->data;
- close_msg->fh = fh->fh_le;
-
return sess_send(sess, msg);
}
@@ -1032,7 +1039,7 @@ int cldc_lock(struct cldc_fh *fh, const
{
struct cldc_session *sess;
struct cldc_msg *msg;
- struct cld_msg_lock *lock;
+ struct cld_msg_lock lock;
if (!fh->valid)
return -EINVAL;
@@ -1040,19 +1047,16 @@ int cldc_lock(struct cldc_fh *fh, const
sess = fh->sess;
/* create LOCK message */
+ lock.fh = fh->fh;
+ lock.flags = lock_flags;
msg = cldc_new_msg(sess, copts,
wait_for_lock ? CMO_LOCK : CMO_TRYLOCK,
- sizeof(struct cld_msg_lock));
+ (xdrproc_t)xdr_cld_msg_lock, &lock);
if (!msg)
return -ENOMEM;
msg->cb = generic_end_cb;
- /* fill in LOCK-specific info */
- lock = (struct cld_msg_lock *) msg->data;
- lock->fh = fh->fh_le;
- lock->flags = cpu_to_le32(lock_flags);
-
return sess_send(sess, msg);
}
@@ -1060,7 +1064,7 @@ int cldc_unlock(struct cldc_fh *fh, cons
{
struct cldc_session *sess;
struct cldc_msg *msg;
- struct cld_msg_unlock *unlock;
+ struct cld_msg_unlock unlock;
if (!fh->valid)
return -EINVAL;
@@ -1068,17 +1072,14 @@ int cldc_unlock(struct cldc_fh *fh, cons
sess = fh->sess;
/* create UNLOCK message */
+ unlock.fh = fh->fh;
msg = cldc_new_msg(sess, copts, CMO_UNLOCK,
- sizeof(struct cld_msg_unlock));
+ (xdrproc_t)xdr_cld_msg_unlock, &unlock);
if (!msg)
return -ENOMEM;
msg->cb = generic_end_cb;
- /* fill in UNLOCK-specific info */
- unlock = (struct cld_msg_unlock *) msg->data;
- unlock->fh = fh->fh_le;
-
return sess_send(sess, msg);
}
@@ -1087,7 +1088,7 @@ int cldc_put(struct cldc_fh *fh, const s
{
struct cldc_session *sess;
struct cldc_msg *msg;
- struct cld_msg_put *put;
+ struct cld_msg_put put;
if (!data || !data_len || data_len > CLD_MAX_PAYLOAD_SZ)
return -EINVAL;
@@ -1098,17 +1099,14 @@ int cldc_put(struct cldc_fh *fh, const s
sess = fh->sess;
/* create PUT message */
+ put.fh = fh->fh;
+ put.data.data_len = data_len;
+ put.data.data_val = (char *)data;
msg = cldc_new_msg(sess, copts, CMO_PUT,
- sizeof(struct cld_msg_put) + data_len);
+ (xdrproc_t)xdr_cld_msg_put, &put);
if (!msg)
return -ENOMEM;
- put = (struct cld_msg_put *) msg->data;
- put->fh = fh->fh_le;
- put->data_size = cpu_to_le32(data_len);
-
- memcpy((put + 1), data, data_len);
-
msg->cb = generic_end_cb;
sess_send(sess, msg);
@@ -1116,67 +1114,39 @@ int cldc_put(struct cldc_fh *fh, const s
return 0;
}
-#undef XC32
-#undef XC64
-#define XC32(name) \
- o->name = le32_to_cpu(resp->name)
-#define XC64(name) \
- o->name = le64_to_cpu(resp->name)
-
static ssize_t get_end_cb(struct cldc_msg *msg, const void *resp_p,
size_t resp_len, enum cle_err_codes resp_rc)
{
- const struct cld_msg_get_resp *resp = resp_p;
- struct cld_msg_get_resp *o = NULL;
-
if (resp_rc == CLE_OK) {
- bool get_body;
-
- o = &msg->copts.u.get.resp;
+ XDR xin;
+ struct cld_msg_get_resp *resp = &msg->copts.resp;
- get_body = (resp->resp.hdr.op == CMO_GET);
- msg->copts.op = CMO_GET;
-
- /* copy-and-swap */
- XC64(inum);
- XC32(ino_len);
- XC32(size);
- XC64(version);
- XC64(time_create);
- XC64(time_modify);
- XC32(flags);
-
- /* copy inode name */
- if (o->ino_len <= CLD_INODE_NAME_MAX) {
- size_t diffsz;
- const void *p;
-
- p = (resp + 1);
- memcpy(&msg->copts.u.get.inode_name, p, o->ino_len);
-
- p += o->ino_len;
- diffsz = p - resp_p;
-
- /* point to internal buffer holding GET data */
- msg->copts.u.get.buf = msg->sess->msg_buf + diffsz;
- msg->copts.u.get.size = msg->sess->msg_buf_len - diffsz;
- } else {
- o->ino_len = 0; /* Probably full of garbage */
+ /* Parse GET response.
+ * Avoid memory allocation in xdr_string by pointing
+ * variable-length elements at static buffers. */
+ xdrmem_create(&xin, (void *)resp_p, resp_len, XDR_DECODE);
+ memset(resp, 0, sizeof(struct cld_msg_get_resp));
+ resp->inode_name = msg->sess->inode_name_temp;
+ resp->data.data_val = msg->sess->payload;
+ resp->data.data_len = 0;
+ if (!xdr_cld_msg_get_resp(&xin, resp)) {
+ xdr_destroy(&xin);
+ return -1009;
}
+ xdr_destroy(&xin);
}
if (msg->copts.cb)
return msg->copts.cb(&msg->copts, resp_rc);
return 0;
}
-#undef XC
int cldc_get(struct cldc_fh *fh, const struct cldc_call_opts *copts,
bool metadata_only)
{
struct cldc_session *sess;
struct cldc_msg *msg;
- struct cld_msg_get *get;
+ struct cld_msg_get get;
if (!fh->valid)
return -EINVAL;
@@ -1184,17 +1154,14 @@ int cldc_get(struct cldc_fh *fh, const s
sess = fh->sess;
/* create GET message */
+ get.fh = fh->fh;
msg = cldc_new_msg(sess, copts, CMO_GET,
- sizeof(struct cld_msg_get));
+ (xdrproc_t)xdr_cld_msg_get, &get);
if (!msg)
return -ENOMEM;
msg->cb = get_end_cb;
- /* fill in GET-specific info */
- get = (struct cld_msg_get *) msg->data;
- get->fh = fh->fh_le;
-
return sess_send(sess, msg);
}
Binary files cld/lib/cldc-dns.o and cld.rpcgen/lib/cldc-dns.o differ
Binary files cld/lib/cldc.o and cld.rpcgen/lib/cldc.o differ
Binary files cld/lib/cldc-udp.o and cld.rpcgen/lib/cldc-udp.o differ
diff -X /garz/tmp/dontdiff -urNp cld/lib/cld_msg_rpc.x cld.rpcgen/lib/cld_msg_rpc.x
--- cld/lib/cld_msg_rpc.x 1969-12-31 19:00:00.000000000 -0500
+++ cld.rpcgen/lib/cld_msg_rpc.x 2010-02-03 16:03:36.000000000 -0500
@@ -0,0 +1,218 @@
+/*
+ * Copyright 2010, Colin McCabe
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING. If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+const CLD_PKT_MAGIC = "CLDc1pkt";
+const CLD_SID_SZ = 8;
+
+const CLD_INODE_NAME_MAX = 256; /**< max total pathname len */
+
+const CLD_MAX_USERNAME = 32;
+
+const CLD_MAX_PKT_MSG_SZ = 1024; /**< The maximum number of message bytes we'll
+ put in a single packet */
+
+const CLD_MAX_PAYLOAD_SZ = 131072; /**< Maximum length of the data that can be
+ sent with get or put. In some sense,
+ this is part of cld's API, and
+ shouldn't be changed lightly. */
+
+const CLD_MAX_MSG_SZ = 196608; /**< Maximum size of a single message
+ including all packets. */
+
+const CLD_MAX_SECRET_KEY = 128; /**< includes req. nul */
+
+/** available RPC operations */
+enum cld_msg_op {
+ /* client -> server */
+ CMO_NOP = 0, /**< no op */
+ CMO_NEW_SESS = 1, /**< new session */
+ CMO_OPEN = 2, /**< open file */
+ CMO_GET_META = 3, /**< get metadata */
+ CMO_GET = 4, /**< get metadata + data */
+ CMO_PUT = 6, /**< put data */
+ CMO_CLOSE = 7, /**< close file */
+ CMO_DEL = 8, /**< delete file */
+ CMO_LOCK = 9, /**< lock */
+ CMO_UNLOCK = 10, /**< unlock */
+ CMO_TRYLOCK = 11, /**< trylock */
+ CMO_ACK = 12, /**< ack of seqid rx'd */
+ CMO_END_SESS = 13, /**< end session */
+
+ /* server -> client */
+ CMO_PING = 14, /**< server to client ping */
+ CMO_NOT_MASTER = 15, /**< I am not the master! */
+ CMO_EVENT = 16, /**< server->cli async event */
+ CMO_ACK_FRAG = 17, /**< ack partial msg */
+
+ CMO_AFTER_LAST
+};
+
+/** CLD error codes */
+enum cle_err_codes {
+ CLE_OK = 0, /**< success / no error */
+ CLE_SESS_EXISTS = 1, /**< session exists */
+ CLE_SESS_INVAL = 2, /**< session doesn't exist */
+ CLE_DB_ERR = 3, /**< db error */
+ CLE_BAD_PKT = 4, /**< invalid/corrupted packet */
+ CLE_INODE_INVAL = 5, /**< inode doesn't exist */
+ CLE_NAME_INVAL = 6, /**< inode name invalid */
+ CLE_OOM = 7, /**< server out of memory */
+ CLE_FH_INVAL = 8, /**< file handle invalid */
+ CLE_DATA_INVAL = 9, /**< invalid data pkt */
+ CLE_LOCK_INVAL = 10, /**< invalid lock */
+ CLE_LOCK_CONFLICT = 11, /**< conflicting lock held */
+ CLE_LOCK_PENDING = 12, /**< lock waiting to be acq. */
+ CLE_MODE_INVAL = 13, /**< op incompat. w/ file mode */
+ CLE_INODE_EXISTS = 14, /**< inode exists */
+ CLE_DIR_NOTEMPTY = 15, /**< dir not empty */
+ CLE_INTERNAL_ERR = 16, /**< nonspecific internal err */
+ CLE_TIMEOUT = 17, /**< session timed out */
+ CLE_SIG_INVAL = 18 /**< HMAC sig bad / auth failed */
+};
+
+/** availble OPEN mode flags */
+enum cld_open_modes {
+ COM_READ = 0x01, /**< read */
+ COM_WRITE = 0x02, /**< write */
+ COM_LOCK = 0x04, /**< lock */
+ COM_ACL = 0x08, /**< ACL update */
+ COM_CREATE = 0x10, /**< create file, if not exist */
+ COM_EXCL = 0x20, /**< fail create if file exists */
+ COM_DIRECTORY = 0x40 /**< operate on a directory */
+};
+
+/** potential events client may receive */
+enum cld_events {
+ CE_UPDATED = 0x01, /**< contents updated */
+ CE_DELETED = 0x02, /**< inode deleted */
+ CE_LOCKED = 0x04, /**< lock acquired */
+ CE_MASTER_FAILOVER = 0x08, /**< master failover */
+ CE_SESS_FAILED = 0x10
+};
+
+/** LOCK flags */
+enum cld_lock_flags {
+ CLF_SHARED = 0x01 /**< a shared (read) lock */
+};
+
+/** Describes whether a packet begins, continues, or ends a message. */
+enum cld_pkt_order_t {
+ CLD_PKT_ORD_MID = 0x0,
+ CLD_PKT_ORD_FIRST = 0x1,
+ CLD_PKT_ORD_LAST = 0x2,
+ CLD_PKT_ORD_FIRST_LAST = 0x3
+};
+const CLD_PKT_IS_FIRST = 0x1;
+const CLD_PKT_IS_LAST = 0x2;
+
+/** Information that appears only in the first packet */
+struct cld_pkt_msg_infos {
+ hyper xid; /**< opaque message id */
+ enum cld_msg_op op; /**< message operation */
+};
+
+/** Information about the message contained in this packet */
+union cld_pkt_msg_info switch (enum cld_pkt_order_t order) {
+ case CLD_PKT_ORD_MID:
+ case CLD_PKT_ORD_LAST:
+ void;
+ case CLD_PKT_ORD_FIRST:
+ case CLD_PKT_ORD_FIRST_LAST:
+ struct cld_pkt_msg_infos mi;
+};
+
+/** header for each packet */
+struct cld_pkt_hdr {
+ hyper magic; /**< magic number; constant */
+ hyper sid; /**< client id */
+ string user<CLD_MAX_USERNAME>; /**< authenticated user */
+ struct cld_pkt_msg_info mi;
+};
+
+/** generic response for PUT, CLOSE, DEL, LOCK, UNLOCK */
+struct cld_msg_generic_resp {
+ enum cle_err_codes code; /**< error code, CLE_xxx */
+ hyper xid_in; /**< C->S xid */
+};
+
+/** ACK-FRAG message */
+struct cld_msg_ack_frag {
+ hyper seqid; /**< sequence id to ack */
+};
+
+/** OPEN message */
+struct cld_msg_open {
+ int mode; /**< open mode, COM_xxx */
+ int events; /**< events mask, CE_xxx */
+ string inode_name<CLD_INODE_NAME_MAX>;
+};
+
+/** OPEN message response */
+struct cld_msg_open_resp {
+ struct cld_msg_generic_resp msg;
+ hyper fh; /**< handle opened */
+};
+
+/** GET message */
+struct cld_msg_get {
+ hyper fh; /**< open file handle */
+};
+
+/** GET message response */
+struct cld_msg_get_resp {
+ struct cld_msg_generic_resp msg;
+ hyper inum; /**< unique inode number */
+ hyper vers; /**< inode version */
+ hyper time_create; /**< creation time */
+ hyper time_modify; /**< last modification time */
+ int flags; /**< inode flags; CIFL_xxx */
+ string inode_name<CLD_INODE_NAME_MAX>;
+ opaque data<CLD_MAX_PAYLOAD_SZ>;
+};
+
+/** PUT message */
+struct cld_msg_put {
+ hyper fh; /**< open file handle */
+ opaque data<CLD_MAX_PAYLOAD_SZ>;
+};
+
+/** CLOSE message */
+struct cld_msg_close {
+ hyper fh; /**< open file handle */
+};
+
+/** DEL message */
+struct cld_msg_del {
+ string inode_name<CLD_INODE_NAME_MAX>;
+};
+
+/** UNLOCK message */
+struct cld_msg_unlock {
+ uint64_t fh; /**< open file handle */
+};
+
+/** LOCK message */
+struct cld_msg_lock {
+ hyper fh; /**< open file handle */
+ int flags; /**< CLF_xxx */
+};
+
+/** Server-to-client EVENT message */
+struct cld_msg_event {
+ hyper fh; /**< open file handle */
+ int events; /**< CE_xxx */
+};
Binary files cld/lib/cld_msg_rpc_xdr.o and cld.rpcgen/lib/cld_msg_rpc_xdr.o differ
diff -X /garz/tmp/dontdiff -urNp cld/lib/common.c cld.rpcgen/lib/common.c
--- cld/lib/common.c 2010-01-29 00:36:25.000000000 -0500
+++ cld.rpcgen/lib/common.c 2010-01-22 18:15:07.000000000 -0500
@@ -26,7 +26,11 @@
#include <errno.h>
#include <glib.h>
#include <cld-private.h>
-#include "cld_msg.h"
+#include <openssl/sha.h>
+#include <openssl/hmac.h>
+#include "cld_msg_rpc.h"
+#include <hail_log.h>
+#include <syslog.h>
/* duplicated from tools/cldcli.c; put in common header somewhere? */
#define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0]))
Binary files cld/lib/common.o and cld.rpcgen/lib/common.o differ
Binary files cld/lib/libcldc.a and cld.rpcgen/lib/libcldc.a differ
Binary files cld/lib/.libs/libcldc.a and cld.rpcgen/lib/.libs/libcldc.a differ
Binary files cld/lib/libtimer.a and cld.rpcgen/lib/libtimer.a differ
Binary files cld/lib/libtimer.o and cld.rpcgen/lib/libtimer.o differ
diff -X /garz/tmp/dontdiff -urNp cld/lib/Makefile.am cld.rpcgen/lib/Makefile.am
--- cld/lib/Makefile.am 2010-02-03 00:17:47.000000000 -0500
+++ cld.rpcgen/lib/Makefile.am 2010-02-02 23:10:51.000000000 -0500
@@ -1,10 +1,21 @@
+BUILT_SOURCES = cld_msg_rpc.h
+
EXTRA_DIST =
libcldc.pc.in libcldc-uninstalled.pc.in
INCLUDES = -I$(top_srcdir)/include \
@GLIB_CFLAGS@
+mostlyclean-local:
+ rm -f *_rpc.h *_rpc_xdr.c
+
+%_rpc.h: %_rpc.x
+ rpcgen -h $< > $@
+
+%_rpc_xdr.c: %_rpc.x
+ rpcgen -c $< > $@
+
LINK = $(LIBTOOL) --mode=link $(CC) $(CFLAGS) $(LDFLAGS) -o $@
lib_LTLIBRARIES = libcldc.la
@@ -15,7 +26,8 @@ libcldc_la_SOURCES = \
cldc-dns.c \
common.c \
libtimer.c \
- pkt.c
+ pkt.c \
+ cld_msg_rpc_xdr.c
libcldc_la_LDFLAGS = \
-version-info $(LIBCLDC_CURRENT):$(LIBCLDC_REVISION):$(LIBCLDC_AGE) \
@@ -25,3 +37,4 @@ libcldc_la_LDFLAGS = \
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = libcldc.pc
+include_HEADERS = cld_msg_rpc.h
diff -X /garz/tmp/dontdiff -urNp cld/lib/pkt.c cld.rpcgen/lib/pkt.c
--- cld/lib/pkt.c 2010-02-03 01:15:26.000000000 -0500
+++ cld.rpcgen/lib/pkt.c 2010-02-03 16:27:08.000000000 -0500
@@ -17,11 +17,16 @@
*/
#include <string.h>
+#include <stdio.h>
#include <errno.h>
+#include <glib.h>
#include <syslog.h>
#include <openssl/sha.h>
#include <openssl/hmac.h>
-#include <cldc.h>
+#include <cld-private.h>
+#include "cld_pkt.h"
+#include "cld_msg_rpc.h"
+#include <hail_log.h>
int __cld_authcheck(struct hail_log *log, const char *key,
const void *buf, size_t buf_len, const void *sha)
@@ -68,7 +73,7 @@ int __cld_authsign(struct hail_log *log,
return 0;
}
-const char *__cld_opstr(enum cld_msg_ops op)
+const char *__cld_opstr(enum cld_msg_op op)
{
switch (op) {
case CMO_NOP: return "CMO_NOP";
@@ -92,3 +97,96 @@ const char *__cld_opstr(enum cld_msg_ops
}
}
+const char *__cld_pkt_hdr_to_str(char *scratch,
+ const char *pkt_hdr, size_t pkt_len)
+{
+ XDR xin;
+ struct cld_pkt_hdr pkt;
+ bool bad_magic;
+ char temp[50], temp2[50];
+ uint64_t seqid;
+ struct cld_pkt_ftr *foot;
+ size_t hdr_len;
+
+ temp[0] = '\0';
+ temp2[0] = '\0';
+ foot = (struct cld_pkt_ftr *)(pkt_hdr + pkt_len - CLD_PKT_FTR_LEN);
+ seqid = le64_to_cpu(foot->seqid);
+
+ if (pkt_len <= CLD_PKT_FTR_LEN) {
+ snprintf(scratch, PKT_HDR_TO_STR_SCRATCH_LEN,
+ "[MALFORMED: only %zu bytes]", pkt_len);
+ return scratch;
+ }
+ xdrmem_create(&xin, (void *)pkt_hdr, pkt_len - CLD_PKT_FTR_LEN,
+ XDR_DECODE);
+ memset(&pkt, 0, sizeof(pkt));
+ if (!xdr_cld_pkt_hdr(&xin, &pkt)) {
+ xdr_destroy(&xin);
+ snprintf(scratch, PKT_HDR_TO_STR_SCRATCH_LEN,
+ "[MALFORMED: can't parse]");
+ return scratch;
+ }
+ hdr_len = xdr_getpos(&xin);
+ xdr_destroy(&xin);
+
+ bad_magic = !!(memcmp(&pkt.magic, CLD_PKT_MAGIC, sizeof(pkt.magic)));
+ if (pkt.mi.order & CLD_PKT_IS_FIRST) {
+ struct cld_pkt_msg_infos *infos =
+ &pkt.mi.cld_pkt_msg_info_u.mi;
+ snprintf(temp, sizeof(temp), "[TYPE:%s, XID:%llx]",
+ __cld_opstr(infos->op),
+ (unsigned long long) infos->xid);
+ switch (infos->op) {
+ case CMO_ACK_FRAG: {
+ XDR x;
+ struct cld_msg_ack_frag ack;
+ memset(&ack, 0, sizeof(ack));
+ xdrmem_create(&x, ((char *)pkt_hdr) + hdr_len,
+ pkt_len - hdr_len - CLD_PKT_FTR_LEN,
+ XDR_DECODE);
+ if (!xdr_cld_msg_ack_frag(&x, &ack)) {
+ xdr_destroy(&x);
+ snprintf(temp2, sizeof(temp2), "{MALFORMED}");
+ break;
+ }
+ snprintf(temp2, sizeof(temp2), "{seqid:%llx}",
+ (unsigned long long) ack.seqid);
+ xdr_destroy(&x);
+ break;
+ }
+ default:
+ break;
+ }
+ } else {
+ snprintf(temp, sizeof(temp), "[CONT]");
+ }
+
+ snprintf(scratch, PKT_HDR_TO_STR_SCRATCH_LEN,
+ "<%s%s%s> "
+ "%s USER:'%s' SEQID:%llx %s",
+ ((pkt.mi.order & CLD_PKT_IS_FIRST) ? "1st" : ""),
+ ((pkt.mi.order & CLD_PKT_IS_LAST) ? "End" : ""),
+ (bad_magic ? "B" : ""),
+ temp, pkt.user,
+ (unsigned long long) seqid,
+ temp2);
+ xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
+ return scratch;
+}
+
+void __cld_dump_buf(const void *buf, size_t len)
+{
+ const unsigned char *buff = buf;
+ size_t off = 0;
+ do {
+ int i;
+ for (i = 0; i < 8; i++) {
+ if (!len)
+ break;
+ printf("%02x ", buff[off++]);
+ len--;
+ }
+ printf("\n");
+ } while (len);
+}
Binary files cld/lib/pkt.o and cld.rpcgen/lib/pkt.o differ
Binary files cld/server/cld and cld.rpcgen/server/cld differ
Binary files cld/server/cldbadm and cld.rpcgen/server/cldbadm differ
Binary files cld/server/cldbadm.o and cld.rpcgen/server/cldbadm.o differ
diff -X /garz/tmp/dontdiff -urNp cld/server/cldb.h cld.rpcgen/server/cldb.h
--- cld/server/cldb.h 2010-01-29 00:36:25.000000000 -0500
+++ cld.rpcgen/server/cldb.h 2010-01-22 18:29:11.000000000 -0500
@@ -23,7 +23,7 @@
#include <stdbool.h>
#include <db.h>
#include <cld-private.h>
-#include <cld_msg.h>
+#include <cld_msg_rpc.h>
typedef uint64_t cldino_t;
Binary files cld/server/cldb.o and cld.rpcgen/server/cldb.o differ
diff -X /garz/tmp/dontdiff -urNp cld/server/cld.h cld.rpcgen/server/cld.h
--- cld/server/cld.h 2010-02-03 01:02:41.000000000 -0500
+++ cld.rpcgen/server/cld.h 2010-02-03 16:46:41.000000000 -0500
@@ -25,7 +25,7 @@
#include <poll.h>
#include <glib.h>
#include "cldb.h"
-#include <cld_msg.h>
+#include <cld_msg_rpc.h>
#include <cld_common.h>
#include <hail_log.h>
@@ -72,20 +72,12 @@ struct session {
bool dead; /* session has ended */
/* huge buffer should always come last */
+ enum cld_msg_op msg_op;
+ uint64_t msg_xid;
unsigned int msg_buf_len;
char msg_buf[CLD_MAX_MSG_SZ];
};
-struct msg_params {
- int sock_fd;
- const struct client *cli;
- struct session *sess;
-
- const struct cld_packet *pkt;
- const void *msg;
- size_t msg_len;
-};
-
struct server_stats {
unsigned long poll; /* num. polls */
unsigned long event; /* events dispatched */
@@ -122,29 +114,59 @@ struct server {
struct server_stats stats; /* global statistics */
};
+struct pkt_info {
+ struct cld_pkt_hdr *pkt;
+ struct session *sess;
+ uint64_t seqid;
+ uint64_t xid;
+ enum cld_msg_op op;
+ size_t hdr_len;
+};
+
/* msg.c */
extern int inode_lock_rescan(DB_TXN *txn, cldino_t inum);
-extern void msg_open(struct msg_params *);
-extern void msg_put(struct msg_params *);
-extern void msg_close(struct msg_params *);
-extern void msg_del(struct msg_params *);
-extern void msg_unlock(struct msg_params *);
-extern void msg_lock(struct msg_params *, bool);
-extern void msg_ack(struct msg_params *);
-extern void msg_get(struct msg_params *, bool);
+extern void msg_get(struct session *sess, const void *v);
+extern void msg_open(struct session *sess, const void *v);
+extern void msg_put(struct session *sess, const void *v);
+extern void msg_close(struct session *sess, const void *v);
+extern void msg_del(struct session *sess, const void *v);
+extern void msg_unlock(struct session *sess, const void *v);
+extern void msg_lock(struct session *sess, const void *v);
+extern void msg_ack(struct session *sess, uint64_t seqid);
/* session.c */
extern uint64_t next_seqid_le(uint64_t *seq);
-extern void pkt_init_pkt(struct cld_packet *dest, const struct cld_packet *src);
extern guint sess_hash(gconstpointer v);
extern gboolean sess_equal(gconstpointer _a, gconstpointer _b);
-extern void msg_new_sess(struct msg_params *, const struct client *);
-extern void msg_end_sess(struct msg_params *, const struct client *);
+extern void msg_new_sess(int sock_fd, const struct client *cli,
+ const struct pkt_info *info);
+extern void msg_end_sess(struct session *sess, uint64_t xid);
extern struct raw_session *session_new_raw(const struct session *sess);
extern void sessions_free(void);
-extern bool sess_sendmsg(struct session *sess, const void *msg_, size_t msglen,
- void (*done_cb)(struct session_outpkt *),
- void *done_data);
+
+/** Send a message as part of a session.
+ *
+ * @param sess The session
+ * @param xdrproc The XDR function to use to serialize the data
+ * @param xdrdata The message data
+ * @param op The op of the message
+ * @param done_cb The callback to call when the message has been acked
+ * @param done_data The data to give to done_cb
+ *
+ * @return true only if the message was sent
+ */
+extern bool sess_sendmsg(struct session *sess,
+ xdrproc_t xdrproc, const void *xdrdata, enum cld_msg_op op,
+ void (*done_cb)(struct session_outpkt *), void *done_data);
+
+/** Send a generic response message.
+ *
+ * @param sess The session
+ * @param code The error code to send
+ */
+extern void sess_sendresp_generic(struct session *sess,
+ enum cle_err_codes code);
+
extern int session_dispose(DB_TXN *txn, struct session *sess);
extern int session_remove_locks(DB_TXN *txn, uint8_t *sid, uint64_t fh,
cldino_t inum, bool *waiter);
@@ -156,12 +178,27 @@ extern struct hail_log srv_log;
extern struct timeval current_time;
extern int udp_tx(int sock_fd, struct sockaddr *, socklen_t,
const void *, size_t);
-extern void resp_copy(struct cld_msg_resp *resp, const struct cld_msg_hdr *src);
-extern void resp_err(struct session *sess,
- const struct cld_msg_hdr *src, enum cle_err_codes errcode);
-extern void resp_ok(struct session *sess, const struct cld_msg_hdr *src);
extern const char *user_key(const char *user);
+/** Transmit a single packet.
+ *
+ * This function doesn't provide error-retransmission logic.
+ * It can't handle messages that are bigger than a single packet.
+ *
+ * @param fd Socket to send the response on
+ * @param cli Client address data
+ * @param sid The session-id to use. Must be of length CLD_SID_SZ
+ * @param seqid The sequence id to use
+ * @param xdrproc The XDR function to use to serialize the data
+ * @param xdrdata The message data
+ * @param op The op of message to send
+ *
+ * @return true only on success
+ */
+extern void simple_sendmsg(int fd, const struct client *cli,
+ uint64_t sid, const char *username, uint64_t seqid,
+ xdrproc_t xdrproc, const void *xdrdata, enum cld_msg_op op);
+
/* util.c */
extern int write_pid_file(const char *pid_fn);
extern void syslogerr(const char *prefix);
Binary files cld/server/cld_msg_rpc_xdr.o and cld.rpcgen/server/cld_msg_rpc_xdr.o differ
Binary files cld/server/common.o and cld.rpcgen/server/common.o differ
Binary files cld/server/libtimer.o and cld.rpcgen/server/libtimer.o differ
diff -X /garz/tmp/dontdiff -urNp cld/server/Makefile.am cld.rpcgen/server/Makefile.am
--- cld/server/Makefile.am 2010-02-03 00:14:51.000000000 -0500
+++ cld.rpcgen/server/Makefile.am 2010-02-02 23:10:54.000000000 -0500
@@ -1,5 +1,6 @@
INCLUDES = -I$(top_srcdir)/include \
+ -I$(top_srcdir)/lib \
@GLIB_CFLAGS@ \
-DCLD_LIBDIR=\""$(libdir)"\" \
-DCLD_LOCAL_STATE_DIR="\"$(localstatedir)\""
@@ -8,7 +9,8 @@ sbin_PROGRAMS = cld cldbadm
cld_SOURCES = cldb.h cld.h \
../lib/common.c ../lib/libtimer.c ../lib/pkt.c \
- cldb.c msg.c server.c session.c util.c
+ cldb.c msg.c server.c session.c util.c \
+ ../lib/cld_msg_rpc_xdr.c
cld_LDADD = @CRYPTO_LIBS@ @GLIB_LIBS@ @DB4_LIBS@
cldbadm_SOURCES = cldb.h cldbadm.c
diff -X /garz/tmp/dontdiff -urNp cld/server/msg.c cld.rpcgen/server/msg.c
--- cld/server/msg.c 2010-02-03 18:10:00.000000000 -0500
+++ cld.rpcgen/server/msg.c 2010-02-03 01:23:17.000000000 -0500
@@ -25,6 +25,8 @@
#include <syslog.h>
#include <openssl/sha.h>
#include <cld-private.h>
+#include <cld_common.h>
+#include <cld_msg_rpc.h>
#include "cld.h"
enum {
@@ -247,12 +249,11 @@ static int inode_notify(DB_TXN *txn, cld
}
memset(&me, 0, sizeof(me));
- memcpy(me.hdr.magic, CLD_MSG_MAGIC, CLD_MAGIC_SZ);
- me.hdr.op = CMO_EVENT;
- me.fh = h.fh;
- me.events = cpu_to_le32(deleted ? CE_DELETED : CE_UPDATED);
+ me.fh = le64_to_cpu(h.fh);
+ me.events = deleted ? CE_DELETED : CE_UPDATED;
- if (!sess_sendmsg(sess, &me, sizeof(me), NULL, NULL))
+ if (!sess_sendmsg(sess, (xdrproc_t)xdr_cld_msg_event,
+ (void *)&me, CMO_EVENT, NULL, NULL))
break;
}
@@ -375,12 +376,11 @@ int inode_lock_rescan(DB_TXN *txn, cldin
}
memset(&me, 0, sizeof(me));
- memcpy(me.hdr.magic, CLD_MSG_MAGIC, CLD_MAGIC_SZ);
- me.hdr.op = CMO_EVENT;
- me.fh = lock.fh;
- me.events = cpu_to_le32(CE_LOCKED);
+ me.fh = le64_to_cpu(lock.fh);
+ me.events = CE_LOCKED;
- if (!sess_sendmsg(sess, &me, sizeof(me), NULL, NULL))
+ if (!sess_sendmsg(sess, (xdrproc_t)xdr_cld_msg_event,
+ (void *)&me, CMO_EVENT, NULL, NULL))
break;
}
@@ -388,12 +388,10 @@ int inode_lock_rescan(DB_TXN *txn, cldin
return rc;
}
-void msg_get(struct msg_params *mp, bool metadata_only)
+void msg_get(struct session *sess, const void *v)
{
- const struct cld_msg_get *msg = mp->msg;
- struct cld_msg_get_resp *resp;
- size_t resp_len;
- uint64_t fh;
+ const struct cld_msg_get *get = v;
+ struct cld_msg_get_resp resp;
struct raw_handle *h = NULL;
struct raw_inode *inode = NULL;
enum cle_err_codes resp_rc = CLE_OK;
@@ -401,17 +399,10 @@ void msg_get(struct msg_params *mp, bool
uint32_t name_len, inode_size;
uint32_t omode;
int rc;
- struct session *sess = mp->sess;
DB_ENV *dbenv = cld_srv.cldb.env;
DB_TXN *txn;
- void *p;
-
- /* make sure input data as large as expected */
- if (mp->msg_len < sizeof(*msg))
- return;
-
- /* get filehandle from input msg */
- fh = le64_to_cpu(msg->fh);
+ void *data_mem;
+ char *inode_name;
rc = dbenv->txn_begin(dbenv, NULL, &txn, 0);
if (rc) {
@@ -421,7 +412,7 @@ void msg_get(struct msg_params *mp, bool
}
/* read handle from db */
- rc = cldb_handle_get(txn, sess->sid, fh, &h, 0);
+ rc = cldb_handle_get(txn, sess->sid, get->fh, &h, 0);
if (rc) {
resp_rc = CLE_FH_INVAL;
goto err_out;
@@ -442,42 +433,30 @@ void msg_get(struct msg_params *mp, bool
goto err_out;
}
- name_len = le32_to_cpu(inode->ino_len);
inode_size = le32_to_cpu(inode->size);
-
- resp_len = sizeof(*resp) + name_len +
- (metadata_only ? 0 : inode_size);
- resp = alloca(resp_len);
- if (!resp) {
- resp_rc = CLE_OOM;
- goto err_out;
- }
-
- HAIL_DEBUG(&srv_log, "%s: sizeof(resp) %zu, name_len %u, "
- "inode->size %u, resp_len %zu",
- __func__,
- sizeof(*resp), name_len,
- inode_size, resp_len);
+ HAIL_DEBUG(&srv_log, "GET-DEBUG: inode->size %u\n", inode_size);
/* return response containing inode metadata */
- memset(resp, 0, resp_len);
- resp_copy(&resp->resp, mp->msg);
- resp->inum = inode->inum;
- resp->ino_len = inode->ino_len;
- resp->size = inode->size;
- resp->version = inode->version;
- resp->time_create = inode->time_create;
- resp->time_modify = inode->time_modify;
- resp->flags = inode->flags;
+ memset(&resp, 0, sizeof(resp));
+ resp.msg.code = CLE_OK;
+ resp.msg.xid_in = sess->msg_xid;
+ resp.inum = le64_to_cpu(inode->inum);
+ resp.vers = le64_to_cpu(inode->version);
+ resp.time_create = le64_to_cpu(inode->time_create);
+ resp.time_modify = le64_to_cpu(inode->time_modify);
+ resp.flags = le32_to_cpu(inode->flags);
- p = (resp + 1);
- memcpy(p, (inode + 1), name_len);
+ name_len = le32_to_cpu(inode->ino_len);
+ inode_name = alloca(name_len + 1);
+ snprintf(inode_name, name_len + 1, "%s", (char *)(inode + 1));
+ resp.inode_name = inode_name;
- p += name_len;
+ resp.data.data_len = 0;
+ resp.data.data_val = NULL;
/* send data, if requested */
- if (!metadata_only) {
- void *data_mem;
+ data_mem = NULL;
+ if (sess->msg_op == CMO_GET) {
size_t data_mem_len;
rc = cldb_data_get(txn, inum, &data_mem, &data_mem_len,
@@ -486,22 +465,23 @@ void msg_get(struct msg_params *mp, bool
/* treat not-found as zero length file, as we may
* not yet have created the data record
*/
- if (rc == DB_NOTFOUND) {
- resp->size = 0;
- resp_len -= inode_size;
- } else if (rc || (data_mem_len != inode_size)) {
- if (!rc)
- free(data_mem);
- resp_rc = CLE_DB_ERR;
- goto err_out;
- } else {
- memcpy(p, data_mem, data_mem_len);
-
- free(data_mem);
+ if (rc != DB_NOTFOUND) {
+ if (rc || (data_mem_len != inode_size)) {
+ if (!rc)
+ free(data_mem);
+ resp_rc = CLE_DB_ERR;
+ goto err_out;
+ } else {
+ resp.data.data_len = data_mem_len;
+ resp.data.data_val = data_mem;
+ }
}
}
- sess_sendmsg(sess, resp, resp_len, NULL, NULL);
+ sess_sendmsg(sess, (xdrproc_t)xdr_cld_msg_get_resp,
+ (void *)&resp, CMO_GET, NULL, NULL);
+ if (data_mem)
+ free(data_mem);
rc = txn->commit(txn, 0);
if (rc)
@@ -516,15 +496,14 @@ err_out:
if (rc)
dbenv->err(dbenv, rc, "msg_get txn abort");
err_out_noabort:
- resp_err(sess, mp->msg, resp_rc);
+ sess_sendresp_generic(sess, resp_rc);
free(h);
free(inode);
}
-void msg_open(struct msg_params *mp)
+void msg_open(struct session *sess, const void *v)
{
- const struct cld_msg_open *msg = mp->msg;
- struct session *sess = mp->sess;
+ const struct cld_msg_open *open = v;
struct cld_msg_open_resp resp;
const char *name;
struct raw_session *raw_sess = NULL;
@@ -535,24 +514,12 @@ void msg_open(struct msg_params *mp)
struct pathname_info pinfo;
void *parent_data = NULL;
size_t parent_len;
- uint32_t msg_mode, msg_events;
uint64_t fh;
cldino_t inum;
enum cle_err_codes resp_rc = CLE_OK;
DB_ENV *dbenv = cld_srv.cldb.env;
DB_TXN *txn;
- /* make sure input data as large as expected */
- if (mp->msg_len < sizeof(*msg))
- return;
-
- msg_mode = le32_to_cpu(msg->mode);
- msg_events = le32_to_cpu(msg->events);
- name_len = le16_to_cpu(msg->name_len);
-
- if (mp->msg_len < (sizeof(*msg) + name_len))
- return;
-
rc = dbenv->txn_begin(dbenv, NULL, &txn, 0);
if (rc) {
dbenv->err(dbenv, rc, "DB_ENV->txn_begin");
@@ -560,11 +527,12 @@ void msg_open(struct msg_params *mp)
goto err_out_noabort;
}
- name = mp->msg + sizeof(*msg);
+ name = open->inode_name;
+ name_len = strlen(name);
- create = msg_mode & COM_CREATE;
- excl = msg_mode & COM_EXCL;
- do_dir = msg_mode & COM_DIRECTORY;
+ create = open->mode & COM_CREATE;
+ excl = open->mode & COM_EXCL;
+ do_dir = open->mode & COM_DIRECTORY;
if (!valid_inode_name(name, name_len) || (create && name_len < 2)) {
resp_rc = CLE_NAME_INVAL;
@@ -662,7 +630,7 @@ void msg_open(struct msg_params *mp)
inum = cldino_from_le(inode->inum);
/* alloc & init new handle; updates session's next_fh */
- h = cldb_handle_new(sess, inum, msg_mode, msg_events);
+ h = cldb_handle_new(sess, inum, open->mode, open->events);
if (!h) {
HAIL_CRIT(&srv_log, "cannot allocate handle");
resp_rc = CLE_OOM;
@@ -717,11 +685,12 @@ void msg_open(struct msg_params *mp)
free(raw_sess);
free(h);
- resp_copy(&resp.resp, mp->msg);
- resp.resp.code = cpu_to_le32(CLE_OK);
- resp.fh = cpu_to_le64(fh);
- sess_sendmsg(sess, &resp, sizeof(resp), NULL, NULL);
-
+ memset(&resp, 0, sizeof(resp));
+ resp.msg.xid_in = sess->msg_xid;
+ resp.msg.code = CLE_OK;
+ resp.fh = fh;
+ sess_sendmsg(sess, (xdrproc_t)xdr_cld_msg_open_resp,
+ (void *)&resp, CMO_OPEN, NULL, NULL);
return;
err_out:
@@ -729,7 +698,7 @@ err_out:
if (rc)
dbenv->err(dbenv, rc, "msg_open txn abort");
err_out_noabort:
- resp_err(mp->sess, mp->msg, resp_rc);
+ sess_sendresp_generic(sess, resp_rc);
free(parent_data);
free(parent);
free(inode);
@@ -737,45 +706,18 @@ err_out_noabort:
free(h);
}
-void msg_put(struct msg_params *mp)
+void msg_put(struct session *sess, const void *v)
{
- const struct cld_msg_put *msg = mp->msg;
- struct session *sess = mp->sess;
- uint64_t fh;
+ const struct cld_msg_put *put = v;
struct raw_handle *h = NULL;
struct raw_inode *inode = NULL;
enum cle_err_codes resp_rc = CLE_OK;
- const void *mem;
int rc;
cldino_t inum;
- uint32_t omode, data_size;
+ uint32_t omode;
DB_ENV *dbenv = cld_srv.cldb.env;
DB_TXN *txn;
- /* make sure input data as large as message header */
- if (mp->msg_len < sizeof(*msg))
- return;
-
- /* make sure additional input data as large as expected */
- data_size = le32_to_cpu(msg->data_size);
- if (data_size > CLD_MAX_PAYLOAD_SZ) {
- HAIL_ERR(&srv_log, "%s: can't PUT %d bytes of data: "
- "%d is the maximum.\n",
- __func__, data_size, CLD_MAX_PAYLOAD_SZ);
- resp_rc = CLE_BAD_PKT;
- goto err_out_noabort;
- }
- if (mp->msg_len != (data_size + sizeof(*msg))) {
- HAIL_INFO(&srv_log, "PUT len mismatch: msg len %zu, "
- "wanted %zu + %u (== %zu)",
- mp->msg_len,
- sizeof(*msg), data_size, data_size + sizeof(*msg));
- resp_rc = CLE_BAD_PKT;
- goto err_out_noabort;
- }
-
- fh = le64_to_cpu(msg->fh);
-
rc = dbenv->txn_begin(dbenv, NULL, &txn, 0);
if (rc) {
dbenv->err(dbenv, rc, "DB_ENV->txn_begin");
@@ -784,7 +726,7 @@ void msg_put(struct msg_params *mp)
}
/* read handle from db */
- rc = cldb_handle_get(txn, sess->sid, fh, &h, 0);
+ rc = cldb_handle_get(txn, sess->sid, put->fh, &h, 0);
if (rc) {
resp_rc = CLE_FH_INVAL;
goto err_out;
@@ -807,15 +749,14 @@ void msg_put(struct msg_params *mp)
}
/* store contig. data area in db */
- mem = (msg + 1);
rc = cldb_data_put(txn, inum,
- mem, data_size, 0);
+ put->data.data_val, put->data.data_len, 0);
if (rc) {
resp_rc = CLE_DB_ERR;
goto err_out;
}
- inode->size = cpu_to_le32(data_size);
+ inode->size = cpu_to_le32(put->data.data_len);
/* update inode */
rc = inode_touch(txn, inode);
@@ -831,7 +772,7 @@ void msg_put(struct msg_params *mp)
goto err_out_noabort;
}
- resp_ok(sess, mp->msg);
+ sess_sendresp_generic(sess, CLE_OK);
free(h);
free(inode);
@@ -842,31 +783,23 @@ err_out:
if (rc)
dbenv->err(dbenv, rc, "msg_put txn abort");
err_out_noabort:
- resp_err(sess, mp->msg, resp_rc);
+ sess_sendresp_generic(sess, resp_rc);
free(h);
free(inode);
}
-void msg_close(struct msg_params *mp)
+void msg_close(struct session *sess, const void *v)
{
- const struct cld_msg_close *msg = mp->msg;
- uint64_t fh;
+ const struct cld_msg_close *close = v;
int rc;
enum cle_err_codes resp_rc = CLE_OK;
struct raw_handle *h = NULL;
cldino_t lock_inum = 0;
bool waiter = false;
- struct session *sess = mp->sess;
DB_ENV *dbenv = cld_srv.cldb.env;
DB_TXN *txn;
- /* make sure input data as large as expected */
- if (mp->msg_len < sizeof(*msg))
- return;
-
- fh = le64_to_cpu(msg->fh);
-
rc = dbenv->txn_begin(dbenv, NULL, &txn, 0);
if (rc) {
dbenv->err(dbenv, rc, "DB_ENV->txn_begin");
@@ -875,7 +808,7 @@ void msg_close(struct msg_params *mp)
}
/* read handle from db */
- rc = cldb_handle_get(txn, sess->sid, fh, &h, DB_RMW);
+ rc = cldb_handle_get(txn, sess->sid, close->fh, &h, DB_RMW);
if (rc) {
if (rc == DB_NOTFOUND)
resp_rc = CLE_FH_INVAL;
@@ -888,7 +821,7 @@ void msg_close(struct msg_params *mp)
lock_inum = cldino_from_le(h->inum);
/* delete handle from db */
- rc = cldb_handle_del(txn, sess->sid, fh);
+ rc = cldb_handle_del(txn, sess->sid, close->fh);
if (rc) {
resp_rc = CLE_DB_ERR;
goto err_out;
@@ -896,7 +829,7 @@ void msg_close(struct msg_params *mp)
/* remove locks, if any */
rc = session_remove_locks(txn, sess->sid,
- fh, lock_inum, &waiter);
+ close->fh, lock_inum, &waiter);
if (rc) {
resp_rc = CLE_DB_ERR;
goto err_out;
@@ -918,7 +851,7 @@ void msg_close(struct msg_params *mp)
goto err_out_noabort;
}
- resp_ok(sess, mp->msg);
+ sess_sendresp_generic(sess, CLE_OK);
free(h);
return;
@@ -927,16 +860,15 @@ err_out:
if (rc)
dbenv->err(dbenv, rc, "msg_close txn abort");
err_out_noabort:
- resp_err(sess, mp->msg, resp_rc);
+ sess_sendresp_generic(sess, resp_rc);
free(h);
}
-void msg_del(struct msg_params *mp)
+void msg_del(struct session *sess, const void *v)
{
- const struct cld_msg_del *msg = mp->msg;
+ const struct cld_msg_del *del = v;
enum cle_err_codes resp_rc = CLE_OK;
int rc, name_len;
- const char *name;
struct pathname_info pinfo;
struct raw_inode *parent = NULL, *ino = NULL;
void *parent_data = NULL;
@@ -949,23 +881,13 @@ void msg_del(struct msg_params *mp)
DB_ENV *dbenv = cld_srv.cldb.env;
DB_TXN *txn;
- /* make sure input data as large as expected */
- if (mp->msg_len < sizeof(*msg))
- return;
-
- name_len = le16_to_cpu(msg->name_len);
-
- if (mp->msg_len < (sizeof(*msg) + name_len))
- return;
-
- name = mp->msg + sizeof(*msg);
-
- if (!valid_inode_name(name, name_len) || (name_len < 2)) {
+ name_len = strlen(del->inode_name);
+ if (!valid_inode_name(del->inode_name, name_len) || (name_len < 2)) {
resp_rc = CLE_NAME_INVAL;
goto err_out_noabort;
}
- pathname_parse(name, name_len, &pinfo);
+ pathname_parse(del->inode_name, name_len, &pinfo);
rc = dbenv->txn_begin(dbenv, NULL, &txn, 0);
if (rc) {
@@ -991,7 +913,8 @@ void msg_del(struct msg_params *mp)
}
/* read inode to be deleted */
- rc = cldb_inode_get_byname(txn, name, name_len, &ino, false, 0);
+ rc = cldb_inode_get_byname(txn, del->inode_name, name_len,
+ &ino, false, 0);
if (rc) {
if (rc == DB_NOTFOUND)
resp_rc = CLE_NAME_INVAL;
@@ -1100,7 +1023,7 @@ void msg_del(struct msg_params *mp)
goto err_out_noabort;
}
- resp_ok(mp->sess, mp->msg);
+ sess_sendresp_generic(sess, CLE_OK);
free(ino);
free(parent);
free(parent_data);
@@ -1111,31 +1034,23 @@ err_out:
if (rc)
dbenv->err(dbenv, rc, "msg_del txn abort");
err_out_noabort:
- resp_err(mp->sess, mp->msg, resp_rc);
+ sess_sendresp_generic(sess, resp_rc);
free(ino);
free(parent);
free(parent_data);
}
-void msg_unlock(struct msg_params *mp)
+void msg_unlock(struct session *sess, const void *v)
{
- const struct cld_msg_unlock *msg = mp->msg;
- uint64_t fh;
+ const struct cld_msg_unlock *unlock = v;
struct raw_handle *h = NULL;
cldino_t inum;
int rc;
enum cle_err_codes resp_rc = CLE_OK;
uint32_t omode;
- struct session *sess = mp->sess;
DB_ENV *dbenv = cld_srv.cldb.env;
DB_TXN *txn;
- /* make sure input data as large as expected */
- if (mp->msg_len < sizeof(*msg))
- return;
-
- fh = le64_to_cpu(msg->fh);
-
rc = dbenv->txn_begin(dbenv, NULL, &txn, 0);
if (rc) {
dbenv->err(dbenv, rc, "DB_ENV->txn_begin");
@@ -1144,7 +1059,7 @@ void msg_unlock(struct msg_params *mp)
}
/* read handle from db */
- rc = cldb_handle_get(txn, sess->sid, fh, &h, 0);
+ rc = cldb_handle_get(txn, sess->sid, unlock->fh, &h, 0);
if (rc) {
resp_rc = CLE_FH_INVAL;
goto err_out;
@@ -1159,7 +1074,7 @@ void msg_unlock(struct msg_params *mp)
}
/* attempt to given lock on filehandle */
- rc = cldb_lock_del(txn, sess->sid, fh, inum);
+ rc = cldb_lock_del(txn, sess->sid, unlock->fh, inum);
if (rc) {
resp_rc = CLE_LOCK_INVAL;
goto err_out;
@@ -1172,7 +1087,7 @@ void msg_unlock(struct msg_params *mp)
goto err_out_noabort;
}
- resp_ok(sess, mp->msg);
+ sess_sendresp_generic(sess, CLE_OK);
free(h);
return;
@@ -1181,30 +1096,22 @@ err_out:
if (rc)
dbenv->err(dbenv, rc, "msg_unlock txn abort");
err_out_noabort:
- resp_err(sess, mp->msg, resp_rc);
+ sess_sendresp_generic(sess, resp_rc);
free(h);
}
-void msg_lock(struct msg_params *mp, bool wait)
+void msg_lock(struct session *sess, const void *v)
{
- const struct cld_msg_lock *msg = mp->msg;
- uint64_t fh;
+ const struct cld_msg_lock *lock = v;
+ bool wait = (sess->msg_op == CMO_LOCK);
struct raw_handle *h = NULL;
cldino_t inum;
int rc;
enum cle_err_codes resp_rc = CLE_OK;
- uint32_t lock_flags, omode;
+ uint32_t omode;
bool acquired = false;
DB_ENV *dbenv = cld_srv.cldb.env;
DB_TXN *txn;
- struct session *sess = mp->sess;
-
- /* make sure input data as large as expected */
- if (mp->msg_len < sizeof(*msg))
- return;
-
- fh = le64_to_cpu(msg->fh);
- lock_flags = le32_to_cpu(msg->flags);
rc = dbenv->txn_begin(dbenv, NULL, &txn, 0);
if (rc) {
@@ -1214,7 +1121,7 @@ void msg_lock(struct msg_params *mp, boo
}
/* read handle from db */
- rc = cldb_handle_get(txn, sess->sid, fh, &h, 0);
+ rc = cldb_handle_get(txn, sess->sid, lock->fh, &h, 0);
if (rc) {
resp_rc = CLE_FH_INVAL;
goto err_out;
@@ -1229,8 +1136,8 @@ void msg_lock(struct msg_params *mp, boo
}
/* attempt to add lock */
- rc = cldb_lock_add(txn, sess->sid, fh, inum,
- lock_flags & CLF_SHARED, wait, &acquired);
+ rc = cldb_lock_add(txn, sess->sid, lock->fh, inum,
+ lock->flags & CLF_SHARED, wait, &acquired);
if (rc) {
if (rc == DB_KEYEXIST)
resp_rc = CLE_LOCK_CONFLICT;
@@ -1253,7 +1160,7 @@ void msg_lock(struct msg_params *mp, boo
}
/* lock was acquired immediately */
- resp_ok(mp->sess, mp->msg);
+ sess_sendresp_generic(sess, CLE_OK);
free(h);
return;
@@ -1262,7 +1169,7 @@ err_out:
if (rc)
dbenv->err(dbenv, rc, "msg_lock txn abort");
err_out_noabort:
- resp_err(mp->sess, mp->msg, resp_rc);
+ sess_sendresp_generic(sess, resp_rc);
free(h);
}
Binary files cld/server/msg.o and cld.rpcgen/server/msg.o differ
Binary files cld/server/pkt.o and cld.rpcgen/server/pkt.o differ
diff -X /garz/tmp/dontdiff -urNp cld/server/server.c cld.rpcgen/server/server.c
--- cld/server/server.c 2010-02-03 01:15:26.000000000 -0500
+++ cld.rpcgen/server/server.c 2010-02-03 16:49:16.000000000 -0500
@@ -37,6 +37,7 @@
#include <openssl/hmac.h>
#include <cld-private.h>
#include "cld.h"
+#include <cld_pkt.h>
#define PROGRAM_NAME "cld"
@@ -125,8 +126,6 @@ int udp_tx(int sock_fd, struct sockaddr
{
ssize_t src;
- HAIL_DEBUG(&srv_log, "%s, fd %d", __func__, sock_fd);
-
src = sendto(sock_fd, data, data_len, 0, addr, addr_len);
if (src < 0 && errno != EAGAIN)
HAIL_ERR(&srv_log, "%s sendto (fd %d, data_len %u): %s",
@@ -139,36 +138,6 @@ int udp_tx(int sock_fd, struct sockaddr
return 0;
}
-void resp_copy(struct cld_msg_resp *resp, const struct cld_msg_hdr *src)
-{
- memcpy(&resp->hdr, src, sizeof(*src));
- resp->code = 0;
- resp->rsv = 0;
- resp->xid_in = src->xid;
-}
-
-void resp_err(struct session *sess,
- const struct cld_msg_hdr *src, enum cle_err_codes errcode)
-{
- struct cld_msg_resp resp;
-
- resp_copy(&resp, src);
- __cld_rand64(&resp.hdr.xid);
- resp.code = cpu_to_le32(errcode);
-
- if (sess->sock_fd <= 0) {
- HAIL_ERR(&srv_log, "Nul sock in response");
- return;
- }
-
- sess_sendmsg(sess, &resp, sizeof(resp), NULL, NULL);
-}
-
-void resp_ok(struct session *sess, const struct cld_msg_hdr *src)
-{
- resp_err(sess, src, CLE_OK);
-}
-
const char *user_key(const char *user)
{
/* TODO: better auth scheme.
@@ -181,266 +150,426 @@ const char *user_key(const char *user)
return user; /* our secret key */
}
-static void show_msg(const struct cld_msg_hdr *msg)
+static int udp_rx_handle(struct session *sess,
+ void (*msg_handler)(struct session *sess, const void *),
+ xdrproc_t xdrproc, void *xdrdata)
+{
+ XDR xin;
+ xdrmem_create(&xin, sess->msg_buf, sess->msg_buf_len, XDR_DECODE);
+ if (!xdrproc(&xin, xdrdata)) {
+ HAIL_DEBUG(&srv_log, "%s: couldn't parse %s message",
+ __func__, __cld_opstr(sess->msg_op));
+ xdr_destroy(&xin);
+ return CLE_BAD_PKT;
+ }
+ msg_handler(sess, xdrdata);
+ xdr_free(xdrproc, xdrdata);
+ xdr_destroy(&xin);
+ return 0;
+}
+
+/** Recieve a UDP packet
+ *
+ * @param sock_fd The UDP socket we received the packet on
+ * @param cli Client address data
+ * @param info Packet information
+ * @param raw_pkt The raw packet buffer
+ * @param raw_len Length of the raw packet buffer
+ *
+ * @return An error code if we should send an error message
+ * response. CLE_OK if we are done.
+ */
+static enum cle_err_codes udp_rx(int sock_fd,
+ const struct client *cli, struct pkt_info *info,
+ const char *raw_pkt, size_t raw_len)
{
- switch (msg->op) {
- case CMO_NOP:
- case CMO_NEW_SESS:
- case CMO_OPEN:
- case CMO_GET_META:
+ struct cld_pkt_hdr *pkt = info->pkt;
+ struct session *sess = info->sess;
+
+ if (sess) {
+ size_t msg_len;
+
+ /* advance sequence id's and update last-contact timestamp */
+ sess->last_contact = current_time.tv_sec;
+ sess->sock_fd = sock_fd;
+
+ if (info->op != CMO_ACK) {
+ /* received message - update session */
+ sess->next_seqid_in++;
+ }
+
+ /* copy message fragment into reassembly buffer */
+ if (pkt->mi.order & CLD_PKT_IS_FIRST) {
+ sess->msg_op = info->op;
+ sess->msg_xid = info->xid;
+ sess->msg_buf_len = 0;
+ }
+ msg_len = raw_len - info->hdr_len - CLD_PKT_FTR_LEN;
+ if ((sess->msg_buf_len + msg_len) > CLD_MAX_MSG_SZ)
+ return CLE_BAD_PKT;
+
+ memcpy(sess->msg_buf + sess->msg_buf_len,
+ raw_pkt + info->hdr_len, msg_len);
+ sess->msg_buf_len += msg_len;
+ }
+
+ if (!(pkt->mi.order & CLD_PKT_IS_LAST)) {
+ struct cld_msg_ack_frag ack;
+ ack.seqid = info->seqid;
+
+ /* transmit ack-partial-msg response (once, without retries) */
+ simple_sendmsg(sock_fd, cli, pkt->sid,
+ pkt->user, 0xdeadbeef,
+ (xdrproc_t)xdr_cld_msg_ack_frag, (void *)&ack,
+ CMO_ACK_FRAG);
+ return CLE_OK;
+ }
+
+ /* Handle a complete message */
+ switch (info->op) {
case CMO_GET:
- case CMO_PUT:
- case CMO_CLOSE:
- case CMO_DEL:
- case CMO_LOCK:
- case CMO_UNLOCK:
+ /* fall through */
+ case CMO_GET_META: {
+ struct cld_msg_get get = {0};
+ return udp_rx_handle(sess, msg_get,
+ (xdrproc_t)xdr_cld_msg_get, &get);
+ }
+ case CMO_OPEN: {
+ struct cld_msg_open open = {0};
+ return udp_rx_handle(sess, msg_open,
+ (xdrproc_t)xdr_cld_msg_open, &open);
+ }
+ case CMO_PUT: {
+ struct cld_msg_put put = {0};
+ return udp_rx_handle(sess, msg_put,
+ (xdrproc_t)xdr_cld_msg_put, &put);
+ }
+ case CMO_CLOSE: {
+ struct cld_msg_close close = {0};
+ return udp_rx_handle(sess, msg_close,
+ (xdrproc_t)xdr_cld_msg_close, &close);
+ }
+ case CMO_DEL: {
+ struct cld_msg_del del = {0};
+ return udp_rx_handle(sess, msg_del,
+ (xdrproc_t)xdr_cld_msg_del, &del);
+ }
+ case CMO_UNLOCK: {
+ struct cld_msg_unlock unlock = {0};
+ return udp_rx_handle(sess, msg_unlock,
+ (xdrproc_t)xdr_cld_msg_unlock, &unlock);
+ }
case CMO_TRYLOCK:
+ /* fall through */
+ case CMO_LOCK: {
+ struct cld_msg_lock lock = {0};
+ return udp_rx_handle(sess, msg_lock,
+ (xdrproc_t)xdr_cld_msg_lock, &lock);
+ }
case CMO_ACK:
+ msg_ack(sess, info->seqid);
+ return 0;
+ case CMO_NOP:
+ sess_sendresp_generic(sess, CLE_OK);
+ return 0;
+ case CMO_NEW_SESS:
+ msg_new_sess(sock_fd, cli, info);
+ return 0;
case CMO_END_SESS:
- case CMO_PING:
- case CMO_NOT_MASTER:
- case CMO_EVENT:
- case CMO_ACK_FRAG:
- HAIL_DEBUG(&srv_log, "msg: op %s, xid %llu",
- __cld_opstr(msg->op),
- (unsigned long long) le64_to_cpu(msg->xid));
- break;
+ msg_end_sess(sess, info->xid);
+ return 0;
+ default:
+ HAIL_DEBUG(&srv_log, "%s: unexpected %s packet",
+ __func__, __cld_opstr(info->op));
+ /* do nothing */
+ return 0;
}
}
-static void udp_rx_msg(const struct client *cli, const struct cld_packet *pkt,
- const struct cld_msg_hdr *msg, struct msg_params *mp)
+/** Parse a packet's header. Verify that the magic number is correct.
+ *
+ * @param raw_pkt Pointer to the packet data
+ * @param raw_len Length of the raw data
+ * @param pkt (out param) the packet header
+ * @param hdr_len (out param) the length of the packet header
+ *
+ * @return true on success; false if this packet is garbage
+ */
+static bool parse_pkt_header(const char *raw_pkt, int raw_len,
+ struct cld_pkt_hdr *pkt, ssize_t *hdr_len)
{
- struct session *sess = mp->sess;
+ XDR xin;
+ static const char * const magic = CLD_PKT_MAGIC;
- if (srv_log.verbose)
- show_msg(msg);
+ if (raw_len <= CLD_PKT_FTR_LEN) {
+ HAIL_DEBUG(&srv_log, "%s: packet is too short: only "
+ "%d bytes", __func__, raw_len);
+ return false;
+ }
+ xdrmem_create(&xin, (void *)raw_pkt, raw_len - CLD_PKT_FTR_LEN,
+ XDR_DECODE);
+ memset(pkt, 0, sizeof(*pkt));
+ if (!xdr_cld_pkt_hdr(&xin, pkt)) {
+ HAIL_DEBUG(&srv_log, "%s: couldn't parse packet header",
+ __func__);
+ xdr_destroy(&xin);
+ return false;
+ }
+ *hdr_len = xdr_getpos(&xin);
+ xdr_destroy(&xin);
+
+ if (memcmp((void *)&pkt->magic, magic, sizeof(pkt->magic))) {
+ HAIL_DEBUG(&srv_log, "%s: bad magic number", __func__);
+ xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)pkt);
+ return false;
+ }
+
+ return true;
+}
+
+/** Look up some information about a packet, including its session and the
+ * type of message it carries.
+ *
+ * @param pkt The packet's header
+ * @param raw_pkt Pointer to the raw packet data
+ * @param raw_len Length of the raw packet data
+ * @param info (out param) Information about the packet
+ *
+ * @return true on success; false if this packet is garbage
+ */
+static bool get_pkt_info(struct cld_pkt_hdr *pkt,
+ const char *raw_pkt, size_t raw_len,
+ size_t hdr_len, struct pkt_info *info)
+{
+ struct cld_pkt_ftr *foot;
+ struct session *s;
+
+ memset(info, 0, sizeof(info));
+ info->pkt = pkt;
+ info->sess = s = g_hash_table_lookup(cld_srv.sessions, &pkt->sid);
+ foot = (struct cld_pkt_ftr *)
+ (raw_pkt + (raw_len - CLD_PKT_FTR_LEN));
+ info->seqid = le64_to_cpu(foot->seqid);
+
+ if (pkt->mi.order & CLD_PKT_IS_FIRST) {
+ info->xid = pkt->mi.cld_pkt_msg_info_u.mi.xid;
+ info->op = pkt->mi.cld_pkt_msg_info_u.mi.op;
+ } else {
+ if (!s) {
+ HAIL_DEBUG(&srv_log, "%s: packet is not first, "
+ "but also not part of an existing session. "
+ "Protocol error.", __func__);
+ return false;
+ }
+ info->xid = s->msg_xid;
+ info->op = s->msg_op;
+ }
+ info->hdr_len = hdr_len;
+ return true;
+}
- switch(msg->op) {
- case CMO_NOP:
- resp_ok(sess, msg);
- break;
+/** Verify that the client session matches IP and username
+ *
+ * @param info Packet information
+ * @param cli Client address data
+ *
+ * @return 0 on success; error code otherwise
+ */
+static enum cle_err_codes validate_pkt_session(const struct pkt_info *info,
+ const struct client *cli)
+{
+ struct session *sess = info->sess;
- case CMO_NEW_SESS: msg_new_sess(mp, cli); break;
- case CMO_END_SESS: msg_end_sess(mp, cli); break;
- case CMO_OPEN: msg_open(mp); break;
- case CMO_GET: msg_get(mp, false); break;
- case CMO_GET_META: msg_get(mp, true); break;
- case CMO_PUT: msg_put(mp); break;
- case CMO_CLOSE: msg_close(mp); break;
- case CMO_DEL: msg_del(mp); break;
- case CMO_UNLOCK: msg_unlock(mp); break;
- case CMO_LOCK: msg_lock(mp, true); break;
- case CMO_TRYLOCK: msg_lock(mp, false); break;
- case CMO_ACK: msg_ack(mp); break;
+ if (!sess) {
+ /* Packets that don't belong to a session must be new-session
+ * packets attempting to establish a session. */
+ if (info->op != CMO_NEW_SESS) {
+ HAIL_DEBUG(&srv_log, "%s: packet doesn't belong to a "
+ "session,but has type %d",
+ __func__, info->op);
+ return CLE_SESS_INVAL;
+ }
+ return 0;
+ }
- default:
- /* do nothing */
- break;
+ if (info->op == CMO_NEW_SESS) {
+ HAIL_DEBUG(&srv_log, "%s: Tried to create a new session, "
+ "but a session with that ID already exists.",
+ __func__);
+ return CLE_SESS_EXISTS;
}
-}
-static void pkt_ack_frag(int sock_fd,
- const struct client *cli,
- const struct cld_packet *pkt)
-{
- size_t alloc_len;
- struct cld_packet *outpkt;
- struct cld_msg_ack_frag *ack_msg;
- void *p;
- const char *secret_key;
+ /* verify that client session matches IP */
+ if ((sess->addr_len != cli->addr_len) ||
+ memcmp(&sess->addr, &cli->addr, sess->addr_len)) {
+ HAIL_DEBUG(&srv_log, "%s: sess->addr doesn't match packet "
+ "addr", __func__);
+ return CLE_SESS_INVAL;
+ }
+
+ /* verify that client session matches username */
+ if (strncmp(info->pkt->user, sess->user, CLD_MAX_USERNAME)) {
+ HAIL_DEBUG(&srv_log, "%s: session doesn't match packet's "
+ "username", __func__);
+ return CLE_SESS_INVAL;
+ }
+
+ if (sess->dead) {
+ HAIL_DEBUG(&srv_log, "%s: packet session is dead",
+ __func__);
+ return CLE_SESS_INVAL;
+ }
+
+ return 0;
+}
- alloc_len = sizeof(*outpkt) + sizeof(*ack_msg) + SHA_DIGEST_LENGTH;
- outpkt = alloca(alloc_len);
- ack_msg = (struct cld_msg_ack_frag *) (outpkt + 1);
- memset(outpkt, 0, alloc_len);
-
- pkt_init_pkt(outpkt, pkt);
-
- memcpy(ack_msg->hdr.magic, CLD_MSG_MAGIC, CLD_MAGIC_SZ);
- __cld_rand64(&ack_msg->hdr.xid);
- ack_msg->hdr.op = CMO_ACK_FRAG;
- ack_msg->seqid = pkt->seqid;
-
- p = outpkt;
- secret_key = user_key(outpkt->user);
- __cld_authsign(&srv_log, secret_key, p, alloc_len - SHA_DIGEST_LENGTH,
- p + alloc_len - SHA_DIGEST_LENGTH);
-
- HAIL_DEBUG(&srv_log, "%s: "
- "sid " SIDFMT ", op %s, seqid %llu",
- __func__,
- SIDARG(outpkt->sid), __cld_opstr(ack_msg->hdr.op),
- (unsigned long long) le64_to_cpu(outpkt->seqid));
-
- /* transmit ack-partial-msg response (once, without retries) */
- udp_tx(sock_fd, (struct sockaddr *) &cli->addr, cli->addr_len,
- outpkt, alloc_len);
-}
-
-static void udp_rx(int sock_fd,
- const struct client *cli,
- const void *raw_pkt, size_t pkt_len)
-{
- const struct cld_packet *pkt = raw_pkt;
- struct cld_packet *outpkt;
- const struct cld_msg_hdr *msg = (struct cld_msg_hdr *) (pkt + 1);
- struct session *sess = NULL;
- enum cle_err_codes resp_rc = CLE_OK;
- struct cld_msg_resp *resp;
- struct msg_params mp;
- size_t alloc_len;
- uint32_t pkt_flags;
- bool first_frag, last_frag, have_new_sess, have_ack, have_put;
+/** Check a packet's cryptographic signature
+ *
+ * @param raw_pkt Pointer to the packet data
+ * @param raw_len Length of the raw data
+ * @param pkt the packet header
+ *
+ * @return 0 on success; error code otherwise
+ */
+static enum cle_err_codes validate_pkt_signature(const char *raw_pkt,
+ int raw_len, const struct cld_pkt_hdr *pkt)
+{
+ struct cld_pkt_ftr *foot;
const char *secret_key;
int auth_rc;
- void *p;
+ foot = (struct cld_pkt_ftr *)
+ (raw_pkt + (raw_len - CLD_PKT_FTR_LEN));
secret_key = user_key(pkt->user);
- /* verify pkt data integrity and credentials via HMAC signature */
auth_rc = __cld_authcheck(&srv_log, secret_key, raw_pkt,
- pkt_len - SHA_DIGEST_LENGTH,
- raw_pkt + pkt_len - SHA_DIGEST_LENGTH);
+ raw_len - SHA_DIGEST_LENGTH,
+ foot->sha);
if (auth_rc) {
HAIL_DEBUG(&srv_log, "auth failed, code %d", auth_rc);
- resp_rc = CLE_SIG_INVAL;
- goto err_out;
- }
-
- pkt_flags = le32_to_cpu(pkt->flags);
- first_frag = pkt_flags & CPF_FIRST;
- last_frag = pkt_flags & CPF_LAST;
- have_new_sess = first_frag && (msg->op == CMO_NEW_SESS);
- have_ack = first_frag && (msg->op == CMO_ACK);
- have_put = first_frag && (msg->op == CMO_PUT);
-
- /* look up client session, verify it matches IP and username */
- sess = g_hash_table_lookup(cld_srv.sessions, pkt->sid);
- if (sess &&
- ((sess->addr_len != cli->addr_len) ||
- memcmp(&sess->addr, &cli->addr, sess->addr_len) ||
- strncmp(pkt->user, sess->user, CLD_MAX_USERNAME) ||
- sess->dead)) {
- resp_rc = CLE_SESS_INVAL;
- goto err_out;
+ return CLE_SIG_INVAL;
}
- mp.sock_fd = sock_fd;
- mp.cli = cli;
- mp.sess = sess;
- mp.pkt = pkt;
- mp.msg = msg;
- mp.msg_len = pkt_len - sizeof(*pkt) - SHA_DIGEST_LENGTH;
-
- HAIL_DEBUG(&srv_log, "%s pkt: len %zu, seqid %llu, sid " SIDFMT ", "
- "flags %s%s, user %s",
- __func__,
- pkt_len, (unsigned long long) le64_to_cpu(pkt->seqid),
- SIDARG(pkt->sid),
- first_frag ? "F" : "", last_frag ? "L" : "",
- pkt->user);
-
- /* advance sequence id's and update last-contact timestamp */
- if (!have_new_sess) {
- if (!sess) {
- resp_rc = CLE_SESS_INVAL;
- goto err_out;
- }
-
- sess->last_contact = current_time.tv_sec;
- sess->sock_fd = sock_fd;
-
- if (!have_ack) {
- /* eliminate duplicates; do not return any response */
- if (le64_to_cpu(pkt->seqid) != sess->next_seqid_in) {
- HAIL_DEBUG(&srv_log, "%s: dropping dup", __func__);
- return;
- }
+ return 0;
+}
- /* received message - update session */
- sess->next_seqid_in++;
- }
- } else {
- if (sess) {
- /* eliminate duplicates; do not return any response */
- if (le64_to_cpu(pkt->seqid) != sess->next_seqid_in) {
- HAIL_DEBUG(&srv_log, "%s: dropping dup", __func__);
- return;
- }
+/** Check if this packet is a duplicate
+ *
+ * @param info Packet info
+ *
+ * @return true only if the packet is a duplicate
+ */
+static bool packet_is_dupe(const struct pkt_info *info)
+{
+ if (!info->sess)
+ return false;
+ if (info->op == CMO_ACK)
+ return false;
- resp_rc = CLE_SESS_EXISTS;
- goto err_out;
- }
+ /* Check sequence IDs to discover if this packet is a dupe */
+ if (info->seqid != info->sess->next_seqid_in) {
+ HAIL_DEBUG(&srv_log, "dropping dup with seqid %llu "
+ "(expected %llu).",
+ (unsigned long long) info->seqid,
+ (unsigned long long) info->sess->next_seqid_in);
+ return true;
}
- /* copy message fragment into reassembly buffer */
- if (sess) {
- if (first_frag)
- sess->msg_buf_len = 0;
-
- if ((sess->msg_buf_len + mp.msg_len) > CLD_MAX_MSG_SZ) {
- resp_rc = CLE_BAD_PKT;
- goto err_out;
- }
-
- memcpy(&sess->msg_buf[sess->msg_buf_len], msg, mp.msg_len);
- sess->msg_buf_len += mp.msg_len;
-
- if (!last_frag) {
- pkt_ack_frag(sock_fd, cli, pkt);
- return;
- }
+ return false;
+}
- mp.msg = msg = (struct cld_msg_hdr *) sess->msg_buf;
- mp.msg_len = sess->msg_buf_len;
+void simple_sendmsg(int fd, const struct client *cli,
+ uint64_t sid, const char *user, uint64_t seqid,
+ xdrproc_t xdrproc, const void *xdrdata, enum cld_msg_op op)
+{
+ XDR xhdr, xmsg;
+ struct cld_pkt_hdr pkt;
+ struct cld_pkt_msg_infos *infos;
+ struct cld_pkt_ftr *foot;
+ const char *secret_key;
+ char *buf;
+ size_t msg_len, hdr_len, buf_len;
+ int auth_rc;
- if ((srv_log.verbose > 1) && !first_frag)
- HAIL_DEBUG(&srv_log, " final message size %u",
- sess->msg_buf_len);
+ /* Set up the packet header */
+ memset(&pkt, 0, sizeof(cld_pkt_hdr));
+ memcpy(&pkt.magic, CLD_PKT_MAGIC, sizeof(pkt.magic));
+ pkt.sid = sid;
+ pkt.user = (char *)user;
+ pkt.mi.order = CLD_PKT_ORD_FIRST_LAST;
+ infos = &pkt.mi.cld_pkt_msg_info_u.mi;
+ __cld_rand64(&infos->xid);
+ infos->op = op;
+
+ /* Determine sizes */
+ msg_len = xdr_sizeof(xdrproc, (void *)xdrdata);
+ if (msg_len > CLD_MAX_MSG_SZ) {
+ HAIL_ERR(&srv_log, "%s: tried to put %d message bytes in a "
+ "single packet. Maximum message bytes per packet "
+ "is %d", __func__, msg_len, CLD_MAX_PKT_MSG_SZ);
+ return;
}
+ hdr_len = xdr_sizeof((xdrproc_t)xdr_cld_pkt_hdr, &pkt);
+ buf_len = msg_len + hdr_len + CLD_PKT_FTR_LEN;
+ buf = alloca(buf_len);
+
+ /* Serialize data */
+ xdrmem_create(&xhdr, buf, hdr_len, XDR_ENCODE);
+ if (!xdr_cld_pkt_hdr(&xhdr, &pkt)) {
+ xdr_destroy(&xhdr);
+ HAIL_ERR(&srv_log, "%s: xdr_cld_pkt_hdr failed",
+ __func__);
+ return;
+ }
+ xdr_destroy(&xhdr);
+ xdrmem_create(&xmsg, buf + hdr_len, msg_len, XDR_ENCODE);
+ if (!xdrproc(&xmsg, (void *)xdrdata)) {
+ xdr_destroy(&xmsg);
+ HAIL_ERR(&srv_log, "%s: xdrproc failed", __func__);
+ return;
+ }
+ xdr_destroy(&xmsg);
- if (last_frag)
- udp_rx_msg(cli, pkt, msg, &mp);
- return;
-
-err_out:
- /* transmit error response (once, without retries) */
- alloc_len = sizeof(*outpkt) + sizeof(*resp) + SHA_DIGEST_LENGTH;
- outpkt = alloca(alloc_len);
- resp = (struct cld_msg_resp *) (outpkt + 1);
- memset(outpkt, 0, alloc_len);
-
- pkt_init_pkt(outpkt, pkt);
-
- resp_copy(resp, msg);
- resp->code = cpu_to_le32(resp_rc);
-
- p = outpkt;
- secret_key = user_key(outpkt->user);
- __cld_authsign(&srv_log, secret_key, p, alloc_len - SHA_DIGEST_LENGTH,
- p + alloc_len - SHA_DIGEST_LENGTH);
-
- HAIL_DEBUG(&srv_log, "%s err: "
- "sid " SIDFMT ", op %s, seqid %llu, code %d",
- __func__,
- SIDARG(outpkt->sid), __cld_opstr(resp->hdr.op),
- (unsigned long long) le64_to_cpu(outpkt->seqid),
- resp_rc);
-
- udp_tx(sock_fd, (struct sockaddr *) &cli->addr, cli->addr_len,
- outpkt, alloc_len);
+ foot = (struct cld_pkt_ftr *)
+ (buf + (buf_len - SHA_DIGEST_LENGTH));
+ foot->seqid = cpu_to_le64(seqid);
+ secret_key = user_key(user);
+
+ auth_rc =__cld_authsign(&srv_log, secret_key, buf,
+ buf_len - SHA_DIGEST_LENGTH,
+ foot->sha);
+ if (auth_rc)
+ HAIL_ERR(&srv_log, "%s: authsign failed: %d",
+ __func__, auth_rc);
+
+ udp_tx(fd, (struct sockaddr *) &cli->addr, cli->addr_len,
+ buf, buf_len);
+}
+
+static void simple_sendresp(int sock_fd, const struct client *cli,
+ const struct pkt_info *info, enum cle_err_codes code)
+{
+ const struct cld_pkt_hdr *pkt = info->pkt;
+ struct cld_msg_generic_resp resp;
+ resp.code = code;
+ resp.xid_in = info->xid;
+
+ simple_sendmsg(sock_fd, cli, pkt->sid, pkt->user, info->seqid,
+ (xdrproc_t)xdr_cld_msg_generic_resp, (void *)&resp,
+ info->op);
}
static bool udp_srv_event(int fd, short events, void *userdata)
{
struct client cli;
char host[64];
- ssize_t rrc;
+ ssize_t rrc, hdr_len;
struct msghdr hdr;
struct iovec iov[2];
- uint8_t raw_pkt[CLD_RAW_MSG_SZ], ctl_msg[CLD_RAW_MSG_SZ];
- struct cld_packet *pkt = (struct cld_packet *) raw_pkt;
+ char raw_pkt[CLD_RAW_MSG_SZ], ctl_msg[CLD_RAW_MSG_SZ];
+ struct cld_pkt_hdr pkt;
+ struct pkt_info info;
+ enum cle_err_codes err;
memset(&cli, 0, sizeof(cli));
@@ -470,46 +599,52 @@ static bool udp_srv_event(int fd, short
HAIL_DEBUG(&srv_log, "client %s message (%d bytes)", host, (int) rrc);
- /* if it is complete garbage, drop immediately */
- if ((rrc < (sizeof(*pkt) + SHA_DIGEST_LENGTH)) ||
- (memcmp(pkt->magic, CLD_PKT_MAGIC, sizeof(pkt->magic)))) {
+ if (!parse_pkt_header(raw_pkt, rrc, &pkt, &hdr_len)) {
cld_srv.stats.garbage++;
- return true; /* continue main loop; do NOT terminate server */
+ return true;
}
- if (cld_srv.cldb.is_master && cld_srv.cldb.up)
- udp_rx(fd, &cli, raw_pkt, rrc);
+ if (!get_pkt_info(&pkt, raw_pkt, rrc, hdr_len, &info)) {
+ xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
+ cld_srv.stats.garbage++;
+ return true;
+ }
+
+ if (packet_is_dupe(&info)) {
+ /* silently drop dupes */
+ xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
+ return true;
+ }
- else {
- struct cld_packet *outpkt;
- struct cld_msg_hdr *msg = (struct cld_msg_hdr *) (pkt + 1);
- struct cld_msg_resp *resp;
- size_t alloc_len;
- const char *secret_key;
- void *p;
-
- alloc_len = sizeof(*outpkt) + sizeof(*resp) + SHA_DIGEST_LENGTH;
- outpkt = alloca(alloc_len);
- memset(outpkt, 0, alloc_len);
-
- pkt_init_pkt(outpkt, pkt);
-
- /* transmit not-master error msg */
- resp = (struct cld_msg_resp *) (outpkt + 1);
- resp_copy(resp, msg);
- resp->hdr.op = CMO_NOT_MASTER;
-
- p = outpkt;
- secret_key = user_key(outpkt->user);
- __cld_authsign(&srv_log, secret_key, p,
- alloc_len - SHA_DIGEST_LENGTH,
- p + alloc_len - SHA_DIGEST_LENGTH);
+ err = validate_pkt_session(&info, &cli);
+ if (err) {
+ simple_sendresp(fd, &cli, &info, err);
+ xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
+ return true;
+ }
- udp_tx(fd, (struct sockaddr *) &cli.addr, cli.addr_len,
- outpkt, alloc_len);
+ err = validate_pkt_signature(raw_pkt, rrc, &pkt);
+ if (err) {
+ simple_sendresp(fd, &cli, &info, err);
+ xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
+ return true;
}
- return true; /* continue main loop; do NOT terminate server */
+ if (!(cld_srv.cldb.is_master && cld_srv.cldb.up)) {
+ simple_sendmsg(fd, &cli, pkt.sid, pkt.user, 0xdeadbeef,
+ (xdrproc_t)xdr_void, NULL, CMO_NOT_MASTER);
+ xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
+ return true;
+ }
+
+ err = udp_rx(fd, &cli, &info, raw_pkt, rrc);
+ if (err) {
+ simple_sendresp(fd, &cli, &info, err);
+ xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
+ return true;
+ }
+ xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
+ return true;
}
static void add_chkpt_timer(void)
Binary files cld/server/server.o and cld.rpcgen/server/server.o differ
diff -X /garz/tmp/dontdiff -urNp cld/server/session.c cld.rpcgen/server/session.c
--- cld/server/session.c 2010-02-03 17:37:29.000000000 -0500
+++ cld.rpcgen/server/session.c 2010-02-03 16:49:55.000000000 -0500
@@ -29,15 +29,15 @@
#include <openssl/sha.h>
#include <cld-private.h>
#include "cld.h"
+#include <cld_pkt.h>
struct session_outpkt {
struct session *sess;
- struct cld_packet *pkt;
+ char *pkt_data;
size_t pkt_len;
uint64_t next_retry;
- uint64_t src_seqid;
unsigned int refs;
void (*done_cb)(struct session_outpkt *);
@@ -60,26 +60,6 @@ uint64_t next_seqid_le(uint64_t *seq)
return rc;
}
-void pkt_init_pkt(struct cld_packet *dest, const struct cld_packet *src)
-{
- memset(dest, 0, sizeof(*dest));
- memcpy(dest->magic, CLD_PKT_MAGIC, CLD_MAGIC_SZ);
- dest->seqid = cpu_to_le64(0xdeadbeef);
- memcpy(dest->sid, src->sid, CLD_SID_SZ);
- dest->flags = cpu_to_le32(CPF_FIRST | CPF_LAST);
- strncpy(dest->user, src->user, CLD_MAX_USERNAME - 1);
-}
-
-static void pkt_init_sess(struct cld_packet *dest, struct session *sess)
-{
- memset(dest, 0, sizeof(*dest));
- memcpy(dest->magic, CLD_PKT_MAGIC, CLD_MAGIC_SZ);
- dest->seqid = next_seqid_le(&sess->next_seqid_out);
- memcpy(dest->sid, sess->sid, CLD_SID_SZ);
- dest->flags = 0;
- strncpy(dest->user, sess->user, CLD_MAX_USERNAME - 1);
-}
-
guint sess_hash(gconstpointer v)
{
const struct session *sess = v;
@@ -397,20 +377,6 @@ static void session_ping_done(struct ses
outpkt->sess->ping_open = false;
}
-static void session_ping(struct session *sess)
-{
- struct cld_msg_hdr resp;
-
- memset(&resp, 0, sizeof(resp));
- memcpy(resp.magic, CLD_MSG_MAGIC, CLD_MAGIC_SZ);
- __cld_rand64(&resp.xid);
- resp.op = CMO_PING;
-
- sess->ping_open = true;
-
- sess_sendmsg(sess, &resp, sizeof(resp), session_ping_done, NULL);
-}
-
static void session_timeout(struct cld_timer *timer)
{
struct session *sess = timer->userdata;
@@ -424,8 +390,12 @@ static void session_timeout(struct cld_t
if (!sess->dead && (sess_expire > now)) {
if (!sess->ping_open &&
(sess_expire > (sess->last_contact + (CLD_SESS_TIMEOUT / 2) &&
- (sess->sock_fd > 0))))
- session_ping(sess);
+ (sess->sock_fd > 0)))) {
+ sess->ping_open = true;
+ sess_sendmsg(sess,
+ (xdrproc_t)xdr_void, NULL, CMO_PING,
+ session_ping_done, NULL);
+ }
cld_timer_add(&cld_srv.timers, &sess->timer,
now + ((sess_expire - now) / 2) + 1);
@@ -519,8 +489,8 @@ static struct session_outpkt *op_alloc(s
if (!op)
return NULL;
- op->pkt = calloc(1, pkt_len);
- if (!op->pkt) {
+ op->pkt_data = calloc(1, pkt_len);
+ if (!op->pkt_data) {
free(op);
return NULL;
}
@@ -542,7 +512,7 @@ static void op_unref(struct session_outp
return;
}
- free(op->pkt);
+ free(op->pkt_data);
free(op);
}
@@ -556,18 +526,10 @@ static int sess_retry_output(struct sess
tmp = sess->out_q;
while (tmp) {
- struct cld_packet *outpkt;
- struct cld_msg_hdr *outmsg;
struct session_outpkt *op;
- GList *tmp1;
-
- tmp1 = tmp;
+ op = tmp->data;
tmp = tmp->next;
- op = tmp1->data;
- outpkt = op->pkt;
- outmsg = (struct cld_msg_hdr *) (outpkt + 1);
-
if (!next_retry || (op->next_retry < next_retry))
*next_retry_out = next_retry = op->next_retry;
@@ -575,15 +537,15 @@ static int sess_retry_output(struct sess
continue;
if (srv_log.verbose) {
- HAIL_DEBUG(&srv_log, "%s: retrying: sid " SIDFMT ", "
- "op %s, seqid %llu",
+ char scratch[PKT_HDR_TO_STR_SCRATCH_LEN];
+ HAIL_DEBUG(&srv_log, "%s: retrying %s",
__func__,
- SIDARG(outpkt->sid), __cld_opstr(outmsg->op),
- (unsigned long long) le64_to_cpu(outpkt->seqid));
+ __cld_pkt_hdr_to_str(scratch, op->pkt_data,
+ op->pkt_len));
}
rc = udp_tx(sess->sock_fd, (struct sockaddr *) &sess->addr,
- sess->addr_len, op->pkt, op->pkt_len);
+ sess->addr_len, op->pkt_data, op->pkt_len);
if (rc)
break;
@@ -616,153 +578,161 @@ static void session_outq(struct session
sess->out_q = g_list_concat(sess->out_q, new_pkts);
}
-bool sess_sendmsg(struct session *sess, const void *msg_, size_t msglen,
- void (*done_cb)(struct session_outpkt *),
- void *done_data)
-{
- struct cld_packet *outpkt;
- unsigned int n_pkts, i;
- size_t pkt_len, msg_left = msglen;
- struct session_outpkt **pkts, *op;
- GList *tmp_root = NULL;
- const void *p;
- bool first_frag = true;
-
- if (msglen > CLD_MAX_MSG_SZ) {
- HAIL_ERR(&srv_log, "%s: message too big (%zu bytes)\n",
- __func__, msglen);
- return false;
- }
-
- n_pkts = (msglen / CLD_MAX_PKT_MSG_SZ);
- n_pkts += (msglen % CLD_MAX_PKT_MSG_SZ) ? 1 : 0;
- pkts = alloca(sizeof(struct session_outpkt *) * n_pkts);
-
- if (srv_log.verbose) {
- const struct cld_msg_hdr *hdr = msg_;
- const struct cld_msg_resp *rsp;
-
- switch (hdr->op) {
- /* This is the command set that gets to cldc_rx_generic */
- case CMO_NOP:
- case CMO_CLOSE:
- case CMO_DEL:
- case CMO_LOCK:
- case CMO_UNLOCK:
- case CMO_TRYLOCK:
- case CMO_PUT:
- case CMO_NEW_SESS:
- case CMO_END_SESS:
- case CMO_OPEN:
- case CMO_GET_META:
- case CMO_GET:
- rsp = (struct cld_msg_resp *) msg_;
- HAIL_DEBUG(&srv_log, "%s: "
- "sid " SIDFMT ", op %s, msglen %u, code %u, "
- "xid %llu, xid_in %llu",
- __func__,
- SIDARG(sess->sid),
- __cld_opstr(hdr->op),
- (unsigned int) msglen,
- le32_to_cpu(rsp->code),
- (unsigned long long) le64_to_cpu(hdr->xid),
- (unsigned long long) le64_to_cpu(rsp->xid_in));
- break;
- default:
- HAIL_DEBUG(&srv_log, "%s: "
- "sid " SIDFMT ", op %s, msglen %u",
- __func__,
- SIDARG(sess->sid),
- __cld_opstr(hdr->op),
- (unsigned int) msglen);
- }
- }
+bool sess_sendmsg(struct session *sess,
+ xdrproc_t xdrproc, const void *xdrdata, enum cld_msg_op msg_op,
+ void (*done_cb)(struct session_outpkt *), void *done_data)
+{
+ XDR xmsg;
+ size_t msg_rem, msg_len, msg_chunk_len;
+ char *msg_bytes, *msg_cur;
+ GList *tmp_list, *new_pkts = NULL;
+ int first, last;
+ const char *secret_key;
- /* pass 1: perform allocations */
- for (i = 0; i < n_pkts; i++) {
- pkts[i] = op = op_alloc(sizeof(*outpkt) +
- CLD_MAX_PKT_MSG_SZ +
- SHA_DIGEST_LENGTH);
- if (!op)
- goto err_out;
+ secret_key = user_key(sess->user);
- tmp_root = g_list_append(tmp_root, op);
+ /* Use XDR to serialize the message */
+ msg_len = xdr_sizeof(xdrproc, (void *)xdrdata);
+ if (msg_len > CLD_MAX_MSG_SZ)
+ return false;
+ msg_bytes = alloca(msg_len);
+ xdrmem_create(&xmsg, msg_bytes, msg_len, XDR_ENCODE);
+ if (!xdrproc(&xmsg, (void *)xdrdata)) {
+ xdr_destroy(&xmsg);
+ HAIL_ERR(&srv_log, "%s: xdrproc failed", __func__);
+ return false;
}
+ xdr_destroy(&xmsg);
- /* pass 2: fill packets */
- p = msg_;
- for (i = 0; i < n_pkts; i++) {
- struct cld_msg_hdr *outmsg;
- void *outmsg_mem;
- size_t copy_len;
- void *out_p;
- const char *secret_key;
-
- op = pkts[i];
+ /* Break the message into packets */
+ first = 1;
+ msg_rem = msg_len;
+ msg_cur = msg_bytes;
+ do {
+ XDR xout;
+ struct cld_pkt_hdr pkt;
+ size_t hdr_len;
+ struct session_outpkt *op;
+ if (msg_rem <= CLD_MAX_PKT_MSG_SZ) {
+ msg_chunk_len = msg_rem;
+ last = 1;
+ } else {
+ msg_chunk_len = CLD_MAX_PKT_MSG_SZ;
+ last = 0;
+ }
+
+ /* Set up packet header */
+ memset(&pkt, 0, sizeof(pkt));
+ memcpy(&pkt.magic, CLD_PKT_MAGIC, sizeof(pkt.magic));
+ memcpy(&pkt.sid, sess->sid, CLD_SID_SZ);
+ pkt.user = sess->user;
+ if (first) {
+ struct cld_pkt_msg_infos *infos =
+ &pkt.mi.cld_pkt_msg_info_u.mi;
+ if (last)
+ pkt.mi.order = CLD_PKT_ORD_FIRST_LAST;
+ else
+ pkt.mi.order = CLD_PKT_ORD_FIRST;
+ __cld_rand64(&infos->xid);
+ infos->op = msg_op;
+ } else {
+ if (last)
+ pkt.mi.order = CLD_PKT_ORD_LAST;
+ else
+ pkt.mi.order = CLD_PKT_ORD_MID;
+ }
+
+ /* Allocate space and initialize session_outpkt structure */
+ hdr_len = xdr_sizeof((xdrproc_t)xdr_cld_pkt_hdr, (void *)&pkt);
+ op = op_alloc(hdr_len + msg_chunk_len + CLD_PKT_FTR_LEN);
+ if (!op) {
+ HAIL_DEBUG(&srv_log, "%s: op_alloc failed",
+ __func__);
+ goto err_out;
+ }
op->sess = sess;
-
- outpkt = op->pkt;
- pkt_len = op->pkt_len;
-
- outmsg_mem = (outpkt + 1);
- outmsg = outmsg_mem;
-
- /* init packet header */
- pkt_init_sess(outpkt, sess);
-
- if (first_frag) {
- first_frag = false;
- outpkt->flags |= cpu_to_le32(CPF_FIRST);
+ op->next_retry = current_time.tv_sec + CLD_RETRY_START;
+ op->done_cb = done_cb;
+ op->done_data = done_data;
+ xdrmem_create(&xout, op->pkt_data, hdr_len, XDR_ENCODE);
+ if (!xdr_cld_pkt_hdr(&xout, &pkt)) {
+ xdr_destroy(&xout);
+ HAIL_ERR(&srv_log, "%s: xdr_cld_pkt_hdr failed",
+ __func__);
+ goto err_out;
}
+ xdr_destroy(&xout);
- copy_len = MIN(pkt_len - sizeof(*outpkt) - SHA_DIGEST_LENGTH,
- msg_left);
- memcpy(outmsg_mem, p, copy_len);
-
- p += copy_len;
- msg_left -= copy_len;
-
- op->pkt_len =
- pkt_len = sizeof(*outpkt) + copy_len + SHA_DIGEST_LENGTH;
-
- if (!msg_left) {
- op->done_cb = done_cb;
- op->done_data = done_data;
-
- outpkt->flags |= cpu_to_le32(CPF_LAST);
+ /* Fill in data */
+ memcpy(op->pkt_data + hdr_len, msg_cur, msg_chunk_len);
+ msg_cur += msg_chunk_len;
+ msg_rem -= msg_chunk_len;
+ first = 0;
+
+ new_pkts = g_list_prepend(new_pkts, op);
+ } while (!last);
+
+ /* add sequence IDs and SHAs */
+ new_pkts = g_list_reverse(new_pkts);
+ for (tmp_list = g_list_first(new_pkts);
+ tmp_list;
+ tmp_list = g_list_next(tmp_list)) {
+ struct session_outpkt *op =
+ (struct session_outpkt *) tmp_list->data;
+ struct cld_pkt_ftr *foot = (struct cld_pkt_ftr *)
+ (op->pkt_data + (op->pkt_len - CLD_PKT_FTR_LEN));
+ int ret;
+
+ foot->seqid = next_seqid_le(&sess->next_seqid_out);
+ ret = __cld_authsign(&srv_log, secret_key,
+ op->pkt_data, op->pkt_len - SHA_DIGEST_LENGTH,
+ foot->sha);
+ if (ret) {
+ HAIL_ERR(&srv_log, "%s: authsign failed: %d",
+ __func__, ret);
+ goto err_out;
}
+ }
- op->next_retry = current_time.tv_sec + CLD_RETRY_START;
-
- out_p = outpkt;
- secret_key = user_key(outpkt->user);
- if (__cld_authsign(&srv_log, secret_key, out_p,
- pkt_len - SHA_DIGEST_LENGTH,
- out_p + pkt_len - SHA_DIGEST_LENGTH))
- goto err_out; /* FIXME: we free all pkts -- wrong! */
-
+ /* send packets */
+ for (tmp_list = g_list_first(new_pkts);
+ tmp_list;
+ tmp_list = g_list_next(tmp_list)) {
+ struct session_outpkt *op =
+ (struct session_outpkt *) tmp_list->data;
udp_tx(sess->sock_fd, (struct sockaddr *) &sess->addr,
- sess->addr_len, outpkt, pkt_len);
+ sess->addr_len, op->pkt_data, op->pkt_len);
}
- session_outq(sess, tmp_root);
+ session_outq(sess, new_pkts);
return true;
err_out:
- for (i = 0; i < n_pkts; i++)
- op_unref(pkts[i]);
- g_list_free(tmp_root);
+ for (tmp_list = g_list_first(new_pkts); tmp_list;
+ tmp_list = g_list_next(tmp_list)) {
+ struct session_outpkt *op;
+ op = (struct session_outpkt *)tmp_list->data;
+ op_unref(op);
+ }
+ g_list_free(new_pkts);
return false;
}
-void msg_ack(struct msg_params *mp)
+void sess_sendresp_generic(struct session *sess, enum cle_err_codes code)
+{
+ struct cld_msg_generic_resp resp;
+ resp.code = code;
+ resp.xid_in = sess->msg_xid;
+
+ sess_sendmsg(sess, (xdrproc_t)xdr_cld_msg_generic_resp,
+ (void *)&resp, sess->msg_op, NULL, NULL);
+}
+
+void msg_ack(struct session *sess, uint64_t seqid)
{
- struct cld_packet *outpkt;
- struct cld_msg_hdr *outmsg;
GList *tmp, *tmp1;
- struct session *sess = mp->sess;
struct session_outpkt *op;
if (!sess->out_q)
@@ -771,19 +741,22 @@ void msg_ack(struct msg_params *mp)
/* look through output queue */
tmp = sess->out_q;
while (tmp) {
+ uint64_t op_seqid;
+ struct cld_pkt_ftr *foot;
tmp1 = tmp;
tmp = tmp->next;
op = tmp1->data;
- outpkt = op->pkt;
- outmsg = (struct cld_msg_hdr *) (outpkt + 1);
+ foot = (struct cld_pkt_ftr *)
+ (op->pkt_data + (op->pkt_len - CLD_PKT_FTR_LEN));
+ op_seqid = le64_to_cpu(foot->seqid);
/* if matching seqid found, we ack'd a message in out_q */
- if (mp->pkt->seqid != outpkt->seqid)
+ if (seqid != op_seqid)
continue;
HAIL_DEBUG(&srv_log, " expiring seqid %llu",
- (unsigned long long) le64_to_cpu(outpkt->seqid));
+ (unsigned long long) op_seqid);
/* remove and delete the ack'd msg; call ack'd callback */
sess->out_q = g_list_delete_link(sess->out_q, tmp1);
@@ -797,19 +770,17 @@ void msg_ack(struct msg_params *mp)
cld_timer_del(&cld_srv.timers, &sess->retry_timer);
}
-void msg_new_sess(struct msg_params *mp, const struct client *cli)
+void msg_new_sess(int sock_fd, const struct client *cli,
+ const struct pkt_info *info)
{
+ const struct cld_pkt_hdr *pkt = info->pkt;
DB *db = cld_srv.cldb.sessions;
struct raw_session raw_sess;
struct session *sess = NULL;
DBT key, val;
int rc;
enum cle_err_codes resp_rc = CLE_OK;
- struct cld_msg_resp *resp;
- struct cld_packet *outpkt;
- size_t alloc_len;
- const char *secret_key;
- void *p;
+ struct cld_msg_generic_resp resp;
sess = session_new();
if (!sess) {
@@ -818,17 +789,17 @@ void msg_new_sess(struct msg_params *mp,
}
/* build raw_session database record */
- memcpy(sess->sid, mp->pkt->sid, sizeof(sess->sid));
+ memcpy(sess->sid, &pkt->sid, sizeof(sess->sid));
memcpy(&sess->addr, &cli->addr, sizeof(sess->addr));
- strncpy(sess->user, mp->pkt->user, sizeof(sess->user));
- sess->user[sizeof(sess->user) - 1] = 0;
+ snprintf(sess->user, sizeof(sess->user), "%s",
+ pkt->user);
- sess->sock_fd = mp->sock_fd;
+ sess->sock_fd = sock_fd;
sess->addr_len = cli->addr_len;
strncpy(sess->ipaddr, cli->addr_host, sizeof(sess->ipaddr));
sess->last_contact = current_time.tv_sec;
- sess->next_seqid_in = le64_to_cpu(mp->pkt->seqid) + 1;
+ sess->next_seqid_in = info->seqid + 1;
session_encode(&raw_sess, sess);
@@ -865,34 +836,26 @@ void msg_new_sess(struct msg_params *mp,
cld_timer_add(&cld_srv.timers, &sess->timer,
time(NULL) + (CLD_SESS_TIMEOUT / 2));
- resp_ok(sess, mp->msg);
+ /* send new-sess reply */
+ resp.code = CLE_OK;
+ resp.xid_in = info->xid;
+ sess_sendmsg(sess, (xdrproc_t)xdr_cld_msg_generic_resp,
+ (void *)&resp, CMO_NEW_SESS, NULL, NULL);
+
return;
err_out:
session_free(sess, true);
- alloc_len = sizeof(*outpkt) + sizeof(*resp) + SHA_DIGEST_LENGTH;
- outpkt = alloca(alloc_len);
- memset(outpkt, 0, alloc_len);
-
- pkt_init_pkt(outpkt, mp->pkt);
-
- resp = (struct cld_msg_resp *) (outpkt + 1);
- resp_copy(resp, mp->msg);
- resp->code = cpu_to_le32(resp_rc);
-
- p = outpkt;
- secret_key = user_key(outpkt->user);
- __cld_authsign(&srv_log, secret_key, p, alloc_len - SHA_DIGEST_LENGTH,
- p + alloc_len - SHA_DIGEST_LENGTH);
-
- HAIL_DEBUG(&srv_log, "%s err: sid " SIDFMT ", op %s, seqid %llu",
+ HAIL_DEBUG(&srv_log, "%s err: sid " SIDFMT ", op %s",
__func__,
- SIDARG(outpkt->sid), __cld_opstr(resp->hdr.op),
- (unsigned long long) le64_to_cpu(outpkt->seqid));
+ pkt->sid, __cld_opstr(CMO_NEW_SESS));
- udp_tx(mp->sock_fd, (struct sockaddr *) &mp->cli->addr,
- mp->cli->addr_len, outpkt, alloc_len);
+ resp.code = resp_rc;
+ resp.xid_in = info->xid;
+ simple_sendmsg(sock_fd, cli, pkt->sid, pkt->user, 0xdeadbeef,
+ (xdrproc_t)xdr_cld_msg_generic_resp, (void *)&resp,
+ CMO_NEW_SESS);
HAIL_DEBUG(&srv_log, "NEW-SESS failed: %d", resp_rc);
}
@@ -902,18 +865,18 @@ static void end_sess_done(struct session
session_trash(outpkt->sess);
}
-void msg_end_sess(struct msg_params *mp, const struct client *cli)
+void msg_end_sess(struct session *sess, uint64_t xid)
{
- struct session *sess = mp->sess;
- struct cld_msg_resp resp;
+ struct cld_msg_generic_resp resp;
/* do nothing; let message acknowledgement via
* end_sess_done mark session dead
*/
-
- memset(&resp, 0, sizeof(resp));
- resp_copy(&resp, mp->msg);
- sess_sendmsg(sess, &resp, sizeof(resp), end_sess_done, NULL);
+ resp.code = CLE_OK;
+ resp.xid_in = xid;
+ sess_sendmsg(sess, (xdrproc_t)xdr_cld_msg_generic_resp,
+ &resp, CMO_END_SESS,
+ end_sess_done, NULL);
}
/*
Binary files cld/server/session.o and cld.rpcgen/server/session.o differ
Binary files cld/server/util.o and cld.rpcgen/server/util.o differ
Binary files cld/test/it-works and cld.rpcgen/test/it-works differ
Binary files cld/test/it-works.o and cld.rpcgen/test/it-works.o differ
Binary files cld/test/libtest.a and cld.rpcgen/test/libtest.a differ
Binary files cld/test/load-file-event and cld.rpcgen/test/load-file-event differ
diff -X /garz/tmp/dontdiff -urNp cld/test/load-file-event.c cld.rpcgen/test/load-file-event.c
--- cld/test/load-file-event.c 2010-01-29 00:36:25.000000000 -0500
+++ cld.rpcgen/test/load-file-event.c 2010-02-03 16:27:33.000000000 -0500
@@ -33,6 +33,8 @@
#include <cldc.h>
#include "test.h"
+#include "cld_pkt.h"
+
struct run {
struct cldc_udp *udp;
struct cld_timer_list tlist;
@@ -146,6 +148,8 @@ static int read_1_cb(struct cldc_call_op
{
struct run *rp = coptarg->private;
struct cldc_call_opts copts;
+ char *data;
+ size_t data_len;
int rc;
if (errc != CLE_OK) {
@@ -153,12 +157,14 @@ static int read_1_cb(struct cldc_call_op
exit(1);
}
- if (coptarg->u.get.size != TESTLEN) {
- fprintf(stderr, "Bad CLD file length %d\n", coptarg->u.get.size);
+ cldc_call_opts_get_data(coptarg, &data, &data_len);
+
+ if (data_len != TESTLEN) {
+ fprintf(stderr, "Bad CLD file length %zu\n", data_len);
exit(1);
}
- if (memcmp(coptarg->u.get.buf, TESTSTR, TESTLEN)) {
+ if (memcmp(data, TESTSTR, TESTLEN)) {
fprintf(stderr, "Bad CLD file content\n");
exit(1);
}
Binary files cld/test/load-file-event.o and cld.rpcgen/test/load-file-event.o differ
Binary files cld/test/lock-file-event and cld.rpcgen/test/lock-file-event differ
Binary files cld/test/lock-file-event.o and cld.rpcgen/test/lock-file-event.o differ
diff -X /garz/tmp/dontdiff -urNp cld/test/Makefile.am cld.rpcgen/test/Makefile.am
--- cld/test/Makefile.am 2010-02-02 22:31:09.000000000 -0500
+++ cld.rpcgen/test/Makefile.am 2010-02-02 22:30:24.000000000 -0500
@@ -1,5 +1,6 @@
INCLUDES = -I$(top_srcdir)/include \
+ -I$(top_srcdir)/lib \
@GLIB_CFLAGS@
EXTRA_DIST = \
Binary files cld/test/save-file-event and cld.rpcgen/test/save-file-event differ
Binary files cld/test/save-file-event.o and cld.rpcgen/test/save-file-event.o differ
Binary files cld/test/util.o and cld.rpcgen/test/util.o differ
Binary files cld/tools/cldcli and cld.rpcgen/tools/cldcli differ
diff -X /garz/tmp/dontdiff -urNp cld/tools/cldcli.c cld.rpcgen/tools/cldcli.c
--- cld/tools/cldcli.c 2010-02-03 17:20:20.000000000 -0500
+++ cld.rpcgen/tools/cldcli.c 2010-02-03 17:23:17.000000000 -0500
@@ -217,6 +217,8 @@ static int cb_ls_2(struct cldc_call_opts
struct cldc_call_opts copts = { NULL, };
struct cld_dirent_cur dc;
int rc, i;
+ char *data;
+ size_t data_len;
bool first = true;
if (errc != CLE_OK) {
@@ -224,8 +226,9 @@ static int cb_ls_2(struct cldc_call_opts
write_from_thread(&cresp, sizeof(cresp));
return 0;
}
+ cldc_call_opts_get_data(copts_in, &data, &data_len);
- rc = cldc_dirent_count(copts_in->u.get.buf, copts_in->u.get.size);
+ rc = cldc_dirent_count(data, data_len);
if (rc < 0) {
write_from_thread(&cresp, sizeof(cresp));
return 0;
@@ -236,7 +239,7 @@ static int cb_ls_2(struct cldc_call_opts
write_from_thread(&cresp, sizeof(cresp));
- cldc_dirent_cur_init(&dc, copts_in->u.get.buf, copts_in->u.get.size);
+ cldc_dirent_cur_init(&dc, data, data_len);
for (i = 0; i < rc; i++) {
struct ls_rec lsr;
@@ -294,6 +297,8 @@ static int cb_cat_2(struct cldc_call_opt
{
struct cresp cresp = { .tcode = TC_FAILED, };
struct cldc_call_opts copts = { NULL, };
+ char *data;
+ size_t data_len;
if (errc != CLE_OK) {
errc_msg(&cresp, errc);
@@ -301,11 +306,13 @@ static int cb_cat_2(struct cldc_call_opt
return 0;
}
+ cldc_call_opts_get_data(copts_in, &data, &data_len);
+
cresp.tcode = TC_OK;
- cresp.u.file_len = copts_in->u.get.size;
+ cresp.u.file_len = data_len;
write_from_thread(&cresp, sizeof(cresp));
- write_from_thread(copts_in->u.get.buf, copts_in->u.get.size);
+ write_from_thread(data, data_len);
/* FIXME: race; should wait until close succeeds/fails before
* returning any data. 'fh' may still be in use, otherwise.
@@ -338,6 +345,8 @@ static int cb_cp_cf_2(struct cldc_call_o
{
struct cresp cresp = { .tcode = TC_FAILED, };
struct cldc_call_opts copts = { NULL, };
+ char *data;
+ size_t data_len;
if (errc != CLE_OK) {
errc_msg(&cresp, errc);
@@ -345,11 +354,12 @@ static int cb_cp_cf_2(struct cldc_call_o
return 0;
}
+ cldc_call_opts_get_data(copts_in, &data, &data_len);
cresp.tcode = TC_OK;
- cresp.u.file_len = copts_in->u.get.size;
+ cresp.u.file_len = data_len;
write_from_thread(&cresp, sizeof(cresp));
- write_from_thread(copts_in->u.get.buf, copts_in->u.get.size);
+ write_from_thread(data, data_len);
/* FIXME: race; should wait until close succeeds/fails before
* returning any data. 'fh' may still be in use, otherwise.
Binary files cld/tools/cldcli.o and cld.rpcgen/tools/cldcli.o differ
diff -X /garz/tmp/dontdiff -urNp cld/tools/Makefile.am cld.rpcgen/tools/Makefile.am
--- cld/tools/Makefile.am 2010-01-29 00:36:25.000000000 -0500
+++ cld.rpcgen/tools/Makefile.am 2010-01-22 18:28:08.000000000 -0500
@@ -1,5 +1,6 @@
INCLUDES = -I$(top_srcdir)/include \
+ -I$(top_srcdir)/lib \
@GLIB_CFLAGS@
bin_PROGRAMS = cldcli
--
To unsubscribe from this list: send the line "unsubscribe hail-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic