[prev in list] [next in list] [prev in thread] [next in thread] 

List:       intermezzo-cvs
Subject:    CVS: intermezzo/lento/Lento/InterMezzo ReqHandler.pm,1.143,1.144 UpcallHandler.pm,1.65,1.66
From:       Phil Schwan <pschwan () users ! sourceforge ! net>
Date:       2001-09-20 18:21:51
[Download RAW message or body]

Update of /cvsroot/intermezzo/intermezzo/lento/Lento/InterMezzo
In directory usw-pr-cvs1:/tmp/cvs-serv23877/Lento/InterMezzo

Modified Files:
	ReqHandler.pm UpcallHandler.pm 
Log Message:
(cvs bailed without saving the commit message, so my incredibly detailed
one is gone.  sigh.  subversion is self-hosting, you know)

Summary:
Enable data-on-demand by adding something like ``data_on_demand="1"''
to a fileset configuration.  During reintegration instead of initiating a
backfetch, we just create a sparse file.  When a sparse file is opened in a
fileset doing DOD, presto does a file_open upcall, which takes care of the
fetch at that time.

(I apologize for some of the more spurious whitespace changes, mostly of
the tabs-to-spaces variety.  I blame emacs, but I applaud its results. :)

lento:
- changed version number to 1.0.5.3

Lento/Fileset.pm:
- added data_on_demand flag
- if enabled, tell Presto about it at startup
 
Lento/Fsetdb.pm:
- abort if DOD is enabled, but fetchtype isn't rsync

Lento/Psdev.pm:
- added FSET_DATA_ON_DEMAND flag, a-la linux/intermezzo_fs.h
- added functions to set that flat
 
Lento/Bulk/Desc.pm:
- made a debug message slightly more informative

Lento/Filter/Upcall.pm:
- handle the new arguments to the file open upcall

Lento/InterMezzo/ReqHandler.pm:
- _start: store the request type and session, if any, on the heap for use in
completion
- finish: send a completion event to the sesion that initiated the request, if
desired
- BackFetch: renamed inodeinfo to pathinfo; pathinfo can now be either a scalar
with a relative pathname (in the upcall case) or a hash with the inode data
(in the backfetch case)

Lento/InterMezzo/UpcallHandler.pm:
- when we get a file open upcall, create a session to handle the fetchfile and
complete the upcall.  There's a fair amount of code duplication here, and it
makes me sad, but I have faith that this whole tangled reintegration mess will
get cleaned up very soon as we move to DAFS packets.

Lento/KML/ReintLento.pm:
- reint_close: if this is a DOD fileset instead of downloading the new file,
truncate at least one block to create a sparse file that will be handled at
open() time


Index: ReqHandler.pm
===================================================================
RCS file: /cvsroot/intermezzo/intermezzo/lento/Lento/InterMezzo/ReqHandler.pm,v
retrieving revision 1.143
retrieving revision 1.144
diff -U2 -r1.143 -r1.144
--- ReqHandler.pm	2001/08/07 02:26:05	1.143
+++ ReqHandler.pm	2001/09/20 18:21:49	1.144
@@ -33,4 +33,6 @@
     Lento::Debuggable::set_option($_[SESSION], trace => 1, default => 1);
     $_[HEAP]->{req_packet} = $_[ARG0];
+    $_[HEAP]->{req_type} = $_[ARG1];
+    $_[HEAP]->{req_session} = $_[ARG2];
     $_[HEAP]->{connection} = $_[HEAP]->{req_packet}->[PKT_CONNECTION];
     $_[HEAP]->{state} = 'idle';
@@ -43,5 +45,4 @@
 
 sub finish {
-
     # This should be enough to break all connection related circular references
     $_[HEAP]->{connection}->put() if $_[HEAP]->{connection};
@@ -49,4 +50,10 @@
     # See comment before alias_set in _start
     $_[KERNEL]->alias_remove($_[SESSION]);
+
+    if (defined($_[HEAP]->{req_session})) {
+        $_[KERNEL]->post($_[HEAP]->{req_session},
+                         $_[HEAP]->{req_type} . "_finish");
+        delete $_[HEAP]->{req_session};
+    }
 }
 
@@ -641,17 +648,17 @@
 }
 
-# 
 sub BackFetch {
     my $packet = shift;
+    my $caller_session = shift;
     &MarkLentoBusy;
     new POE::Session
-        ([$packet, "BackFetch"], # two arguments to _start
-         'Lento::EventSupport' , ['alias_set', 'alias_remove', '_stop', 
-                                  '_child', '_signal'], 
+        ([$packet, "BackFetch", $caller_session], # three arguments to _start
+         'Lento::EventSupport', ['alias_set', 'alias_remove', '_stop',
+                                 '_child', '_signal'],
          'Lento::InterMezzo::ReqHandler', ['_start', 'finish', 'got_error'],
          REQ => sub {
              my $packet = $_[HEAP]->{req_packet};
              my $fsetname = @{$packet->[PKT_PARMS]}[0];
-             my $inodeinfo = @{$packet->[PKT_PARMS]}[1];
+             my $pathinfo = @{$packet->[PKT_PARMS]}[1];
              my $fetchtype = @{$packet->[PKT_PARMS]}[2] || "Desc";
              my $remote_mtime = @{$packet->[PKT_PARMS]}[3];
@@ -666,5 +673,5 @@
              # compatibility
              $fetchtype="Desc" unless defined $fetchtype;
-             
+
              $_[HEAP]->{session_level} = LVL_RECOVER;
              $_[HEAP]->{connection} = $_[HEAP]->{req_packet}->[PKT_CONNECTION];
@@ -675,11 +682,26 @@
                  $_[KERNEL]->yield('finish');
              }
+
              # set up source desc
-             my $path = $_[HEAP]->{fset}->mtpt();
-             DEBUG "*** fsetname $fsetname, inode $inodeinfo->{ino} mtpt $path\n";
-             
-             $path .= sprintf("/...ino:%lu:%u", $inodeinfo->{ino}, $inodeinfo->{generation});
-             DEBUG "**** path $path, fetchtype $fetchtype\n";
-             DEBUG "****direction $Lento::Bulk::SERVER_TO_CLIENT,$Lento::Bulk::SOURCE\n";
+             #
+             # XXX This is a hack for data-on-demand until we convert to DAFS
+             # RPCS and clean up this reintegration mess.
+             #
+             # If $pathinfo is a hash, it should have {ino} and {generation} for
+             #   the file being backfetched.
+             # If it's a scalar, it should be the relative path to the file being
+             #   fetched.
+             my $path;
+             if (ref(\$pathinfo) eq 'SCALAR') {
+                 # This is a FetchFile
+                 $path = $_[HEAP]->{fset}->mtpt() . "/$pathinfo";
+                 DEBUG "*** fsetname $fsetname, mtpt $path, file $pathinfo\n";
+             } else {
+                 # This is a BackFetch
+                 $path = sprintf("%s/...ino:%lu:%u",
+                                 $_[HEAP]->{fset}->mtpt(),
+                                 $pathinfo->{ino}, $pathinfo->{generation});
+                 DEBUG "*** fsetname $fsetname, inode $pathinfo->{ino} mtpt $path\n";
+             }
 
              # instantiate a descriptor of the correct type $fetchtype
@@ -698,15 +720,15 @@
                     remote_mtime     => $remote_mtime,
                     remote_ctime     => $remote_ctime,
-                    remote_size      => $remote_size } 
+                    remote_size      => $remote_size }
                  );
 
              if (!$desc) {
                 $packet->[PKT_CONNECTION]->set_error
-                    ("descriptor instantiation",ENOENT, "No descriptor");
+                    ("descriptor instantiation", ENOENT, "No descriptor");
                 return;
              }
-             
+
              DEBUG $desc->printme();
-             
+
              my ($rc, $bulk_session) = $desc->setup();
              if ($rc) {
@@ -716,9 +738,9 @@
                      return;
                  } elsif ($rc == ENOTCONN) {
-                     $_[KERNEL]->yield('got_error',"backfetch",ENOTCONN,
+                     $_[KERNEL]->yield('got_error', 'backfetch', ENOTCONN,
                                        "No connection");
                      return;
                  } else {
-                     $!=$rc;
+                     $! = $rc;
                      confess "Error setting up bulk transfer ($!)\n";
                  }
@@ -1004,6 +1026,6 @@
              $_[HEAP]->{recno_incr} = $recno_incr;
              
-             DEBUG "did reint: last_local: $_[HEAP]->{last_local_offset}";
-             DEBUG " offs_incr: $_[HEAP]->{offs_incr}";
+             DEBUG "did reint: last_local: $_[HEAP]->{last_local_offset}\n";
+             DEBUG " offs_incr: $_[HEAP]->{offs_incr}\n";
              DEBUG " kmllen: $_[HEAP]->{segment_length}\n";
              DEBUG "last recno from presto is ".

Index: UpcallHandler.pm
===================================================================
RCS file: /cvsroot/intermezzo/intermezzo/lento/Lento/InterMezzo/UpcallHandler.pm,v
retrieving revision 1.65
retrieving revision 1.66
diff -U2 -r1.65 -r1.66
--- UpcallHandler.pm	2001/08/04 01:55:02	1.65
+++ UpcallHandler.pm	2001/09/20 18:21:49	1.66
@@ -19,20 +19,94 @@
 sub OpenFile ($) {
     my $upcall = shift;
-    
-    # XXX: Remove for DOD
-    $upcall->complete(0);
-    
-    $upcall->{fset} = $::fsetdb->find_by_path($upcall->{path}) ||
-    die "no fileset for " . $upcall->{path};
-    
+
+    $upcall->{fset} = $::fsetdb->find_by_name($upcall->{fileset});
+    if (!$upcall->{fset}) {
+        confess "No such fileset $upcall->{fileset}";
+    }
+
     if ($upcall->{fset}->serversysid() eq $::mysysid) {
-    DEBUG "Not performing OpenFile for local fileset\n";
-    $upcall->complete(0);
+        DEBUG "Not performing OpenFile for local fileset\n";
+        $upcall->complete(0);
         return;
     }
-    
+
+    new POE::Session
+      ( [$upcall],
+        # Waited for by queued sessions
+        'Lento::EventSupport', ['AddWaiter','alias_set', 'alias_remove',
+                                '_child', '_stop'],
+        _start => sub {
+            my $upcall = $_[ARG0];
+            $_[HEAP]->{session_name} = $_[SESSION]->ID() .
+              ": OpenFile UPC ($upcall->{path}) $_[SESSION]";
+
+            $_[HEAP]->{upcall} = $upcall;
+            my $connection = $upcall->{fset}->server()->{connection_obj};
+
+            Lento::Debuggable::set_option($_[SESSION],
+                                          trace => 1, default => 1);
+
+            $_[KERNEL]->alias_set("Upcall - $_[SESSION]");
+
+            $_[HEAP]->{session_level} = LVL_FULL;
+
+            $_[HEAP]->{closeinfo} =
+              { path        => $upcall->{fset}->{mtpt} . $upcall->{path},
+                psdev       => $::psdev }; # XXX is this correct?
+
+            my $fetchtype = $upcall->{fset}->{fetchtype};
+            my $desc = "Lento::Bulk::$fetchtype"->new
+              ( { sink_type => $Lento::Bulk::SINK_FILE,
+                  sink_object => $upcall->{fset}->{mtpt} . $upcall->{path},
+                  role => $Lento::Bulk::SINK,
+                  direction => $Lento::Bulk::SERVER_TO_CLIENT,
+                  connection => $connection,
+                  callback_event => 'got_fetch_file',
+                  callback_session => $_[SESSION],
+                  #close_callback => 'Lento::KML::ReintLento::reint_close_callback',
+                  debug => 1 } );
+
+            DEBUG $desc->printme();
+
+            my ($rc, $bulk_session) = $desc->setup();
+            if ($rc) {
+                if ($rc == ENOENT) {
+                    DEBUG "Skipping backfetch for file $path\n";
+
+                    # fake the backfetch completion
+                    $poe_kernel->post($session,'got_backfetch_file', 1);
+                    return;
+                }
+
+                if ($rc == ENOTCONN) {
+                    DEBUG "Disconnected while trying to create sink desc\n";
+
+                    $poe_kernel->post($session,'got_backfetch_file', ENOTCONN);
+                    return;
+                }
+
+                $! = $rc;
+                confess "Error setting up bulk transfer ($!)\n";
+            }
+
+            # XXX: Should we test for return values from this function?
+            $desc->start();
+
+            DEBUG("SENDING backfetch request packet for $upcall->{path}\n");
+            $connection->sendpacket
+              ('REQ', ['BackFetch', $upcall->{fset}->{name}, $upcall->{path},
+                       $fetchtype, 0, 0, 0]);
+        },
+        got_fetch_file => sub {
+            my $error = $_[ARG0];
+
+            $_[HEAP]->{upcall}->complete($error);
+            $_[KERNEL]->alias_remove("Upcall - $_[SESSION]");
+        }
+      );
+
     return;
 }
-          
+
 sub OpenDir {
     my $upcall = shift;
@@ -176,5 +250,5 @@
                   DEBUG "GetPermit returns error $rc, need to retry\n";
                   DEBUG "Unlocking, but will retry in $delay seconds!\n";
-                  # Be a good samaritan and unlokc to let others fail as well!
+                  # Be a good samaritan and unlock to let others fail as well!
                   # This will prevent normal permit sessions from hanging around
                   # and will also allow us to pool any additional upcalls during


_______________________________________________
intermezzo-commit mailing list
intermezzo-commit@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/intermezzo-commit

[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic