Use fal to integrate SageMaker with dbt

We discuss a straightforward way to integrate your dbt project with SageMaker by using fal. We will train a SageMaker model, store model data in a dbt model, use a SageMaker model to make some test predictions and store prediction results in another dbt model.

Use fal to integrate SageMaker with dbt
Photo by Possessed Photography / Unsplash

Often times we want use our dbt models in ML contexts so that we can make predictions based on our data. Amazon SageMaker is an ML service that makes it easy to train ML models and to deploy them into production-level environments. Wouldn't it be awesome to integrate SageMaker in a dbt project and vice versa? By using fal, you can do this!

Here we discuss a straightforward way to integrate your dbt project with SageMaker by using fal. We will:

  • train a SageMaker model
  • store model data in a dbt model
  • use a SageMaker model to make some test predictions
  • store prediction results in another dbt model.

Prerequisites

Example project

Our example project uses the same example as the official SageMaker tutorial. Here’s the directory structure:

example-project
├── data
│   ├── raw_sample_batch.csv
│   └── raw_training_data.csv
├── models
│   ├── sagemaker_models.sql
│   ├── sample_batch.sql
│   ├── sample_batch_with_predictions.sql
│   ├── schema.yml
│   └── training_sample.sql
├── scripts
│   ├── predict.py
│   └── train.py
├── dbt_project.yml
├── profiles.yml
├── README.md
└── requirements.txt

The profiles.yml defines a local postgres output. You can change it to whatever is more suitable for you. Once ready, we can run dbt seed --profiles-dir . from the root directory of our example project.

We're looking at the Adult Census dataset from 1994 and we would like to predict if a person in this dataset makes more than 50,000 USD per year. The raw data is available in the data directory of the example project. It’s split into two datasets: raw_training_data.csv, which will use for training, and raw_sample_batch.csv, which we will use to make predictions. In total there are 32,561 entries.

Here’s what our schema.yml looks like:

version: 2

models:
  - name: training_sample
    config:
      materialized: table

  - name: sagemaker_models
    meta:
      fal:
        scripts:
          after:
            - train.py

  - name: sample_batch
    config:
      materialized: table

  - name: sample_batch_with_predictions
    meta:
      fal:
        scripts:
          after:
            - predict.py

There are four models defined and two of them are associated with Python scripts: sagemaker_models is associated with train.py, whereas sample_batch_with_predictions is associated with predict.py. See our docs for more details on associating models and scripts.

Here’s how the data will flow in our project:

Dashed lines refer to data flow and solid lines show association between Python scripts and dbt models.

First, in the training pipeline, train.py will access the data from training_sample and use it to train a new SageMaker model. This model information will then be stored in sagemaker_models.

Then, in prediction pipeline, predict.py will access the batch data from sample_batch and use the latest SageMaker model data from sagemaker_models in order to make predictions. predict.py will store the prediction results in sample_batch_with_predictions.

Our python scripts will populate both sagemaker_models and sample_batch_with_predictions, so dbt doesn’t need to calculate these models. We accomplish this by making these models ephemeral. See for example sagemaker_models:

{{ config(materialized='ephemeral') }}

-- {{ ref('training_sample') }}

SELECT * FROM {{ target.schema }}.{{ model.name }}

Note the use of ref in a comment. This is done so that training_sample is calculated before we run scripts associated with sagemaker_models.

Training script

In this section we’re looking at train.py. The entire file can be found here.

Preparing SageMaker inputs

First, we import everything that’s necessary:

import pandas as pd
import boto3
import os
from sklearn.model_selection import train_test_split
import sagemaker
from sagemaker.debugger import Rule, rule_configs
from sagemaker.session import TrainingInput
import time

In order for SageMaker to train a model, we need to create input files, which are CSV files in an S3 bucket. We start by downloading the data from training_sample as a pandas DataFrame by using fal's ref magic function, followed by splitting of the dataset into features and labels_vector:

training_sample = ref("training_sample")

labels_vector = training_sample["Income>50K"].to_numpy()

features = training_sample.drop("Income>50K", 1)

We then further split our data into training and validation sets:

X_train, X_val, y_train, y_val = train_test_split(
    features, labels_vector, test_size=0.25, random_state=1)

train = pd.concat(
    [pd.Series(y_train, index=X_train.index, name="Income>50K", dtype=int), X_train],
    axis=1)

validation = pd.concat(
    [pd.Series(y_val, index=X_val.index, name="Income>50K", dtype=int), X_val], axis=1)

The training and validation sets need to be converted to CSV files:

train.to_csv("train.csv", index=False, header=False)

validation.to_csv("validation.csv", index=False, header=False)

Next, we upload these CSV files to an S3 bucket:

bucket = os.environ.get("s3_bucket")
prefix = os.environ.get("s3_prefix")

boto3.Session().resource("s3").Bucket(bucket).Object(
    os.path.join(prefix, "data/train.csv")
).upload_file("train.csv")
boto3.Session().resource("s3").Bucket(bucket).Object(
    os.path.join(prefix, "data/validation.csv")
).upload_file("validation.csv")

print("Finished preparing data")

We’re using environment variables s3_bucket and s3_prefix to store our bucket and prefix strings.

Training a SageMaker model

We can use the sagemaker.estimator.Estimator class to create an XGBoost estimator model and then set the necessary hyperparameters:

role = os.environ.get("sagemaker_role")
s3_output_location = "s3://{}/{}/{}".format(bucket, prefix, "xgboost_model")
container = sagemaker.image_uris.retrieve("xgboost", region, "1.2-1")

xgb_model = sagemaker.estimator.Estimator(
    image_uri=container,
    role=role,
    instance_count=1,
    instance_type="ml.m4.xlarge",
    volume_size=5,
    output_path=s3_output_location,
    sagemaker_session=sagemaker.Session(),
    rules=[Rule.sagemaker(rule_configs.create_xgboost_report())],
)

xgb_model.set_hyperparameters(
    max_depth = 5,
    eta = 0.2,
    gamma = 4,
    min_child_weight = 6,
    subsample = 0.7,
    objective = "binary:logistic",
    num_round = 1000
)

This is how you configure inputs for training a SageMaker model:

train_input = TrainingInput(
    "s3://{}/{}/{}".format(bucket, prefix, "data/train.csv"), content_type="csv")

validation_input = TrainingInput(
    "s3://{}/{}/{}".format(bucket, prefix, "data/validation.csv"), content_type="csv")

Now, we're ready to start training our model:

xgb_model.fit({"train": train_input, "validation": validation_input}, wait=True)

print("Model training complete.")

When executed, the code will wait for training to be complete. Once done, the model artifacts will be saved in the same S3 bucket.

Storing SageMaker job data in a dbt model

We store the training job results in sagemaker_models:

data = {
    "dbt_model": [context.current_model.name],
    "created_at": [time.time()],
    "job_name": [sagemaker_model.latest_training_job.name],
}

model_df = pd.DataFrame.from_dict(data)
write_to_model(model_df, mode="append")

That’s it for training!

Prediction Script

In this section we’re looking at predict.py. The entire file can be found here.

Preparing batch data

Similar to train.py, we start with importing the necessary libraries and setting the bucket and prefix string variables:

import boto3
import sagemaker
import os
from io import BytesIO
import numpy as np
import pandas as pd

bucket = os.environ.get("s3_bucket")
prefix = os.environ.get("s3_prefix")

We download the data from sample_batch as a DataFrame, and upload it to our S3 bucket:

batch_df = ref("sample_batch")
batch_df.to_csv("batch.csv", index=False, header=False)

print("Uploading batch data")

boto3.Session().resource("s3").Bucket(bucket).Object(
    os.path.join(prefix, "batch/batch.csv")
).upload_file("batch.csv")

The input and output strings for our predictions also need to be set:

# Batch input
batch_input = "s3://{}/{}/batch".format(bucket, prefix)

# Batch transform output
batch_output = "s3://{}/{}/batch-prediction".format(bucket, prefix)

Setting the latest SageMaker model

In order to use the SageMaker model that we trained above, we download the data from sagemaker_models, get the latest entry and convert it into a SageMaker Estimator object:

sagemaker_models = ref("sagemaker_models")
model_name = sagemaker_models.sort_values(by="created_at", ascending=False).iloc[0]["job_name"]

print(f"Using model: {model_name}")

sagemaker_model = sagemaker.estimator.Estimator.attach(model_name)

Making a batch prediction

We need to create a Transformer object from sagemaker_model and use the new transformer to make a prediction:

transformer = sagemaker_model.transformer(
    instance_count=1, instance_type="ml.m4.xlarge", output_path=batch_output
)

print("Start prediction")

transformer.transform(
    data=batch_input, data_type="S3Prefix", content_type="text/csv", split_type="Line"
)
transformer.wait()

Storing prediction data in a dbt model

We download the predicted values from S3 as a text file, convert it into a DataFrame and write the DataFrame to the dbt model sample_batch_with_predictions.

prediction_obj = (
    boto3.Session()
    .resource("s3")
    .Bucket(bucket)
    .Object(os.path.join(prefix, "batch-prediction/batch.csv.out"))
)

print("Downloading predictions")
with BytesIO(prediction_obj.get()["Body"].read()) as prediction_raw:
    predictions = np.loadtxt(prediction_raw, dtype="float")
    output_df = pd.concat(
        [
            batch_df,
            pd.Series(predictions, index=batch_df.index, name="Above50k", dtype=float),
        ],
        axis=1,
    )

    print("Writing predictions to the Data Warehouse")
    write_to_model(output_df)

With this finished, we’re ready to try to run our pipelines.

Running the training and prediction pipelines

From the root of our example project we can trigger a fal flow run:

fal flow run -m +sagemaker_models+ --profiles-dir .

This command will first run dbt to calculate the dbt model training_sample, followed immediately by running of train.py. At the end of this run, we should have a new table in our database, sagemaker_models, that contains data associated with a SageMaker model that just finished training.

Next, we trigger a prediction pipeline:

fal flow run -m +sample_batch_with_predictions+ --profiles-dir .

Again, this command will first run dbt and calculate sample_batch and then run predict.py. At the end of this run, there should be another new table in our database, sample_batch_with_predictions, that should contain the same data as sample_batch but with an extra column Above50k that contains the prediction results.

Troubleshooting

If you run into issues with fal, you can check our docs or contact us on our Discord server. You can also check the SageMaker guide and documentation.

Summary

We have walked through a project that shows you how to use dbt models as data inputs for SageMaker. We stored SageMaker model information in a dbt model and then we used this information to make batch predictions. Finally, we stored the prediction results in a separate dbt model. All of this was possible because fal makes it easy to integrate Python scripts into your dbt project.