Use fal to integrate SageMaker with dbt
We discuss a straightforward way to integrate your dbt project with SageMaker by using fal. We will train a SageMaker model, store model data in a dbt model, use a SageMaker model to make some test predictions and store prediction results in another dbt model.
Often times we want use our dbt models in ML contexts so that we can make predictions based on our data. Amazon SageMaker is an ML service that makes it easy to train ML models and to deploy them into production-level environments. Wouldn't it be awesome to integrate SageMaker in a dbt project and vice versa? By using fal
, you can do this!
Here we discuss a straightforward way to integrate your dbt project with SageMaker by using fal
. We will:
- train a SageMaker model
- store model data in a dbt model
- use a SageMaker model to make some test predictions
- store prediction results in another dbt model.
Prerequisites
- AWS account
- Locally configured AWS CLI
- S3 bucket
- SageMaker execution role
- Ability to run dbt locally
Example project
Our example project uses the same example as the official SageMaker tutorial. Here’s the directory structure:
example-project
├── data
│ ├── raw_sample_batch.csv
│ └── raw_training_data.csv
├── models
│ ├── sagemaker_models.sql
│ ├── sample_batch.sql
│ ├── sample_batch_with_predictions.sql
│ ├── schema.yml
│ └── training_sample.sql
├── scripts
│ ├── predict.py
│ └── train.py
├── dbt_project.yml
├── profiles.yml
├── README.md
└── requirements.txt
The profiles.yml
defines a local postgres
output. You can change it to whatever is more suitable for you. Once ready, we can run dbt seed --profiles-dir .
from the root directory of our example project.
We're looking at the Adult Census dataset from 1994 and we would like to predict if a person in this dataset makes more than 50,000 USD per year. The raw data is available in the data
directory of the example project. It’s split into two datasets: raw_training_data.csv
, which will use for training, and raw_sample_batch.csv
, which we will use to make predictions. In total there are 32,561 entries.
Here’s what our schema.yml
looks like:
version: 2
models:
- name: training_sample
config:
materialized: table
- name: sagemaker_models
meta:
fal:
scripts:
after:
- train.py
- name: sample_batch
config:
materialized: table
- name: sample_batch_with_predictions
meta:
fal:
scripts:
after:
- predict.py
There are four models defined and two of them are associated with Python scripts: sagemaker_models
is associated with train.py
, whereas sample_batch_with_predictions
is associated with predict.py
. See our docs for more details on associating models and scripts.
Here’s how the data will flow in our project:

Dashed lines refer to data flow and solid lines show association between Python scripts and dbt models.
First, in the training pipeline, train.py
will access the data from training_sample
and use it to train a new SageMaker model. This model information will then be stored in sagemaker_models
.
Then, in prediction pipeline, predict.py
will access the batch data from sample_batch
and use the latest SageMaker model data from sagemaker_models
in order to make predictions. predict.py
will store the prediction results in sample_batch_with_predictions
.
Our python scripts will populate both sagemaker_models
and sample_batch_with_predictions
, so dbt doesn’t need to calculate these models. We accomplish this by making these models ephemeral. See for example sagemaker_models
:
{{ config(materialized='ephemeral') }}
-- {{ ref('training_sample') }}
SELECT * FROM {{ target.schema }}.{{ model.name }}
Note the use of ref in a comment. This is done so that training_sample is calculated before we run scripts associated with sagemaker_models
.
Training script
In this section we’re looking at train.py
. The entire file can be found here.
Preparing SageMaker inputs
First, we import everything that’s necessary:
import pandas as pd
import boto3
import os
from sklearn.model_selection import train_test_split
import sagemaker
from sagemaker.debugger import Rule, rule_configs
from sagemaker.session import TrainingInput
import time
In order for SageMaker to train a model, we need to create input files, which are CSV files in an S3 bucket. We start by downloading the data from training_sample
as a pandas DataFrame by using fal's ref magic function, followed by splitting of the dataset into features and labels_vector
:
training_sample = ref("training_sample")
labels_vector = training_sample["Income>50K"].to_numpy()
features = training_sample.drop("Income>50K", 1)
We then further split our data into training and validation sets:
X_train, X_val, y_train, y_val = train_test_split(
features, labels_vector, test_size=0.25, random_state=1)
train = pd.concat(
[pd.Series(y_train, index=X_train.index, name="Income>50K", dtype=int), X_train],
axis=1)
validation = pd.concat(
[pd.Series(y_val, index=X_val.index, name="Income>50K", dtype=int), X_val], axis=1)
The training and validation sets need to be converted to CSV files:
train.to_csv("train.csv", index=False, header=False)
validation.to_csv("validation.csv", index=False, header=False)
Next, we upload these CSV files to an S3 bucket:
bucket = os.environ.get("s3_bucket")
prefix = os.environ.get("s3_prefix")
boto3.Session().resource("s3").Bucket(bucket).Object(
os.path.join(prefix, "data/train.csv")
).upload_file("train.csv")
boto3.Session().resource("s3").Bucket(bucket).Object(
os.path.join(prefix, "data/validation.csv")
).upload_file("validation.csv")
print("Finished preparing data")
We’re using environment variables s3_bucket
and s3_prefix
to store our bucket and prefix strings.
Training a SageMaker model
We can use the sagemaker.estimator.Estimator
class to create an XGBoost estimator model and then set the necessary hyperparameters:
role = os.environ.get("sagemaker_role")
s3_output_location = "s3://{}/{}/{}".format(bucket, prefix, "xgboost_model")
container = sagemaker.image_uris.retrieve("xgboost", region, "1.2-1")
xgb_model = sagemaker.estimator.Estimator(
image_uri=container,
role=role,
instance_count=1,
instance_type="ml.m4.xlarge",
volume_size=5,
output_path=s3_output_location,
sagemaker_session=sagemaker.Session(),
rules=[Rule.sagemaker(rule_configs.create_xgboost_report())],
)
xgb_model.set_hyperparameters(
max_depth = 5,
eta = 0.2,
gamma = 4,
min_child_weight = 6,
subsample = 0.7,
objective = "binary:logistic",
num_round = 1000
)
This is how you configure inputs for training a SageMaker model:
train_input = TrainingInput(
"s3://{}/{}/{}".format(bucket, prefix, "data/train.csv"), content_type="csv")
validation_input = TrainingInput(
"s3://{}/{}/{}".format(bucket, prefix, "data/validation.csv"), content_type="csv")
Now, we're ready to start training our model:
xgb_model.fit({"train": train_input, "validation": validation_input}, wait=True)
print("Model training complete.")
When executed, the code will wait for training to be complete. Once done, the model artifacts will be saved in the same S3 bucket.
Storing SageMaker job data in a dbt model
We store the training job results in sagemaker_models
:
data = {
"dbt_model": [context.current_model.name],
"created_at": [time.time()],
"job_name": [sagemaker_model.latest_training_job.name],
}
model_df = pd.DataFrame.from_dict(data)
write_to_model(model_df, mode="append")
That’s it for training!
Prediction Script
In this section we’re looking at predict.py
. The entire file can be found here.
Preparing batch data
Similar to train.py
, we start with importing the necessary libraries and setting the bucket and prefix string variables:
import boto3
import sagemaker
import os
from io import BytesIO
import numpy as np
import pandas as pd
bucket = os.environ.get("s3_bucket")
prefix = os.environ.get("s3_prefix")
We download the data from sample_batch
as a DataFrame, and upload it to our S3 bucket:
batch_df = ref("sample_batch")
batch_df.to_csv("batch.csv", index=False, header=False)
print("Uploading batch data")
boto3.Session().resource("s3").Bucket(bucket).Object(
os.path.join(prefix, "batch/batch.csv")
).upload_file("batch.csv")
The input and output strings for our predictions also need to be set:
# Batch input
batch_input = "s3://{}/{}/batch".format(bucket, prefix)
# Batch transform output
batch_output = "s3://{}/{}/batch-prediction".format(bucket, prefix)
Setting the latest SageMaker model
In order to use the SageMaker model that we trained above, we download the data from sagemaker_models
, get the latest entry and convert it into a SageMaker Estimator object:
sagemaker_models = ref("sagemaker_models")
model_name = sagemaker_models.sort_values(by="created_at", ascending=False).iloc[0]["job_name"]
print(f"Using model: {model_name}")
sagemaker_model = sagemaker.estimator.Estimator.attach(model_name)
Making a batch prediction
We need to create a Transformer object from sagemaker_model
and use the new transformer to make a prediction:
transformer = sagemaker_model.transformer(
instance_count=1, instance_type="ml.m4.xlarge", output_path=batch_output
)
print("Start prediction")
transformer.transform(
data=batch_input, data_type="S3Prefix", content_type="text/csv", split_type="Line"
)
transformer.wait()
Storing prediction data in a dbt model
We download the predicted values from S3 as a text file, convert it into a DataFrame and write the DataFrame to the dbt model sample_batch_with_predictions
.
prediction_obj = (
boto3.Session()
.resource("s3")
.Bucket(bucket)
.Object(os.path.join(prefix, "batch-prediction/batch.csv.out"))
)
print("Downloading predictions")
with BytesIO(prediction_obj.get()["Body"].read()) as prediction_raw:
predictions = np.loadtxt(prediction_raw, dtype="float")
output_df = pd.concat(
[
batch_df,
pd.Series(predictions, index=batch_df.index, name="Above50k", dtype=float),
],
axis=1,
)
print("Writing predictions to the Data Warehouse")
write_to_model(output_df)
With this finished, we’re ready to try to run our pipelines.
Running the training and prediction pipelines
From the root of our example project we can trigger a fal flow run:
fal flow run -m +sagemaker_models+ --profiles-dir .
This command will first run dbt to calculate the dbt model training_sample
, followed immediately by running of train.py
. At the end of this run, we should have a new table in our database, sagemaker_models
, that contains data associated with a SageMaker model that just finished training.
Next, we trigger a prediction pipeline:
fal flow run -m +sample_batch_with_predictions+ --profiles-dir .
Again, this command will first run dbt and calculate sample_batch
and then run predict.py
. At the end of this run, there should be another new table in our database, sample_batch_with_predictions
, that should contain the same data as sample_batch
but with an extra column Above50k
that contains the prediction results.
Troubleshooting
If you run into issues with fal, you can check our docs or contact us on our Discord server. You can also check the SageMaker guide and documentation.
Summary
We have walked through a project that shows you how to use dbt models as data inputs for SageMaker. We stored SageMaker model information in a dbt model and then we used this information to make batch predictions. Finally, we stored the prediction results in a separate dbt model. All of this was possible because fal makes it easy to integrate Python scripts into your dbt project.