patch-1.3.32 linux/fs/nfs/rpcsock.c

Next file: linux/fs/nfs/sock.c
Previous file: linux/fs/nfs/inode.c
Back to the patch index
Back to the overall index

diff -u --recursive --new-file v1.3.31/linux/fs/nfs/rpcsock.c linux/fs/nfs/rpcsock.c
@@ -0,0 +1,391 @@
+/*
+ *  linux/fs/nfs/rpcsock.c
+ *
+ *  This is a generic RPC call interface for datagram sockets that is able
+ *  to place several concurrent RPC requests at the same time. It works like
+ *  this:
+ *
+ *  -	When a process places a call, it allocates a request slot if
+ *	one is available. Otherwise, it sleeps on the backlog queue.
+ *  -	The first process on the receive queue waits for the next RPC reply,
+ *	and peeks at the XID. If it finds a matching request, it receives
+ *	the datagram on behalf of that process and wakes it up. Otherwise,
+ *	the datagram is discarded.
+ *  -	If the process having received the datagram was the first one on
+ *	the receive queue, it wakes up the next one to listen for replies.
+ *  -	It then removes itself from the request queue. If there are more
+ *	callers waiting on the backlog queue, they are woken up, too.
+ *
+ *  Copyright (C) 1995, Olaf Kirch <okir@monad.swb.de>
+ */
+
+#ifdef MODULE
+#include <linux/module.h>
+#endif
+
+#include <linux/types.h>
+#include <linux/malloc.h>
+#include <linux/sched.h>
+#include <linux/nfs_fs.h>
+#include <linux/errno.h>
+#include <linux/socket.h>
+#include <linux/fcntl.h>
+#include <asm/segment.h>
+#include <linux/in.h>
+#include <linux/net.h>
+#include <linux/mm.h>
+#include <linux/rpcsock.h>
+
+#define msleep(sec)	{ current->timeout = sec * HZ / 1000; \
+			  current->state = TAKS_INTERRUPTIBLE; \
+			  schedule(); \
+			}
+#define dprintk		if (0) printk
+
+static inline void
+rpc_insque(struct rpc_sock *rsock, struct rpc_wait *slot)
+{
+	struct rpc_wait	*tmp;
+
+	if ((tmp = rsock->tail) != NULL) {
+		tmp->next = slot;
+	} else {
+		rsock->head = slot;
+	}
+	rsock->tail = slot;
+	slot->prev = tmp;
+	slot->next = NULL;
+	dprintk("RPC: inserted %08lx into queue.\n", (long)slot);
+	dprintk("RPC: head = %08lx, tail = %08lx.\n",
+			(long) rsock->head, (long) rsock->tail);
+}
+
+static inline void
+rpc_remque(struct rpc_sock *rsock, struct rpc_wait *slot)
+{
+	struct rpc_wait	*prev = slot->prev,
+			*next = slot->next;
+
+	if (prev != NULL)
+		prev->next = next;
+	else
+		rsock->head = next;
+	if (next != NULL)
+		next->prev = prev;
+	else
+		rsock->tail = prev;
+	dprintk("RPC: removed %08lx from queue.\n", (long)slot);
+	dprintk("RPC: head = %08lx, tail = %08lx.\n",
+			(long) rsock->head, (long) rsock->tail);
+}
+
+static inline int
+rpc_sendto(struct rpc_sock *rsock, const int *buf, int len,
+			struct sockaddr *sap, int salen)
+{
+	struct socket	*sock = rsock->sock;
+	unsigned long	oldfs;
+	int		result;
+
+	dprintk("RPC: sending %d bytes (buf %08lx)\n", len, (long) buf);
+	oldfs = get_fs();
+	set_fs(get_ds());
+	result = sock->ops->sendto(sock, buf, len, 0, 0, sap, salen);
+	set_fs(oldfs);
+	dprintk("RPC: result = %d\n", result);
+
+	return result;
+}
+
+/*
+ * This code is slightly complicated. Since the networking code does not
+ * honor the current->timeout value, we have to select on the socket.
+ */
+static inline int
+rpc_select(struct rpc_sock *rsock)
+{
+	struct select_table_entry entry;
+	struct file	*file = rsock->file;
+	select_table	wait_table;
+
+	dprintk("RPC: selecting on socket...\n");
+	wait_table.nr = 0;
+	wait_table.entry = &entry;
+	current->state = TASK_INTERRUPTIBLE;
+	if (!file->f_op->select(file->f_inode, file, SEL_IN, &wait_table)
+	 && !file->f_op->select(file->f_inode, file, SEL_IN, NULL)) {
+		schedule();
+		remove_wait_queue(entry.wait_address, &entry.wait);
+		current->state = TASK_RUNNING;
+		if (current->signal & ~current->blocked)
+			return -ERESTARTSYS;
+		if (current->timeout == 0)
+			return -ETIMEDOUT;
+	} else if (wait_table.nr)
+		remove_wait_queue(entry.wait_address, &entry.wait);
+	current->state = TASK_RUNNING;
+	dprintk("RPC: ...Okay, there appears to be some data.\n");
+	return 0;
+}
+
+static inline int
+rpc_recvfrom(struct rpc_sock *rsock, int *buf, int len,
+			struct sockaddr *sap, int salen, int flags)
+{
+	struct socket	*sock = rsock->sock;
+	struct sockaddr	sa;
+	int		alen = sizeof(sa);
+	unsigned long	oldfs;
+	int		result;
+
+	dprintk("RPC: receiving %d bytes max (buf %08lx)\n", len, (long) buf);
+	oldfs = get_fs();
+	set_fs(get_ds());
+	result = sock->ops->recvfrom(sock, buf, len, 1, flags, &sa, &alen);
+	set_fs(oldfs);
+	dprintk("RPC: result = %d\n", result);
+
+#if 0
+	if (alen != salen || memcmp(&sa, sap, alen)) {
+		dprintk("RPC: reply address mismatch... rejected.\n");
+		result = -EAGAIN;
+	}
+#endif
+
+	return result;
+}
+
+/*
+ * Place the actual RPC call.
+ */
+static int
+rpc_call_one(struct rpc_sock *rsock, struct rpc_wait *slot,
+		struct sockaddr *sap, int salen,
+		const int *sndbuf, int slen, int *rcvbuf, int rlen)
+{
+	struct rpc_wait	*rovr = NULL;
+	int		result;
+	u32		xid;
+	int		safe;
+
+	dprintk("RPC: placing one call, rsock = %08lx, slot = %08lx, "
+		"sap = %08lx, salen = %d, "
+		"sndbuf = %08lx, slen = %d, rcvbuf = %08lx, rlen = %d\n",
+		(long) rsock, (long) slot, (long) sap, 
+		salen, (long) sndbuf, slen, (long) rcvbuf, rlen);
+
+	result = rpc_sendto(rsock, sndbuf, slen, sap, salen);
+	if (result < 0)
+		return result;
+
+	do {
+		/* We are not the receiver. Wait on the side lines. */
+		if (rsock->head != slot) {
+			slot->wait = NULL;
+			interruptible_sleep_on(&slot->wait);
+			if (slot->gotit)
+				break;
+			if (current->timeout != 0)
+				continue;
+			if (rsock->shutdown) {
+				printk("RPC: aborting call due to shutdown.\n");
+				return -EIO;
+			}
+			return -ETIMEDOUT;
+		}
+		
+		/* wait for data to arrive */
+		result = rpc_select(rsock);
+		if (result < 0) {
+			dprintk("RPC: select error = %d\n", result);
+			break;
+		}
+
+		result = rpc_recvfrom(rsock, (int *)&xid, sizeof(xid),
+						sap, salen, MSG_PEEK);
+		if (result < 0) {
+			switch (-result) {
+			case EAGAIN: case ECONNREFUSED:
+				continue;
+			default:
+				dprintk("rpc_call: recv error = %d\n", result);
+			case ERESTARTSYS:
+				return result;
+			}
+		}
+
+		/* Look for the caller */
+		safe = 0;
+		for (rovr = rsock->head; rovr; rovr = rovr->next) {
+			if (safe++ > NRREQS) {
+				printk("RPC: loop in request Q!!\n");
+				rovr = NULL;
+				break;
+			}
+			if (rovr->xid == xid)
+				break;
+		}
+
+		if (!rovr || rovr->gotit) {
+			/* bad XID or duplicate reply, discard dgram */
+			dprintk("RPC: bad XID or duplicate reply.\n");
+			rpc_recvfrom(rsock, (int *)&xid, sizeof(xid),
+						sap, salen, 0);
+			continue;
+		}
+		rovr->gotit = 1;
+
+		/* Now receive the reply */
+		result = rpc_recvfrom(rsock, rovr->buf, rovr->len,
+						sap, salen, 0);
+
+		/* If this is not for ourselves, wake up the caller */
+		if (rovr != slot)
+			wake_up(&rovr->wait);
+	} while (rovr != slot);
+
+	/* This is somewhat tricky. We rely on the fact that we are able to
+	 * remove ourselves from the queues before the next reader is scheduled,
+	 * otherwise it would find that we're still at the head of the queue
+	 * and go to sleep again.
+	 */
+	if (rsock->head == slot && slot->next != NULL)
+		wake_up(&slot->next->wait);
+
+	return result;
+}
+
+/*
+ * Generic RPC call routine. This handles retries and timeouts etc pp
+ */
+int
+rpc_call(struct rpc_sock *rsock, struct sockaddr *sap, int addrlen,
+		const int *sndbuf, int slen, int *rcvbuf, int rlen,
+		struct rpc_timeout *strategy, int flag)
+{
+	struct rpc_wait		*slot;
+	int			result, retries;
+	unsigned long		timeout;
+
+	timeout = strategy->init_timeout;
+	retries = 0;
+	slot = NULL;
+
+	do {
+		dprintk("RPC call TP1\n");
+		current->timeout = jiffies + timeout;
+		if (slot == NULL) {
+			while ((slot = rsock->free) == NULL) {
+				if (!flag) {
+					current->timeout = 0;
+					return -ENOBUFS;
+				}
+				interruptible_sleep_on(&rsock->backlog);
+				if (current->timeout == 0) {
+					result = -ETIMEDOUT;
+					goto timedout;
+				}
+				if (rsock->shutdown) {
+					printk("RPC: aborting call due to shutdown.\n");
+					current->timeout = 0;
+					return -EIO;
+				}
+			}
+			dprintk("RPC call TP2\n");
+			slot->gotit = 0;
+			slot->xid = *(u32 *)sndbuf;
+			slot->buf = rcvbuf;
+			slot->len = rlen;
+			rsock->free = slot->next;
+			rpc_insque(rsock, slot);
+		}
+
+		dprintk("RPC call TP3\n");
+		result = rpc_call_one(rsock, slot, sap, addrlen,
+					sndbuf, slen, rcvbuf, rlen);
+		if (result != -ETIMEDOUT)
+			break;
+
+timedout:
+		dprintk("RPC call TP4\n");
+		dprintk("RPC: rpc_call_one returned timeout.\n");
+		if (strategy->exponential)
+			timeout <<= 1;
+		else
+			timeout += strategy->increment;
+		if (strategy->max_timeout && timeout >= strategy->max_timeout)
+			timeout = strategy->max_timeout;
+		if (strategy->retries && ++retries >= strategy->retries)
+			break;
+	} while (1);
+
+	dprintk("RPC call TP5\n");
+	current->timeout = 0;
+	if (slot != NULL) {
+		dprintk("RPC call TP6\n");
+		rpc_remque(rsock, slot);
+		slot->next = rsock->free;
+		rsock->free = slot;
+
+		/* wake up tasks that haven't sent anything yet. (Waking
+		 * up the first one the wait queue would be enough) */
+		if (rsock->backlog)
+			wake_up(&rsock->backlog);
+	}
+
+	if (rsock->shutdown)
+		wake_up(&rsock->shutwait);
+
+	return result;
+}
+
+struct rpc_sock *
+rpc_makesock(struct file *file)
+{
+	struct rpc_sock	*rsock;
+	struct rpc_wait	*slot;
+	int		i;
+
+	dprintk("RPC: make RPC socket...\n");
+	if ((rsock = kmalloc(sizeof(struct rpc_sock), GFP_KERNEL)) == NULL)
+		return NULL;
+
+	rsock->sock = &file->f_inode->u.socket_i;
+	rsock->file = file;
+
+	rsock->free = rsock->waiting;
+	for (i = 0, slot = rsock->waiting; i < NRREQS-1; i++, slot++)
+		slot->next = slot + 1;
+	slot->next = NULL;
+
+	rsock->backlog = NULL;
+	rsock->head = rsock->tail = NULL;
+
+	rsock->shutwait = NULL;
+	rsock->shutdown = 0;
+
+	dprintk("RPC: made socket %08lx", (long) rsock);
+	return rsock;
+}
+
+int
+rpc_closesock(struct rpc_sock *rsock)
+{
+	unsigned long	t0 = jiffies;
+
+	rsock->shutdown = 1;
+	while (rsock->head) {
+		interruptible_sleep_on(&rsock->shutwait);
+		if (current->signal & ~current->blocked)
+			return -EINTR;
+#if 1
+		if (t0 && t0 - jiffies > 60 * HZ) {
+			printk("RPC: hanging in rpc_closesock.\n");
+			t0 = 0;
+		}
+#endif
+	}
+
+	kfree(rsock);
+	return 0;
+}

FUNET's LINUX-ADM group, linux-adm@nic.funet.fi
TCL-scripts by Sam Shen, slshen@lbl.gov with Sam's (original) version
of this