fbpx
Dask in the Cloud Dask in the Cloud
When doing data science and/or machine learning, it is becoming increasingly common to need to scale up your analyses to larger... Dask in the Cloud

When doing data science and/or machine learning, it is becoming increasingly common to need to scale up your analyses to larger datasets.

When working in Python and the PyData ecosystem, Dask is a popular tool for doing so. There are many reasons for this, one being that Dask composes well with all of the PyData tools. It’s a simple system designed to parallelize any PyData library.

When beginning to work with a larger dataset, you’ll first want to scale up your analysis to leverage all the cores of a single workstation.

After this, you may need to scale out your computation to leverage a cluster on the cloud (AWS, Azure, or Google Cloud Platform, for example).

In this post, we

  • demonstrate a common pattern in data science workflows using pandas,
  • show how we can scale it up using Dask to harness the cores of a single workstation, and
  • Show how we can scale it out to the cloud using Coiled Cloud.

You can also find all the code here on github.

Note: you should always try to reason about whether you actually need to scale out your computation. For example, before doing so, perhaps you could make your pandas code more efficient. If you’re doing machine learning, plot learning curves to make sure that including more data will actually result in improving your model.

PANDAS: A COMMON PATTERN IN DATA SCIENCE

Here, we introduce a common pattern in data science and show how to perform it using pandas on an in-memory dataset. We’ll check out a 700MB subset of the NYC taxi dataset (which is ~10 GB, in total).

We read in the data and use the groupby DataFrame method to check out the average tip amount, as a function of passenger count:

# Import pandas and read in beginning of 1st file
import pandas as pd
df = pd.read_csv("data_taxi/yellow_tripdata_2019-01.csv")

# Compute average tip as a function of the number of passengers
df.groupby("passenger_count").tip_amount.mean()

This took around 15 seconds on my laptop, which is within my tolerance of time that I’m willing to wait for my analysis.

Next I want to perform exactly the same analysis on the entire dataset.

DASK: SCALING UP YOUR DATA SCIENCE

Recall that the entire dataset is around 10GB, which is more than the available RAM on my laptop, meaning I can’t store it in memory.

I could write a for loop:

for filename in glob("~/data_taxi/yellow_tripdata_2019-*.csv"):
    df = pd.read_csv(filename)
    df.groupby("passenger_count").tip_amount.mean()

This doesn’t take advantage of the multiple cores on my laptop nor is it particularly elegant. Enter Dask for single machine parallelism.

We import several parts of Dask, spin up a local cluster and instantiate a Dask client:

from dask.distributed import LocalCluster, Client
cluster = LocalCluster(n_workers=4)
client = Client(cluster)
client

We then import Dask DataFrame, read in all the data (lazily), and compute the same groupby, as we did with pandas above.

import dask.dataframe as dd

df = dd.read_csv(
    "data_taxi/yellow_tripdata_2019-*.csv",
    dtype={'RatecodeID': 'float64',
       'VendorID': 'float64',
       'passenger_count': 'float64',
       'payment_type': 'float64'}
)

mean_amount = df.groupby("passenger_count").tip_amount.mean().compute()

This takes around 3.5 minutes on my laptop, which is bearable. I suppose. However, if you wanted to do anything even slightly more sophisticated (hint: you usually do), this time would blow up pretty quickly.

So if I had access to a cluster on the cloud, it would be a good idea to use it!

 

Before doing so, let’s note several aspects of what we’ve just done:

  • We’ve used a Dask DataFrame, which is essentially, a large, virtual dataframe divided along the index into multiple Pandas DataFrames.
  • We’re working on a local cluster, consisting of
    • scheduler (which manages and sends the work / tasks to the workers) and
    • workers, which compute the tasks.
  • We’ve instantiated a Dask client, “the user-facing entry point for cluster users.”

What this means is that the client lives wherever you are writing your Python code and the client talks to the scheduler, passing it the tasks.

A common Dask / Jupyter workspace, including the Dask Task Stream and Cluster Map.

COILED: SCALING OUT YOUR DATA SCIENCE

It’s now time to burst to the cloud. If you had access to cloud resources (such as AWS) and knew how to configure Kubernetes and Docker containers, you could get a Dask cluster up and running in the cloud. This would still take a significant amount of time, however.

An alternative is to use Coiled, which we’ll do here. To do so, I’ve also signed into Coiled Cloud, pip installed coiled, and authenticated.  You can do this yourself in a terminal if you want to follow along

pip install coiled --upgrade
coiled login  # redirects you to authenticate with github or google

We then perform our necessary imports, spin up a cluster (which takes about a minute), and instantiate our client:

import coiled
from dask.distributed import LocalCluster, Client
cluster = coiled.Cluster(n_workers=10)
client = Client(cluster)

We can then import our data (this time from s3), and perform our groupby:

import dask.dataframe as dd

# Read data into a Dask DataFrame
df = dd.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv",
    dtype={
        'RatecodeID': 'float64',
       'VendorID': 'float64',
       'passenger_count': 'float64',
       'payment_type': 'float64'
    },
    storage_options={"anon":True}
mean_amount = df.groupby("passenger_count").tip_amount.mean().compute()

This all took under 30 seconds on Coiled Cloud, an order of magnitude less time than it took on my laptop, even for this relatively straightforward analysis.

Note the power of being able to do this set of analyses in a single workflow. There was no need to switch contexts or environments. On top of this, from Coiled, it is straightforward to go back to using Dask on my local workstation or pandas when we’re done. Cloud compute is great when it’s necessary, but can be a burden when it’s not.

DO YOU NEED FASTER DATA SCIENCE?

You too can get started on a Coiled cluster immediately. Coiled also handles security, conda/docker environments, and team management, so you can get back to doing data science. Get started for free today on Coiled Cloud.

Originally posted here.

ODSC Community

ODSC Community

The Open Data Science community is passionate and diverse, and we always welcome contributions from data science professionals! All of the articles under this profile are from our community, with individual authors mentioned in the text itself.

1