[prev in list] [next in list] [prev in thread] [next in thread] 

List:       hadoop-commits
Subject:    svn commit: r449844 - in /lucene/hadoop/trunk: CHANGES.txt
From:       cutting () apache ! org
Date:       2006-09-25 22:50:01
Message-ID: 20060925225001.AB76F1A981A () eris ! apache ! org
[Download RAW message or body]

Author: cutting
Date: Mon Sep 25 15:50:00 2006
New Revision: 449844

URL: http://svn.apache.org/viewvc?view=rev&rev=449844
Log:
HADOOP-556.  Contrib/streaming: send keep-alive reports to the tasktracker every 10 \
seconds rather than every 100 records.  Contributed by Michel.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
  lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java


Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=449844&r1=449843&r2=449844
 ==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Sep 25 15:50:00 2006
@@ -61,6 +61,11 @@
     extend ObjectWritable to handle enums, so that they can be passed
     as RPC parameters.  (Sanjay Dahiya via cutting)
 
+16. HADOOP-556.  Contrib/streaming: send keep-alive reports to task
+    tracker every 10 seconds rather than every 100 records, to avoid
+    task timeouts.  (Michel Tourn via cutting)
+
+
 Release 0.6.2 (unreleased)
 
 1. HADOOP-532.  Fix a bug reading value-compressed sequence files,

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
                
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/o \
rg/apache/hadoop/streaming/PipeMapRed.java?view=diff&rev=449844&r1=449843&r2=449844 \
                ==============================================================================
                
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java \
                (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java \
Mon Sep 25 15:50:00 2006 @@ -497,8 +497,12 @@
             output.collect(key, val);
           }
           numRecWritten_++;
-          if (numRecWritten_ % 100 == 0) {
-            logprintln(numRecRead_ + "/" + numRecWritten_);
+          long now = System.currentTimeMillis();
+          if (now-lastStdoutReport > reporterOutDelay_) {
+            lastStdoutReport = now;
+            String hline = "Records R/W=" + numRecRead_ + "/" + numRecWritten_;
+            reporter.setStatus(hline);
+            logprintln(hline);
             logflush();
           }
         }
@@ -511,6 +515,8 @@
     OutputCollector output;
     Reporter reporter;
     byte[] answer;
+    long lastStdoutReport = 0;
+    
   }
 
   class MRErrorThread extends Thread {
@@ -529,11 +535,11 @@
           String lineStr = new String(line, "UTF-8");
           logprintln(lineStr);
           long now = System.currentTimeMillis(); 
-          if (num < 10 || (now-lastStderrReport > 10*1000)) {
+          if (num < 20 || (now-lastStderrReport > reporterErrDelay_)) {
+            lastStderrReport = now;
             String hline = "MRErr: " + lineStr;
             System.err.println(hline);
             reporter.setStatus(hline);
-            lastStderrReport = now;
           }
         }
       } catch (IOException io) {
@@ -671,11 +677,14 @@
   long numRecSkipped_ = 0;
   long nextRecReadLog_ = 1;
 
+  
   long minRecWrittenToEnableSkip_ = Long.MAX_VALUE;
 
   int keyCols_;
   final static int ALL_COLS = Integer.MAX_VALUE;
 
+  long reporterOutDelay_ = 10*1000L; 
+  long reporterErrDelay_ = 10*1000L; 
   long joinDelay_;
   JobConf job_;
   FileSystem fs_;

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
                
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/o \
rg/apache/hadoop/streaming/StreamJob.java?view=diff&rev=449844&r1=449843&r2=449844 \
                ==============================================================================
                
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java \
                (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java \
Mon Sep 25 15:50:00 2006 @@ -46,7 +46,6 @@
 
   protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName());
   final static String REDUCE_NONE = "NONE";
-
   private boolean reducerNone_;
 
   public StreamJob(String[] argv, boolean mayExit) {
@@ -107,12 +106,13 @@
     redCmd_ = unqualifyIfLocalPath(redCmd_);
   }
 
-  void validateNameEqValue(String neqv) {
+  String[] parseNameEqValue(String neqv) {
     String[] nv = neqv.split("=", 2);
     if (nv.length < 2) {
       fail("Invalid name=value spec: " + neqv);
     }
     msg("Recording name=value: name=" + nv[0] + " value=" + nv[1]);
+    return nv;
   }
 
   String unqualifyIfLocalPath(String cmd) throws IOException {
@@ -199,17 +199,17 @@
         configPath_.add(s);
       } else if ((s = optionArg(argv_, i, "-dfs", false)) != null) {
         i++;
-        userJobConfProps_.add("fs.default.name=" + s);
+        userJobConfProps_.put("fs.default.name", s);
       } else if ((s = optionArg(argv_, i, "-jt", false)) != null) {
         i++;
-        userJobConfProps_.add("mapred.job.tracker=" + s);
+        userJobConfProps_.put("mapred.job.tracker", s);
       } else if ((s = optionArg(argv_, i, "-jobconf", false)) != null) {
         i++;
-        validateNameEqValue(s);
-        userJobConfProps_.add(s);
+        String[] nv = parseNameEqValue(s);
+        userJobConfProps_.put(nv[0], nv[1]);
       } else if ((s = optionArg(argv_, i, "-cmdenv", false)) != null) {
         i++;
-        validateNameEqValue(s);
+        parseNameEqValue(s);
         if (addTaskEnvironment_.length() > 0) {
           addTaskEnvironment_ += " ";
         }
@@ -389,8 +389,9 @@
     // First try an explicit spec: it's too hard to find our own location in this \
                case:
     // $HADOOP_HOME/bin/hadoop jar \
                /not/first/on/classpath/custom-hadoop-streaming.jar
     // where findInClasspath() would find the version of hadoop-streaming.jar in \
                $HADOOP_HOME
-    String runtimeClasses = jobConf_.get("stream.shipped.hadoopstreaming"); // jar \
                or class dir
-
+    String runtimeClasses = userJobConfProps_.get("stream.shipped.hadoopstreaming"); \
// jar or class dir +System.out.println(runtimeClasses + \
"=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming"); +    
     if (runtimeClasses == null) {
       runtimeClasses = StreamUtil.findInClasspath(StreamJob.class.getName());
     }
@@ -433,13 +434,15 @@
   }
 
   protected void setUserJobConfProps(boolean doEarlyProps) {
-    Iterator it = userJobConfProps_.iterator();
+    Iterator it = userJobConfProps_.keySet().iterator();
     while (it.hasNext()) {
-      String prop = (String) it.next();
-      String[] nv = prop.split("=", 2);
-      if (doEarlyProps == nv[0].equals("fs.default.name")) {
-        msg("xxxJobConf: set(" + nv[0] + ", " + nv[1] + ") early=" + doEarlyProps);
-        jobConf_.set(nv[0], nv[1]);
+      String key = (String) it.next();
+      String val = (String)userJobConfProps_.get(key);
+      boolean earlyName = key.equals("fs.default.name");
+      earlyName |= key.equals("stream.shipped.hadoopstreaming");
+      if (doEarlyProps == earlyName) {
+        msg("xxxJobConf: set(" + key + ", " + val + ") early=" + doEarlyProps);
+        jobConf_.set(key, val);
       }
     }
   }
@@ -752,7 +755,8 @@
   protected boolean hasSimpleInputSpecs_;
   protected ArrayList packageFiles_ = new ArrayList(); // <String>
   protected ArrayList shippedCanonFiles_ = new ArrayList(); // <String>
-  protected ArrayList userJobConfProps_ = new ArrayList(); // <String> name=value
+  //protected ArrayList userJobConfProps_ = new ArrayList(); // <String> name=value
+  protected TreeMap<String, String> userJobConfProps_ = new TreeMap<String, \
String>();   protected String output_;
   protected String mapsideoutURI_;
   protected String mapCmd_;


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic