[prev in list] [next in list] [prev in thread] [next in thread]
List: nutch-cvs
Subject: [Nutch-cvs] nutch/src/java/net/nutch/mapReduce MapTask.java,1.3,1.4
From: Doug Cutting <cutting () users ! sourceforge ! net>
Date: 2005-01-24 22:17:03
Message-ID: E1CtCWS-0006uS-Ob () sc8-pr-cvs1 ! sourceforge ! net
[Download RAW message or body]
Update of /cvsroot/nutch/nutch/src/java/net/nutch/mapReduce
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv26458
Modified Files:
MapTask.java
Log Message:
Close input & output.
Index: MapTask.java
===================================================================
RCS file: /cvsroot/nutch/nutch/src/java/net/nutch/mapReduce/MapTask.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -C2 -d -r1.3 -r1.4
*** MapTask.java 24 Jan 2005 20:07:19 -0000 1.3
--- MapTask.java 24 Jan 2005 22:16:51 -0000 1.4
***************
*** 43,101 ****
final int partitions = job.getNumReduceTasks();
final SequenceFile.Writer[] outs = new SequenceFile.Writer[partitions];
- String tmpDir = NutchConf.get("mapred.temp.dir", "/tmp/mapred");
- String outDir = tmpDir+"/"+this.getTaskId();
- for (int i = 0; i < partitions; i++) {
- outs[i] =
- new SequenceFile.Writer(NutchFileSystem.getNamed("local"),
- new File(outDir, "out-"+i).toString(),
- job.getOutputKeyClass(),
- job.getOutputValueClass());
- }
-
- Mapper mapper; // make mapper & partitioner
- final Partitioner partitioner;
try {
! mapper = (Mapper)job.getMapperClass().newInstance();
! partitioner = (Partitioner)job.getPartitionerClass().newInstance();
! } catch (Exception e) {
! throw new RuntimeException(e);
! }
! OutputCollector collector = new OutputCollector() { // make collector
! public void collect(WritableComparable key, Writable value)
! throws IOException {
! outs[partitioner.getPartition(key, partitions)].append(key, value);
! }
! };
! float lastProgress = 0.0f;
! RecordReader in = // open the input
! job.getInputFormat().getRecordReader(job.getFileSystem(), split);
! WritableComparable key;
! Writable value;
! try {
! key = (WritableComparable)job.getInputKeyClass().newInstance();
! value = (Writable)job.getInputValueClass().newInstance();
! } catch (Exception e) {
! throw new IOException(e.toString());
! }
! while (in.next(key, value)) { // map input to collector
! mapper.map(key, value, collector);
! float progress = // compute progress
! (float)in.getPos()-split.getStart()/(float)split.getLength();
!
! if ((progress - lastProgress) > 0.005f) // report every 1/2%
! umbilical.progress(new UTF8(getTaskId()), new FloatWritable(progress));
! }
! for (int i = 0; i < partitions; i++) { // close output
! outs[i].close();
}
}
!
}
--- 43,109 ----
final int partitions = job.getNumReduceTasks();
final SequenceFile.Writer[] outs = new SequenceFile.Writer[partitions];
try {
! String tmpDir = NutchConf.get("mapred.temp.dir", "/tmp/mapred");
! String outDir = tmpDir+"/"+this.getTaskId();
! for (int i = 0; i < partitions; i++) {
! outs[i] =
! new SequenceFile.Writer(NutchFileSystem.getNamed("local"),
! new File(outDir, "out-"+i).toString(),
! job.getOutputKeyClass(),
! job.getOutputValueClass());
! }
! Mapper mapper; // make mapper & partitioner
! final Partitioner partitioner;
! try {
! mapper = (Mapper)job.getMapperClass().newInstance();
! partitioner = (Partitioner)job.getPartitionerClass().newInstance();
! } catch (Exception e) {
! throw new RuntimeException(e);
! }
! OutputCollector collector = new OutputCollector() { // make collector
! public void collect(WritableComparable key, Writable value)
! throws IOException {
! outs[partitioner.getPartition(key, partitions)].append(key, value);
! }
! };
! float lastProgress = 0.0f;
! WritableComparable key;
! Writable value;
! try {
! key = (WritableComparable)job.getInputKeyClass().newInstance();
! value = (Writable)job.getInputValueClass().newInstance();
! } catch (Exception e) {
! throw new IOException(e.toString());
! }
! RecordReader in = // open the input
! job.getInputFormat().getRecordReader(job.getFileSystem(), split);
! try {
! while (in.next(key, value)) { // map input to collector
! mapper.map(key, value, collector);
!
! float progress = // compute progress
! (float)(in.getPos()-split.getStart())/(float)split.getLength();
!
! if ((progress - lastProgress) > 0.001f) // 1000 progress reports
! umbilical.progress(new UTF8(getTaskId()),
! new FloatWritable(progress));
! }
! } finally {
! in.close(); // close input
! }
! } finally {
! for (int i = 0; i < partitions; i++) { // close output
! outs[i].close();
! }
}
}
!
}
-------------------------------------------------------
This SF.Net email is sponsored by: IntelliVIEW -- Interactive Reporting
Tool for open source databases. Create drag-&-drop reports. Save time
by over 75%! Publish reports on the web. Export to DOC, XLS, RTF, etc.
Download a FREE copy at http://www.intelliview.com/go/osdn_nl
_______________________________________________
Nutch-cvs mailing list
Nutch-cvs@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nutch-cvs
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic