Ray:一个值得关注的分布式计算框架

0. 引子

在上汽帆一尚行,我们开发了一个一站式的机器学习平台iGear,用来支撑集团的机器学习的应用开发。作为一个机器学习平台,很重要的一个功能就是进行复杂的CPUGPU任务调度,以支持不同的机器学习开发范式。

但是任务的难度很快就升级了,我们发现单个任务的调度和闭环根本不能满足客户的复杂AI开发场景的需求。实际的AI开发往往是由一系列的任务流组成的:数据的清洗、增广,不同的训练任务的叠加,训练结果的评估等等。所以往往需要能够让用户定义一个清晰的工作流(Workflow)Workflow中可以自由的定义不同的任务,以及任务和任务间的依赖关系,并且管理任务的串行和并行的执行逻辑、任务间的数据依赖、容错管理等等。这样的需求非常常见,所以现在通用的机器学习平台很多都有这样的支持,比如腾讯云的TI-ML平台。

我们的AI任务平台也是基于这样任务间的依赖管理,增加了Workflow的调度管理,可以让用户自己定义复杂的Workflow任务流。下图就是我们iGear Workflow的一个配置定义:

 

这里面引入了对数据集的处理任务和多个训练任务之间的关系,depends_on字段定义了任务间的依赖关系。Workflow本质上是基于依赖关系定义了一个有向无环图(DAG),有依赖的节点需要等到被依赖的节点顺利执行完才可以执行,没有依赖关系的节点可以并行执行,非常简单和直观,这个大部分程序员朋友应该都能理解。

可是平心而论,这种基于容器任务间依赖的Workflow是有其局限性的。我觉得最大的问题在于:

  1. 这种基于容器任务的依赖调度的粒度太粗了,不能随心所欲编织任务,配又比较复杂 

  2. 实时性不好,任务是基于容器的调度,数据的互通通常是基于特定的分布式存储

  3. 任务之间的数据依赖的格式标准很难统一又必须统一

  4. 容错处理和Debug会比较麻烦,往往只等dump出各个任务的日志去排查

所以对于实时性要求很高的机器学习场景,有没有一个很好的开源方案能够解决这些问题?大家会很容易想到MapReduceSparkStormFlink这些大数据处理方案,可是很遗憾,这些框架都和一种计算范式绑定死了,例如MapReduceSpark基于BatchMini-Batch的计算范式,FlinkStorm的基于Stream的范式。而显然这些计算范式不适合解决所有问题,比如,机器学习的训练场景往往需要使用AI框架(Tensorflow)并绑定GPU

所以我们真正需要的是一个分布式的计算框架,可以细粒度的管理计算任务和任务间依赖(比如这个计算任务可以是一个函数或对象),同时又不染指具体的计算范式,可以让这个框架和现在所有计算框架相兼容。同时它需要有非常高的性能,要像编写单体单语言程序一样自由,不用管理任务间数据依赖和持久化方案等等。

听起来很美,但是还真的有,这就是今天的主角:Ray

 

1. Ray的诞生

Ray在2017年诞生在伯克利大学的RiseLab实验室,这个实验室的前身是AMPLab,在这里诞生了SparkMesosTachyon(现在的Alluxio)等伟大的作品,所以Ray同样也是很值得期待的。

Ray的作者之一:Robert Nishihara

最开始Ray是由Robert NishiharaPhilipp Moritz这两个RiseLab的研究生创造的,本意是建造一个简化分布式计算的运行时框架。后来在Michael Jordan Ion Stoica教授的支持和指导下,从一个研究性项目开源出来,并逐渐发展成一个超过200Contributors50个公司参与的大型开源项目。这其中,就有蚂蚁金服的深度参与,而且蚂蚁金服已经基于Ray深度定制开发了一套系统用于生产,也是Ray项目最大的商业应用背书。

如果给Ray下一个一句话的定义,那就是:Ray是一个开发和运行分布式应用的高性能和简单的框架。

2. Ray的极简教程

通过Ray的使用例子来介绍Ray的定位可能是最简单有效的方式,何况RayApi也简单到极致。

2.1 Ray的安装

Ray的核心代码是基于C++Driver侧的代码是基于Python,尽管也为Java开发了DriverApi接口,但是最全面的还是Python,这一点和现在的AI框架也基本一致。所以可以像安装普通的python包一样使用包管理工具安装,如:

pip install -U ray

2.2 Ray的Task

Ray有两种任务模式,TaskActor。简单来说,Task就是一个可以远程执行的无状态的函数(Remote Function),而Actor是一个有状态的对象(Remote Class)。Task的抽象适合运行无状态的任务,可以无阻塞的调度和运行。Actor和我们常提到的类似于AkkaActor模型还是有序别的,RayActor抽象就是一个有内部状态的远程对象,可以调用这个对象的内部函数去更新状态。Actor的抽象更适合用于构建如分布式训练的参数服务器等需要更新和同步状态的场景。

这里先通过代码介绍Task的概念:

# Execute Python functions in parallel.import ray#如果是分布式Ray集群,使用 ray.init(address=<cluster-address>)ray.init()@ray.remotedef f(x):    return x * x    futures = [f.remote(i) for i in range(4)]print(ray.get(futures))

输出结果:

[0, 1, 4, 9]

Ray可以以单机和集群的方式启动,我们忽略Ray集群配置的介绍,重点放在Ray的使用范式和设计思想上。

通过给一个Python函数添加@ray.remote这个函数装饰器,我们就可以申明一个可以运行在Ray集群中的Task。之后可以通过f.remote()的方式进行调用。这里的详细的调用过程会在下文阐述,但是总的来说,这一步会将这个任务分发到集群的节点上,并且这里的调用方式完全是无阻塞的,也就是说futures = [f.remote(i) for i in range(4)]会无阻塞的执行,并不会等待结果的返回。返回的futures是这些异步无阻塞任务的id。之后使用ray.get(futures)来根据任务id来拉取远程任务的结果,这里的调用过程是阻塞的,直到任务结果的返回。如果熟悉future异步编程模式的话一定对这种使用方式不陌生。通过这种方式,可以瞬间调用成千上万个远程任务到Ray集群的各个节点,并同步任务的结果。

 

更为有趣和重要的是,可以看看这样的使用方式:

# A Ray remote function.@ray.remotedef remote_function():    return 1
@ray.remotedef remote_chain_function(value):    return value + 1
y1_id = remote_function.remote()assert ray.get(y1_id) == 1
chained_id = remote_chain_function.remote(y1_id)assert ray.get(chained_id) == 2

我们这里定义了两个remote函数, 分别是remote_functionremote_chain_function,有趣的是在调用remote_chain_function的时候参数调用的是remote_functionid。也就是说remote函数的参数不仅仅可以是普通的参数类型,也可以是另一个remote函数的id,而当参数是另一个remote函数的id的时候,remote_chain_function的执行会被block住,直到依赖的remote函数执行完成后才将依赖的执行完的结果取出放入调用函数的参数中继续执行。

这个设计真的令人拍案叫绝,它实际上将分布式任务的依赖通过参数求值顺序的依赖和函数间调用自然的结合在一起,让工程师可以自由定义细粒度的分布式任务。我在引子中提过,基于容器的的Workflow的粗粒度的任务依赖和调度,需要平台管理依赖关系和数据流动规范。而使用Ray,分布式任务可以像写单体程序一样自然:任务的依赖用函数参数的求值序来保证,任务间的数据的规范更是完全由python的数据结构来统一。简单、自然、灵活而又强大,这个设计非常具有美感。

 

2.3 Ray的Actor

 

任务可以定义和创建任意多个无状态的Task函数,并通过remote语义的方式分发到Ray集群中执行,用户的代码中通过函数调用的方式维护依赖关系。可是有的时候需要有状态的对象抽象,能够通过更新和共享对象的状态来进行协作,比如参数服务器,多个work可能需要共享一个参数服务器,定期的更新和拉取全局训练参数。鉴于此,Ray提供了Actor的抽象。通过一个简单的例子来说明

@ray.remoteclass Counter(object):    def __init__(self):        self.value = 0
    def increment(self):        self.value += 1        return self.value
counter = Counter.remote()print(ray.get(counter.increment.remote()))print(ray.get(counter.increment.remote()))

输出结果:

12

Task相似,只要在普通的Python类上面加上@ray.remote装饰器,就可以定义一个ActorActor类中可以定义任意数量和形式的状态,通过通过实例化后的对象函数的调用可以处理和更新这些状态。Actor中的函数都可以理解为一个task,只不过这个task共享着对象内部的状态,所以上面的例子中可以通过counter.increment.remote()多次调用,并且更新counter对象内部的value状态。当然,实例化后的对象可以被多个Task或处理逻辑调用,构造复杂的任务和对象流。特别值得注意的是,在Ray的内部会管理好时序关系,多个线程或进程调用Actor内部的remote函数时,Ray内部会维护一个队列,Actor的函数会依次执行。

2.4 Ray的资源管理

对于remote函数或者Actor,还可以指定调度的资源配额,如下所示:

@ray.remote(num_cpus=2, num_gpus=0.5)class Actor(object):    pass

 

这个Actor在调度的时候会在集群中选择合适的节点,并被分配2CPU核和0.5GPU。注意一下,这里的GPU的配额可以为非整数,但这里并不是什么GPU虚拟化的方案,设置了就可以高枕无忧。Ray会管理GPU卡的分配,但是具体在使用的时候是需要Actor自己去负责管理GPU显存的分数,否则有可能会多个任务占用同一张卡,都设置了0.5,但是业务代码不做设置的话还是会导致显存OOM。好在现在很多框架(Tensorflow)都有显存的配额管理。

2.5 所以Ray到底是什么?

 

看到这里的朋友们,可能还在疑惑,Ray到底实际上解决了什么问题,它的抽象的层次又到底在哪一层,它又真正填补了哪一块。大家可以看出,Ray其实是将函数和对象的分布式抽象和调度做好了,这是要重点突出一下的。它将Remote FunctionRemote Actor 为粒度拼起了一个或多个动态的函数粒度的分布式计算图。说它是动态的,是指它可以在Runtime去生成和创建计算任务,所以非常灵活。而说它是FunctionActor粒度的,而相比之下,TensorflowPytorch这样的AI框架,是通过框架提供机器学习算子的DSL语义拼出了计算图(区别是TF是静态图,Pytorch是动态图,所以一般认为Pytorch也更灵活一点);而基于容器调度的方案则是粗粒度的任务调度。所以使用Ray,完全可以将基于TFPytorch框架写的训练任务包装成一个Remote Function调度出去。所以Ray这个分布式计算框架并不指定计算范式或者计算引擎,而是可以和这样的框架相融合。

我们对逻辑上的使用就简单介绍到这里,实际上RayAPI非常简单,有兴趣的朋友可以去官网看,相信很快就能掌握。使用RayRemote ActorRemote Function可以构建很多有趣的应用,比如,有描述怎么用15行代码实现一个参数服务器。有兴趣的朋友可以去看一看。

但是相信很多朋友会有这样的疑惑,尽管在编程范式上非常理想,作为一个分布式计算框架,它的性能是不是能够保证。要得到这方面的答案,我们需要打开Ray的设计架构进去看一看。

3. Ray的架构设计

3.1 分布式实时系统的挑战

 

Ray作者的发表的论文[1,2]中详细阐释了Ray的设计上的思考。在构建机器学习或者强化学习的实时性分布式中,下面这七个需求通常非常重要:

 

性能上的需求:

R1: 实时性和低延迟:通常情况下,机器学习任务需要细粒度的任务调度和毫秒级的端到端延迟。

R2高吞吐:例如在强化学习的仿真场景中,既包含实时训练、推理,又包含交互仿真,系统的吞吐会非常大。

执行模型的需求:

R3动态任务创建比如在模特卡罗树搜索的场景中,需要在运行时动态创建成千上万的任务。

R4异构任务的创建:现在的训练任务很多都需要CPUGPU等多种异构计算介质上进行。

R5任意任务流依赖:机器学习场景往往都需要提供细粒度的任务依赖控制管理。

实用性上的需求

R6透明的容错管理:机器学习往往是一个复杂的计算图,执行中分布式任务出现错误在所难免,如何透明的容灾变得非常重要。

R7调试和Profiling:对于复杂的分布式任务而言,这个需求不言而喻。

3.2 Ray的总体架构

Ray的设计围绕着解决这七个需求展开。如果忽略掉大部分细节,Ray最突出的三个设计就是全局控制状态(Global-Control-State,GCS)混合任务调度模式(hybrid scheduling)分布式内存对象存储(In-Memory DistributedObject Store)。下图是Ray的总体的设计示意图:

Ray的架构设计图[2]

3.2.1 全局状态控制(GCS)

GCS是就是上图中蓝色标识的部分。Ray的架构依赖于一个逻辑上中心化的控制面板,它存储了所有的集群系统的控制状态信息和组件间的发布订阅依赖和交互信息,实际上GCS是目前通过Redis分布式内存数据库实现。理论上,GCS存储了所有的状态,除了GCS之外,而其它的组件或者执行任务都可以理解为是无状态的,这样就非常方便的进行拓展和容灾恢复。比如,如果某个组件失败,只需要重启就可以;而如果任务失败,GCS也会存储好任务的依赖关系,通过重启依赖任务簇,就可以重建和恢复数据和任务。

极端场景下,通过数据分片,GCS可以达到很高的吞吐和次毫秒级的时延,具有非常好的性能。另外,有了全信息图谱的GCS,系统的监控、DebugProfiling也更加容易实现。

 

3.2.2 混合调度机制

一般来说,分布式的调度系统如果仅仅依赖全局的调度器,那么很容易出现性能瓶颈,这一点早期MapReduceJobTracker全局调度器就是前车之鉴,所以后来才有了Yarn项目。那么对于Ray而言,需要做到超低时延,摆在它面前的就是调度器的设计。而Ray用混合调度方案交出了满意的答案。

Ray有两类调度器:全局调度器(GlobalScheduler)和本地调度器(Local Scheduler)。并且使用一种被他们称为Bottom-Up的混合调度机制。简单来说,由于动态图特性,Worker线程会在执行的时候创建新的worker任务,worker会将新的worker任务提交到local scheduler,由local scheduler来决定是不是将新提交来的worker任务就在这个节点运行。一般来说,如果本地节点的资源足够,优先会在本地节点启动,这样会有更高的调度性能和资源利用率。如果本地资源不匹配,local scheduler会将worker任务上交给global scheduler,由global scheduler根据节点和任务信息决定调度向哪个节点,然后将任务提交给相应节点的local scheduler进行实际的调度。

大家能从这种方案看出,混合调度模式可以让大部分任务在本地被调度起,极大提高了调度效率,减小了节点间的交互,也减小了global scheduler的工作负载。

3.2.3 分布式内存对象存储

数据的存储和共享在分布式系统中非常重要,而在我们讨论的高吞吐、低时延的机器学习系统中,高效的数据存储和共享则显得更加重要。

为了减小时延,Ray实现了一个基于内存的分布式存储系统,底层是基于Apache Arrow(https://arrow.apache.org/),这是一个跨语言的in-memory的数据共享协议组件,可以实现zero-copy读,并且可以避免数据序列化和反序列化,减小数据加载overhead

 

Ray利用Arrow,在每个节点上通过共享内存方案构建了一个这个节点上所有worker共享的Object Store,这样节点上的worker间共享数据避免了序列化和拷贝,性能非常高。当然,因为是基于奢侈的内存的存储,免不了会有swap策略。当内存不足时,热的对象数据会被一直存放在内存里,但冷的数据会按照LRU策略swap到硬盘上。

值得注意的是,这里并不是真正意义上的分布式内存对象存储,因为数据是隔离存储在各个节点上的,如果需要获取别的节点上的数据,是需要发起一个grpc请求,将远端节点上的数据拷贝到本地的object store。总的来说,因为大部分数据交互其实是在同一个节点内,因为混合调度策略的机制就是让亲和性高的任务优先调度在同一个节点了,所以这种数据共享机制仍然是非常高效的。

3.3 Ray的执行和值返回过程

 

前面介绍了Ray的整体架构,下面可以通过一个执行流的说明让大家对Ray的架构和执行机制有更加深入和具体的认识。

3.3.1 Ray的执行过程

下图是Ray详细的任务执行流程图:

Ray任务执行流程[1]

假设有N1N2两个节点,driver程序执行在N1节点上,假设add这个remote函数的参数ab也是remote function。那么任务的执行流程大体上是这样的:

  • 0步:通过driver侧的程序,add这个remote function会自动在GCS注册存储下来,并且分发到集群中的每一个节点。大家有可能会对python程序如何分发很感兴趣,因为一般而言,python程序的分发不会像java程序这么轻松和自然。Ray主要还是基于Pickle的代码持久化方案,有兴趣的可以阅读它的cloudpickle部分代码。但我个人觉得基于Dill可能更简单些,不知道作者为什么没有用。
  • 1步:当在N1节点上执行add.remote(a, b)的时候,任务会被提交给N1节点上的Local Scheduler
  • 2步:Local Scheduler觉得本地节点的资源不适合调度该任务,便将该任务发送给Global scheduler
  • 3步:Global Scheduler检查GCS里面的信息,发现依赖的参数a任务结果存储在N1节点,参数b任务结果存储在N2上。Global Scheduler决定将任务调度到N2节点上。
  • 4步:Global Scheduler将任务提交给N2节点上的Local Scheduler
  • 5步:N2节点查看参数ab是否已经满足,发现b参数已经存在且满足,但是a参数还不满足。
  • 6步:于是N2节点查看GCS中的信息,查看到a存储在N1节点上(如果这个时候a还没有计算成功,也会触发a参数相关的remote function的计算)
  • 7步:N2通过grpc调用将存储在N1节点上的a拉取过来并存储在object store中。
  • 8步:add函数的两个参数依赖都已经完成,trigger N2节点上的worker进程进行计算。
  • 9步:N2上相应的worker开始执行计算。

3.3.1 Ray的值返回过程

下图是Ray详细的任务值返回流程图:

Ray任务值返回流程[1]

 

因为add.remote()函数是非阻塞的,driver程序的ray.get()会被执行,而ray.get()会被阻塞,它的阻塞实际上以下的流程:

  • 1步:ray会根据cfuture id来查询,它首先会查看N1节点上的object store,发现没有。
  • 2步:紧接着N1通过GCS查看c的状态,发现也没有,说明c还没有被创建,于是在GCSObject Table中注册一个回调,一旦c的条目被创建,这个回调就会被执行而通知到client
  • 3步:N2节点上的worker执行完毕,会将结果写入到N2Object Store
  • 4步:同时N2会在GCSObject Table中写入c的结果。
  • 5步:GCS中因为写入了c,之前注册了的回调函数会被执行。
  • 6步:回调的逻辑之一就是将存储在N2节点上的c的值拷贝到N1节点上。
  • 7步:ray.get的阻塞结束,c的值被返回。

3.4 Ray的性能简介

 

Ray作者详细阐释架构设计理念的论文中有很多Ray的性能统计指标结果,其中稍微介绍下很有趣的两个指标:

上图a部分阐述了task当采用data locality,具有locality awareness的任务,通常会调度到数据节点,所以latency会非常小,并且基本不受数据大小的影响;而没有locality awareness的任务(通常是Actor的函数,因为Actor一旦分配就固定在一个节点上了),通常会有2-3倍的时延增加,并且随着data size的增加而增加。

另外从b途中可以看到Ray具有非常好的线性拓展性,成比例的增加节点就可以成比例的增加执行任务,这是每个分布式系统都苦苦追寻的理想。

 

Ray的架构就简单介绍到这里,基本上描述完整了Ray的执行流和组件,也串起了Ray的主要架构设计和哲学。

4. 结束语

Ray是一个值得关注的分布式框架。因为AI系统不可避免的在走向实时性,会变得更加复杂和苛刻。传统的大数据框架和AI框架都不是能够完整解决这些问题的工具。而Ray却有点机会。

 

未经允许不得转载:大自然的搬运工 » Ray:一个值得关注的分布式计算框架

赞 (1)

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址