fbpx
Basic Kubeflow Pipelines From Scratch Basic Kubeflow Pipelines From Scratch
Kubeflow is a machine learning toolkit that facilitates the deployment of machine learning projects on Kubernetes. Although quite recent, Kubeflow is... Basic Kubeflow Pipelines From Scratch

Kubeflow is a machine learning toolkit that facilitates the deployment of machine learning projects on Kubernetes. Although quite recent, Kubeflow is becoming increasingly present in tech companies’ stack, and getting started with it can be quite overwhelming for newcomers due to the scarcity of project archives.

Even though Kubeflow’s documentation is far from lacking, it is always helpful to have a helping hand when you create a machine learning pipeline from scratch.
I will do my best to be that helping hand.

In this guide, we will go through every step that is necessary to have a functioning pipeline. You will learn how to :

  • Create a Kuberneter cluster
  • Install Kubeflow
  • Create a container registry
  • Build a container image and push it to your registry
  • Give Kubeflow access to your S3 buckets
  • Create Kubeflow components with input and output artifacts
  • Create a Kubeflow pipeline, upload it and run it

AWS — Elastic Kubernetes Service

I will focus here on making a pipeline from scratch using AWS EKS, which is a service to create Kubernetes clusters like Google’s GKE.

Create an EKS cluster and install Kubeflow

Although you can create an EKS cluster from the AWS Management Console by typing “EKS” on the search bar and selecting “Elastic Kubernetes Service”, I strongly recommend you follow this Kubeflow guide which includes the cluster creation step, done directly with command lines from your computer’s terminal. Following this will ensure that your cluster has the right settings to install Kubeflow later on.

Be sure not to miss the following prerequisites :

Do not hesitate to change your AWS region (like “eu-west-1”) and to select more powerful EC2 instances. I personally chose p2.xlarge instances.
My configuration would, for instance, be :

export AWS_CLUSTER_NAME=kubeflow-demo
export AWS_REGION=eu-west-1
export K8S_VERSION=1.18
export EC2_INSTANCE_TYPE=p2.xlarge

As of January 2022, Kubeflow does not install properly on newest version of Kubernetes. I strongly recommend keeping the K8S_VERSION to 1.18.

If you’ve followed the guide up until the “Access Kubeflow central dashboard” chapter, the command kubectl get ingress -n istio-system should return an address that looks like this :
123-istiosystem-istio-2af2-4567.us-west-2.elb.amazonaws.com
Copy and paste this address to your browser. As specified in the guide, if you’re using basic authentication, the credentials are the ones you specified in the configuration file, or the default (admin@kubeflow.org:12341234).

You should now have access to your Kubeflow Central Dashboard :

Kubeflow Central Dashboard. Image by author

Create a container registry

Using Kubernetes means using Container images, so let’s leave the dashboard and create a container registry.

Go back to your AWS Management Console and type “ECR” in the search bar and click on “Elastic Container Registry”. From there, select “Repositories” on the left bar, and click on “Create repository”.

ECR Menu. Image by author

Once created, a new repository will appear on your menu. Note that it has a URI that looks like this: <ID>.dkr.ecr.<REGION>.amazonaws.com/<REGISTRY_NAME>
Where <ID> is your account ID, <REGION> is your registry’s region (for instance eu-west-1), and <REGITSTRY_NAME> is the name you gave your registry when you created it.

Each repository has a URI and so does each image that you push on it. This is the link we will need later on in this article when we take a look at component creation.

Check if your cluster’s worker nodes have access to ECR

Go to EKS on your AWS Management console and select “Clusters”. Click on the cluster you previously created and go to the “Configure” tab, and then go to the “Compute” sub-tab :

You should see a node group like this. Image by author

Click on your node group, and on the “Details” tab, click on “Node IAM Role ARN”. This opens your nodes’ IAM Management Console. Make sure that the “AmazonEC2ContainerRegistryReadOnly” policy is attached. If it is not, add it by clicking on “Attach policies” :

Image by author

Install Docker

If you already have Docker installed, go to the next part. If not, go to the Docker main page and install Docker Desktop.

The easiest way to do so on Linux or Mac is to type the command
brew install Docker on your terminal.

Create a container

On your computer, create a folder on which you will work from now on. It can be a GitHub repository folder if you want to version your work.

Create a file and name it “Dockerfile”. On linux and mac, you can type the touch Dockerfile command for that. Open it with any IDE you want, or even Notepad (Windows) or TextEdit (Mac).

In the Dockerfile, we’ll declare a parent image (here I’ll take ubuntu:20.04), install pip, then install some packages we’ll need to run our scripts. Our Dockerfile will look like this :

FROM ubuntu:20.04
RUN set -xe \
 && apt-get update -y \
 && apt-get install -y python3-pip \
 && apt install -y python-is-python3
RUN pip3 — no-cache-dir install numpy scikit-learn pandas boto3

Now build an image with this command, and change <image_name> to any name you want to give your image. Don’t forget the dot at the end !

docker build -t <image_name> .

Push your image to your container registry

Retrieve an authentication token and authenticate your Docker client to your registry. Use the AWS CLI :

aws ecr-public get-login-password --region <REGION>| docker login --username AWS --password-stdin <ECR_URI>

Change <REGION> to your repository’s region (eg. eu-west-1) and <ECR_URI> should look like this :
<ACCOUNT_ID>.dkr.ecr.<REGION>.amazonaws.com for private registries and
public.ecr.aws/<REGISTRY_ID>/ for public registries

Then, tag your image so you can push the image to this repository:

docker tag <image_name>:latest <ECR_URI>/<REGISTRY_NAME>:latest

Run the following command to push this image to your newly created AWS repository:

docker push <ECR_URI>/<REGISTRY_NAME>:latest

If you have any issue, you can get all your commands on your registry’s menu by clicking on “View push commands”

Image by author

Setting up a test dataset on S3

Our basic pipeline will start with downloading a zipped dataset from S3.
We’ll use this kaggle dataset as an example but any zipped csv file will do.

On the AWS Management console, type “S3” on the search bar to access the service, and click on “Create bucket”. Once created, click on you bucket and upload you zipfile on it.

Give Kubeflow access to S3

Go back to your Kubeflow Central Dashboard and click on “Notebook Servers” and click on “New Server”. This might take several minutes.

Once created, click on “Connect” to open Jupyterlab and open a Terminal instance.
Type aws configure and type your credentials as you did during step 1’s prerequisites to log in to aws from your Kubeflow instance. This will enable you access to your S3 buckets from your scripts.

Test if you have access to S3

On Jupyterlab, open a new Notebook.

import boto3
conn = boto3.client('s3')
contents = conn.list_objects(Bucket=bucket_name)['Contents']
for f in contents:
     print(f['Key'])

In this code snippet, <bucket_name> is the name of your bucket.

If you run this cell, you should see your zipfile, as well as any other file in any other subfolder. If you want to explore a specific subfolder, change the third line to :

contents = conn.list_objects(Bucket=<bucket_name>,
Prefix=<subfolder_name>)['Contents']

Create the Kubeflow pipeline

We’ll do a simple pipeline that downloads our zipfile from our S3 bucket, uploads the unzipped csv files to the bucket, and reads one of the datasets with pandas

Step 1. Create the python functions

We’ll create the unzipping function :

import zipfile
import os
def unzip_func(bucket_zipfile_path):
    """Unzips a file. 
    Parameters:
    bucket_zipfile_path : str, path to the zipfile in the bucket
    """
    # Set up connection
    s3_resource = boto3.client('s3')
    path_bucket = "bucket_name" # Change it to your bucket name
    path_to_move_file = "destination_subfolder/" # Change it to your 
                                                 # destination subfolder
    
    # Create local targets
    os.makedirs("./data", exist_ok=True)
    os.makedirs("./unzipped_data", exist_ok=True)
    
    # Download zipfile
    boto3.resource('s3').Object(path_bucket, bucket_zipfile_path).\
                        download_file(Filename="./data/zipfile.zip")
    
    # Unzip
    for zip in os.listdir("./data"):
        with zipfile.ZipFile(os.path.join("./data", zip), 'r') as file:
            file.extractall("./unzipped_data")
            
    # Upload
    for file in os.listdir("./unzipped_data"):
        output_path = path_to_move_file + file
        s3_resource.upload_file(os.path.join("./unzipped_data", file), 
                                path_bucket, output_path)
        
    # Return S3 path to last extracted csv file
    return output_path

Then the reader function :

import pandas as pd
from io import StringIO
def csv_s3_reader(bucket_name, 
                  csv_path, 
                  sep=";", 
                  decimal=",", 
                  encoding="utf-8"):
  """Reads a csv file from S3.
  Parameters:
  bucket_name : str, S3 bucket name,
  csv_path : str, path to your csv file in your bucket
  sep : str, parameter for pd.read_csv(),
  decimal : str, parameter for pd.read_csv(),
  encoding : str, parameter for pd.read_csv()
  
  Change sep, decimal and encoding depending on your csv's formatting
  """
  
  # Set up connection and download file
  csv_obj = boto3.client('s3').get_object(Bucket=bucket_name, Key=csv_path)
  body = csv_obj['Body']
  csv_string = body.read().decode(encoding)
  
  # Read csv
  df = pd.read_csv(StringIO(csv_string), sep=sep, decimal=decimal, error_bad_lines=False, encoding=encoding)
  
  return df

