[prev in list] [next in list] [prev in thread] [next in thread] 

List:       nutch-cvs
Subject:    [Nutch-cvs] nutch/src/java/net/nutch/mapReduce InterTrackerProtocol.java,1.1,1.2 JobTracker.java,1.2
From:       Michael Cafarella <mike_cafarella () users ! sourceforge ! net>
Date:       2005-01-24 18:32:15
Message-ID: E1Ct90n-0001nb-CD () sc8-pr-cvs1 ! sourceforge ! net
[Download RAW message or body]

Update of /cvsroot/nutch/nutch/src/java/net/nutch/mapReduce
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv6600

Modified Files:
	InterTrackerProtocol.java JobTracker.java Task.java 
	TaskTracker.java 
Removed Files:
	TaskProfile.java TaskRunner.java 
Log Message:

  Modify the InterTrackerProfile to use a single Task object, not an
array of them.  Replace "TaskProfile" with "Task" and its subclasses.
Remove obsolete "TaskRunner".




--- TaskProfile.java DELETED ---

Index: TaskTracker.java
===================================================================
RCS file: /cvsroot/nutch/nutch/src/java/net/nutch/mapReduce/TaskTracker.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -C2 -d -r1.2 -r1.3
*** TaskTracker.java	24 Jan 2005 10:07:06 -0000	1.2
--- TaskTracker.java	24 Jan 2005 18:32:13 -0000	1.3
***************
*** 26,33 ****
      LogFormatter.getLogger("net.nutch.mapReduce.TaskTracker");
  
!     UTF8 localName;
      InetSocketAddress jobTrackAddr;
  
-     TreeMap tasks = new TreeMap();
      TreeMap taskProfiles = new TreeMap();
      TreeMap taskStatus = new TreeMap();
--- 26,32 ----
      LogFormatter.getLogger("net.nutch.mapReduce.TaskTracker");
  
!     UTF8 taskTrackerName;
      InetSocketAddress jobTrackAddr;
  
      TreeMap taskProfiles = new TreeMap();
      TreeMap taskStatus = new TreeMap();
***************
*** 41,46 ****
       * Start with the local machine name, and the addr of the target JobTracker
       */
!     public TaskTracker(String machineName, InetSocketAddress jobTrackAddr) throws \
                IOException {
!         this.localName = new UTF8(machineName);
          this.jobTrackAddr = jobTrackAddr;
          this.taskReportServer = RPC.getServer(this, TASKREPORT_PORT);
--- 40,45 ----
       * Start with the local machine name, and the addr of the target JobTracker
       */
!     public TaskTracker(String taskTrackerName, InetSocketAddress jobTrackAddr) \
                throws IOException {
!         this.taskTrackerName = new UTF8(taskTrackerName);
          this.jobTrackAddr = jobTrackAddr;
          this.taskReportServer = RPC.getServer(this, TASKREPORT_PORT);
***************
*** 79,84 ****
              // Emit standard hearbeat message to check in with JobTracker
              //
!             TaskProfile toRun[] = (TaskProfile[]) ((ArrayWritable) \
                jobClient.emitHeartbeat(createStatusReport())).toArray();
!             launchTasks(toRun);
              lastHeartbeat = now;
          }
--- 78,85 ----
              // Emit standard hearbeat message to check in with JobTracker
              //
!             Task toRun = jobClient.emitHeartbeat(createStatusReport());
!             if (toRun != null) {
!                 launchTask(toRun);
!             }
              lastHeartbeat = now;
          }
***************
*** 102,106 ****
                      (ts.getRunState() == TaskStatus.FAILED)) {
                      it.remove();
-                     tasks.remove(taskid);
                      taskStatus.remove(taskid);
                  }
--- 103,106 ----
***************
*** 108,135 ****
              }
          }
!         return new TaskTrackerStatus(localName, taskReports);
      }
  
      /**
!      * launchTasks(TaskProfile tasks[]) receives a set of Tasks to execute.
!      * Each TaskProfile includes info on how, exactly, the Task should be run.
       * We will include status info in the heartbeat message sent to the JobTracker.
       */
!     void launchTasks(TaskProfile toLaunch[]) {
!         for (int i = 0; i < toLaunch.length; i++) {
!             UTF8 curId = new UTF8(toLaunch[i].getTaskId());
!             Task t = (Task) tasks.get(curId);
  
!             if (t == null) {
!                 if (toLaunch[i].isMapTask()) {
!                     t = new MapTask(toLaunch[i].getJarPath().toString(), \
                curId.toString(), toLaunch[i].getSplit());
!                 } else {
!                     t = new ReduceTask(toLaunch[i].getJarPath().toString(), \
                curId.toString());
!                 }
!                 tasks.put(curId, t);
!                 taskProfiles.put(curId, toLaunch[i]);
!                 taskStatus.put(curId, new TaskStatus(curId, 0.0f, \
                TaskStatus.RUNNING));
!                 new LaunchTask(t).start();
!             }
          }
      }
--- 108,126 ----
              }
          }
!         return new TaskTrackerStatus(taskTrackerName, taskReports);
      }
  
      /**
!      * launchTask(Task) receives a Task to execute.
!      * Each Task includes info on how, exactly, the Task should be run.
       * We will include status info in the heartbeat message sent to the JobTracker.
       */
!     void launchTask(Task t) {
!         UTF8 curId = new UTF8(t.getTaskId());
  
!         if (taskProfiles.get(curId) == null) {
!             taskProfiles.put(curId, t);
!             taskStatus.put(curId, new TaskStatus(curId, 0.0f, TaskStatus.RUNNING));
!             new LaunchTask(t).start();
          }
      }
***************
*** 170,174 ****
       */
      public Task getTask(UTF8 taskid) throws IOException {
!         return (Task) tasks.get(taskid);
      }
  
--- 161,165 ----
       */
      public Task getTask(UTF8 taskid) throws IOException {
!         return (Task) taskProfiles.get(taskid);
      }
  
***************
*** 198,204 ****
       ***********************************************************/
      class LaunchTask extends Thread {
!         Task task;
!         public LaunchTask(Task task) {
!             this.task = task;
          }
  
--- 189,195 ----
       ***********************************************************/
      class LaunchTask extends Thread {
!         Task t;
!         public LaunchTask(Task t) {
!             this.t = t;
          }
  
***************
*** 209,216 ****
                      //"-verbose",
                      "-cp", 
!                     task.getJarPath(),                          // classpath is \
                task's jar
                      Child.class.getName(),                      // main is Child
                      TASKREPORT_PORT+"",                         // pass umbilical \
                port
!                     task.getTaskId().toString()                 // pass task \
identifier  });
              } catch (IOException ie) {
--- 200,207 ----
                      //"-verbose",
                      "-cp", 
!                     t.getJarPath(),                             // classpath is \
                task's jar
                      Child.class.getName(),                      // main is Child
                      TASKREPORT_PORT+"",                         // pass umbilical \
                port
!                     t.getTaskId().toString()                    // pass task \
identifier  });
              } catch (IOException ie) {
***************
*** 219,223 ****
                  // Perhaps attach them to the error log for the task
              } finally {
!                 reportTaskFinished(new UTF8(task.getTaskId()));
              }
          }
--- 210,214 ----
                  // Perhaps attach them to the error log for the task
              } finally {
!                 reportTaskFinished(new UTF8(t.getTaskId()));
              }
          }
***************
*** 275,283 ****
      public static void main(String argv[]) throws IOException {
          if (argv.length < 2) {
!             System.out.println("TaskTracker <localMachine> <jobTrackerAddr:port>");
              System.exit(-1);
          }
  
!         String localMachine = argv[0];
          String jobTrackerStr = argv[1];
  
--- 266,274 ----
      public static void main(String argv[]) throws IOException {
          if (argv.length < 2) {
!             System.out.println("TaskTracker <taskTrackerName> \
<jobTrackerAddr:port>");  System.exit(-1);
          }
  
!         String taskTrackerName = argv[0];
          String jobTrackerStr = argv[1];
  
***************
*** 292,296 ****
          InetSocketAddress jobTrackerAddr = new InetSocketAddress(jobTrackerName, \
jobTrackerPort);  
!         TaskTracker tt = new TaskTracker(localMachine, jobTrackerAddr);
          try {
              while (true) {
--- 283,287 ----
          InetSocketAddress jobTrackerAddr = new InetSocketAddress(jobTrackerName, \
jobTrackerPort);  
!         TaskTracker tt = new TaskTracker(taskTrackerName, jobTrackerAddr);
          try {
              while (true) {

--- TaskRunner.java DELETED ---

Index: JobTracker.java
===================================================================
RCS file: /cvsroot/nutch/nutch/src/java/net/nutch/mapReduce/JobTracker.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -C2 -d -r1.2 -r1.3
*** JobTracker.java	24 Jan 2005 10:07:06 -0000	1.2
--- JobTracker.java	24 Jan 2005 18:32:12 -0000	1.3
***************
*** 111,115 ****
      Vector unassignedTasks = new Vector();
  
!     // taskid->TaskProfile map
      TreeMap taskProfiles = new TreeMap();
  
--- 111,115 ----
      Vector unassignedTasks = new Vector();
  
!     // taskid->Task map
      TreeMap taskProfiles = new TreeMap();
  
***************
*** 203,207 ****
      // InterTrackerProtocol
      ////////////////////////////////////////////////////
!     public ArrayWritable emitHeartbeat(TaskTrackerStatus status) {
          status.setLastSeen(System.currentTimeMillis());
          synchronized (taskTrackers) {
--- 203,207 ----
      // InterTrackerProtocol
      ////////////////////////////////////////////////////
!     public Task emitHeartbeat(TaskTrackerStatus status) {
          status.setLastSeen(System.currentTimeMillis());
          synchronized (taskTrackers) {
***************
*** 216,221 ****
          LOG.info("Got heartbeat.  Unassigned tasks: " + unassignedTasks.size());
  
!         // Allocate some pending task(s) to this TaskTracker
!         return new ArrayWritable(TaskProfile.class, \
getTaskAssignments(status.getTrackerName()));  }
  
--- 216,221 ----
          LOG.info("Got heartbeat.  Unassigned tasks: " + unassignedTasks.size());
  
!         // Allocate a pending task to this TaskTracker
!         return getTaskAssignment(status.getTrackerName());
      }
  
***************
*** 282,286 ****
      void completedTask(UTF8 taskid) {
          LOG.info("Task " + taskid + " has finished successfully.");
!         // REMIND - mjc - possibly gc the TaskProfile here, because we're all done.
          // (though remember a map task may have to be re-executed)
          // Also, decrement the number of Tasks that a Job is waiting on.
--- 282,286 ----
      void completedTask(UTF8 taskid) {
          LOG.info("Task " + taskid + " has finished successfully.");
!         // REMIND - mjc - possibly gc the Task here, because we're all done.
          // (though remember a map task may have to be re-executed)
          // Also, decrement the number of Tasks that a Job is waiting on.
***************
*** 303,308 ****
      
      /**
!      * TaskProfile[] getTaskAssignments(UTF8 taskTracker) returns
!      * a list of tasks we'd like the taskTracker to execute right now.
       *
       * Eventually this function should compute load on the various TaskTrackers,
--- 303,308 ----
      
      /**
!      * Task getTaskAssignment(UTF8 taskTracker) returns
!      * a task we'd like the taskTracker to execute right now.
       *
       * Eventually this function should compute load on the various TaskTrackers,
***************
*** 310,326 ****
       * just grabs a single item out of the pending task list and hands it back.
       */
!     TaskProfile[] getTaskAssignments(UTF8 taskTracker) {
          if (unassignedTasks.size() > 0) {
              UTF8 taskid = (UTF8) unassignedTasks.elementAt(0);
!             TaskProfile task = (TaskProfile) taskProfiles.get(taskid);
              unassignedTasks.remove(taskid);
  
              // REMIND - mjc - assign the Task to the taskTracker here
  
!             TaskProfile assignment[] = new TaskProfile[1];
!             assignment[0] = task;
!             return assignment;
          } else {
!             return new TaskProfile[0];
          }
      }
--- 310,324 ----
       * just grabs a single item out of the pending task list and hands it back.
       */
!     Task getTaskAssignment(UTF8 taskTracker) {
          if (unassignedTasks.size() > 0) {
              UTF8 taskid = (UTF8) unassignedTasks.elementAt(0);
!             Task task = (Task) taskProfiles.get(taskid);
              unassignedTasks.remove(taskid);
  
              // REMIND - mjc - assign the Task to the taskTracker here
  
!             return task;
          } else {
!             return null;
          }
      }
***************
*** 349,353 ****
          // Right now, we do a simple, single Map task.
          File f = new File(filePath.toString());
!         TaskProfile tp = new TaskProfile(jarPath.toString(), createTaskId(), \
TaskProfile.MAP, 0, new FileSplit(f, 0, f.length()));  
          taskProfiles.put(new UTF8(tp.getTaskId()), tp);
--- 347,351 ----
          // Right now, we do a simple, single Map task.
          File f = new File(filePath.toString());
!         Task tp = new MapTask(jarPath.toString(), createTaskId(), new FileSplit(f, \
0, f.length()));  
          taskProfiles.put(new UTF8(tp.getTaskId()), tp);

Index: Task.java
===================================================================
RCS file: /cvsroot/nutch/nutch/src/java/net/nutch/mapReduce/Task.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -C2 -d -r1.1 -r1.2
*** Task.java	21 Jan 2005 00:09:10 -0000	1.1
--- Task.java	24 Jan 2005 18:32:12 -0000	1.2
***************
*** 24,30 ****
    ////////////////////////////////////////////
  
!   protected Task() {}
  
!   protected Task(String jarPath, String taskId) {
      this.jarPath = jarPath;
      this.taskId = taskId;
--- 24,30 ----
    ////////////////////////////////////////////
  
!   public Task() {}
  
!   public Task(String jarPath, String taskId) {
      this.jarPath = jarPath;
      this.taskId = taskId;

Index: InterTrackerProtocol.java
===================================================================
RCS file: /cvsroot/nutch/nutch/src/java/net/nutch/mapReduce/InterTrackerProtocol.java,v
 retrieving revision 1.1
retrieving revision 1.2
diff -C2 -d -r1.1 -r1.2
*** InterTrackerProtocol.java	24 Jan 2005 10:07:06 -0000	1.1
--- InterTrackerProtocol.java	24 Jan 2005 18:32:10 -0000	1.2
***************
*** 19,23 ****
       * possibly of zero length.
       */
!     ArrayWritable emitHeartbeat(TaskTrackerStatus status);
  }
  
--- 19,23 ----
       * possibly of zero length.
       */
!     Task emitHeartbeat(TaskTrackerStatus status);
  }
  



-------------------------------------------------------
This SF.Net email is sponsored by: IntelliVIEW -- Interactive Reporting
Tool for open source databases. Create drag-&-drop reports. Save time
by over 75%! Publish reports on the web. Export to DOC, XLS, RTF, etc.
Download a FREE copy at http://www.intelliview.com/go/osdn_nl
_______________________________________________
Nutch-cvs mailing list
Nutch-cvs@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nutch-cvs


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic