[prev in list] [next in list] [prev in thread] [next in thread] 

List:       intermezzo-cvs
Subject:    CVS: intermezzo/lento/Lento/InterMezzo ReqHandler.pm,1.133,1.134 UpcallHandler.pm,1.61,1.62
From:       Gordon Matzigkeit <gord-fig () users ! sourceforge ! net>
Date:       2001-04-25 17:55:34
[Download RAW message or body]

Update of /cvsroot/intermezzo/intermezzo/lento/Lento/InterMezzo
In directory usw-pr-cvs1:/tmp/cvs-serv15423/lento/Lento/InterMezzo

Modified Files:
	ReqHandler.pm UpcallHandler.pm 
Log Message:
Excise Lento::Packet from the sources.  Now everything is
XDR-based.  Lento::Pinger-managed connections work, but replication does not.

Index: ReqHandler.pm
===================================================================
RCS file: /cvsroot/intermezzo/intermezzo/lento/Lento/InterMezzo/ReqHandler.pm,v
retrieving revision 1.133
retrieving revision 1.134
diff -U2 -r1.133 -r1.134
--- ReqHandler.pm	2001/04/23 20:50:53	1.133
+++ ReqHandler.pm	2001/04/25 17:55:32	1.134
@@ -9,5 +9,4 @@
 use Lento::Connection;
 use Lento::Debuggable;
-use Lento::Packet;
 use Lento::ReqDispatcher;
 use Lento::Replicator;
@@ -42,8 +41,4 @@
     Lento::Debuggable::set_option($_[SESSION], trace => 1, default => 1);
     $_[HEAP]->{req_packet} = $_[ARG0];
-    if (UNIVERSAL::isa ($_[ARG0], 'Lento::Packet'))
-    {
-        confess "old packet";
-    }
 
     $_[HEAP]->{connection} = $_[ARG0]->connection;