In our pipeline, the “csv_path” parameter of “csv_s3_reader()” will be the output string of “unzip_func()”.
You can now test if your functions work.

Step 2. Reformat python functions to component functions

To make them understandable by Kubeflow, we rewrite the functions like this:

import kfp
import kfp.components as comp
# First function
def unzip_files(bucket_zipfile_path : str, 
                output_string : comp.OutputPath(str)) :
    
    import boto3
    import os
    from io import BytesIO
    import zipfile
 
    # Set up connection
    s3_resource = boto3.client('s3')
    path_bucket = "bucket_name" # Change it to your bucket name
    path_to_move_file = "destination_subfolder/" # Change it to your 
                                                 # destination subfolder
    
    # Create local targets
    os.makedirs("./data", exist_ok=True)
    os.makedirs("./unzipped_data", exist_ok=True)
    
    # Download zipfile
    boto3.resource('s3').Object(path_bucket, bucket_zipfile_path).\
                        download_file(Filename="./data/zipfile.zip")
    
    # Unzip
    for zip in os.listdir("./data"):
        with zipfile.ZipFile(os.path.join("./data", zip), 'r') as file:
            file.extractall("./unzipped_data")
    
    # Upload
    for file in os.listdir("./unzipped_data"):
        output_path = path_to_move_file + file
        s3_resource.upload_file(os.path.join("./unzipped_data", file), 
                                path_bucket, output_path)
    
    # Create Artifact : S3 path to last extracted csv file    
    with open(output_string, 'w') as writer:
        writer.write(output_path)
        
# 2nd function
def csv_s3_reader(bucket_name : str, 
                  csv_path : comp.InputPath(str), 
                  sep : str, 
                  decimal : str, 
                  encoding : str, 
                  output_csv:comp.OutputPath('CSV')):
    
    from io import StringIO
    import pandas as pd
    import boto3
    import os
    
    # Set up connection and download file
    with open(csv_path, 'r') as reader:
        line = reader.readline()
        csv_obj = boto3.client('s3').get_object(Bucket=bucket_name, Key=line)
    body = csv_obj['Body']
    csv_string = body.read().decode(encoding)
    
    # Read csv
    df = pd.read_csv(StringIO(csv_string), sep=sep, decimal=decimal, 
                     error_bad_lines=False, encoding=encoding)
    
    df.to_csv(output_csv, index=True, header=True)

As you can see, you need to declare the type of every parameter for your functions. Since Kubeflow pipelines deal with Artifacts, instead of returning an object, we act as if we write it on disk, except that instead of a path, we give a comp.OutputPath() object.

For instance, if you want to write a pandas dataframe to your computer, you would write df.to_csv(path_name) . In Kubeflow, this becomes df.to_csv(output_path_object) where output_path_object is defined in the function’s parameters as comp.OutputPath(‘CSV') .

You handle Kubeflow inputs the same way, by reading comp.InputPath() objects as if they were written on your disk.

You might have noticed that these functions have imports inside them, and we’ll get to that soon when we talk about components.

Step 3. Convert functions to components

You can create a component from a function with kfp.components.create_components_from_func, and we’ll pass it 2 parameters : the function to convert and the base image it should run on when the component is called :

# Link to your container image
base_img = "<REGISTRY_URI>:latest" # Change to your registry's URI
# Create first OP
unzip_files_op = kfp.components.create_component_from_func(unzip_files, base_image=base_img) 
# Create second OP
csv_s3_reader_op = kfp.components.create_component_from_func(csv_s3_reader, base_image=base_img)

This is why some packages were imported inside the functions. boto3, for instance is not natively installed on Kubeflow’s environment like kfpis. Therefore, importing it too early will return an error ; it should be instead imported in the function since it will run on a container image that has boto3 installed.

Step 4. Creating the pipeline

A pipeline describes the succession of components to call and the parameters to pass them. It takes the form of a function that has a kfp.dsl.pipeline()
decorator.

@kfp.dsl.pipeline(
    name='Give a name to your pipeline',
    description='Give it a description too'
)
def unzip_and_read_pipeline(
    bucket_zipfile_path, 
    bucket_name,  
    sep, 
    decimal, 
    encoding
):  
    # Call the first OP
    first_task = unzip_files_op(bucket_zipfile_path)
    
    # Call the second OP and pass it the first task's outputs
    second_task = csv_s3_reader_op(bucket_name, 
                                   first_task.outputs['output_string'], 
                                   sep, 
                                   decimal, 
                                   encoding)

Step 5. Full pipeline

The full pipeline should look like this. Copy and paste it to a Jupyter notebook cell and don’t forget to change some lines depending on your bucket name and container image URIs:

%%writefile ./test_pipeline.py
import kfp
import kfp.components as comp
# First function
def unzip_files(bucket_zipfile_path : str, 
                output_string : comp.OutputPath(str)) :
    
    import boto3
    import os
    from io import BytesIO
    import zipfile
 
    # Set up connection
    s3_resource = boto3.client('s3')
    path_bucket = "bucket_name" # Change it to your bucket name
    path_to_move_file = "destination_subfolder/" # Change it to your 
                                                 # destination subfolder
    
    # Create local targets
    os.makedirs("./data", exist_ok=True)
    os.makedirs("./unzipped_data", exist_ok=True)
    
    # Download zipfile
    boto3.resource('s3').Object(path_bucket, bucket_zipfile_path).\
                        download_file(Filename="./data/zipfile.zip")
    
    # Unzip
    for zip in os.listdir("./data"):
        with zipfile.ZipFile(os.path.join("./data", zip), 'r') as file:
            file.extractall("./unzipped_data")
    
    # Upload
    for file in os.listdir("./unzipped_data"):
        output_path = path_to_move_file + file
        s3_resource.upload_file(os.path.join("./unzipped_data", file), 
                                path_bucket, output_path)
    
    # Create Artifact : S3 path to last extracted csv file    
    with open(output_string, 'w') as writer:
        writer.write(output_path)
        
# 2nd function
def csv_s3_reader(bucket_name : str, 
                  csv_path : comp.InputPath(str), 
                  sep : str, 
                  decimal : str, 
                  encoding : str, 
                  output_csv:comp.OutputPath('CSV')):
    
    from io import StringIO
    import pandas as pd
    import boto3
    import os
    
    # Set up connection and download file
    with open(csv_path, 'r') as reader:
        line = reader.readline()
        csv_obj = boto3.client('s3').get_object(Bucket=bucket_name, Key=line)
    body = csv_obj['Body']
    csv_string = body.read().decode(encoding)
    
    # Read csv
    df = pd.read_csv(StringIO(csv_string), sep=sep, decimal=decimal, 
                     error_bad_lines=False, encoding=encoding)
    
    df.to_csv(output_csv, index=True, header=True)
    
# Link to your container image
base_img = "public.ecr.aws/f6t4n1w1/public_test:latest" # Change to your registry's URI
# Create first OP
unzip_files_op = kfp.components.create_component_from_func(unzip_files, base_image=base_img) 
# Create second OP
csv_s3_reader_op = kfp.components.create_component_from_func(csv_s3_reader, base_image=base_img) 
@kfp.dsl.pipeline(
    name='Give a name to your pipeline',
    description='Give it a description too'
)
def unzip_and_read_pipeline(
    bucket_zipfile_path, 
    bucket_name,  
    sep, 
    decimal, 
    encoding
):  
    # Call the first OP
    first_task = unzip_files_op(bucket_zipfile_path)
    
    # Call the second OP and pass it the first task's outputs
    second_task = csv_s3_reader_op(bucket_name, 
                                   first_task.outputs['output_string'], 
                                   sep, 
                                   decimal, 
                                   encoding)

The %%writefile ./test_pipeline.py line means that running the cell saves this script as test_pipeline.py on your current directory.

You then convert the python pipeline to YAML with this command on your jupyter notebook :

%%sh
dsl-compile --py test_pipeline.py --output test_pipeline.yaml

Upload Pipeline to Kubeflow

On Kubeflow’s Central Dashboard, go to “Pipelines” and click on “Upload Pipeline”

Pipeline creation menu. Image by author

Give your pipeline a name and a description, select “Upload a file”, and upload your newly created YAML file. Click on “Create”. You should see your pipeline in your list of pipelines. Select it, and you should see a diagram like this :

Pipeline Summary. Image by author

Running the pipeline

Click on “Create Run”. You will need to chose — or create — an experiment to run your pipeline on. You will also need to specify your pipeline’s parameters, which are those that you defined in your unzip_and_read_pipeline function.

Note that string parameters have to be specified without the inverted commasmy_bucket_name will be correctly processed whereas 'my_bucket_name' won’t !

Launch menu. Image by author

Start your experiment.

If all goes well, your run succeeds and you will see the resulting screen :

Experiment Summary. Image by author

If you click on any of your tasks, you will be able to see its logs, input parameters, as well as input and output artifacts.

Here, the second task isn’t really useful, but you could add some data pre-processing instructions to return a cleaned csv file.

And there you have it! A basic Kubeflow pipeline!

Usually, you would like to avoid having to write all your functions in the jupyter notebook, and rather have them on a GitHub repository.

GitHub Actions allows you to build and push a container image to ECR that contains all your scripts, that you can later import when you build your pipeline.

Article originally posted here by Antoine Villatte. Reposted with permission.

ODSC Community

The Open Data Science community is passionate and diverse, and we always welcome contributions from data science professionals! All of the articles under this profile are from our community, with individual authors mentioned in the text itself.

1