Production Machine Learning with Kubeflow

On a recent project with Singapore's most recognized airline, I had the pleasure of working out a machine learing platform that enabled Data Scientists to productionize their workflows.

I've helped build a few start ups by now and usually on smaller projects where we need to push out a new business entity within 6-9 months, the ML workflow consisted of Data Scientists ingesting, manipulating and building their models on local machines before dumping the model somewhere to be used. A very error prone manual process.

Working with this larger client, I needed a way to onboard Data Scientists quickly and have reproducible workflows. At the same time, we needed flexibility on how we would serve these models, redirect traffic etc.

Eventually what I settled on was Kubeflow which met the needs of consistent reproducible pipelines, had familiar tooling for Data Scientists and they didn't need to know much about Kubernetes. The back end systems were also served on Kubernetes so this made the terraform scripts and DevOps side of things a bit more manageable in terms of not needing to learn too much tooling and reusing similar infrastructure.

With Kubeflow and CI/CD agents, I could have the model deployed and versioned on many options:

  • The kubernetes cluster itself if inference can run on CPU/GPU and not require crazy scaling
  • Azure
  • GCP ML Engine
  • AWS Sagemaker

Additionally, I could use any of thoes platforms to run hyper parameter tuning. I opted not to in this case so that the data wouldn't be exposed outside of the enclosed network. Only the final inference model would be exposed to another cloud provider.

Kubeflow

Kubeflow itself doesn't solve the data ingestion problem but it enables experimentation, model deployment and reproducible results. Simple python code was used to build each module of the pipeline which consisted of inputs and outputs into the next step of the pipeline. That module was built as a Docker container which we can automate with CI agents. This containerization runs well on Kubernetes and across machines and we have version control.

Then we have our pipeline python file and use a command line tool to describe what the pipeline looks like. The CLI produces a yaml file which then runs on the kubernetes cluster when we upload it to the Kubeflow UI.

Here's some terminology we need to go over:

Pipeline - A description/flow of how the various docker modules interact with each other.
Experiment - A collection of runs / pipelines

Here's the UI for a list of pipelines we've uploaded:
kubeflow-pipeline-view

This is what a pipeline graph looks like:
pipeline-graph

List of experiments:
experiments

Sample Run:
new-experiment-run

Sample Artifact Output:
confusion-matrix

We'll go into what a piece of module code will look like in a bit.

Infrastructure

For the infrastructure, I opted for a GKE private cluster and disabled ingress and egress. This has a few ramifications we need to think about. But first let's look at a high level what the infrastructure looks like:

infrastructure

A few things to note which may not be pictured here:

  • 1 private subnet for back end services which has outgoing internet access for services via the NAT.
  • 1 private subnet for Kubeflow which has no ingress or egress.
  • GCR, GCS and BigQuery are locked down via VPC Service controls in GCP so hitting the RESTful APIs and everything wouldn't work. Just gsutil if you're on the selected users list.
  • There's a public VPC for the master nodes. You can further restrict access via a Bastion Host.
  • There's also a service mesh (Istio) installed.
  • An Identity Aware Proxy is the only way for the Data Scientists to get access to the Kubeflow UI.
  • Versioning was enabled for the GCS buckets in case files get overwritten, we can retrieve their older versions.

The ML side of things with kubeflow was more restricted so that training data and possibly personally identifiable information remained isolated. Backend services in the other subnet don't have access to the data because some rogue application which has internet access can call home. We can restrict that further with a service mesh ingress.

In terms of company setup, I used GSuite on a specific domain before creating all this under an organizational account. Best practices still apply when setting up a GCP organization which I won't get into here. IAP allowed me a few layers of security:

  • First, to restrict Kubeflow to organizational members only.
  • DevOps has to explicitly grant access to a specific / group of users in order to access the UI.
  • 2 factor auth is enforced.
  • I can remove a member from the organization and completely shut them out no matter where they are in GCP.

Here's what the org structure looks like from a tech side. It follows Google standard best practices:

gcp-org-structure

Python Modules

I had each step of the pipeline in its own folder which had its own Dockerfile for that step.
Let's take a look at an example Dockerfile:

FROM python:3.6.9-stretch

COPY . /
RUN pip install --upgrade setuptools
RUN pip install -r requirements.txt

ENTRYPOINT ["python", "main.py"]

I used Debian stretch even though it was larger than Alpine because certain python modules required compiling C modules. Alpine at the time of writing had issues with compiling some C code so installing from the requirements.txt would always fail e.g. numpy. No issues on the stretch image though.

It goes without saying that dependency versions were locked with pipenv run pip freeze > requirements.txt.
Also yes, I used pipenv to lock my version of Python as well.

So what's in the main.py of the module?

Something like this:

import argparse
import os
import subprocess
from google.cloud import storage
import pandas as pd
import pickle
from sklearn.model_selection import train_test_split

def main(argv=None):
    parser = argparse.ArgumentParser(description='Load CSV Data From GCS Bucket')
    parser.add_argument('-bucket', type=str, help='Bucket Base URI e.g. gs://mybucket/')
    parser.add_argument('-data', type=str, help='Data CSV Filename')
    parser.add_argument('-train', type=str, help='Training data Filename')
    parser.add_argument('-test', type=str, help='Test data Filename')
    args = parser.parse_args()

    data_fname = download_blob(args.bucket, args.data)
    data_split(data_fname, args.train, args.test) # 'training_data.pickle', 'test_data.pickle'

    upload_blob(args.bucket, args.train)
    upload_blob(args.bucket, args.test)


def download_blob(bucket_name, source_blob_name):
    destination_file_name = 'result.csv'

    if not os.getenv('GOOGLE_APPLICATION_CREDENTIALS'):
        subprocess.run(["gsutil", "cp", "gs://{}/{}".format(bucket_name, source_blob_name), destination_file_name])

    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(source_blob_name)
    blob.download_to_filename(destination_file_name)

    print('Blob {} downloaded to {}.'.format(
        source_blob_name,
        destination_file_name))

    return destination_file_name


def data_split(data_fname, training_data_fname, test_data_fname):
    data = pd.read_csv(data_fname)

    X = data.drop(['Id', 'Species'], axis=1)
    y = data['Species']

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.4, random_state=5)

    training_data_dict = {}
    training_data_dict['X_train'] = X_train
    training_data_dict['y_train'] = y_train

    with open(training_data_fname, 'wb')  as handle:
        pickle.dump(training_data_dict, handle)

    test_data_dict = {}
    test_data_dict['X_test'] = X_test
    test_data_dict['y_test'] = y_test

    with open(test_data_fname, 'wb')  as handle:
        pickle.dump(test_data_dict, handle)


def upload_blob(bucket_name, file_name):
    if not os.getenv('GOOGLE_APPLICATION_CREDENTIALS'):
        subprocess.run(["gsutil", "cp", file_name, "gs://{}/".format(bucket_name)])

    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(file_name)
    blob.upload_from_filename(file_name)

    print('File {} uploaded to {}.'.format(
        file_name,
        bucket_name))


if __name__== "__main__":
    main()

Pretty self explanatory. Things to note are:

  • main() takes arguments which will be reflected in the Kubeflow pipelines UI
  • GOOGLE_APPLICATION_CREDENTIALS will be injected in by pipeline code. The secret already exists in the cluster when you set up Kubeflow. However it does not exist locally. So we fallback to gsutil when running locally (assuming VPC service controls allow the user running it access to GCS)
  • module then splits the data and we write the path to the files in the output

One thing I learned with some experimentation here is that the output from these modules is the input to another. Pipelines will automatically read the output file and pipe what's in there to the next step. So don't try to use the contents of a CSV file as the input to the next step. It's not what you expect.

We also use GCS for file versioning and theoretically unlimited storage instead of working with files in the cluster. We can mount volumes through pipeline code, but that seems like more of a hassle.

Pipeline Code

The pipeline.py describes what the pipeline looks like in the graph we saw earlier. We use the python DSL to compile a pipeline.yaml file which then gets zipped up into a tarball for us to upload to Kubeflow pipelines.

Before we do this though, ensure that the CI/CD agent has built the docker container for the module and pushed it to a Docker registry.

import kfp
from kfp import components
from kfp import dsl
from kfp import gcp

#========== Operations ============
def data_process_op(bucket, data, train, test):
    return dsl.ContainerOp(
        name='data preprocess',
        image='gcr.io/<MYPROJECT>/data_process:latest',
        arguments=[
            '-bucket', bucket,
            '-data', data,
            '-train', train,
            '-test', test
        ]
    )

def train_op(bucket, train, model):
    return dsl.ContainerOp(
        name='train',
        image='gcr.io/<MYPROJECT>/train:latest',
        arguments=[
            '-bucket', bucket,
            '-train', train,
            '-model', model]
    )

def test_op(bucket, test, model):
    return dsl.ContainerOp(
        name='test',
        image='gcr.io/<MYPROJECT>/test:latest',
        arguments=[
            '-bucket', bucket,
            '-test', test,
            '-model', model]
    )


#========== PIPELINE ==============

@dsl.pipeline(
  name='Iris Pipeline Example',
  description='Example with the Iris classification'
)
def iris_train_pipeline(
    bucket = 'data.datascience.encountr.co',
    source_data = 'Iris.csv',
    train_data = 'training_data.pickle',
    test_data = 'test_data.pickle',
    model = 'model.sav'
):
 
  data_process = data_process_op(bucket,source_data,train_data,test_data)
  train = train_op(bucket,train_data,model)
  train.after(data_process)
  test = test_op(bucket,test_data,model)
  test.after(train)


  steps = [ data_process, train, test]

  for step in steps:
      step.apply(gcp.use_gcp_secret('user-gcp-sa'))

if __name__ == '__main__':
  import kfp.compiler as compiler
  compiler.Compiler().compile(iris_train_pipeline, __file__ + '.tar.gz')

Generally the flow is as follows:

  1. Define all the operations and which docker images they use and what arguments will be passed in
  2. Use the pipeline DSL code (seen after the comments line) to merge the operations together. Note that the output of some is the input of others. That's how UI knows how to draw the graph.
  3. use dsl-compile --py pipeline.py --out pipeline.tar.gz to build the tarball.

Now here's where it gets tricky. Due to some legacy stuff in the DSL compiler, the compiler automatically adds a few different lines of yaml into the pipeline.yaml (inside the tarball) which says the module outputs some artifacts for visualization. Namely it will show:

 outputs:
      artifacts:
      - name: mlpipeline-ui-metadata
        optional: true
        path: /mlpipeline-ui-metadata.json
      - name: mlpipeline-metrics
        optional: true
        path: /mlpipeline-metrics.json

Unless your module produces visualizations, remove these items from the steps in the pipeline. Do this by:

  • untarring pipeline.tar.gz
  • edited the yaml file to remove the artifacts
  • zipping back up the yaml file into the tarball

Yeah, it sucks. If we don't do it, the step will fail in execution as of version Kubeflow version v0.5.1 (time of writing)

Finally we upload the tarball to the UI and execute a run.

Thanks for reading!

Tony Truong

Read more posts by this author.

Subscribe to Tony Truong

Get the latest posts delivered right to your inbox.

or subscribe via RSS with Feedly!