

XGBoost – Frictionless Training on Datasets Too Big for the Memory
ModelingXGBoostposted by ODSC Community February 24, 2021 ODSC Community

Bursting XGBoost training from your laptop to a Dask cluster allows training on out-of-core data, and saves hours of engineering work.
We demo an accessible workflow to run your training on a temporary, Python-native cluster straight from your own notebook or script. Check it out, run it on your own data!
What we’ll cover:
1. Loading and transforming data on a distributed cluster
2. Training XGBoost on Dask
3. How does the distributed XGBoost implementation work
4. Bursting to the cloud as and when needed
5. Visualizing performance of computations on a cluster with Dask Dashboards
Feel free to skip ahead to the part you’re most interested in!
Here is the code we will use, if you’d like to jump right in:
Questions about the code? Join our Community Slack channel.
import coiled import dask.dataframe as dd from dask.distributed import Client, LocalCluster # Create a local Dask Cluster cluster = LocalCluster() # all the power of our local machine # or a Coiled Cloud cluster cluster = coiled.Cluster( n_workers=4, worker_cpu=4, worker_memory='24GiB', # As much as we need software='michal-mucha/xgboost-on-coiled' # Public docker image ) # Connect to the cluster client = Client(cluster) # Load the example dataset sample - specify columns columns = [ "interest_rate", "loan_age", "num_borrowers", "borrower_credit_score", "num_units" ] categorical = [ "orig_channel", "occupancy_status", "property_state", "first_home_buyer", "loan_purpose", "property_type", "zip", "relocation_mortgage_indicator", "delinquency_12" ] # Download data from S3 mortgage_data = dd.read_parquet( "s3://coiled-data/mortgage-2000.parq/*", compression="gzip", columns=columns + categorical, storage_options={"anon": True} ) # Cache the data on Cluster workers mortgage_data = mortgage_data.persist() # Cast categorical columns to the correct type from dask_ml.preprocessing import Categorizer ce = Categorizer(columns=categorical) mortgage_data = ce.fit_transform(mortgage_data) for col in categorical: mortgage_data[col] = mortgage_data[col].cat.codes # Create the train-test split from dask_ml.model_selection import train_test_split X, y = mortgage_data.iloc[:, :-1], mortgage_data["delinquency_12"] X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.1, shuffle=True, random_state=2 ) # Create the XGBoost DMatrix import xgboost as xgb dtrain = xgb.dask.DaskDMatrix(client, X_train, y_train) # Set parameters params = { "max_depth": 8, "max_leaves": 2 ** 8, "gamma": 0.1, "eta": 0.1, "min_child_weight": 30, "objective": "binary:logistic", "grow_policy": "lossguide" } # train the model %%time output = xgb.dask.train( client, params, dtrain, num_boost_round=5, evals=[(dtrain, 'train')] ) booster = output['booster'] # booster is the trained model history = output['history'] # A dictionary containing evaluation # Set down the cluster client.close()
When in doubt, use XGBoost
– Owen Zhang, Kaggle Grandmaster
XGBoost reliably wins Kaggle competitions, is fast, and easy to use. It wins in business applications, which makes it a very popular model, with a great community and many exciting deployment stories.
All you really need to get some machine learning goodness flowing is the library, a computer to run it on, and a tabular dataset.
We’re all set if our data fits in memory. But what if it doesn’t? This is probably why you’re reading this post!
XGBoost works best with the entire dataset available during training.
We would usually have a set of solutions with very limited appeal – using swap memory, limiting our data, buying hardware, spending days on a data streaming harness..
Friction, delay, complexity.
There must be a better way!
Swap memory is slow!
Enter Dask – Distributed XGBoost on a Cluster
Why a cluster? With a cluster, you can pool the memory of a lot of machines, which is usually cheaper (or less impossible) than getting a suitable single machine.
Companies like Walmart and institutions like Harvard Medical School use Dask to work on very large problems with quick iteration cycles. Dask scales native Python code – we can use it to easily run our XGBoost work on larger data!
Official integration
A year ago, in February 2020, the XGBoost team released an official integration with Dask. An official integration means faster introduction of bleeding-edge features, as well as the recognition and support for the community of Dask users.
This incredibly useful combo was already available through extensions such as dask-xgboost and the scikit-learn API – both very familiar to many Python data practitioners. In this post, we will use the new, official integration. If you prefer the sklearn API, we have you covered with a notebook example over here!
Predicting loan delinquency
In this post, we will use `xgboost.dask` to train a model to predict loan delinquency on the Fannie Mae Single-Family Loan Performance Data prepared by the team at NVIDIA RAPIDS.
The entire dataset consists of 17 years (2000-2016) of monthly records, with the total uncompressed CSV size reaching close to 200GB. We will be using an 8GB sample – one year of data – to build out code that can successfully run on the entire history. The dataset sample is saved in the efficient parquet format. The original CSVs are available here.
From local to distributed
We will start locally with a sample, then scale out to use the full power of a local machine, and finally expand to a distributed Dask cluster in the cloud.
We would love to see you apply distributed XGBoost to a dataset that’s useful to you.
If you’d like to try, check out our scaling-xgboost example notebook, swap in your dataset, and see how well it does!
Let’s start by launching a local Dask cluster
import dask.dataframe as dd from dask.distributed import Client, LocalCluster cluster = LocalCluster(n_workers=8) client = Client(cluster)
When working with big datasets, I like to start out on a small sample in JupyterLab on my local machine. Using Dask, I can design the entire workflow using a local cluster.
When I need more power, all it takes is to swap out my local cluster for a bigger one.
Loading the dataset from the cloud, I pin it to cluster memory – I will need it for next steps:
mortgage_data = dd.read_parquet( "s3://coiled-data/mortgage-2000.parq/", compression="gzip", columns=columns, storage_options={"anon":True} ) mortgage_data = mortgage_data.persist()
When building the workflow, I need to account for the dynamics of running code on a distributed cluster. For example, network transfer cost between the scheduler, workers, and the client is not a factor in local development. Things change when I move out of my computer.
Distributed Preprocessing with Dask-ML
The dataset in this demo comes with pre-built features, but the model I want to use has a data type restriction – only float, integer, and boolean features are accepted.
As some of the columns contain categorical data provided as strings, I need to transform them.
Dask-ML comes in very handy, with a friendly API that will make any scikit-learn user feel at home. For my use case, the `Categorizer` object is an excellent match.
from dask_ml.preprocessing import Categorizer ce = Categorizer(columns=categorical)
I initialize the object with a prepared list of categorical columns. Then, I am ready to transform my data.
At this point, we could add more features, using Dask to parallelize the pandas work across all the cores on my computer or cluster.
Before jumping into model training, I will use Dask-ML to split my distributed dataframe into training and validation sets.
from dask_ml.model_selection import train_test_split X, y = mortgage_data.iloc[:, 1:], mortgage_data["delinquency_12"] X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.1, shuffle=True, random_state=2 )
From small data to big data in one line of code
The official integration with Dask released by the XGBoost team makes distributed training on a Dask cluster as simple as typing `xgboost.dask.train` instead of the local `xgboost.train`.
It also provides a distributed variant of its internal data structure, the DMatrix, which can be built directly from a distributed Dask dataframe.
This operation will trigger materialization of the lazily scheduled Dask DF operations:
import xgboost as xgb dtrain = xgb.dask.DaskDMatrix(client, X_train, y_train)
Let’s train XGBoost to predict loan performance: