fbpx
Kubeflow MLOps : Automatic Pipeline Deployment with CI/CD/CT Kubeflow MLOps : Automatic Pipeline Deployment with CI/CD/CT
If you already have a functioning Kubernetes cluster with Kubeflow installed on it, you can directly follow this guide. If you... Kubeflow MLOps : Automatic Pipeline Deployment with CI/CD/CT

If you already have a functioning Kubernetes cluster with Kubeflow installed on it, you can directly follow this guide. If you don’t, I strongly recommend checking my previous article.

This time, we’ll go a step further and :

  • Make an advanced pipeline that contains pre-processing, model building, inference and performance evaluation
  • Use GitHub Actions to perform CI and CD
  • Connect GitHub Actions to the Kubeflow endpoint and launch the updated pipeline

The Kubeflow pipeline you will build with this article. Image by author

Source dataset and GitHub Repo

In this article, we’ll use the data from the Seattle Building Energy Benchmarking that can be found on this Kaggle page and build a model to predict the total greenhouse effect gas emissions, indicated by the column TotalGHGEmissions.

This dataset is made of two tables, which are observations from 2015 and 2016. We’ll store them on an S3 bucket on which our pipeline will source itself. If you followed my previous article, your Kubeflow server should have access to your bucket.

All of the code shown in this guide is available in this public repository.

Advanced Pipeline — Step 1 : Components

If you already know how to create Kubeflow components and pipelines, you can directly go to the “Upload the Pipeline” chapter of this article.

Our pipeline will have the following steps :

  • Merge the data from 2015 and 2016 and split the result into a training set and testing set
  • Data cleaning, NaN filling and feature engineering
  • Data preprocessing and preparation
  • Model building : SVM, RandomForest and XGBoost
  • Evaluate results, select best model, train it and save it
  • Predict

Once again, all of these steps’ codes can be found in the kf_utils folder of my repo, but let’s detail one of them so we can have a better understanding of Kubeflow’s “grammar”:

import kfp.components as comp
from typing import NamedTuple
def evaluate_models(bucket : str, 
                     subfolder : str,
                     svm_mse : float, 
                     svm_r2 : float, 
                     svm_hyperparams : dict, 
                     xgb_mse : float, 
                     xgb_r2 : float, 
                     xgb_hyperparams : dict, 
                     rf_mse : float, 
                     rf_r2 : float, 
                     rf_hyperparams : dict
                     )-> NamedTuple('Outputs', 
                                    [
                                      ('best_model', str), 
                                      ('hyperparams', dict)
                                    ]
                                   ):
    
    import pandas as pd
    import matplotlib.pyplot as plt
    import seaborn as sns
    import numpy as np
    import boto3
    
    def easy_bar_plot(x, y, data, figname, order=None, xlab=None, ylab=None, 
                      title=None, grid=True, values_over_bars=True, vob_round=0, 
                      vob_offset=None, vob_rot=None, x_tick_rot=None):
    
        fig, ax = plt.subplots(figsize = (18, 8))
        if order is None:
            order = np.sort(data[x].unique()) 
        sns.barplot(x=x, y=y, data=data, ax=ax, order=order)
        if xlab is not None:
            ax.set_xlabel(xlab, fontsize = 16, fontweight = "bold")
        if ylab is not None:
            ax.set_ylabel(ylab, fontsize = 16, fontweight = "bold")
        if title is not None:
            plt.suptitle(title, fontsize = 18, fontweight = "bold")
        if grid :
            plt.grid(b=True, which='major', axis='both', alpha = 0.3)
        if values_over_bars:
            if vob_offset is None:
                vob_offset = 0.015
            if vob_rot is None:
                vob_rot = 0
            if vob_rot > 0:
                ha="left"
            else:
                ha="center"
            pos=0
            for i, (q, val) in data.iterrows():
                if val != 0:
                    ax.text(pos, 
                            val + vob_offset*data[y].max(), 
                            "{}".format(round(val,vob_round)), 
                            ha=ha, 
                            fontsize = 12, 
                            fontweight = "bold", 
                            rotation=vob_rot, 
                            rotation_mode="anchor")
                pos += 1
        if x_tick_rot is not None:
            plt.xticks(rotation = x_tick_rot, ha="right")
        plt.savefig(figname)
        plt.show()
        
    
    performance_report = {}
    performance_report["SVM"] = {"MSE" : svm_mse, "R2" : svm_r2}
    performance_report["XGB"] = {"MSE" : xgb_mse, "R2" : xgb_r2}
    performance_report["RandomForest"] = {"MSE" : rf_mse, "R2" : rf_r2}
    
    performance_df = pd.DataFrame.from_dict(performance_report, orient="index")
    performance_df = performance_df.reset_index().rename(columns={"index" : "Model"})
    
    hyperparams_dict = {"SVM" : svm_hyperparams, 
                        "XGB" : xgb_hyperparams, 
                        "RandomForest" : rf_hyperparams}
    
    easy_bar_plot(x="Model", y="R2", data=performance_df[["Model", "R2"]],
                  figname = "./model_performance_R2.png",
                  order=performance_df["Model"], 
                  xlab="Model", 
                  ylab="R2 score", 
                  title="R2 score by Model", 
                  grid=True, 
                  values_over_bars=True, 
                  vob_round=3, 
                  vob_offset=None, 
                  vob_rot=None, 
                  x_tick_rot=None)
    
    easy_bar_plot(x="Model", y="MSE", data=performance_df[["Model", "MSE"]], 
                  figname = "./model_performance_MSE.png",
                  order=performance_df["Model"], 
                  xlab="Model", 
                  ylab="MSE", 
                  title="MSE by Model", 
                  grid=True, 
                  values_over_bars=True, 
                  vob_round=3, 
                  vob_offset=None, 
                  vob_rot=None, 
                  x_tick_rot=None)
    
    best_model = performance_df.loc[performance_df["R2"]==performance_df["R2"].max(), 
                                    "Model"].values[0]
    best_models_hyperparams = hyperparams_dict[best_model]
    
    s3_resource = boto3.client('s3')
    s3_resource.upload_file("./model_performance_R2.png", 
                            bucket, 
                            subfolder + "/model_performance_R2.png")
    s3_resource.upload_file("./model_performance_MSE.png", 
                            bucket, 
                            subfolder + "/model_performance_MSE.png")
    
    return (best_model, best_models_hyperparams)
if __name__ == '__main__':
    from kfp.components import create_component_from_func
    base_img = "public.ecr.aws/f6t4n1w1/poc_kf_pipeline:latest"
    evaluate_models_op = create_component_from_func(
        evaluate_models,
        output_component_file='evaluate_models_op.yaml',
        base_image=base_img,
        annotations={
            "author": "Antoine Villatte"
        }
    )

So what’s happening here?

The first thing we do is to import the packages we need to describe the op, and define the evaluate_models() function that will become the op.
The structure is :

def function(input_1 : type, input_2 : type, …) -> Output

Here, the output is a Named Tuple, in which you can declare the several elements that constitute the op’s output. These are the names you will later have to use in your code to save the artifacts.

Note that for classic outputs like csv files, this simplified structure is enough :

import kfp.components as comp
def function(input_1 : type, input_2 : type, …, 
             csv_output : comp.OutputPath('csv')

After that, we import the packages that will be necessary for this component, such as matplotlib. The reason why the imports are made inside of the function and not before are explained in my first article.
I also define my easy_bar_plot function ; note that a good practice would be to have it defined in a module inside of the repo, but here I made it directly in the function for a better readability of the snippet.

The performances and best hyperparameters, which are artifact outputs of the previous Kubeflow compenents, are stored in dictionaries and plotted. The plots are saved and the best model is selected based on the best R2 score.
Since Kubeflow doesn’t yet accept any kind of plot as an artifact output, a workaround solution is to upload the plots to the s3 bucket, which is what we do in lines 116 and 119.

Now, when I defined the function I specified the output as the Named Tuple : NamedTuple('Outputs', [('best_model' : str), ('hyperparams' : dict)]).
Kubeflow will understand that my artifact output is made of 2 elements, and therefore expects the op function to return 2 objects.
In line 123, we do indeed have 2 objects returned :
return (best_model, best_models_hyperparams)
best_model being the first object returned, it will be matched to the first element of the NamedTuple, and automatically considered a string by Kubeflow.

The last block of code is here to convert the python function to the actual Kubeflow op in the form of a yaml file with the create_component_from_func function, which takes as parameters the function to convert, the output path, and a base docker image. Here, I provided a light public image I created, and if you read my previous article, you should know how to make one by now.

In my repo, all of my Kubeflow components (their .py files and the resulting .yaml files) are stored in the kf_utils directory.

Advanced Pipeline — Step 2: from components to pipeline

Once all the components making the above-mentioned steps are created, you can create the following pipeline. I won’t go too much into detail here since I already talked about pipeline creation in my last article.

import kfp
from kfp.components import load_component_from_file
merge_and_split_op = load_component_from_file("./kf_utils/merge_and_split_op.yaml")
preprocess_dataset_op = load_component_from_file("./kf_utils/preprocess_dataset_op.yaml")
prepare_data_op = load_component_from_file("./kf_utils/prepare_data_op.yaml")
train_svm_op = load_component_from_file("./kf_utils/train_svm_op.yaml")
train_randomforest_op = load_component_from_file("./kf_utils/train_randomforest_op.yaml")
train_xgb_op = load_component_from_file("./kf_utils/train_xgb_op.yaml")
evaluate_models_op = load_component_from_file("./kf_utils/evaluate_models_op.yaml")
train_best_model_op = load_component_from_file("./kf_utils/train_best_model_op.yaml")
model_predict_op = load_component_from_file("./kf_utils/model_predict_op.yaml")
@kfp.dsl.pipeline(
   name='Emission prediction pipeline',
   description='An example pipeline.'
)
def emission_pipeline(
    bucket,
    data_2015,
    data_2016,
    hyperopt_iterations,
    subfolder
):
    merge_and_split_task = merge_and_split_op(bucket, data_2015, data_2016)
    preprocess_task = preprocess_dataset_op(merge_and_split_task.outputs['output_edfcsv'])
    preparation_task = prepare_data_op(preprocess_task.outputs['output_cleandatacsv'])
    
    rf_train_task = train_randomforest_op(preparation_task.outputs['output_xtraincsv'],
                                         preparation_task.outputs['output_ytraincsv'],
                                         preparation_task.outputs['output_xtestcsv'],
                                         preparation_task.outputs['output_ytestcsv'],
                                         hyperopt_iterations)
    
    xgb_train_task = train_xgb_op(preparation_task.outputs['output_xtraincsv'],
                                 preparation_task.outputs['output_ytraincsv'],
                                 preparation_task.outputs['output_xtestcsv'],
                                 preparation_task.outputs['output_ytestcsv'],
                                 hyperopt_iterations)
    
    svm_train_task = train_svm_op(preparation_task.outputs['output_xtraincsv'],
                                 preparation_task.outputs['output_ytraincsv'],
                                 preparation_task.outputs['output_xtestcsv'],
                                 preparation_task.outputs['output_ytestcsv'],
                                 hyperopt_iterations)
    
    evaluate_models_task = evaluate_models_op(bucket,
                                              subfolder,
                                              svm_train_task.outputs['MSE'],
                                              svm_train_task.outputs['R2'],
                                              svm_train_task.outputs['hyperparams'],
                                              xgb_train_task.outputs['MSE'],
                                              xgb_train_task.outputs['R2'],
                                              xgb_train_task.outputs['hyperparams'],
                                              rf_train_task.outputs['MSE'],
                                              rf_train_task.outputs['R2'],
                                              rf_train_task.outputs['hyperparams']
                                             )
    
    train_best_model_task = train_best_model_op(evaluate_models_task.outputs['best_model'],
                                               evaluate_models_task.outputs['hyperparams'],
                                               preparation_task.outputs['output_xtraincsv'],
                                               preparation_task.outputs['output_ytraincsv'])
    
    model_predict_task = model_predict_op(train_best_model_task.outputs['output_pickle_model'],
                                          preparation_task.outputs['output_xtestcsv'])

We’ll save this code in the “pipeline” directory.

Upload the pipeline

We will now need to upload this pipeline to Kubeflow. What we’ll upload is not the python file but a compiled yaml file that is created with the shell command :

dsl-compile --py pipeline/pipeline.py --output pipeline/pipeline.yaml

We now have the pipeline yaml in the “pipeline” directory. In my previous article, I showed you how to upload it to Kubeflow using the Central Dashboard but here we’ll do it from a python command.

This is where it gets tricky. The easiest, most platform-agnostic way to connect to the Kubeflow pipelines manager is to open a client session like so :

client = kfp.Client(host=host, cookies=cookies)

The kfp.Client() function takes two parameters, which are :

  • The kubeflow host endpoint which should look like this :
    123-isitiosystem-istio-2af2-456.us-west-2.elb.amazonaws.com/pipeline
    which is your Central Dashboard’s URL followed by /pipeline
  • An authentication cookie that is stored in your browser when you log into your Central Dashboard with your login and password

You can copy and paste your cookie value in your code to open the client session but that means leaving it in the clear for everyone to see, plus the cookie inevitably expires. To compensate for that, we’ll use beautiful soup to log into the Dashboard and retrieve the cookie.
In the end, we’ll want to use GitHub actions to launch the pipeline — that means that the login, password and URL can be saved as secrets and passed to the python script as environment variables. Our pipeline launch script will therefore look like this :

import kfp
from datetime import datetime
import re
import os
import mechanize
from bs4 import BeautifulSoup
import urllib
import http.cookiejar as cookielib 
# Get today's date for tags
today = str(datetime.now())
# Def
def get_id(text):
    """
    Function that retrieves a pipelines's ID from its logs
    Parameters
    ----------
    text : str
        string version of the logs.
    Returns
    -------
    str : Id of the pipeline.
    """
    match = re.search('{\'id\': \'(.+?)\',\\n', text)        
    if match:
        found = match.group(1)
        return(found)
    
def get_cookie(text):
    """
    Function that retrieves login cookie
    Parameters
    ----------
    text : str
        string version of the logs.
    Returns
    -------
    str : cookie value.
    """
    match = re.search('authservice_session=(.+?) ', text)        
    if match:
        found = match.group(1)
        return(found)
# Parameters
URL = os.getenv('URL')
pipeline_name = "advanced_pipeline"
job_name = 'job' + today
ENDPOINT = os.getenv('ENDPOINT')
EMAIL = os.getenv('EMAIL')
PASSWORD = os.getenv('PASSWORD')
# Run parameters
experiment_id = 'fe0390c9-a311-4248-89e2-72522f17c26c'
pipeline_id = get_id(str(pipe_logs))
version_id = '1'
params = {'bucket' : 'your-bucket-name,
        'data_2015' : 'temptest/2015-building-energy-benchmarking.csv',
        'data_2016' : 'temptest/2016-building-energy-benchmarking.csv',
        'hyperopt_iterations' : '50',
        'subfolder' : 'your-subfolder-name'}
# Do we create or update the pipeline ?
kind = "create" # or "update"
# Get cookie value
cj = cookielib.CookieJar()
br = mechanize.Browser()
br.set_cookiejar(cj)
br.open(URL)
br.select_form(nr=0)
br.form['login'] = EMAIL
br.form['password'] = PASSWORD
br.submit()
authservice_session = 'authservice_session={}'.format(get_cookie(str(cj)))
# Connect to Kubeflow Pipelines Manager
client = kfp.Client(host=ENDPOINT, cookies=authservice_session)
# Create pipeline
if kind == "create":
    pipe_logs = client.upload_pipeline(pipeline_package_path="./pipeline/pipeline.yaml",
                          pipeline_name=pipeline_name,
                          description="frend")
    
# Upload new version of existing pipeline
elif kind == "update":
    version_name = "update-pipeline-" + today
    pipe_logs = client.upload_pipeline_version(pipeline_package_path="./pipeline.yaml",
                                   pipeline_version_name=version_name,
                                   pipeline_name = pipeline_name)
# Run pipeline
client.run_pipeline(experiment_id=experiment_id,
                   job_name=job_name,
                   params=params,
                   pipeline_id=pipeline_id)

Running this script will create — or update — and launch your Kubeflow pipeline. Note that for that to work you need to have already created an Kubeflow experiment, but this is easy to do in the Central Dashboard and they are reusable.

On my repo, this script is called run_pipeline.py and is in the main directory.

We now have all the tools necessary to have a fully automated process.

Automation — Step 1 : prepare the GitHub secrets

The run_pipeline.py script will use 4 secret values :

  • URL : Your Kubeflow server URL, which should look like this :
    123-isitiosystem-istio-2af2-456.us-west-2.elb.amazonaws.com
  • ENDPOINT : The kubeflow host endpoint which should look like this :
    123-isitiosystem-istio-2af2-456.us-west-2.elb.amazonaws.com/pipeline
  • EMAIL and PASSWORD : your Kubeflow Dashboard Center logs

Go to your GitHub repo and go to Settings, then Secrets :

GitHub Actions Secrets. Image by author

From there, add your secrets. Let’s call them respectively KBFL_URL, KBFL_ENDPOINT, KBFL_LOGIN, and KBFL_PASSWORD.

Automation — Step 2: setup GitHub Actions

On your repo and click on Actions. You should have suggested actions displayed on this page. Choose any one of them, for instance the one called “Python Application” and click on “configure”.

GitHub Actions main page. Image by author

In the following configuration window, replace the yaml code with this :

name: Create and launch pipeline
on:
  push:
    branches: [ main ]
  pull_request:
    branches: [ main ]
    
env:
  AWS_REGION: eu-west-1
jobs:
  build:
    runs-on: ubuntu-latest
    steps:
    - name: Checkout
      uses: actions/checkout@v2
    
    - uses: actions/checkout@v2
    - name: Set up Python 3.10
      uses: actions/setup-python@v2
      with:
        python-version: "3.10"
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install flake8 pytest kfp mechanize beautifulsoup4 cookiejar
        if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
        
    - name: Get Kubectl
      run: |
        pip3 install kfp --upgrade
        export PATH=$PATH:~/.local/bin
        
    - name: Rebuild pipeline
      run: |
        dsl-compile --py pipeline/pipeline.py --output pipeline/pipeline.yaml
    
    - name: execute py script 
      run: |
        python run_pipeline.py
      env:
        URL: ${{ secrets.KBFL_URL }}
        ENDPOINT: ${{ secrets.KBFL_ENDPOINT }}
        EMAIL: ${{ secrets.KBFL_LOGIN }}
        PASSWORD: ${{ secrets.KBFL_PASSWORD }}

What this action does it that on each commit or merge request on the main branch, GitHub Actions will create an environment, install python and its dependencies on it, install kfp, rebuild the pipeline with dsl-compile , and launch the run_pipeline.py script with the proper environment variables imported from the GitHub secrets.

While building all of these assets, GitHub actions will perform the CI / CD steps and any incoherence in the Kubeflow pipeline will be detected during the dsl-compile step and return an error.

If you want to implement continuous training (CT), you can change the client.run_pipeline() call in the run_pipeline.py script with client.create_recurring_run() .

And there you have it ! When you need to modify any component of the pipeline, you can create a branch, modify the component, run the component script to recreate the op, commit and create a merge request. GitHub will recreate and upload the pipeline and launch it on Kubeflow, it’s all automated.

Next steps

The automation here is not really a proper CD and CT in terms of MLOps good practices, but you now have all the necessary tools to do that.

From there, the next steps would be to have only one model that is periodically retrained with client.create_recurring_run(), which would make what we just built your training pipeline.
You should then create an inference pipeline that only loads the model and makes predictions, which would allow you to set up another type of recurring — or on-demand — runs, without having to retrain the model every time. Finally, you should also have a monitoring pipeline that triggers the training pipeline when it detects a drop in the model’s performance.
You can also add a performance criterion in your CD so that your GitHub action only succeeds when a newly added feature improves the performance of the model.

Article originally posted here. Reposted with permission.

ODSC Community

The Open Data Science community is passionate and diverse, and we always welcome contributions from data science professionals! All of the articles under this profile are from our community, with individual authors mentioned in the text itself.

1