[prev in list] [next in list] [prev in thread] [next in thread]
List: hadoop-commits
Subject: svn commit: r449844 - in /lucene/hadoop/trunk: CHANGES.txt
From: cutting () apache ! org
Date: 2006-09-25 22:50:01
Message-ID: 20060925225001.AB76F1A981A () eris ! apache ! org
[Download RAW message or body]
Author: cutting
Date: Mon Sep 25 15:50:00 2006
New Revision: 449844
URL: http://svn.apache.org/viewvc?view=rev&rev=449844
Log:
HADOOP-556. Contrib/streaming: send keep-alive reports to the tasktracker every 10 \
seconds rather than every 100 records. Contributed by Michel.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=449844&r1=449843&r2=449844
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Sep 25 15:50:00 2006
@@ -61,6 +61,11 @@
extend ObjectWritable to handle enums, so that they can be passed
as RPC parameters. (Sanjay Dahiya via cutting)
+16. HADOOP-556. Contrib/streaming: send keep-alive reports to task
+ tracker every 10 seconds rather than every 100 records, to avoid
+ task timeouts. (Michel Tourn via cutting)
+
+
Release 0.6.2 (unreleased)
1. HADOOP-532. Fix a bug reading value-compressed sequence files,
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/o \
rg/apache/hadoop/streaming/PipeMapRed.java?view=diff&rev=449844&r1=449843&r2=449844 \
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java \
(original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java \
Mon Sep 25 15:50:00 2006 @@ -497,8 +497,12 @@
output.collect(key, val);
}
numRecWritten_++;
- if (numRecWritten_ % 100 == 0) {
- logprintln(numRecRead_ + "/" + numRecWritten_);
+ long now = System.currentTimeMillis();
+ if (now-lastStdoutReport > reporterOutDelay_) {
+ lastStdoutReport = now;
+ String hline = "Records R/W=" + numRecRead_ + "/" + numRecWritten_;
+ reporter.setStatus(hline);
+ logprintln(hline);
logflush();
}
}
@@ -511,6 +515,8 @@
OutputCollector output;
Reporter reporter;
byte[] answer;
+ long lastStdoutReport = 0;
+
}
class MRErrorThread extends Thread {
@@ -529,11 +535,11 @@
String lineStr = new String(line, "UTF-8");
logprintln(lineStr);
long now = System.currentTimeMillis();
- if (num < 10 || (now-lastStderrReport > 10*1000)) {
+ if (num < 20 || (now-lastStderrReport > reporterErrDelay_)) {
+ lastStderrReport = now;
String hline = "MRErr: " + lineStr;
System.err.println(hline);
reporter.setStatus(hline);
- lastStderrReport = now;
}
}
} catch (IOException io) {
@@ -671,11 +677,14 @@
long numRecSkipped_ = 0;
long nextRecReadLog_ = 1;
+
long minRecWrittenToEnableSkip_ = Long.MAX_VALUE;
int keyCols_;
final static int ALL_COLS = Integer.MAX_VALUE;
+ long reporterOutDelay_ = 10*1000L;
+ long reporterErrDelay_ = 10*1000L;
long joinDelay_;
JobConf job_;
FileSystem fs_;
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/o \
rg/apache/hadoop/streaming/StreamJob.java?view=diff&rev=449844&r1=449843&r2=449844 \
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java \
(original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java \
Mon Sep 25 15:50:00 2006 @@ -46,7 +46,6 @@
protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName());
final static String REDUCE_NONE = "NONE";
-
private boolean reducerNone_;
public StreamJob(String[] argv, boolean mayExit) {
@@ -107,12 +106,13 @@
redCmd_ = unqualifyIfLocalPath(redCmd_);
}
- void validateNameEqValue(String neqv) {
+ String[] parseNameEqValue(String neqv) {
String[] nv = neqv.split("=", 2);
if (nv.length < 2) {
fail("Invalid name=value spec: " + neqv);
}
msg("Recording name=value: name=" + nv[0] + " value=" + nv[1]);
+ return nv;
}
String unqualifyIfLocalPath(String cmd) throws IOException {
@@ -199,17 +199,17 @@
configPath_.add(s);
} else if ((s = optionArg(argv_, i, "-dfs", false)) != null) {
i++;
- userJobConfProps_.add("fs.default.name=" + s);
+ userJobConfProps_.put("fs.default.name", s);
} else if ((s = optionArg(argv_, i, "-jt", false)) != null) {
i++;
- userJobConfProps_.add("mapred.job.tracker=" + s);
+ userJobConfProps_.put("mapred.job.tracker", s);
} else if ((s = optionArg(argv_, i, "-jobconf", false)) != null) {
i++;
- validateNameEqValue(s);
- userJobConfProps_.add(s);
+ String[] nv = parseNameEqValue(s);
+ userJobConfProps_.put(nv[0], nv[1]);
} else if ((s = optionArg(argv_, i, "-cmdenv", false)) != null) {
i++;
- validateNameEqValue(s);
+ parseNameEqValue(s);
if (addTaskEnvironment_.length() > 0) {
addTaskEnvironment_ += " ";
}
@@ -389,8 +389,9 @@
// First try an explicit spec: it's too hard to find our own location in this \
case:
// $HADOOP_HOME/bin/hadoop jar \
/not/first/on/classpath/custom-hadoop-streaming.jar
// where findInClasspath() would find the version of hadoop-streaming.jar in \
$HADOOP_HOME
- String runtimeClasses = jobConf_.get("stream.shipped.hadoopstreaming"); // jar \
or class dir
-
+ String runtimeClasses = userJobConfProps_.get("stream.shipped.hadoopstreaming"); \
// jar or class dir +System.out.println(runtimeClasses + \
"=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming"); +
if (runtimeClasses == null) {
runtimeClasses = StreamUtil.findInClasspath(StreamJob.class.getName());
}
@@ -433,13 +434,15 @@
}
protected void setUserJobConfProps(boolean doEarlyProps) {
- Iterator it = userJobConfProps_.iterator();
+ Iterator it = userJobConfProps_.keySet().iterator();
while (it.hasNext()) {
- String prop = (String) it.next();
- String[] nv = prop.split("=", 2);
- if (doEarlyProps == nv[0].equals("fs.default.name")) {
- msg("xxxJobConf: set(" + nv[0] + ", " + nv[1] + ") early=" + doEarlyProps);
- jobConf_.set(nv[0], nv[1]);
+ String key = (String) it.next();
+ String val = (String)userJobConfProps_.get(key);
+ boolean earlyName = key.equals("fs.default.name");
+ earlyName |= key.equals("stream.shipped.hadoopstreaming");
+ if (doEarlyProps == earlyName) {
+ msg("xxxJobConf: set(" + key + ", " + val + ") early=" + doEarlyProps);
+ jobConf_.set(key, val);
}
}
}
@@ -752,7 +755,8 @@
protected boolean hasSimpleInputSpecs_;
protected ArrayList packageFiles_ = new ArrayList(); // <String>
protected ArrayList shippedCanonFiles_ = new ArrayList(); // <String>
- protected ArrayList userJobConfProps_ = new ArrayList(); // <String> name=value
+ //protected ArrayList userJobConfProps_ = new ArrayList(); // <String> name=value
+ protected TreeMap<String, String> userJobConfProps_ = new TreeMap<String, \
String>(); protected String output_;
protected String mapsideoutURI_;
protected String mapCmd_;
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic