patch-2.4.20 linux-2.4.20/fs/intermezzo/presto.c

Next file: linux-2.4.20/fs/intermezzo/psdev.c
Previous file: linux-2.4.20/fs/intermezzo/methods.c
Back to the patch index
Back to the overall index

diff -urN linux-2.4.19/fs/intermezzo/presto.c linux-2.4.20/fs/intermezzo/presto.c
@@ -1,12 +1,26 @@
-/*
- * intermezzo.c
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- * This file implements basic routines supporting the semantics
+ *  Author: Peter J. Braam <braam@clusterfs.com>
+ *  Copyright (C) 1998 Stelias Computing Inc
+ *  Copyright (C) 1999 Red Hat Inc.
+ *
+ *   This file is part of InterMezzo, http://www.inter-mezzo.org.
+ *
+ *   InterMezzo is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   InterMezzo is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
  *
- * Author: Peter J. Braam  <braam@cs.cmu.edu>
- * Copyright (C) 1998 Stelias Computing Inc
- * Copyright (C) 1999 Red Hat Inc.
+ *   You should have received a copy of the GNU General Public License
+ *   along with InterMezzo; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  *
+ * This file implements basic routines supporting the semantics
  */
 #include <linux/types.h>
 #include <linux/kernel.h>
@@ -23,13 +37,7 @@
 #include <linux/smp_lock.h>
 
 #include <linux/intermezzo_fs.h>
-#include <linux/intermezzo_upcall.h>
 #include <linux/intermezzo_psdev.h>
-#include <linux/intermezzo_kml.h>
-
-extern int presto_init_last_rcvd_file(struct presto_file_set *);
-extern int presto_init_lml_file(struct presto_file_set *);
-extern int presto_init_kml_file(struct presto_file_set *);
 
 int presto_walk(const char *name, struct nameidata *nd)
 {
@@ -37,7 +45,7 @@
         /* we do not follow symlinks to support symlink operations 
            correctly. The vfs should always hand us resolved dentries
            so we should not be required to use LOOKUP_FOLLOW. At the
-	   reintegrating end, lento again should be working with the 
+           reintegrating end, lento again should be working with the 
            resolved pathname and not the symlink. SHP
            XXX: This code implies that direct symlinks do not work. SHP
         */
@@ -51,17 +59,6 @@
 }
 
 
-static inline int presto_dentry_is_fsetroot(struct dentry *dentry)
-{
-        return ((long) dentry->d_fsdata) & PRESTO_FSETROOT;
-}
-
-static inline struct presto_file_set *presto_dentry2fset(struct dentry *dentry)
-{
-        return (struct presto_file_set *)
-                (((long) dentry->d_fsdata) - PRESTO_FSETROOT);
-}
-
 /* find the presto minor device for this inode */
 int presto_i2m(struct inode *inode)
 {
@@ -70,7 +67,7 @@
         cache = presto_get_cache(inode);
         CDEBUG(D_PSDEV, "\n");
         if ( !cache ) {
-                printk("PRESTO: BAD: cannot find cache for dev %d, ino %ld\n",
+                CERROR("PRESTO: BAD: cannot find cache for dev %d, ino %ld\n",
                        inode->i_dev, inode->i_ino);
                 EXIT;
                 return -1;
@@ -91,48 +88,6 @@
 
 }
 
-int presto_has_all_data(struct inode *inode)
-{
-        ENTRY;
-
-        if ( (inode->i_size >> inode->i_sb->s_blocksize_bits) >
-             inode->i_blocks) {
-                EXIT;
-                return 0;
-        }
-        EXIT;
-        return 1;
-
-}
-
-/* find the fileset dentry for this dentry */
-struct presto_file_set *presto_fset(struct dentry *de)
-{
-        struct dentry *fsde;
-        ENTRY;
-        fsde = de;
-        for ( ; ; ) {
-                if ( presto_dentry_is_fsetroot(fsde) ) {
-                        EXIT;
-                        return presto_dentry2fset(fsde);
-                }
-                /* are we at the cache "/" ?? */
-                if ( fsde->d_parent == fsde ) {
-                        if ( !de->d_inode ) {
-                                printk("Warning %*s has no fileset inode.\n",
-                                       de->d_name.len, de->d_name.name);
-                        }
-                        /* better to return a BAD thing */
-                        EXIT;
-                        return NULL;
-                }
-                fsde = fsde->d_parent;
-        }
-        /* not reached */
-        EXIT;
-        return NULL;
-}
-
 /* XXX check this out */
 struct presto_file_set *presto_path2fileset(const char *name)
 {
@@ -168,7 +123,7 @@
 
         ENTRY;
         minor = presto_i2m(dentry->d_inode);
-        if ( upc_comms[minor].uc_no_filter ) {
+        if ( izo_channels[minor].uc_no_filter ) {
                 EXIT;
                 return ~0;
         }
@@ -178,41 +133,30 @@
              (flag == PRESTO_ATTR || flag == PRESTO_DATA) &&
              (fset->fset_flags & FSET_INSYNC) ) {
                 CDEBUG(D_INODE, "fset in sync (ino %ld)!\n",
-                       fset->fset_mtpt->d_inode->i_ino);
+                       fset->fset_dentry->d_inode->i_ino);
                 EXIT;
                 return 1;
         }
 
-        /* if it is a fsetroot, it's stored in the fset_flags */
-        if ( fset && presto_dentry_is_fsetroot(dentry) ) {
-                EXIT;
-                return fset->fset_data & flag;
-        }
-
         EXIT;
-        return ((int)(long)dentry->d_fsdata & flag);
+        return (presto_d2d(dentry)->dd_flags & flag);
 }
 
 /* set a bit in the dentry flags */
 void presto_set(struct dentry *dentry, int flag)
 {
-
+        ENTRY;
         if ( dentry->d_inode ) {
                 CDEBUG(D_INODE, "SET ino %ld, flag %x\n",
                        dentry->d_inode->i_ino, flag);
         }
-
-        if ( presto_dentry_is_fsetroot(dentry)) {
-                struct presto_file_set *fset = presto_dentry2fset(dentry);
-                if (fset) {
-                        fset->fset_data |= flag;
-                        CDEBUG(D_INODE, "Setting fset->fset_data: now %x\n",
-                               fset->fset_data);
-                }
-        } else {
-                CDEBUG(D_INODE, "Setting dentry->d_fsdata\n");
-                ((int)(long)dentry->d_fsdata) |= flag;
+        if ( presto_d2d(dentry) == NULL) {
+                CERROR("dentry without d_fsdata in presto_set: %p: %*s", dentry,
+                                dentry->d_name.len, dentry->d_name.name);
+                BUG();
         }
+        presto_d2d(dentry)->dd_flags |= flag;
+        EXIT;
 }
 
 /* given a path: complete the closes on the fset */
@@ -224,7 +168,6 @@
         struct presto_file_set *fset;
         ENTRY;
 
-
         error = presto_walk(path, &nd);
         if (error) {
                 EXIT;
@@ -242,7 +185,7 @@
         fset = presto_fset(dentry);
         error = -EINVAL;
         if ( !fset ) {
-                printk("No fileset!\n");
+                CERROR("No fileset!\n");
                 EXIT;
                 goto out_complete;
         }
@@ -256,108 +199,7 @@
         return error;
 }       
 
-/* set the fset recno and offset to a given value */ 
-int lento_reset_fset(char *path, __u64 offset, __u32 recno)
-{
-        struct nameidata nd;
-        struct dentry *dentry;
-        int error;
-        struct presto_file_set *fset;
-        ENTRY;
-
-
-        error = presto_walk(path, &nd);
-        if (error)
-                return error;
-
-        dentry = nd.dentry;
-
-        error = -ENXIO;
-        if ( !presto_ispresto(dentry->d_inode) ) {
-                EXIT;
-                goto out_complete;
-        }
-        
-        fset = presto_fset(dentry);
-        error = -EINVAL;
-        if ( !fset ) {
-                printk("No fileset!\n");
-                EXIT;
-                goto out_complete;
-        }
-
-        write_lock(&fset->fset_kml.fd_lock);
-        fset->fset_kml.fd_recno = recno;
-        fset->fset_kml.fd_offset = offset;
-        read_lock(&fset->fset_kml.fd_lock);
-        
-        EXIT;
- out_complete:
-        path_release(&nd);
-        return error;
-}       
-
-
-
-/* given a path, write an LML record for it - thus must have root's 
-   group array settings, since lento is doing this 
-*/ 
-int lento_write_lml(char *path,
-                     __u64 remote_ino, 
-                     __u32 remote_generation,
-                     __u32 remote_version,
-                     struct presto_version *remote_file_version)
-{
-        struct nameidata nd; 
-        struct rec_info rec;
-        struct dentry *dentry;
-        struct file file;
-        int error;
-        struct presto_file_set *fset;
-        ENTRY;
-
-        error = presto_walk(path, &nd);
-        if (error) {
-                EXIT;
-                return error;
-        }
-        dentry = nd.dentry;
-
-        file.f_dentry = dentry;
-        file.private_data = NULL;
-
-        error = -ENXIO;
-        if ( !presto_ispresto(dentry->d_inode) ) {
-                EXIT;
-                goto out_lml;
-        }
-        
-        fset = presto_fset(dentry);
-        error = -EINVAL;
-        if ( !fset ) {
-                printk("No fileset!\n");
-                EXIT;
-                goto out_lml;
-        }
-
-        
-        /* setting offset to -1 appends */
-        rec.offset = -1;
-        /* this only requires a transaction below which is automatic */
-        error = presto_write_lml_close(&rec, 
-                                       fset,
-                                       &file, 
-                                       remote_ino,
-                                       remote_generation,
-                                       remote_version,
-                                       remote_file_version);
-        
-        EXIT;
- out_lml:
-        path_release(&nd);
-        return error;
-}       
-
+#if 0
 /* given a path: write a close record and cancel an LML record, finally
    call truncate LML.  Lento is doing this so it goes in with uid/gid's 
    root. 
@@ -396,14 +238,14 @@
 
         error=-EINVAL;
         if (fset==NULL) {
-                printk("No fileset!\n");
+                CERROR("No fileset!\n");
                 EXIT;
                 goto out_cancel_lml;
         }
         
         /* this only requires a transaction below which is automatic */
         handle = presto_trans_start(fset, dentry->d_inode, PRESTO_OP_RELEASE); 
-        if ( !handle ) {
+        if ( IS_ERR(handle) ) {
                 error = -ENOMEM; 
                 EXIT; 
                 goto out_cancel_lml; 
@@ -435,7 +277,7 @@
 
         if (info->flags & LENTO_FL_WRITE_EXPECT) {
                 error = presto_write_last_rcvd(&rec, fset, info); 
-                if ( error ) {
+                if ( error < 0 ) {
                         EXIT; 
                         presto_trans_commit(fset, handle);
                         goto out_cancel_lml;
@@ -454,148 +296,101 @@
         path_release(&nd); 
         return error;
 }       
+#endif 
 
-
-/* given a path, operate on the flags in its dentry.  Used by downcalls */
-int presto_mark_dentry(const char *name, int and_flag, int or_flag, 
+/* given a dentry, operate on the flags in its dentry.  Used by downcalls */
+int izo_mark_dentry(struct dentry *dentry, int and_flag, int or_flag, 
                        int *res)
 {
-        struct nameidata nd;
-        struct dentry *dentry;
-        int error;
-
-        CDEBUG(D_INODE, "name: %s, and flag %x, or flag %x\n",
-               name, and_flag, or_flag);
-
-        error = presto_walk(name, &nd);
-        if (error)
-                return error;
-        dentry = nd.dentry;
-        CDEBUG(D_INODE, "dentry at %p, d_fsdata %p\n", dentry, dentry->d_fsdata);
+        int error = 0;
 
-        error = -ENXIO;
-        if ( !presto_ispresto(dentry->d_inode) )
-                goto out;
-
-        error = 0;
-        if ( presto_dentry_is_fsetroot(dentry) ) {
-                struct presto_file_set *fset = presto_dentry2fset(dentry);
-                CDEBUG(D_INODE, "Setting fset fset_data: fset %p\n", fset);
-                if ( fset ) {
-                        fset->fset_data &= and_flag;
-                        fset->fset_data |= or_flag;
-                        if (res) {
-                                *res = fset->fset_data;
-                        }
-                }
-                CDEBUG(D_INODE, "fset %p, flags %x data %x\n", 
-                       fset, fset->fset_flags, fset->fset_data);
-        } else {
-                ((int)(long)dentry->d_fsdata) &= and_flag;
-                ((int)(long)dentry->d_fsdata) |= or_flag;
-                if (res) 
-                        *res = (int)(long)dentry->d_fsdata;
+        if (presto_d2d(dentry) == NULL) {
+                CERROR("InterMezzo: no ddata for inode %ld in %s\n",
+                       dentry->d_inode->i_ino, __FUNCTION__);
+                return -EINVAL;
         }
 
-        /* indicate if we were the only users while changing the flag */
-        if ( atomic_read(&dentry->d_count) > 1 )
-                error = -EBUSY;
+        CDEBUG(D_INODE, "inode: %ld, and flag %x, or flag %x, dd_flags %x\n",
+               dentry->d_inode->i_ino, and_flag, or_flag,
+               presto_d2d(dentry)->dd_flags);
+
+        presto_d2d(dentry)->dd_flags &= and_flag;
+        presto_d2d(dentry)->dd_flags |= or_flag;
+        if (res) 
+                *res = presto_d2d(dentry)->dd_flags;
 
-out:
-        path_release(&nd);
         return error;
 }
 
 /* given a path, operate on the flags in its cache.  Used by mark_ioctl */
-int presto_mark_cache(const char *name, int and_flag, int or_flag, 
-                      int *res)
+int izo_mark_cache(struct dentry *dentry, int and_flag, int or_flag, 
+                   int *res)
 {
-        struct nameidata nd;
-        struct dentry *dentry;
         struct presto_cache *cache;
-        int error;
-
-        CDEBUG(D_INODE,
-               "presto_mark_cache :: name: %s, and flag %x, or flag %x\n",
-               name, and_flag, or_flag);
 
-        error = presto_walk(name, &nd);
-        if (error)
-                return error;
+        if (presto_d2d(dentry) == NULL) {
+                CERROR("InterMezzo: no ddata for inode %ld in %s\n",
+                       dentry->d_inode->i_ino, __FUNCTION__);
+                return -EINVAL;
+        }
 
-        dentry = nd.dentry;
-        error = -ENXIO;
-        if ( !presto_ispresto(dentry->d_inode) )
-                goto out;
+        CDEBUG(D_INODE, "inode: %ld, and flag %x, or flag %x, dd_flags %x\n",
+               dentry->d_inode->i_ino, and_flag, or_flag,
+               presto_d2d(dentry)->dd_flags);
 
-        error = -EBADF;
         cache = presto_get_cache(dentry->d_inode);
         if ( !cache ) {
-                printk("PRESTO: BAD: cannot find cache in presto_mark_cache\n");
-                make_bad_inode(dentry->d_inode);
-                goto out;
+                CERROR("PRESTO: BAD: cannot find cache in izo_mark_cache\n");
+                return -EBADF;
         }
-        error = 0;
+
         ((int)cache->cache_flags) &= and_flag;
         ((int)cache->cache_flags) |= or_flag;
-        if (res) {
+        if (res)
                 *res = (int)cache->cache_flags;
-        }
 
-out:
-        path_release(&nd);
-        return error;
+        return 0;
 }
 
-int presto_mark_fset_dentry(struct dentry *dentry, int and_flag, int or_flag, 
-                     int * res)
+int presto_set_max_kml_size(const char *path, unsigned long max_size)
 {
-        int error;
         struct presto_file_set *fset;
 
-        error = -ENXIO;
-        if ( !presto_ispresto(dentry->d_inode) )
-                return error;
+        ENTRY;
+
+        fset = presto_path2fileset(path);
+        if (IS_ERR(fset)) {
+                EXIT;
+                return PTR_ERR(fset);
+        }
+
+        fset->kml_truncate_size = max_size;
+        CDEBUG(D_CACHE, "KML truncate size set to %lu bytes for fset %s.\n",
+               max_size, path);
+
+        EXIT;
+        return 0;
+}
 
-        error = -EBADF;
+int izo_mark_fset(struct dentry *dentry, int and_flag, int or_flag, 
+                  int * res)
+{
+        struct presto_file_set *fset;
+        
         fset = presto_fset(dentry);
         if ( !fset ) {
-                printk("PRESTO: BAD: cannot find cache in presto_mark_cache\n");
+                CERROR("PRESTO: BAD: cannot find cache in izo_mark_cache\n");
                 make_bad_inode(dentry->d_inode);
-                return error;
+                return -EBADF;
         }
-        error = 0;
         ((int)fset->fset_flags) &= and_flag;
         ((int)fset->fset_flags) |= or_flag;
-        if (res) { 
+        if (res)
                 *res = (int)fset->fset_flags;
-        }
-
-        return error;
-}
-
-/* given a path, operate on the flags in its cache.  Used by mark_ioctl */
-inline int presto_mark_fset(const char *name, int and_flag, int or_flag, 
-                     int * res)
-{
-        struct nameidata nd;
-        struct dentry *dentry;
-        int error;
-        ENTRY;
 
-        error = presto_walk(name, &nd);
-        if (error)
-                return error;
-
-
-        dentry = nd.dentry;
-        error = presto_mark_fset_dentry(dentry, and_flag, or_flag, res);
-
-        path_release(&nd);
-        return error;
+        return 0;
 }
 
-
 /* talk to Lento about the permit */
 static int presto_permit_upcall(struct dentry *dentry)
 {
@@ -606,8 +401,12 @@
         int fsetnamelen;
         struct presto_file_set *fset = NULL;
 
-        if ( (minor = presto_i2m(dentry->d_inode)) < 0)
+        ENTRY;
+
+        if ( (minor = presto_i2m(dentry->d_inode)) < 0) {
+                EXIT;
                 return -EINVAL;
+        }
 
         fset = presto_fset(dentry);
         if (!fset) {
@@ -617,22 +416,26 @@
         
         if ( !presto_lento_up(minor) ) {
                 if ( fset->fset_flags & FSET_STEAL_PERMIT ) {
+                        EXIT;
                         return 0;
                 } else {
+                        EXIT;
                         return -ENOTCONN;
                 }
         }
 
-        PRESTO_ALLOC(buffer, char *, PAGE_SIZE);
+        PRESTO_ALLOC(buffer, PAGE_SIZE);
         if ( !buffer ) {
-                printk("PRESTO: out of memory!\n");
+                CERROR("PRESTO: out of memory!\n");
+                EXIT;
                 return -ENOMEM;
         }
-        path = presto_path(dentry, fset->fset_mtpt, buffer, PAGE_SIZE);
+        path = presto_path(dentry, fset->fset_dentry, buffer, PAGE_SIZE);
         pathlen = MYPATHLEN(buffer, path);
         fsetnamelen = strlen(fset->fset_name); 
-        rc = lento_permit(minor, pathlen, fsetnamelen, path, fset->fset_name);
+        rc = izo_upc_permit(minor, dentry, pathlen, path, fset->fset_name);
         PRESTO_FREE(buffer, PAGE_SIZE);
+        EXIT;
         return rc;
 }
 
@@ -641,13 +444,16 @@
  *  - if 0 is returned the permit was already in the kernel -- or --
  *    Lento gave us the permit without reintegration
  *  - lento returns the number of records it reintegrated 
+ *
+ * Note that if this fileset has branches, a permit will -never- to a normal
+ * process for writing in the data area (ie, outside of .intermezzo)
  */
 int presto_get_permit(struct inode * inode)
 {
         struct dentry *de;
         struct presto_file_set *fset;
         int minor = presto_i2m(inode);
-        int rc;
+        int rc = 0;
 
         ENTRY;
         if (minor < 0) {
@@ -657,47 +463,101 @@
 
         if ( ISLENTO(minor) ) {
                 EXIT;
-                return -EINVAL;
+                return 0;
         }
 
         if (list_empty(&inode->i_dentry)) {
-                printk("No alias for inode %d\n", (int) inode->i_ino);
+                CERROR("No alias for inode %d\n", (int) inode->i_ino);
                 EXIT;
                 return -EINVAL;
         }
 
         de = list_entry(inode->i_dentry.next, struct dentry, d_alias);
 
+        if (presto_chk(de, PRESTO_DONT_JOURNAL)) {
+                EXIT;
+                return 0;
+        }
+
         fset = presto_fset(de);
         if ( !fset ) {
-                printk("Presto: no fileset in presto_get_permit!\n");
+                CERROR("Presto: no fileset in presto_get_permit!\n");
                 EXIT;
                 return -EINVAL;
         }
 
+        if (fset->fset_flags & FSET_HAS_BRANCHES) {
+                EXIT;
+                return -EROFS;
+        }
+
+        spin_lock(&fset->fset_permit_lock);
         if (fset->fset_flags & FSET_HASPERMIT) {
-                lock_kernel();
                 fset->fset_permit_count++;
                 CDEBUG(D_INODE, "permit count now %d, inode %lx\n", 
                        fset->fset_permit_count, inode->i_ino);
-                unlock_kernel();
+                spin_unlock(&fset->fset_permit_lock);
                 EXIT;
                 return 0;
+        }
+
+        /* Allow reintegration to proceed without locks -SHP */
+        fset->fset_permit_upcall_count++;
+        if (fset->fset_permit_upcall_count == 1) {
+                spin_unlock(&fset->fset_permit_lock);
+                rc = presto_permit_upcall(fset->fset_dentry);
+                spin_lock(&fset->fset_permit_lock);
+                fset->fset_permit_upcall_count--;
+                if (rc == 0) {
+                        izo_mark_fset(fset->fset_dentry, ~0, FSET_HASPERMIT,
+                                      NULL);
+                        fset->fset_permit_count++;
+                } else if (rc == ENOTCONN) {
+                        CERROR("InterMezzo: disconnected operation. stealing permit.\n");
+                        izo_mark_fset(fset->fset_dentry, ~0, FSET_HASPERMIT,
+                                      NULL);
+                        fset->fset_permit_count++;
+                        /* set a disconnected flag here to stop upcalls */
+                        rc = 0;
+                } else {
+                        CERROR("InterMezzo: presto_permit_upcall failed: %d\n", rc);
+                        rc = -EROFS;
+                        /* go to sleep here and try again? */
+                }
+                wake_up_interruptible(&fset->fset_permit_queue);
         } else {
-		/* Allow reintegration to proceed without locks -SHP */
-                rc = presto_permit_upcall(fset->fset_mtpt);
-                lock_kernel();
-                if ( !rc ) { 
-                	presto_mark_fset_dentry
-				(fset->fset_mtpt, ~0, FSET_HASPERMIT, NULL);
-                	fset->fset_permit_count++;
+                /* Someone is already doing an upcall; go to sleep. */
+                DECLARE_WAITQUEUE(wait, current);
+
+                spin_unlock(&fset->fset_permit_lock);
+                add_wait_queue(&fset->fset_permit_queue, &wait);
+                while (1) {
+                        set_current_state(TASK_INTERRUPTIBLE);
+
+                        spin_lock(&fset->fset_permit_lock);
+                        if (fset->fset_permit_upcall_count == 0)
+                                break;
+                        spin_unlock(&fset->fset_permit_lock);
+
+                        if (signal_pending(current)) {
+                                remove_wait_queue(&fset->fset_permit_queue,
+                                                  &wait);
+                                return -ERESTARTSYS;
+                        }
+                        schedule();
                 }
-                CDEBUG(D_INODE, "permit count now %d, ino %lx (likely 1), rc %d\n", 
-        		fset->fset_permit_count, inode->i_ino, rc);
-                unlock_kernel();
-                EXIT;
-                return rc;
+                remove_wait_queue(&fset->fset_permit_queue, &wait);
+                /* We've been woken up: do we have the permit? */
+                if (fset->fset_flags & FSET_HASPERMIT)
+                        /* FIXME: Is this the right thing? */
+                        rc = -EAGAIN;
         }
+
+        CDEBUG(D_INODE, "permit count now %d, ino %ld (likely 1), "
+               "rc %d\n", fset->fset_permit_count, inode->i_ino, rc);
+        spin_unlock(&fset->fset_permit_lock);
+        EXIT;
+        return rc;
 }
 
 int presto_put_permit(struct inode * inode)
@@ -714,11 +574,11 @@
 
         if ( ISLENTO(minor) ) {
                 EXIT;
-                return -1;
+                return 0;
         }
 
         if (list_empty(&inode->i_dentry)) {
-                printk("No alias for inode %d\n", (int) inode->i_ino);
+                CERROR("No alias for inode %d\n", (int) inode->i_ino);
                 EXIT;
                 return -1;
         }
@@ -727,436 +587,143 @@
 
         fset = presto_fset(de);
         if ( !fset ) {
-                printk("Presto: no fileset in presto_get_permit!\n");
+                CERROR("InterMezzo: no fileset in %s!\n", __FUNCTION__);
                 EXIT;
                 return -1;
         }
 
-        lock_kernel();
+        if (presto_chk(de, PRESTO_DONT_JOURNAL)) {
+                EXIT;
+                return 0;
+        }
+
+        spin_lock(&fset->fset_permit_lock);
         if (fset->fset_flags & FSET_HASPERMIT) {
-                if (fset->fset_permit_count > 0) fset->fset_permit_count--;
-                else printk("Put permit while permit count is 0, inode %lx!\n",
-                                inode->i_ino); 
+                if (fset->fset_permit_count > 0)
+                        fset->fset_permit_count--;
+                else
+                        CERROR("Put permit while permit count is 0, "
+                               "inode %ld!\n", inode->i_ino); 
         } else {
-        	fset->fset_permit_count=0;
-        	printk("Put permit while no permit, inode %lx, flags %x!\n", 
-               		inode->i_ino, fset->fset_flags);
+                fset->fset_permit_count = 0;
+                CERROR("InterMezzo: put permit while no permit, inode %ld, "
+                       "flags %x!\n", inode->i_ino, fset->fset_flags);
         }
 
-        CDEBUG(D_INODE, "permit count now %d, inode %lx\n", 
-        		fset->fset_permit_count, inode->i_ino);
+        CDEBUG(D_INODE, "permit count now %d, inode %ld\n",
+               fset->fset_permit_count, inode->i_ino);
 
         if (fset->fset_flags & FSET_PERMIT_WAITING &&
-                    fset->fset_permit_count == 0) {
-                CDEBUG(D_INODE, "permit count now 0, ino %lx, notify Lento\n", 
+            fset->fset_permit_count == 0) {
+                CDEBUG(D_INODE, "permit count now 0, ino %ld, wake sleepers\n",
                        inode->i_ino);
-                presto_mark_fset_dentry(fset->fset_mtpt, ~FSET_PERMIT_WAITING, 0, NULL);
-                presto_mark_fset_dentry(fset->fset_mtpt, ~FSET_HASPERMIT, 0, NULL);
-                lento_release_permit(fset->fset_cache->cache_psdev->uc_minor,
-                                     fset->fset_permit_cookie);
-                fset->fset_permit_cookie = 0; 
+                wake_up_interruptible(&fset->fset_permit_queue);
         }
-        unlock_kernel();
+        spin_unlock(&fset->fset_permit_lock);
 
         EXIT;
         return 0;
 }
 
-
 void presto_getversion(struct presto_version * presto_version,
                        struct inode * inode)
 {
-        presto_version->pv_mtime = cpu_to_le64((__u64)inode->i_mtime);
-        presto_version->pv_ctime = cpu_to_le64((__u64)inode->i_ctime);
-        presto_version->pv_size = cpu_to_le64((__u64)inode->i_size);
-}
-
-/*
- *  note: this routine "pins" a dentry for a fileset root
- */
-int presto_set_fsetroot(char *path, char *fsetname, unsigned int fsetid,
-                        unsigned int flags)
-{
-        struct presto_file_set *fset;
-        struct presto_file_set *fset2;
-        struct dentry *dentry;
-        struct presto_cache *cache;
-        int error;
-
-        ENTRY;
-
-        PRESTO_ALLOC(fset, struct presto_file_set *, sizeof(*fset));
-        error = -ENOMEM;
-        if ( !fset ) {
-                printk(KERN_ERR "No memory allocating fset for %s\n", fsetname);
-                EXIT;
-                return -ENOMEM;
-        }
-        CDEBUG(D_INODE, "fset at %p\n", fset);
-
-        printk("presto: fsetroot: path %s, fileset name %s\n", path, fsetname);
-        error = presto_walk(path, &fset->fset_nd);
-        CDEBUG(D_INODE, "\n");
-        if (error) {
-                EXIT;
-                goto out_free;
-        }
-        dentry = fset->fset_nd.dentry;
-        CDEBUG(D_INODE, "\n");
-
-        error = -ENXIO;
-        if ( !presto_ispresto(dentry->d_inode) ) {
-                EXIT;
-                goto out_dput;
-        }
-
-        CDEBUG(D_INODE, "\n");
-        cache = presto_get_cache(dentry->d_inode);
-        if (!cache) {
-                printk(KERN_ERR "No cache found for %s\n", path);
-                EXIT;
-                goto out_dput;
-        }
-
-        CDEBUG(D_INODE, "\n");
-        error = -EINVAL;
-        if ( !cache->cache_mtpt) {
-                printk(KERN_ERR "Presto - no mountpoint: fsetroot fails!\n");
-                EXIT;
-                goto out_dput;
-        }
-        CDEBUG(D_INODE, "\n");
-
-        if (!cache->cache_root_fileset)  {
-                printk(KERN_ERR "Presto - no file set: fsetroot fails!\n");
-                EXIT;
-                goto out_dput;
-        }
-
-        error = -EEXIST;
-        CDEBUG(D_INODE, "\n");
-        fset2 = presto_fset(dentry);
-        if (fset2 && (fset2->fset_mtpt == dentry) ) { 
-                printk(KERN_ERR "Fsetroot already set (path %s)\n", path);
-                EXIT;
-                goto out_dput;
-        }
-
-        fset->fset_cache = cache;
-        fset->fset_mtpt = dentry;
-        fset->fset_name = fsetname;
-        fset->fset_chunkbits = CHUNK_BITS;
-        fset->fset_flags = flags;
-	fset->fset_file_maxio = FSET_DEFAULT_MAX_FILEIO; 
-
-        dentry->d_fsdata = (void *) ( ((long)fset) + PRESTO_FSETROOT );
-        list_add(&fset->fset_list, &cache->cache_fset_list);
-
-        error = presto_init_kml_file(fset);
-        if ( error ) {
-                EXIT;
-                CDEBUG(D_JOURNAL, "Error init_kml %d\n", error);
-                goto out_list_del;
-        }
-
-        error = presto_init_last_rcvd_file(fset);
-        if ( error ) {
-                int rc;
-                EXIT;
-                rc = presto_close_journal_file(fset);
-                CDEBUG(D_JOURNAL, "Error init_lastrcvd %d, cleanup %d\n", error, rc);
-                goto out_list_del;
-        }
-
-        error = presto_init_lml_file(fset);
-        if ( error ) {
-                int rc;
-                EXIT;
-                rc = presto_close_journal_file(fset);
-                CDEBUG(D_JOURNAL, "Error init_lml %d, cleanup %d\n", error, rc);
-                goto out_list_del;
-        }
-
-#ifdef  CONFIG_KREINT
-        /* initialize kml reint buffer */
-        error = kml_init (fset); 
-        if ( error ) {
-                int rc;
-                EXIT;
-                rc = presto_close_journal_file(fset);
-                CDEBUG(D_JOURNAL, "Error init kml reint %d, cleanup %d\n", 
-                                error, rc);
-                goto out_list_del;
-        }
-#endif
-        if ( dentry->d_inode == dentry->d_inode->i_sb->s_root->d_inode) {
-                cache->cache_flags |= CACHE_FSETROOT_SET;
-        }
-
-        CDEBUG(D_PIOCTL, "-------> fset at %p, dentry at %p, mtpt %p, fset %s, cache %p, d_fsdata %p\n",
-               fset, dentry, fset->fset_mtpt, fset->fset_name, cache, dentry->d_fsdata);
-
-        EXIT;
-        return 0;
-
- out_list_del:
-        list_del(&fset->fset_list);
-        dentry->d_fsdata = 0;
- out_dput:
-        path_release(&fset->fset_nd); 
- out_free:
-        PRESTO_FREE(fset, sizeof(*fset));
-        return error;
+        presto_version->pv_mtime = (__u64)inode->i_mtime;
+        presto_version->pv_ctime = (__u64)inode->i_ctime;
+        presto_version->pv_size  = (__u64)inode->i_size;
 }
 
-int presto_get_kmlsize(char *path, size_t *size)
-{
-        struct nameidata nd;
-        struct presto_file_set *fset;
-        struct dentry *dentry;
-        int error;
-
-        ENTRY;
-        error = presto_walk(path, &nd);
-        if (error) {
-                EXIT;
-                return error;
-        }
-        dentry = nd.dentry;
-
-        error = -ENXIO;
-        if ( !presto_ispresto(dentry->d_inode) ) {
-                EXIT;
-                goto kml_out;
-        }
-
-        error = -EINVAL;
-        if ( ! presto_dentry_is_fsetroot(dentry)) {
-                EXIT;
-                goto kml_out;
-        }
-
-        fset = presto_dentry2fset(dentry);
-        if (!fset) {
-                EXIT;
-                goto kml_out;
-        }
-        error = 0;
-        *size = fset->fset_kml.fd_offset;
 
- kml_out:
-        path_release(&nd);
-        return error;
-}
-
-int presto_clear_fsetroot(char *path)
+/* If uuid is non-null, it is the uuid of the peer that's making the revocation
+ * request.  If it is null, this request was made locally, without external
+ * pressure to give up the permit.  This most often occurs when a client
+ * starts up.
+ *
+ * FIXME: this function needs to be refactored slightly once we start handling
+ * multiple clients.
+ */
+int izo_revoke_permit(struct dentry *dentry, __u8 uuid[16])
 {
-        struct nameidata nd;
-        struct presto_file_set *fset;
-        struct dentry *dentry;
-        struct presto_cache *cache;
-        int error;
+        struct presto_file_set *fset; 
+        DECLARE_WAITQUEUE(wait, current);
+        int minor, rc;
 
         ENTRY;
-        error = presto_walk(path, &nd);
-        if (error) {
-                EXIT;
-                return error;
-        }
-        dentry = nd.dentry;
-
-        error = -ENXIO;
-        if ( !presto_ispresto(dentry->d_inode) ) {
-                EXIT;
-                goto put_out;
-        }
-
-        error = -EINVAL;
-        if ( ! presto_dentry_is_fsetroot(dentry)) {
-                EXIT;
-                goto put_out;
-        }
 
-        fset = presto_dentry2fset(dentry);
-        if (!fset) {
+        minor = presto_i2m(dentry->d_inode);
+        if (minor < 0) {
                 EXIT;
-                goto put_out;
+                return -ENODEV;
         }
 
-#ifdef  CONFIG_KREINT
-        error = kml_cleanup (fset);
-        if ( error ) {
-                printk("InterMezzo: Closing kml for fset %s: %d\n",
-                       fset->fset_name, error);
-        }
-#endif
-
-        error = presto_close_journal_file(fset);
-        if ( error ) {
-                printk("InterMezzo: Closing journal for fset %s: %d\n",
-                       fset->fset_name, error);
-        }
-        cache = fset->fset_cache;
-        cache->cache_flags &= ~CACHE_FSETROOT_SET;
-
-        list_del(&fset->fset_list);
-        dentry->d_fsdata = 0;
-        path_release(&fset->fset_nd);
-        fset->fset_mtpt = NULL;
-        PRESTO_FREE(fset->fset_name, strlen(fset->fset_name) + 1);
-        PRESTO_FREE(fset, sizeof(*fset));
-        EXIT;
-
-put_out:
-        path_release(&nd); /* for our lookup */
-        return error;
-}
-
-int presto_clear_all_fsetroots(char *path)
-{
-        struct nameidata nd;
-        struct presto_file_set *fset;
-        struct dentry *dentry;
-        struct presto_cache *cache;
-        int error;
-        struct list_head *tmp,*tmpnext;
-
-
-        ENTRY;
-        error = presto_walk(path, &nd);
-        if (error) {
+        fset = presto_fset(dentry);
+        if (fset == NULL) {
                 EXIT;
-                return error;
+                return -ENODEV;
         }
-        dentry = nd.dentry;
 
-        error = -ENXIO;
-        if ( !presto_ispresto(dentry->d_inode) ) {
+        spin_lock(&fset->fset_permit_lock);
+        if (fset->fset_flags & FSET_PERMIT_WAITING) {
+                CERROR("InterMezzo: Two processes are waiting on the same permit--this not yet supported!  Aborting this particular permit request...\n");
                 EXIT;
-                goto put_out;
+                spin_unlock(&fset->fset_permit_lock);
+                return -EINVAL;
         }
 
-        error = -EINVAL;
-        if ( ! presto_dentry_is_fsetroot(dentry)) {
-                EXIT;
-                goto put_out;
-        }
+        if (fset->fset_permit_count == 0)
+                goto got_permit;
 
-        fset = presto_dentry2fset(dentry);
-        if (!fset) {
+        /* Something is still using this permit.  Mark that we're waiting for it
+         * and go to sleep. */
+        rc = izo_mark_fset(dentry, ~0, FSET_PERMIT_WAITING, NULL);
+        spin_unlock(&fset->fset_permit_lock);
+        if (rc < 0) {
                 EXIT;
-                goto put_out;
+                return rc;
         }
 
-        cache = fset->fset_cache;
-        cache = fset->fset_cache;
-        cache->cache_flags &= ~CACHE_FSETROOT_SET;
-
-        tmp = &cache->cache_fset_list;
-        tmpnext = tmp->next;
-        while ( tmpnext != &cache->cache_fset_list) {
-		tmp=tmpnext;
-                tmpnext=tmp->next;
-                fset = list_entry(tmp, struct presto_file_set, fset_list);
-
-                
-                error = presto_close_journal_file(fset);
-                if ( error ) {
-                        printk("InterMezzo: Closing journal for fset %s: %d\n",
-                               fset->fset_name, error);
+        add_wait_queue(&fset->fset_permit_queue, &wait);
+        while (1) {
+                set_current_state(TASK_INTERRUPTIBLE);
+
+                spin_lock(&fset->fset_permit_lock);
+                if (fset->fset_permit_count == 0)
+                        break;
+                spin_unlock(&fset->fset_permit_lock);
+
+                if (signal_pending(current)) {
+                        /* FIXME: there must be a better thing to return... */
+                        remove_wait_queue(&fset->fset_permit_queue, &wait);
+                        EXIT;
+                        return -ERESTARTSYS;
                 }
-                list_del(&fset->fset_list);
-                fset->fset_mtpt->d_fsdata = 0;
-                path_release(&fset->fset_nd);
-                fset->fset_mtpt = NULL;
-                PRESTO_FREE(fset->fset_name, strlen(fset->fset_name) +1);
-                PRESTO_FREE(fset, sizeof(*fset));
-        }
-
-        EXIT;
- put_out:
-        path_release(&nd); /* for our lookup */
-        return error;
-}
-
-
-int presto_get_lastrecno(char *path, off_t *recno)
-{
-        struct nameidata nd; 
-        struct presto_file_set *fset;
-        struct dentry *dentry;
-        int error;
-        ENTRY;
-
-        error = presto_walk(path, &nd);
-        if (error) {
-                EXIT;
-                return error;
-        }
-
-        dentry = nd.dentry;
-
-        error = -ENXIO;
-        if ( !presto_ispresto(dentry->d_inode) ) {
-                EXIT;
-                goto kml_out;
-        }
 
-        error = -EINVAL;
-        if ( ! presto_dentry_is_fsetroot(dentry)) {
-                EXIT;
-                goto kml_out;
-        }
+                /* FIXME: maybe there should be a timeout here. */
 
-        fset = presto_dentry2fset(dentry);
-        if (!fset) {
-                EXIT;
-                goto kml_out;
+                schedule();
         }
-        error = 0;
-        *recno = fset->fset_kml.fd_recno;
-
- kml_out:
-        path_release(&nd);
-        return error;
-}
-
-/* 
-   if *cookie != 0, lento must wait for this cookie
-   before releasing the permit, operations are in progress. 
-*/ 
-int presto_permit_downcall( const char * path, int *cookie )
-{
-        int result;
-        struct presto_file_set *fset; 
 
-        fset = presto_path2fileset(path);
-        if (IS_ERR(fset)) { 
-                EXIT;
-                return PTR_ERR(fset);
-        }
+        remove_wait_queue(&fset->fset_permit_queue, &wait);
+ got_permit:
+        /* By this point fset->fset_permit_count is zero and we're holding the
+         * lock. */
+        CDEBUG(D_CACHE, "InterMezzo: releasing permit inode %ld\n",
+               dentry->d_inode->i_ino);
 
-	lock_kernel();
-        if (fset->fset_permit_count != 0) {
-                /* is there are previous cookie? */
-                if (fset->fset_permit_cookie == 0) {
-                        CDEBUG(D_CACHE, "presto installing cookie 0x%x, %s\n",
-                               *cookie, path);
-                        fset->fset_permit_cookie = *cookie;
-                } else {
-                        *cookie = fset->fset_permit_cookie;
-                        CDEBUG(D_CACHE, "presto has cookie 0x%x, %s\n",
-                               *cookie, path);
+        if (uuid != NULL) {
+                rc = izo_upc_revoke_permit(minor, fset->fset_name, uuid);
+                if (rc < 0) {
+                        spin_unlock(&fset->fset_permit_lock);
+                        EXIT;
+                        return rc;
                 }
-                result = presto_mark_fset(path, 0, FSET_PERMIT_WAITING, NULL);
-        } else {
-                *cookie = 0;
-                CDEBUG(D_CACHE, "presto releasing permit %s\n", path);
-                result = presto_mark_fset(path, ~FSET_HASPERMIT, 0, NULL);
         }
-	unlock_kernel();
 
-        return result;
+        izo_mark_fset(fset->fset_dentry, ~FSET_PERMIT_WAITING, 0, NULL);
+        izo_mark_fset(fset->fset_dentry, ~FSET_HASPERMIT, 0, NULL);
+        spin_unlock(&fset->fset_permit_lock);
+        EXIT;
+        return 0;
 }
 
 inline int presto_is_read_only(struct presto_file_set * fset)
@@ -1171,4 +738,3 @@
         mask= (ISLENTO(minor)? CACHE_LENTO_RO : CACHE_CLIENT_RO);
         return  ((cache->cache_flags & mask)? 1 : 0);
 }
-

FUNET's LINUX-ADM group, linux-adm@nic.funet.fi
TCL-scripts by Sam Shen (who was at: slshen@lbl.gov)