

Basic Kubeflow Pipelines From Scratch
Modelingposted by ODSC Community June 15, 2022 ODSC Community

Kubeflow is a machine learning toolkit that facilitates the deployment of machine learning projects on Kubernetes. Although quite recent, Kubeflow is becoming increasingly present in tech companies’ stack, and getting started with it can be quite overwhelming for newcomers due to the scarcity of project archives.
Even though Kubeflow’s documentation is far from lacking, it is always helpful to have a helping hand when you create a machine learning pipeline from scratch.
I will do my best to be that helping hand.
In this guide, we will go through every step that is necessary to have a functioning pipeline. You will learn how to :
- Create a Kuberneter cluster
- Install Kubeflow
- Create a container registry
- Build a container image and push it to your registry
- Give Kubeflow access to your S3 buckets
- Create Kubeflow components with input and output artifacts
- Create a Kubeflow pipeline, upload it and run it
AWS — Elastic Kubernetes Service
I will focus here on making a pipeline from scratch using AWS EKS, which is a service to create Kubernetes clusters like Google’s GKE.
Create an EKS cluster and install Kubeflow
Although you can create an EKS cluster from the AWS Management Console by typing “EKS” on the search bar and selecting “Elastic Kubernetes Service”, I strongly recommend you follow this Kubeflow guide which includes the cluster creation step, done directly with command lines from your computer’s terminal. Following this will ensure that your cluster has the right settings to install Kubeflow later on.
Be sure not to miss the following prerequisites :
- Install kubectl
- Install the AWS Command Line Interface (CLI)
- Configure the AWS CLI by running the following command:
aws configure
. - Enter your Access Keys (Access Key ID and Secret Access Key).
- Enter your preferred AWS Region and default output options.
- Install eksctl and the aws-iam-authenticator.
Do not hesitate to change your AWS region (like “eu-west-1”) and to select more powerful EC2 instances. I personally chose p2.xlarge instances.
My configuration would, for instance, be :
export AWS_CLUSTER_NAME=kubeflow-demo
export AWS_REGION=eu-west-1
export K8S_VERSION=1.18
export EC2_INSTANCE_TYPE=p2.xlarge
As of January 2022, Kubeflow does not install properly on newest version of Kubernetes. I strongly recommend keeping the K8S_VERSION to 1.18.
If you’ve followed the guide up until the “Access Kubeflow central dashboard” chapter, the command kubectl get ingress -n istio-system
should return an address that looks like this :
123-istiosystem-istio-2af2-4567.us-west-2.elb.amazonaws.com
Copy and paste this address to your browser. As specified in the guide, if you’re using basic authentication, the credentials are the ones you specified in the configuration file, or the default (admin@kubeflow.org
:12341234
).
You should now have access to your Kubeflow Central Dashboard :
Kubeflow Central Dashboard. Image by author
Create a container registry
Using Kubernetes means using Container images, so let’s leave the dashboard and create a container registry.
Go back to your AWS Management Console and type “ECR” in the search bar and click on “Elastic Container Registry”. From there, select “Repositories” on the left bar, and click on “Create repository”.
ECR Menu. Image by author
Once created, a new repository will appear on your menu. Note that it has a URI that looks like this: <ID>.dkr.ecr.<REGION>.amazonaws.com/<REGISTRY_NAME>
Where <ID> is your account ID, <REGION> is your registry’s region (for instance eu-west-1), and <REGITSTRY_NAME> is the name you gave your registry when you created it.
Each repository has a URI and so does each image that you push on it. This is the link we will need later on in this article when we take a look at component creation.
Check if your cluster’s worker nodes have access to ECR
Go to EKS on your AWS Management console and select “Clusters”. Click on the cluster you previously created and go to the “Configure” tab, and then go to the “Compute” sub-tab :
You should see a node group like this. Image by author
Click on your node group, and on the “Details” tab, click on “Node IAM Role ARN”. This opens your nodes’ IAM Management Console. Make sure that the “AmazonEC2ContainerRegistryReadOnly” policy is attached. If it is not, add it by clicking on “Attach policies” :
Image by author
Install Docker
If you already have Docker installed, go to the next part. If not, go to the Docker main page and install Docker Desktop.
The easiest way to do so on Linux or Mac is to type the command
brew install Docker
on your terminal.
Create a container
On your computer, create a folder on which you will work from now on. It can be a GitHub repository folder if you want to version your work.
Create a file and name it “Dockerfile”. On linux and mac, you can type the touch Dockerfile
command for that. Open it with any IDE you want, or even Notepad (Windows) or TextEdit (Mac).
In the Dockerfile, we’ll declare a parent image (here I’ll take ubuntu:20.04), install pip, then install some packages we’ll need to run our scripts. Our Dockerfile will look like this :
FROM ubuntu:20.04
RUN set -xe \
&& apt-get update -y \
&& apt-get install -y python3-pip \
&& apt install -y python-is-python3
RUN pip3 — no-cache-dir install numpy scikit-learn pandas boto3
Now build an image with this command, and change <image_name> to any name you want to give your image. Don’t forget the dot at the end !
docker build -t <image_name> .
Push your image to your container registry
Retrieve an authentication token and authenticate your Docker client to your registry. Use the AWS CLI :
aws ecr-public get-login-password --region <REGION>| docker login --username AWS --password-stdin <ECR_URI>
Change <REGION> to your repository’s region (eg. eu-west-1) and <ECR_URI> should look like this :
<ACCOUNT_ID>.dkr.ecr.<REGION>.amazonaws.com
for private registries and
public.ecr.aws/<REGISTRY_ID>/
for public registries
Then, tag your image so you can push the image to this repository:
docker tag <image_name>:latest <ECR_URI>/<REGISTRY_NAME>:latest
Run the following command to push this image to your newly created AWS repository:
docker push <ECR_URI>/<REGISTRY_NAME>:latest
If you have any issue, you can get all your commands on your registry’s menu by clicking on “View push commands”
Image by author
Setting up a test dataset on S3
Our basic pipeline will start with downloading a zipped dataset from S3.
We’ll use this kaggle dataset as an example but any zipped csv file will do.
On the AWS Management console, type “S3” on the search bar to access the service, and click on “Create bucket”. Once created, click on you bucket and upload you zipfile on it.
Give Kubeflow access to S3
Go back to your Kubeflow Central Dashboard and click on “Notebook Servers” and click on “New Server”. This might take several minutes.
Once created, click on “Connect” to open Jupyterlab and open a Terminal instance.
Type aws configure
and type your credentials as you did during step 1’s prerequisites to log in to aws from your Kubeflow instance. This will enable you access to your S3 buckets from your scripts.
Test if you have access to S3
On Jupyterlab, open a new Notebook.
import boto3 conn = boto3.client('s3') contents = conn.list_objects(Bucket=bucket_name)['Contents'] for f in contents: print(f['Key'])
In this code snippet, <bucket_name> is the name of your bucket.
If you run this cell, you should see your zipfile, as well as any other file in any other subfolder. If you want to explore a specific subfolder, change the third line to :
contents = conn.list_objects(Bucket=<bucket_name>, Prefix=<subfolder_name>)['Contents']
Create the Kubeflow pipeline
We’ll do a simple pipeline that downloads our zipfile from our S3 bucket, uploads the unzipped csv files to the bucket, and reads one of the datasets with pandas
Step 1. Create the python functions
We’ll create the unzipping function :
import zipfile import os def unzip_func(bucket_zipfile_path): """Unzips a file. Parameters: bucket_zipfile_path : str, path to the zipfile in the bucket """ # Set up connection s3_resource = boto3.client('s3') path_bucket = "bucket_name" # Change it to your bucket name path_to_move_file = "destination_subfolder/" # Change it to your # destination subfolder # Create local targets os.makedirs("./data", exist_ok=True) os.makedirs("./unzipped_data", exist_ok=True) # Download zipfile boto3.resource('s3').Object(path_bucket, bucket_zipfile_path).\ download_file(Filename="./data/zipfile.zip") # Unzip for zip in os.listdir("./data"): with zipfile.ZipFile(os.path.join("./data", zip), 'r') as file: file.extractall("./unzipped_data") # Upload for file in os.listdir("./unzipped_data"): output_path = path_to_move_file + file s3_resource.upload_file(os.path.join("./unzipped_data", file), path_bucket, output_path) # Return S3 path to last extracted csv file return output_path
Then the reader function :
import pandas as pd from io import StringIO def csv_s3_reader(bucket_name, csv_path, sep=";", decimal=",", encoding="utf-8"): """Reads a csv file from S3. Parameters: bucket_name : str, S3 bucket name, csv_path : str, path to your csv file in your bucket sep : str, parameter for pd.read_csv(), decimal : str, parameter for pd.read_csv(), encoding : str, parameter for pd.read_csv() Change sep, decimal and encoding depending on your csv's formatting """ # Set up connection and download file csv_obj = boto3.client('s3').get_object(Bucket=bucket_name, Key=csv_path) body = csv_obj['Body'] csv_string = body.read().decode(encoding) # Read csv df = pd.read_csv(StringIO(csv_string), sep=sep, decimal=decimal, error_bad_lines=False, encoding=encoding) return df
In our pipeline, the “csv_path” parameter of “csv_s3_reader()” will be the output string of “unzip_func()”.
You can now test if your functions work.
Step 2. Reformat python functions to component functions
To make them understandable by Kubeflow, we rewrite the functions like this:
import kfp import kfp.components as comp # First function def unzip_files(bucket_zipfile_path : str, output_string : comp.OutputPath(str)) : import boto3 import os from io import BytesIO import zipfile # Set up connection s3_resource = boto3.client('s3') path_bucket = "bucket_name" # Change it to your bucket name path_to_move_file = "destination_subfolder/" # Change it to your # destination subfolder # Create local targets os.makedirs("./data", exist_ok=True) os.makedirs("./unzipped_data", exist_ok=True) # Download zipfile boto3.resource('s3').Object(path_bucket, bucket_zipfile_path).\ download_file(Filename="./data/zipfile.zip") # Unzip for zip in os.listdir("./data"): with zipfile.ZipFile(os.path.join("./data", zip), 'r') as file: file.extractall("./unzipped_data") # Upload for file in os.listdir("./unzipped_data"): output_path = path_to_move_file + file s3_resource.upload_file(os.path.join("./unzipped_data", file), path_bucket, output_path) # Create Artifact : S3 path to last extracted csv file with open(output_string, 'w') as writer: writer.write(output_path) # 2nd function def csv_s3_reader(bucket_name : str, csv_path : comp.InputPath(str), sep : str, decimal : str, encoding : str, output_csv:comp.OutputPath('CSV')): from io import StringIO import pandas as pd import boto3 import os # Set up connection and download file with open(csv_path, 'r') as reader: line = reader.readline() csv_obj = boto3.client('s3').get_object(Bucket=bucket_name, Key=line) body = csv_obj['Body'] csv_string = body.read().decode(encoding) # Read csv df = pd.read_csv(StringIO(csv_string), sep=sep, decimal=decimal, error_bad_lines=False, encoding=encoding) df.to_csv(output_csv, index=True, header=True)
As you can see, you need to declare the type of every parameter for your functions. Since Kubeflow pipelines deal with Artifacts, instead of returning an object, we act as if we write it on disk, except that instead of a path, we give a comp.OutputPath() object.
For instance, if you want to write a pandas dataframe to your computer, you would write df.to_csv(path_name)
. In Kubeflow, this becomes df.to_csv(output_path_object)
where output_path_object
is defined in the function’s parameters as comp.OutputPath(‘CSV')
.
You handle Kubeflow inputs the same way, by reading comp.InputPath()
objects as if they were written on your disk.
You might have noticed that these functions have imports inside them, and we’ll get to that soon when we talk about components.
Step 3. Convert functions to components
You can create a component from a function with kfp.components.create_components_from_func
, and we’ll pass it 2 parameters : the function to convert and the base image it should run on when the component is called :
# Link to your container image base_img = "<REGISTRY_URI>:latest" # Change to your registry's URI # Create first OP unzip_files_op = kfp.components.create_component_from_func(unzip_files, base_image=base_img) # Create second OP csv_s3_reader_op = kfp.components.create_component_from_func(csv_s3_reader, base_image=base_img)
This is why some packages were imported inside the functions. boto3
, for instance is not natively installed on Kubeflow’s environment like kfp
is. Therefore, importing it too early will return an error ; it should be instead imported in the function since it will run on a container image that has boto3
installed.
Step 4. Creating the pipeline
A pipeline describes the succession of components to call and the parameters to pass them. It takes the form of a function that has a kfp.dsl.pipeline()
decorator.
@kfp.dsl.pipeline( name='Give a name to your pipeline', description='Give it a description too' ) def unzip_and_read_pipeline( bucket_zipfile_path, bucket_name, sep, decimal, encoding ): # Call the first OP first_task = unzip_files_op(bucket_zipfile_path) # Call the second OP and pass it the first task's outputs second_task = csv_s3_reader_op(bucket_name, first_task.outputs['output_string'], sep, decimal, encoding)
Step 5. Full pipeline
The full pipeline should look like this. Copy and paste it to a Jupyter notebook cell and don’t forget to change some lines depending on your bucket name and container image URIs:
%%writefile ./test_pipeline.py import kfp import kfp.components as comp # First function def unzip_files(bucket_zipfile_path : str, output_string : comp.OutputPath(str)) : import boto3 import os from io import BytesIO import zipfile # Set up connection s3_resource = boto3.client('s3') path_bucket = "bucket_name" # Change it to your bucket name path_to_move_file = "destination_subfolder/" # Change it to your # destination subfolder # Create local targets os.makedirs("./data", exist_ok=True) os.makedirs("./unzipped_data", exist_ok=True) # Download zipfile boto3.resource('s3').Object(path_bucket, bucket_zipfile_path).\ download_file(Filename="./data/zipfile.zip") # Unzip for zip in os.listdir("./data"): with zipfile.ZipFile(os.path.join("./data", zip), 'r') as file: file.extractall("./unzipped_data") # Upload for file in os.listdir("./unzipped_data"): output_path = path_to_move_file + file s3_resource.upload_file(os.path.join("./unzipped_data", file), path_bucket, output_path) # Create Artifact : S3 path to last extracted csv file with open(output_string, 'w') as writer: writer.write(output_path) # 2nd function def csv_s3_reader(bucket_name : str, csv_path : comp.InputPath(str), sep : str, decimal : str, encoding : str, output_csv:comp.OutputPath('CSV')): from io import StringIO import pandas as pd import boto3 import os # Set up connection and download file with open(csv_path, 'r') as reader: line = reader.readline() csv_obj = boto3.client('s3').get_object(Bucket=bucket_name, Key=line) body = csv_obj['Body'] csv_string = body.read().decode(encoding) # Read csv df = pd.read_csv(StringIO(csv_string), sep=sep, decimal=decimal, error_bad_lines=False, encoding=encoding) df.to_csv(output_csv, index=True, header=True) # Link to your container image base_img = "public.ecr.aws/f6t4n1w1/public_test:latest" # Change to your registry's URI # Create first OP unzip_files_op = kfp.components.create_component_from_func(unzip_files, base_image=base_img) # Create second OP csv_s3_reader_op = kfp.components.create_component_from_func(csv_s3_reader, base_image=base_img) @kfp.dsl.pipeline( name='Give a name to your pipeline', description='Give it a description too' ) def unzip_and_read_pipeline( bucket_zipfile_path, bucket_name, sep, decimal, encoding ): # Call the first OP first_task = unzip_files_op(bucket_zipfile_path) # Call the second OP and pass it the first task's outputs second_task = csv_s3_reader_op(bucket_name, first_task.outputs['output_string'], sep, decimal, encoding)
The %%writefile ./test_pipeline.py
line means that running the cell saves this script as test_pipeline.py on your current directory.
You then convert the python pipeline to YAML with this command on your jupyter notebook :
%%sh
dsl-compile --py test_pipeline.py --output test_pipeline.yaml
Upload Pipeline to Kubeflow
On Kubeflow’s Central Dashboard, go to “Pipelines” and click on “Upload Pipeline”
Pipeline creation menu. Image by author
Give your pipeline a name and a description, select “Upload a file”, and upload your newly created YAML file. Click on “Create”. You should see your pipeline in your list of pipelines. Select it, and you should see a diagram like this :
Pipeline Summary. Image by author
Running the pipeline
Click on “Create Run”. You will need to chose — or create — an experiment to run your pipeline on. You will also need to specify your pipeline’s parameters, which are those that you defined in your unzip_and_read_pipeline
function.
Note that string parameters have to be specified without the inverted commas. my_bucket_name
will be correctly processed whereas 'my_bucket_name'
won’t !
Launch menu. Image by author
Start your experiment.
If all goes well, your run succeeds and you will see the resulting screen :

Experiment Summary. Image by author
If you click on any of your tasks, you will be able to see its logs, input parameters, as well as input and output artifacts.
Here, the second task isn’t really useful, but you could add some data pre-processing instructions to return a cleaned csv file.
And there you have it! A basic Kubeflow pipeline!
Usually, you would like to avoid having to write all your functions in the jupyter notebook, and rather have them on a GitHub repository.
GitHub Actions allows you to build and push a container image to ECR that contains all your scripts, that you can later import when you build your pipeline.
Article originally posted here by Antoine Villatte. Reposted with permission.