[prev in list] [next in list] [prev in thread] [next in thread]
List: nutch-cvs
Subject: [Nutch-cvs] nutch/src/java/net/nutch/mapReduce InterTrackerProtocol.java,1.1,1.2 JobTracker.java,1.2
From: Michael Cafarella <mike_cafarella () users ! sourceforge ! net>
Date: 2005-01-24 18:32:15
Message-ID: E1Ct90n-0001nb-CD () sc8-pr-cvs1 ! sourceforge ! net
[Download RAW message or body]
Update of /cvsroot/nutch/nutch/src/java/net/nutch/mapReduce
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv6600
Modified Files:
InterTrackerProtocol.java JobTracker.java Task.java
TaskTracker.java
Removed Files:
TaskProfile.java TaskRunner.java
Log Message:
Modify the InterTrackerProfile to use a single Task object, not an
array of them. Replace "TaskProfile" with "Task" and its subclasses.
Remove obsolete "TaskRunner".
--- TaskProfile.java DELETED ---
Index: TaskTracker.java
===================================================================
RCS file: /cvsroot/nutch/nutch/src/java/net/nutch/mapReduce/TaskTracker.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -C2 -d -r1.2 -r1.3
*** TaskTracker.java 24 Jan 2005 10:07:06 -0000 1.2
--- TaskTracker.java 24 Jan 2005 18:32:13 -0000 1.3
***************
*** 26,33 ****
LogFormatter.getLogger("net.nutch.mapReduce.TaskTracker");
! UTF8 localName;
InetSocketAddress jobTrackAddr;
- TreeMap tasks = new TreeMap();
TreeMap taskProfiles = new TreeMap();
TreeMap taskStatus = new TreeMap();
--- 26,32 ----
LogFormatter.getLogger("net.nutch.mapReduce.TaskTracker");
! UTF8 taskTrackerName;
InetSocketAddress jobTrackAddr;
TreeMap taskProfiles = new TreeMap();
TreeMap taskStatus = new TreeMap();
***************
*** 41,46 ****
* Start with the local machine name, and the addr of the target JobTracker
*/
! public TaskTracker(String machineName, InetSocketAddress jobTrackAddr) throws \
IOException {
! this.localName = new UTF8(machineName);
this.jobTrackAddr = jobTrackAddr;
this.taskReportServer = RPC.getServer(this, TASKREPORT_PORT);
--- 40,45 ----
* Start with the local machine name, and the addr of the target JobTracker
*/
! public TaskTracker(String taskTrackerName, InetSocketAddress jobTrackAddr) \
throws IOException {
! this.taskTrackerName = new UTF8(taskTrackerName);
this.jobTrackAddr = jobTrackAddr;
this.taskReportServer = RPC.getServer(this, TASKREPORT_PORT);
***************
*** 79,84 ****
// Emit standard hearbeat message to check in with JobTracker
//
! TaskProfile toRun[] = (TaskProfile[]) ((ArrayWritable) \
jobClient.emitHeartbeat(createStatusReport())).toArray();
! launchTasks(toRun);
lastHeartbeat = now;
}
--- 78,85 ----
// Emit standard hearbeat message to check in with JobTracker
//
! Task toRun = jobClient.emitHeartbeat(createStatusReport());
! if (toRun != null) {
! launchTask(toRun);
! }
lastHeartbeat = now;
}
***************
*** 102,106 ****
(ts.getRunState() == TaskStatus.FAILED)) {
it.remove();
- tasks.remove(taskid);
taskStatus.remove(taskid);
}
--- 103,106 ----
***************
*** 108,135 ****
}
}
! return new TaskTrackerStatus(localName, taskReports);
}
/**
! * launchTasks(TaskProfile tasks[]) receives a set of Tasks to execute.
! * Each TaskProfile includes info on how, exactly, the Task should be run.
* We will include status info in the heartbeat message sent to the JobTracker.
*/
! void launchTasks(TaskProfile toLaunch[]) {
! for (int i = 0; i < toLaunch.length; i++) {
! UTF8 curId = new UTF8(toLaunch[i].getTaskId());
! Task t = (Task) tasks.get(curId);
! if (t == null) {
! if (toLaunch[i].isMapTask()) {
! t = new MapTask(toLaunch[i].getJarPath().toString(), \
curId.toString(), toLaunch[i].getSplit());
! } else {
! t = new ReduceTask(toLaunch[i].getJarPath().toString(), \
curId.toString());
! }
! tasks.put(curId, t);
! taskProfiles.put(curId, toLaunch[i]);
! taskStatus.put(curId, new TaskStatus(curId, 0.0f, \
TaskStatus.RUNNING));
! new LaunchTask(t).start();
! }
}
}
--- 108,126 ----
}
}
! return new TaskTrackerStatus(taskTrackerName, taskReports);
}
/**
! * launchTask(Task) receives a Task to execute.
! * Each Task includes info on how, exactly, the Task should be run.
* We will include status info in the heartbeat message sent to the JobTracker.
*/
! void launchTask(Task t) {
! UTF8 curId = new UTF8(t.getTaskId());
! if (taskProfiles.get(curId) == null) {
! taskProfiles.put(curId, t);
! taskStatus.put(curId, new TaskStatus(curId, 0.0f, TaskStatus.RUNNING));
! new LaunchTask(t).start();
}
}
***************
*** 170,174 ****
*/
public Task getTask(UTF8 taskid) throws IOException {
! return (Task) tasks.get(taskid);
}
--- 161,165 ----
*/
public Task getTask(UTF8 taskid) throws IOException {
! return (Task) taskProfiles.get(taskid);
}
***************
*** 198,204 ****
***********************************************************/
class LaunchTask extends Thread {
! Task task;
! public LaunchTask(Task task) {
! this.task = task;
}
--- 189,195 ----
***********************************************************/
class LaunchTask extends Thread {
! Task t;
! public LaunchTask(Task t) {
! this.t = t;
}
***************
*** 209,216 ****
//"-verbose",
"-cp",
! task.getJarPath(), // classpath is \
task's jar
Child.class.getName(), // main is Child
TASKREPORT_PORT+"", // pass umbilical \
port
! task.getTaskId().toString() // pass task \
identifier });
} catch (IOException ie) {
--- 200,207 ----
//"-verbose",
"-cp",
! t.getJarPath(), // classpath is \
task's jar
Child.class.getName(), // main is Child
TASKREPORT_PORT+"", // pass umbilical \
port
! t.getTaskId().toString() // pass task \
identifier });
} catch (IOException ie) {
***************
*** 219,223 ****
// Perhaps attach them to the error log for the task
} finally {
! reportTaskFinished(new UTF8(task.getTaskId()));
}
}
--- 210,214 ----
// Perhaps attach them to the error log for the task
} finally {
! reportTaskFinished(new UTF8(t.getTaskId()));
}
}
***************
*** 275,283 ****
public static void main(String argv[]) throws IOException {
if (argv.length < 2) {
! System.out.println("TaskTracker <localMachine> <jobTrackerAddr:port>");
System.exit(-1);
}
! String localMachine = argv[0];
String jobTrackerStr = argv[1];
--- 266,274 ----
public static void main(String argv[]) throws IOException {
if (argv.length < 2) {
! System.out.println("TaskTracker <taskTrackerName> \
<jobTrackerAddr:port>"); System.exit(-1);
}
! String taskTrackerName = argv[0];
String jobTrackerStr = argv[1];
***************
*** 292,296 ****
InetSocketAddress jobTrackerAddr = new InetSocketAddress(jobTrackerName, \
jobTrackerPort);
! TaskTracker tt = new TaskTracker(localMachine, jobTrackerAddr);
try {
while (true) {
--- 283,287 ----
InetSocketAddress jobTrackerAddr = new InetSocketAddress(jobTrackerName, \
jobTrackerPort);
! TaskTracker tt = new TaskTracker(taskTrackerName, jobTrackerAddr);
try {
while (true) {
--- TaskRunner.java DELETED ---
Index: JobTracker.java
===================================================================
RCS file: /cvsroot/nutch/nutch/src/java/net/nutch/mapReduce/JobTracker.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -C2 -d -r1.2 -r1.3
*** JobTracker.java 24 Jan 2005 10:07:06 -0000 1.2
--- JobTracker.java 24 Jan 2005 18:32:12 -0000 1.3
***************
*** 111,115 ****
Vector unassignedTasks = new Vector();
! // taskid->TaskProfile map
TreeMap taskProfiles = new TreeMap();
--- 111,115 ----
Vector unassignedTasks = new Vector();
! // taskid->Task map
TreeMap taskProfiles = new TreeMap();
***************
*** 203,207 ****
// InterTrackerProtocol
////////////////////////////////////////////////////
! public ArrayWritable emitHeartbeat(TaskTrackerStatus status) {
status.setLastSeen(System.currentTimeMillis());
synchronized (taskTrackers) {
--- 203,207 ----
// InterTrackerProtocol
////////////////////////////////////////////////////
! public Task emitHeartbeat(TaskTrackerStatus status) {
status.setLastSeen(System.currentTimeMillis());
synchronized (taskTrackers) {
***************
*** 216,221 ****
LOG.info("Got heartbeat. Unassigned tasks: " + unassignedTasks.size());
! // Allocate some pending task(s) to this TaskTracker
! return new ArrayWritable(TaskProfile.class, \
getTaskAssignments(status.getTrackerName())); }
--- 216,221 ----
LOG.info("Got heartbeat. Unassigned tasks: " + unassignedTasks.size());
! // Allocate a pending task to this TaskTracker
! return getTaskAssignment(status.getTrackerName());
}
***************
*** 282,286 ****
void completedTask(UTF8 taskid) {
LOG.info("Task " + taskid + " has finished successfully.");
! // REMIND - mjc - possibly gc the TaskProfile here, because we're all done.
// (though remember a map task may have to be re-executed)
// Also, decrement the number of Tasks that a Job is waiting on.
--- 282,286 ----
void completedTask(UTF8 taskid) {
LOG.info("Task " + taskid + " has finished successfully.");
! // REMIND - mjc - possibly gc the Task here, because we're all done.
// (though remember a map task may have to be re-executed)
// Also, decrement the number of Tasks that a Job is waiting on.
***************
*** 303,308 ****
/**
! * TaskProfile[] getTaskAssignments(UTF8 taskTracker) returns
! * a list of tasks we'd like the taskTracker to execute right now.
*
* Eventually this function should compute load on the various TaskTrackers,
--- 303,308 ----
/**
! * Task getTaskAssignment(UTF8 taskTracker) returns
! * a task we'd like the taskTracker to execute right now.
*
* Eventually this function should compute load on the various TaskTrackers,
***************
*** 310,326 ****
* just grabs a single item out of the pending task list and hands it back.
*/
! TaskProfile[] getTaskAssignments(UTF8 taskTracker) {
if (unassignedTasks.size() > 0) {
UTF8 taskid = (UTF8) unassignedTasks.elementAt(0);
! TaskProfile task = (TaskProfile) taskProfiles.get(taskid);
unassignedTasks.remove(taskid);
// REMIND - mjc - assign the Task to the taskTracker here
! TaskProfile assignment[] = new TaskProfile[1];
! assignment[0] = task;
! return assignment;
} else {
! return new TaskProfile[0];
}
}
--- 310,324 ----
* just grabs a single item out of the pending task list and hands it back.
*/
! Task getTaskAssignment(UTF8 taskTracker) {
if (unassignedTasks.size() > 0) {
UTF8 taskid = (UTF8) unassignedTasks.elementAt(0);
! Task task = (Task) taskProfiles.get(taskid);
unassignedTasks.remove(taskid);
// REMIND - mjc - assign the Task to the taskTracker here
! return task;
} else {
! return null;
}
}
***************
*** 349,353 ****
// Right now, we do a simple, single Map task.
File f = new File(filePath.toString());
! TaskProfile tp = new TaskProfile(jarPath.toString(), createTaskId(), \
TaskProfile.MAP, 0, new FileSplit(f, 0, f.length()));
taskProfiles.put(new UTF8(tp.getTaskId()), tp);
--- 347,351 ----
// Right now, we do a simple, single Map task.
File f = new File(filePath.toString());
! Task tp = new MapTask(jarPath.toString(), createTaskId(), new FileSplit(f, \
0, f.length()));
taskProfiles.put(new UTF8(tp.getTaskId()), tp);
Index: Task.java
===================================================================
RCS file: /cvsroot/nutch/nutch/src/java/net/nutch/mapReduce/Task.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -C2 -d -r1.1 -r1.2
*** Task.java 21 Jan 2005 00:09:10 -0000 1.1
--- Task.java 24 Jan 2005 18:32:12 -0000 1.2
***************
*** 24,30 ****
////////////////////////////////////////////
! protected Task() {}
! protected Task(String jarPath, String taskId) {
this.jarPath = jarPath;
this.taskId = taskId;
--- 24,30 ----
////////////////////////////////////////////
! public Task() {}
! public Task(String jarPath, String taskId) {
this.jarPath = jarPath;
this.taskId = taskId;
Index: InterTrackerProtocol.java
===================================================================
RCS file: /cvsroot/nutch/nutch/src/java/net/nutch/mapReduce/InterTrackerProtocol.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -C2 -d -r1.1 -r1.2
*** InterTrackerProtocol.java 24 Jan 2005 10:07:06 -0000 1.1
--- InterTrackerProtocol.java 24 Jan 2005 18:32:10 -0000 1.2
***************
*** 19,23 ****
* possibly of zero length.
*/
! ArrayWritable emitHeartbeat(TaskTrackerStatus status);
}
--- 19,23 ----
* possibly of zero length.
*/
! Task emitHeartbeat(TaskTrackerStatus status);
}
-------------------------------------------------------
This SF.Net email is sponsored by: IntelliVIEW -- Interactive Reporting
Tool for open source databases. Create drag-&-drop reports. Save time
by over 75%! Publish reports on the web. Export to DOC, XLS, RTF, etc.
Download a FREE copy at http://www.intelliview.com/go/osdn_nl
_______________________________________________
Nutch-cvs mailing list
Nutch-cvs@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nutch-cvs
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic