[prev in list] [next in list] [prev in thread] [next in thread]
List: intermezzo-cvs
Subject: CVS: intermezzo/lento/Lento/InterMezzo ReqHandler.pm,1.133,1.134 UpcallHandler.pm,1.61,1.62
From: Gordon Matzigkeit <gord-fig () users ! sourceforge ! net>
Date: 2001-04-25 17:55:34
[Download RAW message or body]
Update of /cvsroot/intermezzo/intermezzo/lento/Lento/InterMezzo
In directory usw-pr-cvs1:/tmp/cvs-serv15423/lento/Lento/InterMezzo
Modified Files:
ReqHandler.pm UpcallHandler.pm
Log Message:
Excise Lento::Packet from the sources. Now everything is
XDR-based. Lento::Pinger-managed connections work, but replication does not.
Index: ReqHandler.pm
===================================================================
RCS file: /cvsroot/intermezzo/intermezzo/lento/Lento/InterMezzo/ReqHandler.pm,v
retrieving revision 1.133
retrieving revision 1.134
diff -U2 -r1.133 -r1.134
--- ReqHandler.pm 2001/04/23 20:50:53 1.133
+++ ReqHandler.pm 2001/04/25 17:55:32 1.134
@@ -9,5 +9,4 @@
use Lento::Connection;
use Lento::Debuggable;
-use Lento::Packet;
use Lento::ReqDispatcher;
use Lento::Replicator;
@@ -42,8 +41,4 @@
Lento::Debuggable::set_option($_[SESSION], trace => 1, default => 1);
$_[HEAP]->{req_packet} = $_[ARG0];
- if (UNIVERSAL::isa ($_[ARG0], 'Lento::Packet'))
- {
- confess "old packet";
- }
$_[HEAP]->{connection} = $_[ARG0]->connection;
@@ -94,14 +89,14 @@
sub SysId {
- my $packet = shift;
+ my $rpc = shift;
new POE::Session
- ([$packet, "SysId"],
+ ([$rpc, "SysId"],
'Lento::EventSupport', ['alias_set', 'alias_remove', '_stop',
'_child', '_signal'],
'Lento::InterMezzo::ReqHandler', ['_start', 'finish', 'got_error'],
REQ => sub {
- my $packet = $_[HEAP]->{req_packet};
- my $sysid = @{$_[HEAP]->{req_packet}->[PKT_PARMS]}[0];
+ my $rpc = $_[HEAP]->{req_packet};
+ my $sysid = $rpc->arg (0);
# XXX check security here!!
@@ -113,6 +108,6 @@
# this used to be in the reqhandler.
# this binds the connection
- $packet->[PKT_CONNECTION]->setsysid($sysid);
- $packet->[PKT_CONNECTION]->{sysid} = $sysid;
+ $rpc->connection->setsysid($sysid);
+ $rpc->connection->{sysid} = $sysid;
LOG "New client instantiated, sysid $sysid\n";
$client = new Lento::Client($sysid);
@@ -121,6 +116,6 @@
- $packet->[PKT_CONNECTION]->sendpacket('REP', $packet, [0]);
- $poe_kernel->yield('finish', [$packet, 0]);
+ $rpc->connection->sendrep ($rpc, [0]);
+ $poe_kernel->yield('finish');
}
);
@@ -132,8 +127,8 @@
# the replicator.
sub ReplicatorStatus {
- my $packet = shift;
+ my $rpc = shift;
new POE::Session
- ([$packet, "ReplicatorStatus"], # two arguments to _start
+ ([$rpc, "ReplicatorStatus"], # two arguments to _start
'Lento::EventSupport', ['alias_set', 'alias_remove',
'get_cookie', '_stop',
@@ -142,8 +137,8 @@
'Lento::InterMezzo::ReqHandler', ['_start', 'finish'],
REQ => sub {
- my $packet = $_[HEAP]->{req_packet};
- my $connection = $packet->[PKT_CONNECTION];
- my $fsetname = @{$packet->[PKT_PARMS]}[0];
- my $fsetinfo = @{$packet->[PKT_PARMS]}[1];
+ my $rpc = $_[HEAP]->{req_packet};
+ my $connection = $rpc->connection;
+ my $fsetname = $rpc->arg (0);
+ my $fsetinfo = $rpc->arg (1);
my $replicator =
Lento::Replicator::find($connection->{sysid}, $fsetname);
@@ -228,5 +223,5 @@
# get here when $fset->{permit} has reintegrated
my $status = $_[ARG0];
- my $packet = $_[HEAP]->{req_packet};
+ my $rpc = $_[HEAP]->{req_packet};
my $connection = $_[HEAP]->{connection};
my $replicator = $_[HEAP]->{replicator};
@@ -245,5 +240,5 @@
$_[HEAP]->{state}="finished";
- $connection->sendpacket('REP', $packet, [$status,$fsetstatus]);
+ $connection->sendrep($rpc, [$status,$fsetstatus]);
$_[KERNEL]->yield('release_lock');
},
@@ -283,8 +278,8 @@
sub Permit {
- my $packet = shift;
+ my $rpc = shift;
new POE::Session
- ([$packet, "Permit"], # two arguments to _start
+ ([$rpc, "Permit"], # two arguments to _start
'Lento::EventSupport', ['alias_set', 'alias_remove',
'get_cookie', '_stop',
@@ -293,7 +288,7 @@
'Lento::InterMezzo::ReqHandler', ['_start', 'finish'],
REQ => sub {
- my $packet = $_[HEAP]->{req_packet};
- my $connection = $packet->[PKT_CONNECTION];
- my $fsetname = @{$packet->[PKT_PARMS]}[0];
+ my $rpc = $_[HEAP]->{req_packet};
+ my $connection = $rpc->connection;
+ my $fsetname = $rpc->arg (0);
my $fset = $::fsetdb->find_by_name($fsetname);
my $replicator =
@@ -423,8 +418,8 @@
$_[SESSION]);
die "no getpermit session"
- unless defined $_[HEAP]->{wait_session};
+ unless defined $_[HEAP]->{wait_session};
} else {
- # We set this greedily here in case we get a got_error right after
- # this subroutine finishes.
+ # We set this greedily here in case we get a got_error
+ # right after this subroutine finishes.
$_[HEAP]->{state}='have_permit';
$_[KERNEL]->yield('permit_acquired',0);
@@ -449,5 +444,4 @@
},
revoke_permit => sub {
- my $packet = $_[HEAP]->{req_packet};
my $fset = $_[HEAP]->{fset};
my $cookie= $_[KERNEL]->call($_[SESSION],'get_cookie');
@@ -472,6 +466,6 @@
$_[KERNEL]->call($_[SESSION], 'reint_done');
} else {
- my $packet = $_[HEAP]->{req_packet};
- my $connection = $packet->[PKT_CONNECTION];
+ my $rpc = $_[HEAP]->{req_packet};
+ my $connection = $rpc->connection;
my $replicator = $_[HEAP]->{replicator};
@@ -496,5 +490,6 @@
# Reintegration complete; issue the permit.
- my $connection = $packet->[PKT_CONNECTION];
+ my $rpc = $_[HEAP]->{req_packet};
+ my $connection = $rpc->connection;
DEBUG "giving permit to $connection->{sysid}\n";
@@ -512,6 +507,6 @@
LOG(" PERMIT REP : holder= $_[HEAP]->{fset}->{permitholder}\n");
my $status = $_[ARG0];
- my $packet = $_[HEAP]->{req_packet};
- my $connection = $packet->[PKT_CONNECTION];
+ my $rpc = $_[HEAP]->{req_packet};
+ my $connection = $rpc->connection;
my $replicator = $_[HEAP]->{replicator};
my $fsetstatus=undef;
@@ -539,5 +534,5 @@
# be certain that the recipient did not receive our reply.
$_[HEAP]->{state}="finished";
- $connection->sendpacket('REP', $packet, [$status]);
+ $connection->sendrep($rpc, [$status]);
$_[KERNEL]->yield('release_lock');
}
@@ -548,6 +543,6 @@
my $errno = $_[ARG1];
- my $packet = $_[HEAP]->{req_packet};
- my $connection = $packet->[PKT_CONNECTION];
+ my $rpc = $_[HEAP]->{req_packet};
+ my $connection = $rpc->connection;
$errno=EAGAIN unless (defined $errno and $errno != 0);
@@ -566,8 +561,8 @@
} else {
# could not acquire permit from permit holder...tell remote
- # This might also cause a get error which will be trapped
+ # This might also cause a get error which will be trapped
$_[HEAP]->{state}='error';
- $connection->sendpacket('REP', $packet, [$errno]);
+ $connection->sendrep($rpc, [$errno]);
}
@@ -602,8 +597,8 @@
sub BackFetch {
- my $packet = shift;
+ my $rpc = shift;
new POE::Session
- ([$packet, "BackFetch"], # two arguments to _start
+ ([$rpc, "BackFetch"], # two arguments to _start
'Lento::EventSupport' , ['alias_set', 'alias_remove', '_stop',
'_child', '_signal'],
@@ -613,7 +608,7 @@
REQ => sub {
- my $packet = $_[HEAP]->{req_packet};
- my $fsetname = @{$packet->[PKT_PARMS]}[0];
- my $inodeinfo = @{$packet->[PKT_PARMS]}[1];
+ my $rpc = $_[HEAP]->{req_packet};
+ my $fsetname = $rpc->arg (0);
+ my $inodeinfo = $rpc->arg (1);
$_[HEAP]->{session_level} = LVL_RECOVER;
@@ -621,5 +616,5 @@
if (!defined($_[HEAP]->{fset} = $::fsetdb->find_by_name($fsetname))) {
# Invalid fileset name sent by client
- #$_[HEAP]->{connection}->sendpacket('REP', $packet, [EINVAL]);
+ #$_[HEAP]->{connection}->sendrep($rpc, [EINVAL]);
$_[KERNEL]->yield('finish');
}
@@ -635,10 +630,9 @@
role => $Lento::Bulk::Desc::SOURCE,
direction => $Lento::Bulk::Desc::SERVER_TO_CLIENT,
- connection => $packet->[PKT_CONNECTION],
+ connection => $rpc->connection,
debug => 1,
callback_event => 'finished_sending',
callback_session => $_[SESSION],
- source_act => $_[HEAP]->{req_packet}->[PKT_CTOKEN],
- sink_act => $_[HEAP]->{req_packet}->[PKT_STOKEN] }
+ sink_xid => $rpc->xid }
);
@@ -648,5 +642,5 @@
if ($rc) {
if ($rc == ENOENT) {
- #$_[HEAP]->{connection}->sendpacket('REP', $packet, [ENOENT]);
+ #$_[HEAP]->{connection}->sendrep($rpc, [ENOENT]);
$_[KERNEL]->yield('finish');
} else {
@@ -657,7 +651,6 @@
finished_sending => sub {
DEBUG "Got finished_sending in BackFetch\n";
- # $_[HEAP]->{connection}->sendpacket('REP', $packet, [0]);
- # DEBUG "Ping delta (server): " . (time() - $time) . "\n";
- $poe_kernel->yield('finish', [$packet, 0]);
+ # $_[HEAP]->{connection}->sendrep($rpc, [0]);
+ $poe_kernel->yield('finish');
}
);
@@ -667,8 +660,8 @@
# Receiver of KML
sub Reintegrate_KML {
- my $packet = shift;
+ my $rpc = shift;
new POE::Session
- ([$packet, "Reintegrate_KML"], # two arguments to _start
+ ([$rpc, "Reintegrate_KML"], # two arguments to _start
'Lento::EventSupport' , ['alias_set', 'alias_remove', '_stop',
'_child', '_signal'],
@@ -676,18 +669,17 @@
'Lento::Bulk::Sink' , ['done_sinking', 'EOD', 'DAT' ],
REQ => sub {
- my $fsetname = shift @{$_[HEAP]->{req_packet}->[PKT_PARMS]};
+ my $rpc = $_[HEAP]->{req_packet};
+ my $fsetname = $rpc->arg (0);
# use the naming used by the sender of the KML
- $_[HEAP]->{next_to_send} =
- shift @{$_[HEAP]->{req_packet}->[PKT_PARMS]};
- my $fsetinfo= $_[HEAP]->{fsetinfo} =
- shift @{$_[HEAP]->{req_packet}->[PKT_PARMS]};
- my $segment_length = shift @{$_[HEAP]->{req_packet}->[PKT_PARMS]};
+ $_[HEAP]->{next_to_send} = $rpc->arg (1);
+ my $fsetinfo = $rpc->arg (2);
+ my $segment_length = $rpc->arg (3);
$_[HEAP]->{segment_length} = $segment_length;
- $_[HEAP]->{connection} = $_[HEAP]->{req_packet}->[PKT_CONNECTION];
+ $_[HEAP]->{connection} = $rpc->connection;
# offsets/recnos in remote's nomenclature, so remote's remote is us!
($_[HEAP]->{remote_kmlsize}, $_[HEAP]->{last_remote_recno},
$_[HEAP]->{last_remote_offset}, $_[HEAP]->{last_local_recno},
- $_[HEAP]->{last_local_offset}) = unpack("NNNNN", $fsetinfo);
+ $_[HEAP]->{last_local_offset}) = @$fsetinfo;
$_[HEAP]->{kmlbuf} = "";
@@ -697,5 +689,5 @@
if (!defined($_[HEAP]->{fset} = $::fsetdb->find_by_name($fsetname))) {
# Invalid fileset name sent by client
- $_[HEAP]->{connection}->sendpacket('REP', $packet, [EINVAL]);
+ $_[HEAP]->{connection}->sendrep($rpc, [EINVAL]);
$_[KERNEL]->yield('done');
return;
@@ -708,5 +700,5 @@
LOG ("No replicator for client $_[HEAP]->{connection}->{sysid}".
"fileset $fsetname\n");
- $_[HEAP]->{connection}->sendpacket('REP', $packet, [EINVAL]);
+ $_[HEAP]->{connection}->sendrep($rpc, [EINVAL]);
$_[KERNEL]->yield('done');
return;
@@ -723,5 +715,5 @@
warn "-=- permitholder: $_[HEAP]->{fset}->{permitholder}\n";
warn "-=- connectsysid: $_[HEAP]->{connection}->{sysid}\n";
- $_[HEAP]->{connection}->sendpacket('REP', $packet,[EPERM]);
+ $_[HEAP]->{connection}->sendrep($rpc, [EPERM]);
$_[KERNEL]->yield('complete');
return 0;
@@ -735,5 +727,5 @@
DEBUG("last_remote_offset %d, last_local_offset %d,".
" last_remote_recno %d, last_local_recno %d\n".
- " replicator %s\n, length %d, S-ACT %d\n",
+ " replicator %s\n, length %d, S-XID %d\n",
$_[HEAP]->{last_remote_offset},
$_[HEAP]->{last_local_offset},
@@ -742,5 +734,5 @@
$_[HEAP]->{replicator}->printme(),
$segment_length,
- $_[HEAP]->{req_packet}->[PKT_STOKEN]);
+ $_[HEAP]->{req_packet}->xid);
my $fset = $_[HEAP]->{fset};
@@ -775,6 +767,6 @@
#} else {
DEBUG("Resetting next_to_send\n");
- $_[HEAP]->{replicator}->next_to_send(
- \
$_[HEAP]->{last_remote_offset}); + \
$_[HEAP]->{replicator}->next_to_send + ($_[HEAP]->{last_remote_offset});
#}
}
@@ -803,11 +795,11 @@
print " RECORDS WERE LOST...MANUAL RESYNC MAY BE NEEDED\n";
DEBUG "******lost records...disconnecting*****\n";
- $_[KERNEL]->yield('got_error',"reintegrate kml",ESTALE,
- "bad record sequence");
+ $_[KERNEL]->yield('got_error', "reintegrate kml",
+ ESTALE, "bad record sequence");
# and disconnect
$_[HEAP]->{connection}->set_error
- ('got_error',"reintegrate kml",ESTALE,
- "bad record sequence");
+ ('got_error', "reintegrate kml",
+ ESTALE, "bad record sequence");
return;
}
@@ -825,8 +817,15 @@
# do we have non-reintegrated data?
if ($::psdev->get_kml_size($fset->mtpt()) - $max_local_offset) {
- DEBUG($replicator->printme()." kml \
size=".$::psdev->get_kml_size($fset->mtpt())." local offs= $max_local_offset\n"); + \
DEBUG($replicator->printme()." kml size=". + \
$::psdev->get_kml_size($fset->mtpt()). + " local offs= $max_local_offset\n");
die "Possible conflict; resync manually\n";
}
+ # Forge an XID for the current session.
+ $_[HEAP]->{session_xid} = $rpc->connection->{rpcsubsys}->cookie
+ (undef, $_[KERNEL]->ID_session_to_id ($_[SESSION]));
+
+ # create the sink bulk descriptor
my $desc = Lento::Bulk::Desc->new
({ sink_type => $Lento::Bulk::Sink::SINK_BUFFER,
@@ -835,11 +834,13 @@
role => $Lento::Bulk::Desc::SINK,
direction => $Lento::Bulk::Desc::CLIENT_TO_SERVER,
- connection => $_[HEAP]->{req_packet}->[PKT_CONNECTION],
+ connection => $_[HEAP]->{connection},
callback_session => $_[SESSION],
callback_event => 'got_kml',
- source_act => $_[HEAP]->{req_packet}->[PKT_CTOKEN],
- sink_act => $_[HEAP]->{req_packet}->[PKT_STOKEN],
+ source_xid => $rpc->xid,
+ sink_xid => $_[HEAP]->{session_xid},
debug => 1 } );
-
+
+ delete $desc->{bytesize} if ($desc->{bytesize} <= 0);
+
DEBUG("Created sink descriptor for receiving KML:\n");
DEBUG $desc->printme();
@@ -857,4 +858,8 @@
my $error = $_[ARG0];
+ # Release our session_xid.
+ $rpc->connection->{rpcsubsys}->cookie
+ ($_[HEAP]->{session_xid}, undef);
+
if ($error) {
# This is draconian but needed!
@@ -924,9 +929,12 @@
# If there are backfetches pending then got_backfetch_file will
#take care of sending the confirmation.
- $_[KERNEL]->yield('send_confirmation', 'done')
- unless ($_[HEAP]->{backfetches_active}>0);
-
- DEBUG ("Not sending confirmation\n")
- if ($_[HEAP]->{backfetches_active}>0);
+ if ($_[HEAP]->{backfetches_active}>0)
+ {
+ DEBUG ("Not sending confirmation\n");
+ }
+ else
+ {
+ $_[KERNEL]->yield('send_confirmation', 'done');
+ }
} else {
# We continue with reintegration...if the last operation was a
@@ -976,6 +984,6 @@
# Schedule a new round of reintegrations if do_reints are waiting
if ($_[HEAP]->{reint_waiting}) {
- # We set this to zero here to ensure that any additional got
- # backfetch events do not trigger of more do_reints
+ # We set this to zero here to ensure that any additional
+ # got backfetch events do not trigger of more do_reints
$_[HEAP]->{reint_waiting}=0;
$_[KERNEL]->yield('do_reint');
@@ -990,12 +998,12 @@
DEBUG("CONFIRMATION: offs_incr: $_[HEAP]->{offs_incr}\n");
if ($status eq 'done') {
- $_[HEAP]->{connection}->sendpacket
- ('REP', $packet,
- [0, $_[HEAP]->{offs_incr}, $_[HEAP]->{recno_incr}]);
+ $_[HEAP]->{connection}->sendrep
+ ($rpc,
+ [0, [$_[HEAP]->{offs_incr}, $_[HEAP]->{recno_incr}]]);
}
$_[KERNEL]->yield('complete');
},
complete => sub {
- my $packet = $_[HEAP]->{req_packet};
+ my $rpc = $_[HEAP]->{req_packet};
$_[KERNEL]->yield('finish');
Index: UpcallHandler.pm
===================================================================
RCS file: /cvsroot/intermezzo/intermezzo/lento/Lento/InterMezzo/UpcallHandler.pm,v
retrieving revision 1.61
retrieving revision 1.62
diff -U2 -r1.61 -r1.62
--- UpcallHandler.pm 2001/02/07 11:14:36 1.61
+++ UpcallHandler.pm 2001/04/25 17:55:32 1.62
@@ -5,5 +5,4 @@
use Lento::Connection;
use Lento::Debuggable;
-use Lento::Packet;
use Lento::Filter::Upcall;
use Lento::ReqDispatcher;
_______________________________________________
intermezzo-commit mailing list
intermezzo-commit@lists.sourceforge.net
http://lists.sourceforge.net/lists/listinfo/intermezzo-commit
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic