Populate dbt models with CSV data

A common source of raw data in ELT pipelines are CSV files. These text files hold data in multiple lines with headers and need to be parsed and loaded into data warehouses, ideally in an automated process. One way to load CSV data is by using the dbt seed
command. This requires you to have the raw data files available locally. But what if the CSVs are stored on a cloud storage provider? Here we describe an easy way to load your CSV data from Amazon S3 and make it available to dbt models.
Prerequisites
It’s a good idea to go through following posts before implementing this one
What are we building
We have CSV files that contain data on Ozone levels at different times in different counties in USA. Ultimately, we want two things: be able to access our data from a data warehouse and to make forecasts. Here’s how it’s going to work:

- fal runs a Python script to extract and load raw CSV data
- dbt transformation prepares models for forecasting
- fal runs a separate Python script that does forecasting and sends data to Slack
Steps 2 and 3 have already been described earlier by us. You can easily associate any Python script with a dbt model and run the script right after running the model. With the new release of fal, we can now associate Python scripts to be run before a dbt model run and we leverage the new capability in order to load raw CSV data.
Connecting scripts to dbt model
Starting with version 0.2.0, users can specify when a Python script should be run: before or after a model run. When running fal run --before
from a command line, fal finds all the scripts that are set to run before a dbt model run. fal run
command works as before and finds scripts that are supposed to run after model runs, as it did before.
Our schema.yml
file declares how Python scripts are associated with a dbt model and also defines a new source, where the extracted data will be stored.
version: 2
sources:
- name: results
database: "{{ env_var('GCLOUD_PROJECT') }}"
schema: "{{ env_var('BQ_DATASET') }}"
tables:
- name: raw_o3_values
models:
- name: miami
description: Ozone levels in Miami
config:
materialized: table
meta:
owner: "@meder"
fal:
scripts:
before:
- fal_scripts/load_o3_data.py
after:
- fal_scripts/slack.py
Extracting and loading CSV data
Here’s a snippet of what the raw CSV data looks like:
n,O3,state,county,month,geom
0,0.044,Alaska,Denali ,2017-12-01,
1,0.044,Alaska,Denali ,2018-11-01,
2,0.036,Alaska,Denali ,2019-09-01,
The first row contains the headers and subsequent rows contain the actual data. In our example project this CSV file is saved as data/raw_o3_values.csv
.
A script that we will use to extract and load the raw data looks like this:
import pandas
cols = ['O3', 'state', 'county', 'month']
df = pandas.read_csv(
"s3://my_s3_bucket/data/raw_o3_values.csv",
storage_options={
"key": "AWS_ACCESS_KEY_ID",
"secret": "AWS_SECRET_ACCESS_KEY",
"token": "AWS_SESSION_TOKEN",
},
usecols=cols)
write_to_source(df, "results", "raw_o3_values")
We’re using pandas to load CSV data followed by using fal magic function write_to_source
that lets you append a DataFrame to a source table defined in a schema.yml file. Watch out, running write_to_source
on the same DataFrame multiple times will lead to duplicated data. This Python scripts is saved as fal_scripts/load_o3_data.py
.
dbt model
Our dbt model transforms data to a form suitable for forecasting by the prophet
library:
WITH o3values as
(SELECT * FROM `{{ env_var('GCLOUD_PROJECT') }}.{{ env_var('BQ_DATASET') }}.raw_o3_values`)
SELECT
O3 as y,
month as ds
FROM
o3values
WHERE
state = 'Florida'
AND county = 'Miami-Dade
where {{ env_var('GCLOUD_PROJECT') }}.{{ env_var('BQ_DATASET') }}.raw_o3_values
is the data warehouse source table, where CSV data is loaded.
Making a forecast and sending a Slack message
The Python script that gets model data and runs a forecast is described detail here.
Here’s an overview of this script:
- download model data as a pandas DataFrame,
- use the fbprophet package to make a forecast that is stored in a different DataFrame
- export forecast as a PNG image
Next, we send the generated image to our Slack channel of choice by using a Slack bot.
Running everything
In the command line, first run the EL step:
fal run --before
Next, dbt model run:
dbt run
Finally, forecasting and sending a Slack message:
fal run
Conclusion
You can now use fal to populate dbt models with data from raw CSV files. What's next? Fal lets you run any Python script in your dbt project. Maybe you want it a part of your CI/CD pipeline and run it automatically. Check out the fal repository for more info, see an example dbt project that uses fal and join our Discord server.