How to detect data anomalies in your dbt models

For time series data, anomalies are almost always present in some way or another. Be it a sudden change in rainfall for a town due to climate change or, in our example here, a sudden upwards trend in daily Covid-19 cases. Any system or business that runs on time series data has to detect and identify such anomalies, ensuring that the system knows something is off.
dbt is an extremely powerful and easy to use data transformation tool, and with fal, it gives the user even more power by enabling them to run Python scripts on their models. This unlocks use cases such as running forecasts, or in our example here, detect anomalies on their models.
Now let's say, you want to detect anomalies on a dbt model, then notify yourself or maybe colleagues via Slack. For that we need a simple system that will use sklearn to find anomalies on our model with time-series data with its DBSCAN function and then will use slack_sdk to notify us via a Slack bot. We already have a blog post about how to set up a simple Slack bot that can notify us, so please do check it out as it won't be explained in detail here.
The model
Now, we cannot detect anomalies out of thin air, so we will need a dbt model. You can use any model of your choice containing time-series numerical data, but for this post we will use a dbt model containing the number of daily new Covid-19 cases in the region of Lombardy, the most populous region of Italy. The model is from a dataset of Covid-19 cases in Italy, and you can use the same model by using dbt seed with the covid19_italy_region.csv
.
Linking our script to our model
Now to detect anomalies on our model, we need to create a Python file, which will house our anomaly detection script, with the name anomaly_detection.py
. In the schema.yml
file of your dbt project, under the model that you want to detect anomalies on, add the following:
meta:
fal:
scripts:
- anomaly_detection.py
The anomaly detection script
Now, for us to find anomalies, we need to write our Python script that will take our model, tune the hyperparameters for DBSCAN
with a little help from our side, feed it to DBSCAN
, plot the anomalies with the data on a Matplotlib figure and save it, then finally send the figure to a Slack channel using a bot with a message.
We first create a function called anomaly_detection
, which will take the column of metric values of our model, apply sliding windows, find anomalies and return them in a numpy array:
def anomaly_detection(X: np.array, eps: float, min_samples: int, window_size: int):
# Here we take the given column of values, apply sliding windows, save the
# number of how many windows we have,and initialize the anomalies list,
# where we will record the indices of our anomalies.
X_windowed = np.lib.stride_tricks.sliding_window_view(x=X, window_shape=window_size, axis=0)
(size_0, _, _) = X_windowed.shape
anomalies = []
# For each window, we use DBSCAN and take note of the locations of the
# anomalous data points, or noises to use the right term, which are noted
# with the value -1 in the labels array.
for window in range(size_0):
clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(X_windowed[window][0][:].reshape(-1,1))
labels = clustering.labels_
location = np.where(labels == -1)
location = location[0]
size = location.size
# If there are anomalies in our current window, we append their indices
# with respect to the input dataset, not the current window.
if size != 0:
if size == 1:
anomalies.append(location[0] + window)
else:
for i in range(size):
anomalies.append(location[i] + window)
else:
continue
# We find the unique values in the anomalies list and convert them to a
# numpy array, as the window slides by 1 index, the indices of the anomalies
# have been repeated in the list many times.
anomalies = np.unique(np.array(anomalies))
# And finally, we return the numpy array of indices of the anomalous data
# points.
return anomalies
Now, as you might have noticed, the DBSCAN
function takes two arguments, eps
and min_samples
which are passed from the anomaly_detection
function. These are hyperparameters of DBSCAN
; eps
stands for epsilon and min_samples
stands for minimum number of points that must be in the neighborhood of a given point in epsilon distance. To keep things light for this post, I won't be explaining DBSCAN that much. But if you want to know more, you can check out this Wikipedia article for a deeper understanding of DBSCAN.
Now, you can hand tune these hyperparameters, but it's really time consuming and requires a lot of guessing based on observations. To make life easier, there are methods to automatically tune the hyperparameters. We are going to use the method explained in this Medium article, so please check it out for a better grasp on what we are about to do.
First hyperparameter to tune is min_samples
For that, we write the function find_ideal_min_samples
:
def find_ideal_min_samples(X: np.array, range_min_samples: list):
# We apply sliding windows to our column of values, as we will be using DBSCAN in each window. We also save
# the number of windows and initialize min_sample_scores, a numpy array in which we will add the silhouette
# score for each window for each min_sample values in separate columns.
X_windowed = np.lib.stride_tricks.sliding_window_view(x=X, window_shape=window_size, axis=0)
(size_0, _, _) = X_windowed.shape
min_sample_scores = np.zeros(shape=(1, len(range_min_samples)))
# Below, we compute and add our silhouette scores to the min_sample_scores array.
for window in range(size_0):
for i in range(len(range_min_samples)):
clustering = KMeans(n_clusters=range_min_samples[i])
cluster_labels = clustering.fit_predict(X_windowed[window][0][:].reshape(-1,1))
silhouette_avg = silhouette_score(X_windowed[window][0][:].reshape(-1,1), cluster_labels)
min_sample_scores[0][i] = min_sample_scores[0][i]+silhouette_avg
# Here, we divide the total scores for all min_samples values to find and average for each. From those, we
# select the min_samples value with the highest score, which is our ideal min sample, and we return it.
min_sample_scores = min_sample_scores / size_0
ideal_min_sample = range_min_samples[np.where(min_sample_scores.max)[0][0]]
return ideal_min_sample
Next is eps
. To find the ideal eps
value, we have two steps; first we find a range of eps
values, then we compute silhouette scores for each and select the highest scoring one.
def find_eps_range(X: np.array, range_const: int):
# The first thing we need to do is to calculate the distances between each consecutive sample, store them
# in a numpy array and sort them.
dists = np.zeros_like(X)
for i in range(X.size-1):
dist = np.linalg.norm(X[i]-X[i+1])
dists[i] = dist
dists = np.sort(dists, axis=0)
# Below, we have two Matplotlib figures; one is the entire set of distances and the other one is the same
# as the other, but its distance value axis is bounded. The PATH_PREFIX is a global constant that you need
# to set beforehand, which is the path of the fal_dbt_exams repo on your local machine.
plt.plot([i for i in range(dists.size)], dists, 'b.', markersize=4)
plt.xlabel('Time')
plt.ylabel('Value')
plt.savefig(f'{PATH_PREFIX}/fal_scripts/anomaly_detection_other/distance_between_samples.jpg')
plt.clf()
bounded_dists_i = np.where(dists<=range_const*5)[0]
plt.plot(bounded_dists_i, [dists[i] for i in bounded_dists_i], 'b.', markersize=4)
plt.xlabel('Time')
plt.ylabel('Value')
plt.savefig(f'{PATH_PREFIX}/fal_scripts/anomaly_detection_other/distance_between_samples_bounded.jpg')
plt.clf()
This is where our system needs some hand tuning. However, please note that this can also be automated for a fully unsupervised anomaly detection system, but requires more dedication. For our example, the extra work can't really be justified, while on the other hand it provides useful visual insight to see how the eps
is selected.
We need a curve with an elbow, or a knee, as it will be the eps
. However, we also need the smallest value possible. To do this we can bound the graph and search for an elbow again. For ease of use we have a constant that controls the bound, namely the range_const
. Here we have the first figure, which has not been bounded yet:

As we can see, there is a curve with a really sharp bend. However, as we said before, for an optimal eps
range we need to bound the plot.
Here we have a bounded curve with a range_const
roughly equaling 2.5% of the maximum value of the distances. Setting the range_const
to this value often provides a good starting point. For our case, that value gives us a pretty good elbow curve with low values:

If we set the range_const
even smaller, to roughly 1% of the maximum, we get a parabola like curve, not the sharp elbow we are looking for.

Now all we have to do is to create a relatively small range of eps
values to test. Although you are free to change stop
and step
of the range
function, if we take a look at the bounded plot with the good range constant, it is visible that a higher stop
won't be necessary. For the step
, it is really up to your compute power, for our example, going up with a step
size of range_const
is OK.
range_const = int(floor(np.amax(column_y)*0.025))
range_eps = range(range_const, (range_const*5)+1, range_const)
Now that we have a range of eps
values to test, it is time to see how they fare against each other. We set up a function called find_ideal_eps
to do the job:
def find_ideal_eps(X: np.array, min_samples: int, window_size: int, range_eps: list):
# Again, we apply sliding windows to our column of values, take note of the number of windows, and
# initialize an array that is esentiallt same as min_sample_scores, but for epsilon values.
X_windowed = np.lib.stride_tricks.sliding_window_view(x=X, window_shape=window_size, axis=0)
(size_0, _, _) = X_windowed.shape
eps_scores = np.zeros(shape=(1, len(range_eps)))
# Here, we compute silhouette scores of each eps value and get a sum for all the windows. For the
# clustering we use DBSCAN, which means that we will be using our recently found ideal min_samples value,
# as the two hyperparameters affect each other.
for window in range(size_0):
for i in range(len(range_eps)):
clustering = DBSCAN(eps=range_eps[i], min_samples=min_samples).fit(X_windowed[window][0][:].reshape(-1,1))
labels = clustering.labels_
if np.unique(labels).size > 1:
silhouette_avg = silhouette_score(X_windowed[window][0][:].reshape(-1,1), labels)
eps_scores[0][i] = eps_scores[0][i]+silhouette_avg
# We calculate the average and return the ideal eps value.
eps_scores = eps_scores / size_0
ideal_eps = range_eps[np.where(eps_scores.max)[0][0]]
return ideal_eps
This is all we need to find anomalies on our model. Now we need to set up two more functions to get a Slack bot to message us some information and a plot of the anomalies in our model.
Sending results via Slack
The two functions we need are plot_anomalies
and send_slack_file
, which is from the metric forecast and slack bot examples.
def plot_anomalies(column_y: np.array, column_date: np.array, anomalies: np.array):
# Below, we plot the data and note tha anomalies in a Matplotlib figure. Then, we save the figure with
# a name that is the timestamp of the time which the anomalies was computed. Then, we return the path
# of the figure for the Slack function.
fig = plt.figure(figsize=(15,5))
axes = fig.add_axes([0.1, 0.1, 0.8, 0.8])
axes.plot([column_date[i] for i in range(column_y.size)], column_y, 'b.', markersize=4)
axes.plot([column_date[i] for i in anomalies], [column_y[i] for i in anomalies], 'r^', markersize=4)
axes.set_xlabel('Time')
axes.set_ylabel('Value')
axes.legend(['Actual', 'Anomaly Found'])
now = str(datetime.datetime.now())
now = now[:10]+'-'+now[11:13]+now[14:16]
fpath = f'{PATH_PREFIX}/fal_scripts/anomaly_detection/{now}-{TIMEZONE}.jpg'
fig.savefig(fname=fpath)
plt.clf()
return fpath
def send_slack_file(file_path: str, message_text: str, channel_id: str, slack_token: str):
# Here, we take the filename of the saved figure and send it with a message.
client = WebClient(token=slack_token)
try:
client.files_upload(
channels=channel_id,
file=file_path,
title="fal.ai anomaly detection",
initial_comment=message_text,
)
except SlackApiError as e:
assert e.response["error"]
Finding anomalies on a dbt model with fal
At last, we have all the functions needed for anomaly detection. However, there is a very important thing missing: The model. Using fal we can load our example model, or your model, in the form of a DataFrame with ease using the extremely handy ref()
function and the context
object.
model_df = ref(context.current_model.name).sort_values(by='ds')
Now that we have all our functions and the model, we can write the following that ties everything together for our script:
# Then, we separate the DataFrame by columns into the values, 'y', and dates, 'ds'. Then we reshape them
# to fit our functions.
column_y = model_df['y'].to_numpy(dtype=np.float).reshape(-1,1)
column_date = model_df['ds'].to_numpy(dtype=datetime.datetime).reshape(-1,1)
# Here, we set the arbitrarily chosen window size for the sliding windows application. For the size of
# our model, 100 is a good window size.
window_size = 100
# Here, we set and arbitrary list of min_samples values, for numerical time-series data, the list below is good.
# With the range of possible min_samples values, we use our find_ideal_min_samples to find the min_samples for
# our system.
range_min_samples = [2,3,4,5]
min_samples = find_ideal_min_samples(column_y, range_min_samples)
# With our min_samples value, we find the optimal epsilon value.
range_const = int(floor(np.amax(column_y)*0.025))
find_eps_range(column_y, range_const)
range_eps = range(range_const, (range_const*5)+1, range_const)
eps = find_ideal_eps(column_y, min_samples, window_size, range_eps)
# Finally, we find the anomalies using our anomaly_detection function, and send them via Slack, while also sending
# a handy command line message.
anomalies = anomaly_detection(column_y, eps, min_samples, window_size)
fpath = plot_anomalies(column_y, column_date, anomalies)
now = str(datetime.datetime.now())
date = now[:10]
hour = now[11:13]+now[14:16]
print(f'anomalies: {anomalies.size}\\\\ndata: {column_y.size}\\\\npercentage: {(anomalies.size/column_y.size)*100}%')
# This is to get around a bug, usually it is not needed.
ssl._create_default_https_context = ssl._create_unverified_context
message = f'fal.ai anomaly detection.\\\\nFound {anomalies.size} anomalies.\\\\nModel: {context.current_model.name}\\\\nDate: {date}\\\\nTime: {hour}-{TIMEZONE}\\\\neps: {eps}, min_samples: {min_samples}, window_size: {window_size}'
# The CHANNEL_ID and SLACK_TOKEN are global variables that have been set beforehand to the Slack bot and channel
# that we are using.
send_slack_file(fpath, message, CHANNEL_ID, SLACK_TOKEN)

And, that's it. We have our Slack bot notifying us about the anomalous data points. Now, let's take a look at the anomalies that have been spotted on our data and do a little analysis:
As we can see, the system finds 3 anomalies in the first wave and 1 while the wave flattens. This behavior is unexpected, however it is insignificant enough that only the extreme outliers are reported as anomalous. In this second wave, we can observe a sharper upwards trend that the system classifies most of the wave as anomalous.
In this example, we can see the power of fal, helping us build operational data science workloads right from our dbt project.
If you want to check out the entire Python script, you can find it here.