Autodesk and Outerbounds Partner to Open Source Ray and HPC Integration in Metaflow

riley_hun
Autodesk
25022 Views
11 min 5 sec read

We are positioning Metaflow as a foundational part of our AI infrastructure on AWSHailing from Netflix, Metaflow emerged as a compelling ML platform that was released as open source in 2019. Metaflow sets itself apart from many other ML tools by placing a stronger emphasis on human-centric engineering design and optimizing for user experience across the entire ML stack.

riley_hun_0-1695238336779.pngAlthough Metaflow can be leveraged by Data Scientists and Machine Learning Engineers across the entire ML stack, we will be focusing strictly on its compute layer. As of August 2023, Metaflow offers two distinct options for its compute layer today: AWS Batch, a managed AWS service that facilitates autoscaling through ECS and container-based jobs, and Kubernetes. Metaflow makes it trivial for us to access an incredibly large amount of compute on AWS Batch for our ML workloads allowing our data scientists to be fully self-served. We have been very pleased with how simple yet robust Metaflow has proven to be. A key consideration for us has always been the remarkable adaptability of Metaflow, especially in handling the challenges associated with distributed training of large-scale models. As such, we wanted to work on ensuring that it was sufficiently future-proof for our AI needs by integrating it with other frameworks in the space.

 

Enter Ray

 

Ray is an open source distributed computing framework that has gained a lot of traction in recent years, particularly due to its substantial role in training ChatGPT. Its primary utility lies in accelerating and streamlining ML workloads and training tasks, and it provides a scalable and efficient platform for large-scale data processing. Ray is a massive framework with a lot of bells and whistles that cover many facets of the end-to-end ML workflow. Some of its capabilities include:

  • Ray Core: serves as the foundation of the entire Ray ecosystem. It provides the building blocks for developing distributed computing applications and functions.
  • Ray Train: a module dedicated towards distributed training of ML models enabling users to leverage distributed data parallelism and other distributed training techniques. It supports popular ML frameworks such as Tensorflow, PyTorch, and HuggingFace.
  • Ray Tune: a library geared towards streamlining and distributing the process of hyper-parameter tuning for ML models. It offers many search algorithms like HyperOpt and Optuna.

We immediately recognized Ray’s potential as a pivotal element of our training infrastructure for distributed computing. We discovered that Ray is designed for extensibility and both Metaflow and Ray greatly complement each other. For example, AWS Batch operates at the infrastructure level, setting up the driver and workers, while Ray functions at the framework level, making efficient use of the underlying hardware. Armed with this newfound understanding and the realization that Ray could fundamentally be installed on top of AWS Batch, we endeavoured to integrate Ray with Metaflow. This integration would allow users to seamlessly run Ray applications within their Metaflow flows, creating a solution that combines the strengths of both frameworks.

 

Distributed Computing Using Ray Orchestrated By Metaflow

 

Metaflow already lays the groundwork for enabling distributed computing through gang-scheduled clusters. The parallel decorator already exists in Metaflow and can be used to run remote Metaflow tasks on an AWS Batch multi-node parallel job. We collaborated with Outerbounds, the team that developed Metaflow, and worked from their existing abstractions to introduce the ray_parallel decorator to run distributed Ray workloads.

 

Using  ray_parallel, Metaflow initiates the underlying hardware for the cluster components using AWS Batch multi-node. Subsequently, users can then embed their Ray code within a Metaflow step. At runtime, Metaflow will automatically launch a transient Ray cluster, run the Ray application specified in the Metaflow ray_parallel task, and then tear down the cluster automatically upon completion of the ray_parallel task.

 

Our implementation of the ray_parallel decorator does all the underlying setup for installing Ray on an on-premise cluster discussed in the docs here. A setup_distributed_env method extends the base parallel decorator’s functionality and ensures that a ray start command is executed for the head node on the ray_parallel control task. Correspondingly, each of the worker nodes runs the ray start command to connect to the head node, which forms the Ray cluster.

 

riley_hun_0-1695239075147.png

Deeper Dive into the ray_parallel decorator

 

We made some other considerations while developing ray_parallel, particularly to enhance the user experience. Most prominently, one challenge we discovered was that the worker nodes created by AWS Batch multi-node parallel jobs would prematurely terminate without coordinating with the head node upon receiving a ray application to execute. For instance, when a user submits their training code to the control task within the Metaflow step, the AWS Batch worker nodes themselves would have nothing to execute, so they complete their tasks prematurely and exit the Ray cluster. To address this challenge, it became necessary to ensure that the worker nodes stayed alive and could communicate with the head node to determine when to complete their respective tasks.

 

To tackle this aforementioned issue, we configure the workers to perform a time.sleep and continuously poll for a signal indicating completion of the Ray job. We leverage S3 to exchange information between worker tasks and the control tasks. For example, we configure the control task to write the job completion signal to S3. All of the worker nodes will continue to poll S3 until this task completion variable is retrieved, whereupon, the worker tasks conclude their execution. This approach ensures the worker nodes stay active and coordinate with the control task within the distributed environment.

 

riley_hun_0-1695241790331.png

We also wanted to ensure that the expected number of nodes have formed the Ray cluster before the Metaflow step executes ray.init(). We accomplish this in the control task, where it continuously polls until all workers have joined the Ray cluster. When the workers complete their respective ray start command and are connected to the head node, they write a signal to S3 indicating that the node has started. The control task consistently reads these signals from S3 until there is confirmation that all nodes have been successfully initiated.

riley_hun_0-1695244562521.png

One critical choice we had to make was deciding whether or not to terminate the entire Metaflow run if a worker left the Ray cluster or crashed due to some unforeseen error. In the end, we decided that if a node encountered an exception, the Metaflow run should exit in response. As such, we implemented a NodeParticipationWatcher class which monitors the participation of nodes in the control task. It starts a thread that continuously polls for the total number of nodes in the Ray cluster and ensures that it equals the expected number of nodes. If the number of participating nodes is less than the expected number of nodes, the entire Metaflow run terminates.

riley_hun_1-1695244630188.png

 

Battle Testing ray_parallel

 

We made sure to test the ray_parallel decorator rigorously across a wide variety of different Ray applications, including distributed training of an XGBoost model, fine-tuning the GPT-J 6 Billion parameter model, and hyper-parameter tuning with Ray Tune. For fine-tuning an LLM, we reproduced this example  from the Ray docs. Here is what we did to set this example up on Metaflow.

 

We install metaflow-ray via pip which gives us access to the ray_parallel decorator.

 

pip install metaflow-ray

 

We import ray_parallel

 

from metaflow import ray_parallel

 

We then use Metaflow’s for_each to specify the number of Ray workers we intend to have in our Ray cluster, and then we decorate our train step with ray_parallel. The example in the Ray docs used 16 nodes, so we do the same.

 

RESOURCES = {"gpu": 1, "cpu": 8, "memory": 60000}

class RayDeepspeedFlow(FlowSpec):
  num_workers = Parameter(
        "num_workers", help="Number of nodes in cluster", default=16
    )

  @Anonymous
  def start(self):
    self.next(self.train, num_parallel=self.num_workers)
  
  @Anonymous_parallel
  @Anonymous(**RESOURCES)
  @Anonymous
  def train(self):
    import ray
    ray.init()
    
    # insert Transformers Trainer code here

 

 

One neat thing is we can also leverage Metaflow’s means of handling external dependencies through the conda decorator, and these packages will propagate to the Ray workers as well, in addition to the rest of the feature set that Metaflow offers.

 

Upon executing this flow, 16 g5.4xlarge instances were provisioned by Metaflow to form the Ray cluster. Our test used a batch size of 16, so the effective batch size was 256 across all 16 nodes — the training job was completed in just under 50 minutes, not including initialization time and checkpoint synchronization. Ray makes checkpointing a breeze, so all we needed to do was set an S3 uri as a Metaflow parameter and feed this into Ray’s HuggingFace Trainer. We used the Metaflow GUI to monitor the Ray training job and track execution time. We found the UI very useful for displaying and collating the logs outputted by the Ray workers. We could even take advantage of Metaflow’s card decorator to plot some gpu profiling visualizations to the UI.

riley_hun_2-1695245899634.png

 

For a complete example of this flow, please visit this link here. There are tons of other examples of using ray_parallel that you can peruse here, such as an end-to-end batch prediction workflow that forms a model registry using Metaflow’s built-in artifact management to version and access Ray Checkpoints  ready to use with Ray Serve.

 

High Performance Computing

 

Once we achieved interoperability between Ray and Metaflow, a new hurdle emerged, which was ensuring that the Ray cluster could appropriately scale to hundreds or even thousands of workers.   When we think about a Ray cluster being comprised of numerous worker nodes that may be dispersed across various network locations, one major bottleneck that we needed to prepare for is inter-node communication.  Aside from that fact that these nodes need to synchronize their updates to the model when performing distributed training, the communication between nodes involves sending data back and forth over a network. If high latency is present, it leads to delays in data transmission, which ultimately impedes the overall training progress. Additionally, in the context of large-scale distributed training, the volume of data being exchanged between nodes can be substantial. Without a network optimized to handle such loads, it can further hinder the efficiency of the training cycle. In order to address these challenges, we set out to fortify Metaflow with high-performance computing (HPC) capabilities.

 

Recognizing that the Ray cluster spun up by the ray_parallel decorator would inherit the properties of the AWS Batch gang-scheduled cluster, we looked into leveraging AWS managed HPC infrastructure to complement Metaflow. One contribution we made was the incorporation of elastic fabric adapter, which is a networking feature that can be activated on EC2 instances to improve inter-instance communication. We made improvements to Metaflow's batch decorator, enabling flows to efficiently utilize EFA when running their training applications with the NVIDIA Collective Communications Library (NCCL). Users can now activate EFA within their flows by specifying the number of EFA network devices to attach to the AWS Batch container in the batch decorator, and in doing so, scale their cluster to numerous nodes that achieve optimal inter-node communication with low latency and high throughput.

 

@Anonymous(cpu=X, gpu=Y, memory=Z, efa=N)

 

Our training infrastructure leverages the Deep Learning AMI provided by AWS which comes with the EFA driver installed. We then configure a cluster placement group so that the nodes are physically close together, and add the EFA network interface to the EC2 launch template. Users can attach up to 4 EFA network devices when using a p4d.24xlarge (A100 GPU) instance. After completing this infrastructure set-up, we are ready to run a training job on Metaflow that can tap into EFA. We specify the requisite environment variables and provide a docker image that is EFA-compatible. When the training job is initiated, Metaflow will then mount the EFA device(s) to the container. 

 

@environment(
        vars={
            "NVIDIA_DRIVER_CAPABILITIES": "compute,utility",
            "FI_PROVIDER": "efa",
            "FI_EFA_USE_DEVICE_RDMA": "1"
        }
    )
@ray_parallel
@batch(gpu=X, cpu=Y, memory=Z, efa=4)
@step
def train(self):
   # insert training code here

 

 

To validate that EFA is indeed working,  we can check the logs from the Metaflow UI to confirm that the selected provider is EFA. 

 

Screenshot 2023-09-14 at 11.50.05 AM.png

 

To complete our HPC cluster integration with Metaflow, we adopt Fsx for Lustre, which is a high-performance file system service designed for high-speed, low-latency access to data, reaching throughput levels of hundreds of gigabytes per second. It is worth noting that Metaflow already boasts a high-throughput optimized S3 client that can load data from S3 to memory very quickly, at tens of gigabits per second. However, we sought an alternative scalable solution for retrieving data from S3 without downloading to the EBS volume attached to the training instances at the start of the training job. FSx for Lustre offers a parallel file system that is perfectly suited for handling simultaneous access from multiple nodes within an HPC cluster. Metaflow streamlines the process of connecting to the FSx for Lustre file system. That is, once the FSx for Lustre file system is configured, it is linked to an S3 bucket containing the data, and we have established a mount to the EC2 instance via the launch template, we can use Metaflow's mounting host volume feature to access the data. 

 

@Anonymous(gpu=X, cpu=Y, memory=Z, host_volumes="/fsx/data")
@ray_parallel
@step
def train(self):
   print(os.system("ls /fsx/data/"))

 

 

Final Thoughts

 

The Autodesk Machine Learning Platform is excited about contributing to Metaflow, as we aim to make distributed training and high-performance computing more readily available to the machine learning community. More specifically, the integration between Metaflow and Ray has long been on the wish lists of many Metaflow users. The release of this experimental extension marks a significant milestone, unlocking boundless possibilities for users to run their own Ray applications that are orchestrated by Metaflow. Furthermore, with the inclusion of EFA, we've also elevated Metaflow's capacity to handle massive-scale AI workloads. This achievement is a testament to the collaborative effort between our team, the team at Outerbounds, and the team at Anyscale

Contributors
2 Comments
tonyszhang
Autodesk

Awesome work and clear write-up, @riley_hun!

neal_harrington84892
Community Visitor

What a wonderful exposition of this work. Thank you Riley!