[prev in list] [next in list] [prev in thread] [next in thread] 

List:       hadoop-commits
Subject:    svn commit: r500410 - in /lucene/hadoop/trunk: ./
From:       cutting () apache ! org
Date:       2007-01-26 23:37:48
Message-ID: 20070126233748.F1D691A981A () eris ! apache ! org
[Download RAW message or body]

Author: cutting
Date: Fri Jan 26 15:37:46 2007
New Revision: 500410

URL: http://svn.apache.org/viewvc?view=rev&rev=500410
Log:
HADOOP-248.  Optimize location of map outputs to no longer use random probes.  \
Contributed by Devaraj.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=500410&r1=500409&r2=500410
 ==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Jan 26 15:37:46 2007
@@ -87,6 +87,9 @@
     with different versions of Lucene without worrying about CLASSPATH
     order.  (Milind Bhandarkar via cutting)
 
+27. HADOOP-248.  Optimize location of map outputs to not use random
+    probes.  (Devaraj Das via cutting)
+
 
 Release 0.10.1 - 2007-01-10
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
                
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?view=diff&rev=500410&r1=500409&r2=500410
 ==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java \
                (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java \
Fri Jan 26 15:37:46 2007 @@ -28,11 +28,10 @@
  */ 
 interface InterTrackerProtocol extends VersionedProtocol {
   /**
-   * version 3 introduced to replace 
-   * emitHearbeat/pollForNewTask/pollForTaskWithClosedJob with
-   * {@link #heartbeat(TaskTrackerStatus, boolean, boolean, short)}
+   * version 4 introduced that removes locateMapOutputs and instead uses
+   * getTaskCompletionEvents to figure finished maps and fetch the outputs
    */
-  public static final long versionID = 3L;
+  public static final long versionID = 4L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;
@@ -62,18 +61,6 @@
           boolean initialContact, boolean acceptNewTasks, short responseId)
   throws IOException;
 
-  /** Called by a reduce task to find which map tasks are completed.
-   *
-   * @param jobId the job id
-   * @param mapTasksNeeded an array of the mapIds that we need
-   * @param partition the reduce's id
-   * @return an array of MapOutputLocation
-   */
-  MapOutputLocation[] locateMapOutputs(String jobId, 
-                                       int[] mapTasksNeeded,
-                                       int partition
-                                       ) throws IOException;
-
   /**
    * The task tracker calls this once, to discern where it can find
    * files referred to by the JobTracker
@@ -96,11 +83,12 @@
    * Returns empty aray if no events are available. 
    * @param jobid job id 
    * @param fromEventId event id to start from. 
+   * @param maxEvents the max number of events we want to look at
    * @return array of task completion events. 
    * @throws IOException
    */
   TaskCompletionEvent[] getTaskCompletionEvents(
-      String jobid, int fromEventId) throws IOException; 
+      String jobid, int fromEventId, int maxEvents) throws IOException;
   
 }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?view=diff&rev=500410&r1=500409&r2=500410
 ==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Fri Jan 26 \
15:37:46 2007 @@ -157,7 +157,7 @@
         public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
             int startFrom) throws IOException{
           return jobSubmitClient.getTaskCompletionEvents(
-              getJobID(), startFrom); 
+              getJobID(), startFrom, 10); 
         }
 
         /**

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=500410&r1=500409&r2=500410
 ==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java \
                (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Jan \
26 15:37:46 2007 @@ -291,24 +291,40 @@
           
           if (state == TaskStatus.State.SUCCEEDED) {
             this.taskCompletionEvents.add( new TaskCompletionEvent(
-                taskCompletionEventTracker++, 
-                status.getTaskId(), 
+                taskCompletionEventTracker, 
+                status.getTaskId(),
+                tip.idWithinJob(),
+                status.getIsMap(),
                 TaskCompletionEvent.Status.SUCCEEDED,
                 httpTaskLogLocation ));
+            tip.setSuccessEventNumber(taskCompletionEventTracker);
             completedTask(tip, status, metrics);
           } else if (state == TaskStatus.State.FAILED ||
                      state == TaskStatus.State.KILLED) {
             this.taskCompletionEvents.add( new TaskCompletionEvent(
-                taskCompletionEventTracker++, 
-                status.getTaskId(), 
+                taskCompletionEventTracker, 
+                status.getTaskId(),
+                tip.idWithinJob(),
+                status.getIsMap(),
                 TaskCompletionEvent.Status.FAILED, 
                 httpTaskLogLocation ));
+            // Get the event number for the (possibly) previously successful
+            // task. If there exists one, then set that status to OBSOLETE 
+            int eventNumber;
+            if ((eventNumber = tip.getSuccessEventNumber()) != -1) {
+              TaskCompletionEvent t = 
+                this.taskCompletionEvents.get(eventNumber);
+              if (t.getTaskId().equals(status.getTaskId()))
+                t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE);
+            }
             // Tell the job to fail the relevant task
             failedTask(tip, status.getTaskId(), status, status.getTaskTracker(),
                        wasRunning, wasComplete);
           }          
         }
 
+        taskCompletionEventTracker++;
+        
         //
         // Update JobInProgress status
         //
@@ -778,12 +794,14 @@
        return null;
     }
     
-    public TaskCompletionEvent[] getTaskCompletionEvents(int fromEventId) {
+    public TaskCompletionEvent[] getTaskCompletionEvents(int fromEventId, 
+        int maxEvents) {
       TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
       if( taskCompletionEvents.size() > fromEventId) {
+        int actualMax = Math.min(maxEvents, 
+            (taskCompletionEvents.size() - fromEventId));
         events = (TaskCompletionEvent[])taskCompletionEvents.subList(
-            fromEventId, taskCompletionEvents.size()).
-            toArray(events);        
+            fromEventId, actualMax + fromEventId).toArray(events);        
       }
       return events; 
     }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
                
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?view=diff&rev=500410&r1=500409&r2=500410
 ==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java \
                (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java \
Fri Jan 26 15:37:46 2007 @@ -85,11 +85,12 @@
      * Get task completion events for the jobid, starting from fromEventId. 
      * Returns empty aray if no events are available. 
      * @param jobid job id 
-     * @param fromEventId event id to start from. 
+     * @param fromEventId event id to start from.
+     * @param maxEvents the max number of events we want to look at 
      * @return array of task completion events. 
      * @throws IOException
      */
     public TaskCompletionEvent[] getTaskCompletionEvents(
-                String jobid, int fromEventId) throws IOException;
+              String jobid, int fromEventId, int maxEvents) throws IOException;
     
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=500410&r1=500409&r2=500410
 ==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Fri Jan 26 \
15:37:46 2007 @@ -1263,48 +1263,6 @@
     }
 
     /**
-     * A TaskTracker wants to know the physical locations of completed, but not
-     * yet closed, tasks.  This exists so the reduce task thread can locate
-     * map task outputs.
-     */
-    public synchronized MapOutputLocation[] 
-             locateMapOutputs(String jobId, int[] mapTasksNeeded, int reduce) 
-    throws IOException {
-        // Check to make sure that the job hasn't 'completed'.
-        JobInProgress job = getJob(jobId);
-        if (job.status.getRunState() != JobStatus.RUNNING) {
-          return new MapOutputLocation[0];
-        }
-        
-        ArrayList result = new ArrayList(mapTasksNeeded.length);
-        for (int i = 0; i < mapTasksNeeded.length; i++) {
-          TaskStatus status = job.findFinishedMap(mapTasksNeeded[i]);
-          if (status != null) {
-             String trackerId = 
-               (String) taskidToTrackerMap.get(status.getTaskId());
-             // Safety check, if we can't find the taskid in 
-             // taskidToTrackerMap and job isn't 'running', then just
-             // return an empty array
-             if (trackerId == null && 
-                     job.status.getRunState() != JobStatus.RUNNING) {
-               return new MapOutputLocation[0];
-             }
-             
-             TaskTrackerStatus tracker;
-             synchronized (taskTrackers) {
-               tracker = (TaskTrackerStatus) taskTrackers.get(trackerId);
-             }
-             result.add(new MapOutputLocation(status.getTaskId(), 
-                                              mapTasksNeeded[i],
-                                              tracker.getHost(), 
-                                              tracker.getHttpPort()));
-          }
-        }
-        return (MapOutputLocation[]) 
-               result.toArray(new MapOutputLocation[result.size()]);
-    }
-
-    /**
      * Grab the local fs name
      */
     public synchronized String getFilesystemName() throws IOException {
@@ -1435,14 +1393,14 @@
     /* 
      * Returns a list of TaskCompletionEvent for the given job, 
      * starting from fromEventId.
-     * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String, \
int) +     * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String, \
                int, int)
      */
     public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
-        String jobid, int fromEventId) throws IOException{
+        String jobid, int fromEventId, int maxEvents) throws IOException{
       TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
       JobInProgress job = (JobInProgress)this.jobs.get(jobid);
       if (null != job) {
-        events = job.getTaskCompletionEvents(fromEventId);
+        events = job.getTaskCompletionEvents(fromEventId, maxEvents);
       }
       return events;
     }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=500410&r1=500409&r2=500410
 ==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java \
                (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Fri Jan \
26 15:37:46 2007 @@ -244,7 +244,7 @@
 
   public JobStatus[] jobsToComplete() {return null;}
   public TaskCompletionEvent[] getTaskCompletionEvents(
-      String jobid, int fromEventId) throws IOException{
+      String jobid, int fromEventId, int maxEvents) throws IOException{
     return TaskCompletionEvent.EMPTY_ARRAY;
   }
   

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=500410&r1=500409&r2=500410
 ==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java \
                (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Fri \
Jan 26 15:37:46 2007 @@ -122,9 +122,19 @@
   
   /**
    * the number of map output locations to poll for at one time
+   */  
+  private int probe_sample_size = 50;
+  
+  /**
+   * a Random used during the map output fetching
    */
-  private static final int PROBE_SAMPLE_SIZE = 50;
-
+  private Random randForProbing;
+  
+  /**
+   * a hashmap from mapId to MapOutputLocation for retrials
+   */
+  private Map<Integer, MapOutputLocation> retryFetches = new HashMap();
+  
   /** Represents the result of an attempt to copy a map output */
   private class CopyResult {
     
@@ -368,6 +378,10 @@
     Random         backoff = new Random();
     final Progress copyPhase = getTask().getProgress().phase();
     
+    //tweak the probe sample size (make it a function of numCopiers)
+    probe_sample_size = Math.max(numCopiers*5, 50);
+    randForProbing = new Random(reduceTask.getPartition() * 100);
+    
     for (int i = 0; i < numOutputs; i++) {
       neededOutputs.add(new Integer(i));
       copyPhase.addPhase();       // add sub-phase per file
@@ -385,6 +399,8 @@
     // start the clock for bandwidth measurement
     long startTime = System.currentTimeMillis();
     long currentTime = startTime;
+    int fromEventId = 0;
+    
     PingTimer pingTimer = new PingTimer();
     pingTimer.setName("Map output copy reporter for task " + 
                       reduceTask.getTaskId());
@@ -401,17 +417,36 @@
         LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
                  " map output location(s)");
         try {
-          MapOutputLocation[] locs = queryJobTracker(neededOutputs, jobClient);
+          // the call to queryJobTracker will modify fromEventId to a value
+          // that it should be for the next call to queryJobTracker
+          MapOutputLocation[] locs = queryJobTracker(fromEventId, jobClient);
           
           // remove discovered outputs from needed list
           // and put them on the known list
+          int gotLocs = (locs == null ? 0 : locs.length);
           for (int i=0; i < locs.length; i++) {
-            neededOutputs.remove(new Integer(locs[i].getMapId()));
-            knownOutputs.add(locs[i]);
+            // check whether we actually need an output. It could happen
+            // that a map task that successfully ran earlier got lost, but
+            // if we already have copied the output of that unfortunate task
+            // we need not copy it again from the new TT (we will ignore 
+            // the event for the new rescheduled execution)
+            if(neededOutputs.remove(new Integer(locs[i].getMapId()))) {
+              // remove the mapId from the retryFetches hashmap since we now
+              // prefer the new location instead of what we saved earlier
+              retryFetches.remove(new Integer(locs[i].getMapId()));
+              knownOutputs.add(locs[i]);
+              gotLocs--;
+            }
+            
           }
+          // now put the remaining hash entries for the failed fetches 
+          // and clear the hashmap
+          knownOutputs.addAll(retryFetches.values());
           LOG.info(reduceTask.getTaskId() +
-                   " Got " + (locs == null ? 0 : locs.length) + 
-                   " map outputs from jobtracker");
+                " Got " + gotLocs + 
+                " new map outputs from jobtracker and " + retryFetches.size() +
+                " map outputs from previous failures");
+          retryFetches.clear();
         }
         catch (IOException ie) {
           LOG.warn(reduceTask.getTaskId() +
@@ -485,6 +520,7 @@
           } else {
             // this copy failed, put it back onto neededOutputs
             neededOutputs.add(new Integer(cr.getMapId()));
+            retryFetches.put(new Integer(cr.getMapId()), cr.getLocation());
           
             // wait a random amount of time for next contact
             currentTime = System.currentTimeMillis();
@@ -504,6 +540,7 @@
             while (locIt.hasNext()) {
               MapOutputLocation loc = (MapOutputLocation)locIt.next();
               if (cr.getHost().equals(loc.getHost())) {
+                retryFetches.put(new Integer(loc.getMapId()), loc);
                 locIt.remove();
                 neededOutputs.add(new Integer(loc.getMapId()));
               }
@@ -514,7 +551,7 @@
         }
         
         // ensure we have enough to keep us busy
-        if (numInFlight < lowThreshold && (numOutputs-numCopied) > \
PROBE_SAMPLE_SIZE) { +        if (numInFlight < lowThreshold && \
(numOutputs-numCopied) > probe_sample_size) {  break;
         }
       }
@@ -608,28 +645,16 @@
   }
   
   /** Queries the job tracker for a set of outputs ready to be copied
-   * @param neededOutputs the list of currently unknown outputs
+   * @param fromEventId the first event ID we want to start from, this will be
+   * modified by the call to this method
    * @param jobClient the job tracker
    * @return a set of locations to copy outputs from
    * @throws IOException
    */  
-  private MapOutputLocation[] queryJobTracker(List neededOutputs,
+  private MapOutputLocation[] queryJobTracker(int fromEventId, 
                                               InterTrackerProtocol jobClient)
   throws IOException {
     
-    // query for a just a random subset of needed segments so that we don't
-    // overwhelm jobtracker.  ideally perhaps we could send a more compact
-    // representation of all needed, i.e., a bit-vector
-    int     checkSize = Math.min(PROBE_SAMPLE_SIZE, neededOutputs.size());
-    int     neededIds[] = new int[checkSize];
-      
-    Collections.shuffle(neededOutputs);
-      
-    ListIterator itr = neededOutputs.listIterator();
-    for (int i=0; i < checkSize; i++) {
-      neededIds[i] = ((Integer)itr.next()).intValue();
-    }
-
     long currentTime = System.currentTimeMillis();    
     long pollTime = lastPollTime + MIN_POLL_INTERVAL;
     while (currentTime < pollTime) {
@@ -640,9 +665,28 @@
     }
     lastPollTime = currentTime;
 
-    return jobClient.locateMapOutputs(reduceTask.getJobId().toString(), 
-                                      neededIds,
-                                      reduceTask.getPartition());
+    TaskCompletionEvent t[] = jobClient.getTaskCompletionEvents(
+                                      reduceTask.getJobId().toString(),
+                                      fromEventId,
+                                      probe_sample_size);
+    
+    List <MapOutputLocation> mapOutputsList = new ArrayList();
+    for (int i = 0; i < t.length; i++) {
+      if (t[i].isMap && 
+          t[i].getTaskStatus() == TaskCompletionEvent.Status.SUCCEEDED) {
+        URI u = URI.create(t[i].getTaskTrackerHttp());
+        String host = u.getHost();
+        int port = u.getPort();
+        String taskId = t[i].getTaskId();
+        int mId = t[i].idWithinJob();
+        mapOutputsList.add(new MapOutputLocation(taskId, mId, host, port));
+      }
+    }
+    Collections.shuffle(mapOutputsList, randForProbing);
+    MapOutputLocation[] locations =
+                        new MapOutputLocation[mapOutputsList.size()];
+    fromEventId += t.length;
+    return mapOutputsList.toArray(locations);
   }
 
   

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
                
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java?view=diff&rev=500410&r1=500409&r2=500410
 ==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java \
                (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java \
Fri Jan 26 15:37:46 2007 @@ -12,12 +12,14 @@
  *
  */
 public class TaskCompletionEvent implements Writable{
-    static public enum Status {FAILED, SUCCEEDED};
+    static public enum Status {FAILED, SUCCEEDED, OBSOLETE};
     
     private int eventId ; 
     private String taskTrackerHttp ;
     private String taskId ;
     Status status ; 
+    boolean isMap = false ;
+    private int idWithinJob;
     public static final TaskCompletionEvent[] EMPTY_ARRAY = 
         new TaskCompletionEvent[0];
     /**
@@ -35,11 +37,15 @@
      * @param taskTrackerHttp task tracker's host:port for http. 
      */
     public TaskCompletionEvent(int eventId, 
-        String taskId, 
+        String taskId,
+        int idWithinJob,
+        boolean isMap,
         Status status, 
         String taskTrackerHttp){
       
-        this.taskId = taskId ; 
+        this.taskId = taskId ;
+        this.idWithinJob = idWithinJob ;
+        this.isMap = isMap ;
         this.eventId = eventId ; 
         this.status =status ; 
         this.taskTrackerHttp = taskTrackerHttp ;
@@ -114,17 +120,28 @@
         return buf.toString();
     }
     
+    public boolean isMapTask() {
+        return isMap;
+    }
+    
+    public int idWithinJob() {
+      return idWithinJob;
+    }
     //////////////////////////////////////////////
     // Writable
     //////////////////////////////////////////////
     public void write(DataOutput out) throws IOException {
         WritableUtils.writeString(out, taskId); 
+        WritableUtils.writeVInt(out, idWithinJob);
+        out.writeBoolean(isMap);
         WritableUtils.writeEnum(out, status); 
         WritableUtils.writeString(out, taskTrackerHttp);
     }
   
     public void readFields(DataInput in) throws IOException {
         this.taskId = WritableUtils.readString(in) ; 
+        this.idWithinJob = WritableUtils.readVInt(in);
+        this.isMap = in.readBoolean();
         this.status = WritableUtils.readEnum(in, Status.class);
         this.taskTrackerHttp = WritableUtils.readString(in);
     }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?view=diff&rev=500410&r1=500409&r2=500410
 ==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java \
                (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Fri Jan \
26 15:37:46 2007 @@ -60,6 +60,7 @@
     private JobInProgress job;
 
     // Status of the TIP
+    private int successEventNumber = -1;
     private int numTaskFailures = 0;
     private double progress = 0;
     private String state = "";
@@ -139,6 +140,15 @@
     }
     
     /**
+     * Return the index of the tip within the job, so "tip_0002_m_012345"
+     * would return 12345;
+     * @return int the tip index
+     */
+     public int idWithinJob() {
+       return partition;
+     }    
+
+    /**
      * Initialization common to Map and Reduce
      */
     void init(String jobUniqueString) {
@@ -567,5 +577,19 @@
      */
     public int getIdWithinJob() {
       return partition;
+    }
+    
+    /**
+     * Set the event number that was raised for this tip
+     */
+    public void setSuccessEventNumber(int eventNumber) {
+      successEventNumber = eventNumber;
+    }
+       
+    /**
+     * Get the event number that was raised for this tip
+     */
+    public int getSuccessEventNumber() {
+      return successEventNumber;
     }
 }


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic