Luigi --基于Python语言的流式任务调度框架的引见

Luigi --基于Python语言的流式任务调度框架的介绍

Luigi:

 

author: vincentzhwg

date: 2014.5.6

 

### web

        https://github.com/spotify/luigi

 

### intro:

        Luigi是基于python语言的,可帮助建立复杂流式批处理任务管理系统。它主要提供了以下功能:任务依赖管理、工作流管理、任务可视化、错误故障处理机制、命令行交互等。Luigi的主要目的是为了解决需要长期运行的流式批处理任务的管理。你可以链接很多个任务,使它们自动化,并进行故障管理。上面所说的任务可以是任何类型的任务,通常来说有如下几种:Hadoop任务、从数据库导入或导出、机器学习算法训练等。

        

### API概览

        在Luigi中有两个基础类:Task, Target. 另外,Parameter类对于如何控制Task类的运行是一个重要的类。

 

        Target:

                广义地讲,Target可对应为磁盘上的文件,或HDFS上文件,或checkpoint点,或数据库等。对于Target来说,唯一需要实现的方法为exists,返回为True表示存在,否则不存在返回为False.

                在实际应用时,写一个Target子类是很少需要用到的。直接使用开箱即可用的LocalTarget及 hdfs.HdfsTarget类就够用了。Luigi提供了Gzip支持,通过参数format=format.Gzip即可。

        

        Task:

                Task是任务逻辑运行的地方,提供了一些方法来定义任务的逻辑行为,主要有run, output, requires.

                Task通过类名及参数值做为标识符进行唯一区分。实际上,在同一个worker中,两个拥有相同类名及相同参数值的task不单单只是equal,而且实际上还是同一个实例。然而,如果参数在构建声明时指定了参数 significant=False ,对于Task的标识是不起影响的。对于多个Task,它们的类名相同,只是指定了 significant=False 的参数值才不同,而未指定 significant=False 的参数值是相同的,对于这些Task来说,它们拥有相同的标识符,即 hash(taskA) == hash(taskB) 是True的,但它们来自于不同的实例。

                Task.requires:

                        requires方法用来指定依赖关系,除了可指定对其他Task的依赖,还可指定为对自身Task的依赖。requires返回值可为 dicts/lists/tuples 或其他类别的封装。

                Task.output:

                        output方法返回一个或多个的Target对象,类似于requires方法,可返回适应于实际需要的对于Target的任何封装。实际上,建议只返回一个Target,因为如果返回多个,atomicity将会被丢失,除非Task能够确保多个Target能被原子性地创建。当然,如果原子性不是非常重要的时候,那么就可以放心地返回多个Target。

                Task.run:

                        run方法包含实际真正执行的代码。注意到,Luigi将任何事情切分为两个阶段,首先它指出在tasks之间的依赖关系,然后它运行每一件事情。 input() 方法是一个内部帮助方法,用来替代在requires 中的对象的对应输出。

 

        

        Parameter:

                在Python语言中,参数通常是在constructor时提供,但Luigi要求在类级别上声明所需的参数。通过这样子的要求,Luigi通过处理这些模板规范化的代码来为constructor提供所需参数。

                Python是个无需指定类型的语言(Python是个强类型动态语言),对于参数无需指定类型。对于Luigi来说,可以简单地使用 luigi.Parameter 即可,之所以存在 DateParameter 的原因,是为了在命令行交互时,确保命令行参数的值可以转换为对应的类型。

        

        Events and callbacks:

                Luigi内置了事件系统,允许注册callback到event中,并触发它们在所定义的tasks中,可挂接到一些预定义好的事件中,或者自定义事件中。每一个event被绑定到一个Task类中,将会被该Task类或其子类所触发。

 

        Instance caching:

                对于实例,Luigi提供了Instance caching。对于同一个标识符的task,就算在代码中实例化创建了两次,但实际上只会创建一个实例,这个是有必要的,确保了task只会被执行一次。

 

 

### Execution Model

        Luigi拥有一个非常简单的运行模型,最重要的一个方面就是没有执行转移。当执行一个Luigi的工作流,worker调度所有的tasks,并在同样的这些进程内执行这些tasks。受益于这种模式,非常容易对所有执行任务进行debug。并且,开发过程也相当简单。在开发过程中,通常通过命令行来运行Luigi,而当你布署时,可以通过crontab或任何其他的调度器来调度。这种模式所带来的不好的地方,在于Luigi不能自由地进行扩展,不过Luigi认为扩展应该交给Task去实现;另外一个不好的地方就是Luigi需要依赖于外部的调度器来触发工作流,如crontab等。

        

### Lugic Patterns

        Code Reuse

                Luigi的一个好处,是非常容易依赖于其他库中所定义的tasks。在执行路径上非常容易进行分叉,其中一项任务的输出可以成为很多其他任务的输入。

                同时,Luigi任务的输出都将被无限期地保存。这点的好处就是当后面的任务失败时,在重跑失败任务时可以重用前面任务的输出,而不需要重跑前面的任务。不好的地方在于,将会有大量的中间结果保存在系统上,一个比较有用的建议就是把这些输出保存在一个特定的目录中,并进行定期地清除。

        

        Triggering Mang Tasks

                一个常见的用例是每晚要运行一个Hadoop任务,但有时因为各种原因该任务会失败。一个有用的模式就是在最后建立一个虚拟任务,仅需声明依赖于最近多天之间的实际真正的任务。

 

### Configuration

        所有的配置均可由两个配置文件进行指定,一个是在自己Home目录下的 client.cfg ,另一个则为 /etc/luigi 。

 

        配置选项有:

                default-scheduler-host : 默认的scheduler

                error-email : 当crash时会收到eamil,但在命令行下运行时则没有。

                luigi-history : 如果设置了该选项,值为一文件名,将为记录一些东西(当前仅有job id)在mapreduce任务的输出目录下。

                如果想在Python下运行Hadoop mapreduce任务,需要指定streaming jar的路径。

 

 

### 现阶段不足

        Luigi关注于批处理任务,所以对于实时流式处理及长时间一直运行的处理的帮助不大。

        Luigi假设每一个task是工作中的相当大的一块数据处理。对于调度几千个任务是可以的,但将其扩展到数万个任务的调度是不太可行的。

        Luigi对于任务调度及任务执行之间进行了严格地划分。动态的for循环及分支在Luigi中是不太容易实现的,举个例子,遍历一个数值计算任务直到它收敛是非常棘手的。

 

 

### 花边

        Luigi是世界上最有名排第二的水管工的名字,中文译名为路易奇或路奇,有着“永远的老二”的外号。至于第一是谁,想必大家都知道了吧 ^_^

 

 

### 安装

        sudo apt-get install build-essential python-dev python-daemon python-setuptools libcurl4-gnutls-dev librtmp-dev

 

        以下包的安装:

        pycares

        pycurl

        unittest2

        futures

        Monotime

        Twisted

        tornado

        backports.ssl_match_hostname

        mechanize