@@ -94,14 +89,14 @@
 
 sub SysId {
-    my $packet = shift;
+    my $rpc = shift;
     
     new POE::Session
-        ([$packet, "SysId"],
+        ([$rpc, "SysId"],
          'Lento::EventSupport', ['alias_set', 'alias_remove', '_stop',
                                  '_child', '_signal'],
          'Lento::InterMezzo::ReqHandler', ['_start', 'finish', 'got_error'],
          REQ => sub {
-             my $packet = $_[HEAP]->{req_packet};
-             my $sysid = @{$_[HEAP]->{req_packet}->[PKT_PARMS]}[0];
+             my $rpc = $_[HEAP]->{req_packet};
+             my $sysid = $rpc->arg (0);
              
              # XXX check security here!!
@@ -113,6 +108,6 @@
                  # this used to be in the reqhandler. 
                  # this binds the connection
-                 $packet->[PKT_CONNECTION]->setsysid($sysid);
-                 $packet->[PKT_CONNECTION]->{sysid} = $sysid;
+                 $rpc->connection->setsysid($sysid);
+                 $rpc->connection->{sysid} = $sysid;
                  LOG "New client instantiated, sysid $sysid\n";
                  $client = new Lento::Client($sysid);
@@ -121,6 +116,6 @@
              
              
-             $packet->[PKT_CONNECTION]->sendpacket('REP', $packet, [0]);
-             $poe_kernel->yield('finish', [$packet, 0]);
+             $rpc->connection->sendrep ($rpc, [0]);
+             $poe_kernel->yield('finish');
          }
         );
@@ -132,8 +127,8 @@
 # the replicator. 
 sub ReplicatorStatus {
-    my $packet = shift;
+    my $rpc = shift;
     
     new POE::Session
-        ([$packet, "ReplicatorStatus"], # two arguments to _start
+        ([$rpc, "ReplicatorStatus"], # two arguments to _start
          'Lento::EventSupport', ['alias_set', 'alias_remove', 
                                  'get_cookie', '_stop',
@@ -142,8 +137,8 @@
          'Lento::InterMezzo::ReqHandler', ['_start', 'finish'],
          REQ => sub {
-             my $packet = $_[HEAP]->{req_packet};
-             my $connection = $packet->[PKT_CONNECTION];
-             my $fsetname = @{$packet->[PKT_PARMS]}[0];
-             my $fsetinfo = @{$packet->[PKT_PARMS]}[1];
+             my $rpc = $_[HEAP]->{req_packet};
+             my $connection = $rpc->connection;
+             my $fsetname = $rpc->arg (0);
+             my $fsetinfo = $rpc->arg (1);
              my $replicator =
                  Lento::Replicator::find($connection->{sysid}, $fsetname);
@@ -228,5 +223,5 @@
              # get here when $fset->{permit} has reintegrated
              my $status = $_[ARG0];
-             my $packet = $_[HEAP]->{req_packet};
+             my $rpc = $_[HEAP]->{req_packet};
              my $connection = $_[HEAP]->{connection};
              my $replicator = $_[HEAP]->{replicator}; 
@@ -245,5 +240,5 @@
              
              $_[HEAP]->{state}="finished";
-             $connection->sendpacket('REP', $packet, [$status,$fsetstatus]);
+             $connection->sendrep($rpc, [$status,$fsetstatus]);
              $_[KERNEL]->yield('release_lock');
          },
@@ -283,8 +278,8 @@
 
 sub Permit {
-    my $packet = shift;
+    my $rpc = shift;
     
     new POE::Session
-        ([$packet, "Permit"],   # two arguments to _start
+        ([$rpc, "Permit"],   # two arguments to _start
          'Lento::EventSupport', ['alias_set', 'alias_remove', 
                                  'get_cookie', '_stop',
@@ -293,7 +288,7 @@
          'Lento::InterMezzo::ReqHandler', ['_start', 'finish'],
          REQ => sub {
-             my $packet = $_[HEAP]->{req_packet};
-             my $connection = $packet->[PKT_CONNECTION];
-             my $fsetname = @{$packet->[PKT_PARMS]}[0];
+             my $rpc = $_[HEAP]->{req_packet};
+             my $connection = $rpc->connection;
+             my $fsetname = $rpc->arg (0);
              my $fset = $::fsetdb->find_by_name($fsetname);
              my $replicator =
@@ -423,8 +418,8 @@
                                                 $_[SESSION]);
                  die "no getpermit session"
-                                        unless defined $_[HEAP]->{wait_session};
+		     unless defined $_[HEAP]->{wait_session};
              } else {
-                 # We set this greedily here in case we get a got_error right after
-                 # this subroutine finishes.
+                 # We set this greedily here in case we get a got_error
+                 # right after this subroutine finishes.
                  $_[HEAP]->{state}='have_permit';
                  $_[KERNEL]->yield('permit_acquired',0);
@@ -449,5 +444,4 @@
          },
          revoke_permit => sub {
-             my $packet = $_[HEAP]->{req_packet};
              my $fset = $_[HEAP]->{fset};
              my $cookie= $_[KERNEL]->call($_[SESSION],'get_cookie');
@@ -472,6 +466,6 @@
                  $_[KERNEL]->call($_[SESSION], 'reint_done');
              } else {   
-                 my $packet = $_[HEAP]->{req_packet};
-                 my $connection = $packet->[PKT_CONNECTION];
+                 my $rpc = $_[HEAP]->{req_packet};
+                 my $connection = $rpc->connection;
                  my $replicator = $_[HEAP]->{replicator}; 
                  
@@ -496,5 +490,6 @@
              
              # Reintegration complete; issue the permit.
-             my $connection = $packet->[PKT_CONNECTION];
+             my $rpc = $_[HEAP]->{req_packet};
+             my $connection = $rpc->connection;
              
              DEBUG "giving permit to $connection->{sysid}\n";
@@ -512,6 +507,6 @@
              LOG("  PERMIT REP : holder= $_[HEAP]->{fset}->{permitholder}\n");
              my $status = $_[ARG0];
-             my $packet = $_[HEAP]->{req_packet};
-             my $connection = $packet->[PKT_CONNECTION];
+             my $rpc = $_[HEAP]->{req_packet};
+             my $connection = $rpc->connection;
              my $replicator = $_[HEAP]->{replicator}; 
              my $fsetstatus=undef;
@@ -539,5 +534,5 @@
                  # be certain that the recipient did not receive our reply.
                  $_[HEAP]->{state}="finished";
-                 $connection->sendpacket('REP', $packet, [$status]);
+                 $connection->sendrep($rpc, [$status]);
                  $_[KERNEL]->yield('release_lock');
              }
@@ -548,6 +543,6 @@
              
              my $errno = $_[ARG1];
-             my $packet = $_[HEAP]->{req_packet};
-             my $connection = $packet->[PKT_CONNECTION];
+             my $rpc = $_[HEAP]->{req_packet};
+             my $connection = $rpc->connection;
              
              $errno=EAGAIN unless (defined $errno and $errno != 0);
@@ -566,8 +561,8 @@
              } else {
                  # could not acquire permit from permit holder...tell remote
-                 # This might also cause a get error which will be trapped 
+                 # This might also cause a get error which will be trapped
                  $_[HEAP]->{state}='error';
                  
-                 $connection->sendpacket('REP', $packet, [$errno]);
+                 $connection->sendrep($rpc, [$errno]);
              }
              
@@ -602,8 +597,8 @@
 
 sub BackFetch {
-    my $packet = shift;
+    my $rpc = shift;
     
     new POE::Session
-        ([$packet, "BackFetch"], # two arguments to _start
+        ([$rpc, "BackFetch"], # two arguments to _start
          'Lento::EventSupport' , ['alias_set', 'alias_remove', '_stop', 
                                   '_child', '_signal'], 
@@ -613,7 +608,7 @@
          
          REQ => sub {
-             my $packet = $_[HEAP]->{req_packet};
-             my $fsetname = @{$packet->[PKT_PARMS]}[0];
-             my $inodeinfo = @{$packet->[PKT_PARMS]}[1];
+             my $rpc = $_[HEAP]->{req_packet};
+             my $fsetname = $rpc->arg (0);
+             my $inodeinfo = $rpc->arg (1);
              
              $_[HEAP]->{session_level} = LVL_RECOVER;
@@ -621,5 +616,5 @@
              if (!defined($_[HEAP]->{fset} = $::fsetdb->find_by_name($fsetname))) {
                  # Invalid fileset name sent by client
-                 #$_[HEAP]->{connection}->sendpacket('REP', $packet, [EINVAL]);
+                 #$_[HEAP]->{connection}->sendrep($rpc, [EINVAL]);
                  $_[KERNEL]->yield('finish');
              }
@@ -635,10 +630,9 @@
                     role             => $Lento::Bulk::Desc::SOURCE,
                     direction        => $Lento::Bulk::Desc::SERVER_TO_CLIENT,
-                    connection       => $packet->[PKT_CONNECTION],
+                    connection       => $rpc->connection,
                     debug            => 1,
                     callback_event   => 'finished_sending',
                     callback_session => $_[SESSION],
-                    source_act => $_[HEAP]->{req_packet}->[PKT_CTOKEN],
-                    sink_act => $_[HEAP]->{req_packet}->[PKT_STOKEN] } 
+                    sink_xid => $rpc->xid } 
                  );
              
@@ -648,5 +642,5 @@
              if ($rc) {
                  if ($rc == ENOENT) {
-                     #$_[HEAP]->{connection}->sendpacket('REP', $packet, [ENOENT]);
+                     #$_[HEAP]->{connection}->sendrep($rpc, [ENOENT]);
                      $_[KERNEL]->yield('finish');
                  } else {
@@ -657,7 +651,6 @@
          finished_sending => sub {
              DEBUG "Got finished_sending in BackFetch\n";
-             #     $_[HEAP]->{connection}->sendpacket('REP', $packet, [0]);
-             #   DEBUG "Ping delta (server): " . (time() - $time) . "\n";
-             $poe_kernel->yield('finish', [$packet, 0]);
+             # $_[HEAP]->{connection}->sendrep($rpc, [0]);
+             $poe_kernel->yield('finish');
          }
         );
@@ -667,8 +660,8 @@
 # Receiver of KML
 sub Reintegrate_KML {
-    my $packet = shift;
+    my $rpc = shift;
     
     new POE::Session
-        ([$packet, "Reintegrate_KML"], # two arguments to _start
+        ([$rpc, "Reintegrate_KML"], # two arguments to _start
          'Lento::EventSupport' , ['alias_set', 'alias_remove', '_stop', 
                                   '_child', '_signal'], 
@@ -676,18 +669,17 @@
          'Lento::Bulk::Sink' , ['done_sinking', 'EOD', 'DAT' ], 
          REQ => sub {
-             my $fsetname = shift @{$_[HEAP]->{req_packet}->[PKT_PARMS]};
+	     my $rpc = $_[HEAP]->{req_packet};
+             my $fsetname = $rpc->arg (0);
              # use the naming used by the sender of the KML
-             $_[HEAP]->{next_to_send} = 
-                 shift @{$_[HEAP]->{req_packet}->[PKT_PARMS]};
-             my $fsetinfo= $_[HEAP]->{fsetinfo} = 
-                 shift @{$_[HEAP]->{req_packet}->[PKT_PARMS]};
-             my $segment_length = shift @{$_[HEAP]->{req_packet}->[PKT_PARMS]};
+             $_[HEAP]->{next_to_send} = $rpc->arg (1);
+             my $fsetinfo = $rpc->arg (2);
+             my $segment_length = $rpc->arg (3);
              $_[HEAP]->{segment_length} = $segment_length;
-             $_[HEAP]->{connection} = $_[HEAP]->{req_packet}->[PKT_CONNECTION];
+             $_[HEAP]->{connection} = $rpc->connection;
              
              # offsets/recnos in remote's nomenclature, so remote's remote is us!
              ($_[HEAP]->{remote_kmlsize}, $_[HEAP]->{last_remote_recno},
               $_[HEAP]->{last_remote_offset}, $_[HEAP]->{last_local_recno},
-              $_[HEAP]->{last_local_offset}) =  unpack("NNNNN", $fsetinfo);
+              $_[HEAP]->{last_local_offset}) =  @$fsetinfo;
              
              $_[HEAP]->{kmlbuf} = "";
@@ -697,5 +689,5 @@
              if (!defined($_[HEAP]->{fset} = $::fsetdb->find_by_name($fsetname))) {
                  # Invalid fileset name sent by client
-                 $_[HEAP]->{connection}->sendpacket('REP', $packet, [EINVAL]);
+                 $_[HEAP]->{connection}->sendrep($rpc, [EINVAL]);
                  $_[KERNEL]->yield('done');
                  return;
@@ -708,5 +700,5 @@
                  LOG ("No replicator for client $_[HEAP]->{connection}->{sysid}".
                       "fileset $fsetname\n");
-                 $_[HEAP]->{connection}->sendpacket('REP', $packet, [EINVAL]);
+                 $_[HEAP]->{connection}->sendrep($rpc, [EINVAL]);
                  $_[KERNEL]->yield('done');
                  return;
@@ -723,5 +715,5 @@
                      warn "-=- permitholder: $_[HEAP]->{fset}->{permitholder}\n";
                      warn "-=- connectsysid: $_[HEAP]->{connection}->{sysid}\n";
-                     $_[HEAP]->{connection}->sendpacket('REP', $packet,[EPERM]);
+		     $_[HEAP]->{connection}->sendrep($rpc, [EPERM]);
                      $_[KERNEL]->yield('complete');
                      return 0;
@@ -735,5 +727,5 @@
              DEBUG("last_remote_offset %d, last_local_offset %d,".
                    " last_remote_recno %d,  last_local_recno %d\n".
-                   " replicator %s\n, length %d, S-ACT %d\n",
+                   " replicator %s\n, length %d, S-XID %d\n",
                    $_[HEAP]->{last_remote_offset}, 
                    $_[HEAP]->{last_local_offset}, 
@@ -742,5 +734,5 @@
                    $_[HEAP]->{replicator}->printme(), 
                    $segment_length, 
-                   $_[HEAP]->{req_packet}->[PKT_STOKEN]);
+                   $_[HEAP]->{req_packet}->xid);
              
              my $fset = $_[HEAP]->{fset};
@@ -775,6 +767,6 @@
                      #} else {
                      DEBUG("Resetting next_to_send\n");
-                     $_[HEAP]->{replicator}->next_to_send(
-                                                          \
$_[HEAP]->{last_remote_offset}); +                     \
$_[HEAP]->{replicator}->next_to_send +			 ($_[HEAP]->{last_remote_offset});
                      #}
                  }
@@ -803,11 +795,11 @@
                          print " RECORDS WERE LOST...MANUAL RESYNC MAY BE NEEDED\n";
                          DEBUG "******lost records...disconnecting*****\n"; 
-                         $_[KERNEL]->yield('got_error',"reintegrate kml",ESTALE,
-                                           "bad record sequence");
+                         $_[KERNEL]->yield('got_error', "reintegrate kml",
+					   ESTALE, "bad record sequence");
                          
                          # and disconnect
                          $_[HEAP]->{connection}->set_error
-                                ('got_error',"reintegrate kml",ESTALE,
-                                          "bad record sequence");
+                                ('got_error', "reintegrate kml",
+				 ESTALE, "bad record sequence");
                          return;
                      }
@@ -825,8 +817,15 @@
              # do we have non-reintegrated data? 
              if ($::psdev->get_kml_size($fset->mtpt()) - $max_local_offset) {
-                 DEBUG($replicator->printme()." kml \
size=".$::psdev->get_kml_size($fset->mtpt())." local offs= $max_local_offset\n"); +   \
DEBUG($replicator->printme()." kml size=". +		       \
$::psdev->get_kml_size($fset->mtpt()). +		       " local offs= $max_local_offset\n");
                  die "Possible conflict; resync manually\n";
              }
              
+	     # Forge an XID for the current session.
+	     $_[HEAP]->{session_xid} = $rpc->connection->{rpcsubsys}->cookie
+		 (undef, $_[KERNEL]->ID_session_to_id ($_[SESSION]));
+
+	     # create the sink bulk descriptor
              my $desc = Lento::Bulk::Desc->new
                  ({ sink_type => $Lento::Bulk::Sink::SINK_BUFFER, 
@@ -835,11 +834,13 @@
                     role =>  $Lento::Bulk::Desc::SINK,
                     direction => $Lento::Bulk::Desc::CLIENT_TO_SERVER,
-                    connection => $_[HEAP]->{req_packet}->[PKT_CONNECTION],
+                    connection => $_[HEAP]->{connection},
                     callback_session => $_[SESSION],
                     callback_event => 'got_kml',
-                    source_act => $_[HEAP]->{req_packet}->[PKT_CTOKEN],
-                    sink_act => $_[HEAP]->{req_packet}->[PKT_STOKEN],
+                    source_xid => $rpc->xid,
+                    sink_xid => $_[HEAP]->{session_xid},
                     debug => 1 } );
-             
+
+	     delete $desc->{bytesize} if ($desc->{bytesize} <= 0);
+
              DEBUG("Created sink descriptor for receiving KML:\n");
              DEBUG $desc->printme();
@@ -857,4 +858,8 @@
              my $error = $_[ARG0];
              
+	     # Release our session_xid.
+	     $rpc->connection->{rpcsubsys}->cookie
+		 ($_[HEAP]->{session_xid}, undef);
+
              if ($error) { 
                  # This is draconian but needed!
@@ -924,9 +929,12 @@
                  # If there are backfetches pending then got_backfetch_file will
                  #take care of sending the confirmation.
-                 $_[KERNEL]->yield('send_confirmation', 'done') 
-                     unless ($_[HEAP]->{backfetches_active}>0);
-                 
-                 DEBUG ("Not sending confirmation\n") 
-                     if ($_[HEAP]->{backfetches_active}>0);
+		 if ($_[HEAP]->{backfetches_active}>0)
+		 {
+		     DEBUG ("Not sending confirmation\n");
+		 }
+		 else
+		 {
+		     $_[KERNEL]->yield('send_confirmation', 'done');
+		 }
              } else {
                  # We continue with reintegration...if the last operation was a 
@@ -976,6 +984,6 @@
                  # Schedule a new round of reintegrations if do_reints are waiting
                  if ($_[HEAP]->{reint_waiting}) {
-                     # We set this to zero here to ensure that any additional got
-                     # backfetch events do not trigger of more do_reints
+                     # We set this to zero here to ensure that any additional
+                     # got backfetch events do not trigger of more do_reints
                      $_[HEAP]->{reint_waiting}=0;
                      $_[KERNEL]->yield('do_reint');
@@ -990,12 +998,12 @@
              DEBUG("CONFIRMATION: offs_incr: $_[HEAP]->{offs_incr}\n");
              if ($status eq 'done') { 
-                 $_[HEAP]->{connection}->sendpacket
-                     ('REP', $packet, 
-                      [0, $_[HEAP]->{offs_incr}, $_[HEAP]->{recno_incr}]);
+                 $_[HEAP]->{connection}->sendrep
+                     ($rpc,
+                      [0, [$_[HEAP]->{offs_incr}, $_[HEAP]->{recno_incr}]]);
              }
              $_[KERNEL]->yield('complete');
          },
          complete => sub {
-             my $packet = $_[HEAP]->{req_packet};
+             my $rpc = $_[HEAP]->{req_packet};
              $_[KERNEL]->yield('finish');
              

Index: UpcallHandler.pm
===================================================================
RCS file: /cvsroot/intermezzo/intermezzo/lento/Lento/InterMezzo/UpcallHandler.pm,v
retrieving revision 1.61
retrieving revision 1.62
diff -U2 -r1.61 -r1.62
--- UpcallHandler.pm	2001/02/07 11:14:36	1.61
+++ UpcallHandler.pm	2001/04/25 17:55:32	1.62
@@ -5,5 +5,4 @@
 use Lento::Connection;
 use Lento::Debuggable;
-use Lento::Packet;
 use Lento::Filter::Upcall;
 use Lento::ReqDispatcher;


_______________________________________________
intermezzo-commit mailing list
intermezzo-commit@lists.sourceforge.net
http://lists.sourceforge.net/lists/listinfo/intermezzo-commit


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic