From: Trond Myklebust <trond.myklebust@fys.uio.no>

RPC: patch by Chuck Lever to make the number of RPC slots a tunable
parameter.  Typically this is of interest for WAN users that want to be
able to have more requests being sent from the client before it waits for a
response from the server in order to ensure maximum bandwidth usage.

Increase the default number of slots when using TCP mounts from 16 to 64.


---

 fs/nfs/inode.c               |    5 ----
 include/linux/sunrpc/debug.h |    2 +
 include/linux/sunrpc/xprt.h  |   26 ++++++++++++----------
 net/sunrpc/sunrpc_syms.c     |    2 +
 net/sunrpc/sysctl.c          |   28 +++++++++++++++++++++++-
 net/sunrpc/xprt.c            |   50 ++++++++++++++++++++++++++-----------------
 6 files changed, 77 insertions(+), 36 deletions(-)

diff -puN fs/nfs/inode.c~nfs-tunable-rpc-slot-table fs/nfs/inode.c
--- 25/fs/nfs/inode.c~nfs-tunable-rpc-slot-table	2004-02-29 15:03:28.000000000 -0800
+++ 25-akpm/fs/nfs/inode.c	2004-02-29 15:03:28.000000000 -0800
@@ -47,11 +47,8 @@
  *        their needs. People that do NFS over a slow network, might for
  *        instance want to reduce it to something closer to 1 for improved
  *        interactive response.
- *
- *        For the moment, though, we instead set it to RPC_MAXREQS, which
- *        is the maximum number of simultaneous RPC requests on the wire.
  */
-#define NFS_MAX_READAHEAD	RPC_MAXREQS
+#define NFS_MAX_READAHEAD	(RPC_DEF_SLOT_TABLE - 1)
 
 static void nfs_invalidate_inode(struct inode *);
 static int nfs_update_inode(struct inode *, struct nfs_fattr *, unsigned long);
diff -puN include/linux/sunrpc/debug.h~nfs-tunable-rpc-slot-table include/linux/sunrpc/debug.h
--- 25/include/linux/sunrpc/debug.h~nfs-tunable-rpc-slot-table	2004-02-29 15:03:28.000000000 -0800
+++ 25-akpm/include/linux/sunrpc/debug.h	2004-02-29 15:03:28.000000000 -0800
@@ -92,6 +92,8 @@ enum {
 	CTL_NFSDEBUG,
 	CTL_NFSDDEBUG,
 	CTL_NLMDEBUG,
+	CTL_SLOTTABLE_UDP,
+	CTL_SLOTTABLE_TCP,
 };
 
 #endif /* _LINUX_SUNRPC_DEBUG_H_ */
diff -puN include/linux/sunrpc/xprt.h~nfs-tunable-rpc-slot-table include/linux/sunrpc/xprt.h
--- 25/include/linux/sunrpc/xprt.h~nfs-tunable-rpc-slot-table	2004-02-29 15:03:28.000000000 -0800
+++ 25-akpm/include/linux/sunrpc/xprt.h	2004-02-29 15:03:28.000000000 -0800
@@ -28,16 +28,18 @@
  *
  * Upper procedures may check whether a request would block waiting for
  * a free RPC slot by using the RPC_CONGESTED() macro.
- *
- * Note: on machines with low memory we should probably use a smaller
- * MAXREQS value: At 32 outstanding reqs with 8 megs of RAM, fragment
- * reassembly will frequently run out of memory.
- */
-#define RPC_MAXCONG		(16)
-#define RPC_MAXREQS		RPC_MAXCONG
-#define RPC_CWNDSCALE		(256)
-#define RPC_MAXCWND		(RPC_MAXCONG * RPC_CWNDSCALE)
+ */
+extern unsigned int xprt_udp_slot_table_entries;
+extern unsigned int xprt_tcp_slot_table_entries;
+
+#define RPC_MIN_SLOT_TABLE	(2U)
+#define RPC_DEF_SLOT_TABLE	(16U)
+#define RPC_MAX_SLOT_TABLE	(128U)
+
+#define RPC_CWNDSHIFT		(8U)
+#define RPC_CWNDSCALE		(1U << RPC_CWNDSHIFT)
 #define RPC_INITCWND		RPC_CWNDSCALE
+#define RPC_MAXCWND(xprt)	((xprt)->max_reqs << RPC_CWNDSHIFT)
 #define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd)
 
 /* Default timeout values */
@@ -92,7 +94,6 @@ struct rpc_rqst {
 	 */
 	struct rpc_task *	rq_task;	/* RPC task data */
 	__u32			rq_xid;		/* request XID */
-	struct rpc_rqst *	rq_next;	/* free list */
 	int			rq_cong;	/* has incremented xprt->cong */
 	int			rq_received;	/* receive completed */
 	u32			rq_seqno;	/* gss seq no. used on req. */
@@ -145,8 +146,9 @@ struct rpc_xprt {
 	struct rpc_wait_queue	resend;		/* requests waiting to resend */
 	struct rpc_wait_queue	pending;	/* requests in flight */
 	struct rpc_wait_queue	backlog;	/* waiting for slot */
-	struct rpc_rqst *	free;		/* free slots */
-	struct rpc_rqst		slot[RPC_MAXREQS];
+	struct list_head	free;		/* free slots */
+	struct rpc_rqst *	slot;		/* slot table storage */
+	unsigned int		max_reqs;	/* total slots */
 	unsigned long		sockstate;	/* Socket state */
 	unsigned char		shutdown   : 1,	/* being shut down */
 				nocong	   : 1,	/* no congestion control */
diff -puN net/sunrpc/sunrpc_syms.c~nfs-tunable-rpc-slot-table net/sunrpc/sunrpc_syms.c
--- 25/net/sunrpc/sunrpc_syms.c~nfs-tunable-rpc-slot-table	2004-02-29 15:03:28.000000000 -0800
+++ 25-akpm/net/sunrpc/sunrpc_syms.c	2004-02-29 15:03:28.000000000 -0800
@@ -63,6 +63,8 @@ EXPORT_SYMBOL(rpc_mkpipe);
 EXPORT_SYMBOL(xprt_create_proto);
 EXPORT_SYMBOL(xprt_destroy);
 EXPORT_SYMBOL(xprt_set_timeout);
+EXPORT_SYMBOL(xprt_udp_slot_table_entries);
+EXPORT_SYMBOL(xprt_tcp_slot_table_entries);
 
 /* Client credential cache */
 EXPORT_SYMBOL(rpcauth_register);
diff -puN net/sunrpc/sysctl.c~nfs-tunable-rpc-slot-table net/sunrpc/sysctl.c
--- 25/net/sunrpc/sysctl.c~nfs-tunable-rpc-slot-table	2004-02-29 15:03:28.000000000 -0800
+++ 25-akpm/net/sunrpc/sysctl.c	2004-02-29 15:03:28.000000000 -0800
@@ -1,7 +1,7 @@
 /*
  * linux/net/sunrpc/sysctl.c
  *
- * Sysctl interface to sunrpc module. This is for debugging only now.
+ * Sysctl interface to sunrpc module.
  *
  * I would prefer to register the sunrpc table below sys/net, but that's
  * impossible at the moment.
@@ -19,6 +19,7 @@
 #include <linux/sunrpc/types.h>
 #include <linux/sunrpc/sched.h>
 #include <linux/sunrpc/stats.h>
+#include <linux/sunrpc/xprt.h>
 
 /*
  * Declare the debug flags here
@@ -117,6 +118,9 @@ done:
 	return 0;
 }
 
+static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
+static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
+
 static ctl_table debug_table[] = {
 	{
 		.ctl_name	= CTL_RPCDEBUG,
@@ -150,6 +154,28 @@ static ctl_table debug_table[] = {
 		.mode		= 0644,
 		.proc_handler	= &proc_dodebug
 	}, 
+	{
+		.ctl_name	= CTL_SLOTTABLE_UDP,
+		.procname	= "udp_slot_table_entries",
+		.data		= &xprt_udp_slot_table_entries,
+		.maxlen		= sizeof(unsigned int),
+		.mode		= 0644,
+		.proc_handler	= &proc_dointvec_minmax,
+		.strategy	= &sysctl_intvec,
+		.extra1		= &min_slot_table_size,
+		.extra2		= &max_slot_table_size
+	},
+	{
+		.ctl_name	= CTL_SLOTTABLE_TCP,
+		.procname	= "tcp_slot_table_entries",
+		.data		= &xprt_tcp_slot_table_entries,
+		.maxlen		= sizeof(unsigned int),
+		.mode		= 0644,
+		.proc_handler	= &proc_dointvec_minmax,
+		.strategy	= &sysctl_intvec,
+		.extra1		= &min_slot_table_size,
+		.extra2		= &max_slot_table_size
+	},
 	{ .ctl_name = 0 }
 };
 
diff -puN net/sunrpc/xprt.c~nfs-tunable-rpc-slot-table net/sunrpc/xprt.c
--- 25/net/sunrpc/xprt.c~nfs-tunable-rpc-slot-table	2004-02-29 15:03:28.000000000 -0800
+++ 25-akpm/net/sunrpc/xprt.c	2004-02-29 15:03:28.000000000 -0800
@@ -338,8 +338,8 @@ xprt_adjust_cwnd(struct rpc_xprt *xprt, 
 		/* The (cwnd >> 1) term makes sure
 		 * the result gets rounded properly. */
 		cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
-		if (cwnd > RPC_MAXCWND)
-			cwnd = RPC_MAXCWND;
+		if (cwnd > RPC_MAXCWND(xprt))
+			cwnd = RPC_MAXCWND(xprt);
 		__xprt_lock_write_next(xprt);
 	} else if (result == -ETIMEDOUT) {
 		cwnd >>= 1;
@@ -1306,10 +1306,9 @@ do_xprt_reserve(struct rpc_task *task)
 	task->tk_status = 0;
 	if (task->tk_rqstp)
 		return;
-	if (xprt->free) {
-		struct rpc_rqst	*req = xprt->free;
-		xprt->free = req->rq_next;
-		req->rq_next = NULL;
+	if (!list_empty(&xprt->free)) {
+		struct rpc_rqst	*req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
+		list_del_init(&req->rq_list);
 		task->tk_rqstp = req;
 		xprt_request_init(task, xprt);
 		return;
@@ -1345,7 +1344,6 @@ xprt_request_init(struct rpc_task *task,
 	req->rq_task	= task;
 	req->rq_xprt    = xprt;
 	req->rq_xid     = xprt_alloc_xid(xprt);
-	INIT_LIST_HEAD(&req->rq_list);
 	dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid,
 			req, req->rq_xid);
 }
@@ -1376,9 +1374,7 @@ xprt_release(struct rpc_task *task)
 	dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
 
 	spin_lock(&xprt->xprt_lock);
-	req->rq_next = xprt->free;
-	xprt->free   = req;
-
+	list_add(&req->rq_list, &xprt->free);
 	xprt_clear_backlog(xprt);
 	spin_unlock(&xprt->xprt_lock);
 }
@@ -1409,6 +1405,9 @@ xprt_set_timeout(struct rpc_timeout *to,
 	to->to_exponential = 0;
 }
 
+unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
+unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE << 2;
+
 /*
  * Initialize an RPC client
  */
@@ -1416,21 +1415,33 @@ static struct rpc_xprt *
 xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to)
 {
 	struct rpc_xprt	*xprt;
+	unsigned int entries;
+	size_t slot_table_size;
 	struct rpc_rqst	*req;
-	int		i;
 
 	dprintk("RPC:      setting up %s transport...\n",
 				proto == IPPROTO_UDP? "UDP" : "TCP");
 
+	entries = (proto == IPPROTO_TCP)?
+		xprt_tcp_slot_table_entries : xprt_udp_slot_table_entries;
+
 	if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
 		return ERR_PTR(-ENOMEM);
 	memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
+	xprt->max_reqs = entries;
+	slot_table_size = entries * sizeof(xprt->slot[0]);
+	xprt->slot = kmalloc(slot_table_size, GFP_KERNEL);
+	if (xprt->slot == NULL) {
+		kfree(xprt);
+		return ERR_PTR(-ENOMEM);
+	}
+	memset(xprt->slot, 0, slot_table_size);
 
 	xprt->addr = *ap;
 	xprt->prot = proto;
 	xprt->stream = (proto == IPPROTO_TCP)? 1 : 0;
 	if (xprt->stream) {
-		xprt->cwnd = RPC_MAXCWND;
+		xprt->cwnd = RPC_MAXCWND(xprt);
 		xprt->nocong = 1;
 	} else
 		xprt->cwnd = RPC_INITCWND;
@@ -1438,6 +1449,7 @@ xprt_setup(int proto, struct sockaddr_in
 	spin_lock_init(&xprt->xprt_lock);
 	init_waitqueue_head(&xprt->cong_wait);
 
+	INIT_LIST_HEAD(&xprt->free);
 	INIT_LIST_HEAD(&xprt->recv);
 	INIT_WORK(&xprt->sock_connect, xprt_socket_connect, xprt);
 	INIT_WORK(&xprt->task_cleanup, xprt_socket_autoclose, xprt);
@@ -1460,17 +1472,16 @@ xprt_setup(int proto, struct sockaddr_in
 	INIT_RPC_WAITQ(&xprt->backlog, "xprt_backlog");
 
 	/* initialize free list */
-	for (i = 0, req = xprt->slot; i < RPC_MAXREQS-1; i++, req++)
-		req->rq_next = req + 1;
-	req->rq_next = NULL;
-	xprt->free = xprt->slot;
+	for (req = &xprt->slot[entries-1]; req >= &xprt->slot[0]; req--)
+		list_add(&req->rq_list, &xprt->free);
 
 	xprt_init_xid(xprt);
 
 	/* Check whether we want to use a reserved port */
 	xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
 
-	dprintk("RPC:      created transport %p\n", xprt);
+	dprintk("RPC:      created transport %p with %u slots\n", xprt,
+			xprt->max_reqs);
 	
 	return xprt;
 }
@@ -1550,11 +1561,11 @@ xprt_sock_setbufsize(struct rpc_xprt *xp
 		return;
 	if (xprt->rcvsize) {
 		sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
-		sk->sk_rcvbuf = xprt->rcvsize * RPC_MAXCONG * 2;
+		sk->sk_rcvbuf = xprt->rcvsize * xprt->max_reqs *  2;
 	}
 	if (xprt->sndsize) {
 		sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
-		sk->sk_sndbuf = xprt->sndsize * RPC_MAXCONG * 2;
+		sk->sk_sndbuf = xprt->sndsize * xprt->max_reqs * 2;
 		sk->sk_write_space(sk);
 	}
 }
@@ -1644,6 +1655,7 @@ xprt_destroy(struct rpc_xprt *xprt)
 	dprintk("RPC:      destroying transport %p\n", xprt);
 	xprt_shutdown(xprt);
 	xprt_close(xprt);
+	kfree(xprt->slot);
 	kfree(xprt);
 
 	return 0;

_