Populate dbt models with CSV data

Populate dbt models with CSV data

A common source of raw data in ELT pipelines are CSV files. These text files hold data in multiple lines with headers and need to be parsed and loaded into data warehouses, ideally in an automated process. One way to load CSV data is by using the dbt seed command. This requires you to have the raw data files available locally. But what if the CSVs are stored on a cloud storage provider? Here we describe an easy way to load your CSV data from Amazon S3 and make it available to dbt models.

Prerequisites

It’s a good idea to go through following posts before implementing this one

What are we building

We have CSV files that contain data on Ozone levels at different times in different counties in USA. Ultimately, we want two things: be able to access our data from a data warehouse and to make forecasts. Here’s how it’s going to work:

ELT pipeline we're building
  1. fal runs a Python script to extract and load raw CSV data
  2. dbt transformation prepares models for forecasting
  3. fal runs a separate Python script that does forecasting and sends data to Slack

Steps 2 and 3 have already been described earlier by us. You can easily associate any Python script with a dbt model and run the script right after running the model. With the new release of fal, we can now associate Python scripts to be run before a dbt model run and we leverage the new capability in order to load raw CSV data.

Connecting scripts to dbt model

Starting with version 0.2.0, users can specify when a Python script should be run: before or after a model run. When running fal run --before from a command line, fal finds all the scripts that are set to run before a dbt model run. fal run command works as before and finds scripts that are supposed to run after model runs, as it did before.

Our schema.yml file declares how Python scripts are associated with a dbt model and also defines a new source, where the extracted data will be stored.

version: 2
sources:
  - name: results
    database: "{{ env_var('GCLOUD_PROJECT') }}"
    schema: "{{ env_var('BQ_DATASET') }}"
    tables:
      - name: raw_o3_values

models:
  - name: miami
    description: Ozone levels in Miami
    config:
      materialized: table
    meta:
      owner: "@meder"
      fal:
        scripts:
          before:
            - fal_scripts/load_o3_data.py
          after:
            - fal_scripts/slack.py

Extracting and loading CSV data

Here’s a snippet of what the raw CSV data looks like:

n,O3,state,county,month,geom
0,0.044,Alaska,Denali ,2017-12-01,
1,0.044,Alaska,Denali ,2018-11-01,
2,0.036,Alaska,Denali ,2019-09-01,

The first row contains the headers and subsequent rows contain the actual data. In our example project this CSV file is saved as data/raw_o3_values.csv.

A script that we will use to extract and load the raw data looks like this:

import pandas

cols = ['O3', 'state', 'county', 'month']

df = pandas.read_csv(
  "s3://my_s3_bucket/data/raw_o3_values.csv",
  storage_options={
    "key": "AWS_ACCESS_KEY_ID",
    "secret": "AWS_SECRET_ACCESS_KEY",
    "token": "AWS_SESSION_TOKEN",
  },
  usecols=cols)

write_to_source(df, "results", "raw_o3_values")

We’re using pandas to load CSV data followed by using fal magic function write_to_source that lets you append a DataFrame to a source table defined in a schema.yml file. Watch out, running write_to_source on the same DataFrame multiple times will lead to duplicated data. This Python scripts is saved as fal_scripts/load_o3_data.py.

dbt model

Our dbt model transforms data to a form suitable for forecasting by the prophet library:

WITH o3values as
  (SELECT * FROM `{{ env_var('GCLOUD_PROJECT') }}.{{ env_var('BQ_DATASET') }}.raw_o3_values`)

SELECT
    O3 as y,
    month as ds
FROM
    o3values
WHERE
    state = 'Florida'
    AND county = 'Miami-Dade

where {{ env_var('GCLOUD_PROJECT') }}.{{ env_var('BQ_DATASET') }}.raw_o3_values is the data warehouse source table, where CSV data is loaded.

Making a forecast and sending a Slack message

The Python script that gets model data and runs a forecast is described detail here.

Here’s an overview of this script:

  • download model data as a pandas DataFrame,
  • use the fbprophet package to make a forecast that is stored in a different DataFrame
  • export forecast as a PNG image

Next, we send the generated image to our Slack channel of choice by using a Slack bot.

Here’s a full script.

Running everything

In the command line, first run the EL step:

fal run --before

Next, dbt model run:

dbt run

Finally, forecasting and sending a Slack message:

fal run

Conclusion

You can now use fal to populate dbt models with data from raw CSV files. What's next? Fal lets you run any Python script in your dbt project. Maybe you want it a part of your CI/CD pipeline and run it automatically. Check out the fal repository for more info, see an example dbt project that uses fal and join our Discord server.