jobtracker自身启动与FIFO 调度器的工作过程

  1. jobtracker自身的main函数如下:
  2.  1 public static void main(String argv[]
     2                           ) throws IOException, InterruptedException {
     3     StringUtils.startupShutdownMessage(JobTracker.class, argv, LOG);
     4     
     5     try {
     6       if(argv.length == 0) {
     7         JobTracker tracker = startTracker(new JobConf());
     8         tracker.offerService();
     9       }
    10       else {
    11         if ("-dumpConfiguration".equals(argv[0]) && argv.length == 1) {
    12           dumpConfiguration(new PrintWriter(System.out));
    13         }
    14         else {
    15           System.out.println("usage: JobTracker [-dumpConfiguration]");
    16           System.exit(-1);
    17         }
    18       }
    19     } catch (Throwable e) {
    20       LOG.fatal(StringUtils.stringifyException(e));
    21       System.exit(-1);
    22     }
    23   }
    View Code
  • 主要代码:
  • JobTracker tracker = startTracker(new JobConf());
            tracker.offerService();
  • startTracker()中主要代码如下://创建一个jobTracker对象,并设置调度器
  • result = new JobTracker(conf, identifier);
            result.taskScheduler.setTaskTrackerManager(result);
  • offerService()函数中主要的代码:taskScheduler.start();//通过taskScheduler的start方法来启动任务调度器,之后任务调度器就开始调度任务来交给jobTracker执行。当jobClient提交一个job后,jobtracker就通过jobadded(job)方法将提交的job加入job队列中,再进一步初始化,具体参照:jobtracker对提交作业的初始化
  • 其中taskScheduler.start()方法中FIFO 调度器代码如下:
  •  public synchronized void start() throws IOException {
        super.start();
        taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);//将JobQueueJobInProgressListener加入到jobTracker中的jobInProgressListeners,taskTrackerManager在JobTracker的刚创建的时候就已经设置成jobTracker了,该行就是把实现了JobInProgressListener类的jobQueueJobInProgressListener加入到jobInProgressListeners中,而JobInProgressListener中有一个Map<JobSchedulingInfo, JobInProgress> jobQueue变量,用以存储所有的作业。因为jobQueueJobInProgressListener是被实例化了的,所有存在,实例化是在新建jobTracker对象时设置taskScheduler中完成。语句在jobQueueTaskScheduler的构造函数中:
  • public JobQueueTaskScheduler() {
        this.jobQueueJobInProgressListener = new JobQueueJobInProgressListener();
      }

  •     eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);
        eagerTaskInitializationListener.start();
        taskTrackerManager.addJobInProgressListener(
            eagerTaskInitializationListener);  }
    • //eagerTaskInitializationListener实例化,在QueueTaskScheduler中的Setconf()对象中
      1 public synchronized void setConf(Configuration conf) {
      2     super.setConf(conf);
      3     padFraction = conf.getFloat("mapred.jobtracker.taskalloc.capacitypad", 
      4                                  0.01f);
      5     this.eagerTaskInitializationListener =
      6       new EagerTaskInitializationListener(conf);
      7   }
      View Code

    而Setconf()函数是在JobTracker实例化中设置Taskscheduler的时候执行的。代码如下:

  • 1 Class<? extends TaskScheduler> schedulerClass
    2       = conf.getClass("mapred.jobtracker.taskScheduler",
    3           JobQueueTaskScheduler.class, TaskScheduler.class);
    4     taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);
    View Code
  • ReflectionUtils.newInstance(schedulerClass, conf)中的代码如下:

  •  1 public static <T> T newInstance(Class<T> theClass, Configuration conf) {
     2     T result;
     3     try {
     4       Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
     5       if (meth == null) {
     6         meth = theClass.getDeclaredConstructor(EMPTY_ARRAY);
     7         meth.setAccessible(true);
     8         CONSTRUCTOR_CACHE.put(theClass, meth);
     9       }
    10       result = meth.newInstance();//产生一个TaskScheduler实例
    11     } catch (Exception e) {
    12       throw new RuntimeException(e);
    13     }
    14     setConf(result, conf);//执行setConf()方法
    15     return result;
    16   }
    View Code
  • 而ReflectionUtils类中的SetConf()方法如下:

  • 1 public static void setConf(Object theObject, Configuration conf) {
    2     if (conf != null) {
    3       if (theObject instanceof Configurable) {
    4         ((Configurable) theObject).setConf(conf);//TaskScheduler实现类Configurable接口,所以其执行的SetConf()方法是JobQueueTaskScheduler中的SetConf()方法
    5       }
    6       setJobConf(theObject, conf);
    7     }
    8   }
    View Code
  • (1)则eagerTaskInitializationListener.start()方法完成的就是:public void start() throws IOException {
        this.jobInitManagerThread = new Thread(jobInitManager, "jobInitManager");//产生一个JobInitManager类的线程。
        jobInitManagerThread.setDaemon(true);//将该线程设置为守护线程
        this.jobInitManagerThread.start();
      }
  • JobInitManager的Run()方法如下: class JobInitManager implements Runnable {
        public void run() {
          JobInProgress job = null;
          while (true) {
            try {
              synchronized (jobInitQueue) {
                while (jobInitQueue.isEmpty()) {
                  jobInitQueue.wait();//当初始化队列为空时,一直等待
                }
                job = jobInitQueue.remove(0);//依次移出队列中的对象
              }
              threadPool.execute(new InitJob(job));//对Job进行初始化,而InitJob()为一个线程class InitJob implements Runnable {
        private JobInProgress job; public InitJob(JobInProgress job) {   this.job = job; } public void run() { ttm.initJob(job); } }其最后调用的是JobTracker中的InitJob(Job)方法,初始化该Job的所有Tasks等。
            } catch (InterruptedException t) {
              LOG.info("JobInitManagerThread interrupted.");
              break;
            }
          }
          LOG.info("Shutting down thread pool");
          threadPool.shutdownNow();
        }
      }
  • (2)则其对象就会加入到JobTracker中的JobInProgressListeners中,此时JobInProgressListeners有两个对象,第一个是JobQueueInProgressListener当JobTracker中的Addjob()方法时,其JobAdded(Job)方法是JobQueueJobInProgressListener中的方法,将Job加入到调度队列中:public void jobAdded(JobInProgress job) {
        jobQueue.put(new JobSchedulingInfo(job.getStatus()), job);
      }
    第二个对象是eagerTaskInitializationListener,则其调用的是EagerTaskInitializationListener类中Jobadded(Job)方法:public void jobAdded(JobInProgress job) {
        synchronized (jobInitQueue) {
          jobInitQueue.add(job);
          resortInitQueue();
          jobInitQueue.notifyAll();
        }

      }
  • (3)之前所说的FairScheduler,CapacityScheduler没有对应的eagerTaskInitializationListener类,则在其对应的jobListener,JobQueuesManager类中必包含了初始化的过程,即哪里一定调用了JobTracker中的InitJob方法
  • jobtracker自身启动与FIFO 调度器的工作过程
  • 相关推荐