Parallelizing Python Workflows in fal

Today's release introduces parallelization of Python scripts in your fal runs. During a run, fal creates a directed acyclic graph (DAG) of links between models and scripts. The number of threads represents the maximum number of tasks fal handles simultaneously.
When fal is running, it will read the configuration for threads the same way dbt does it. We can set it in the profiles.yml
file like this:
fal_test:
target: dev
outputs:
dev:
type: postgres
threads: 4
# ...
❯ fal run --profiles-dir .
16:48:50 Found 3 models, 3 tests, 0 snapshots, 0 analyses, 165 macros, 0 operations, 2 seed files, 1 source, 0 exposures, 0 metrics
12:48:51 | Starting fal run for following models and scripts:
agent_wait_time: /Users/matteo/Projects/fal/fal/integration_tests/mock/fal_scripts/test.py
zendesk_ticket_data: /Users/matteo/Projects/fal/fal/integration_tests/mock/fal_scripts/test.py
john_table: /Users/matteo/Projects/fal/fal/integration_tests/mock/fal_scripts/john_test.py
Concurrency: 4 threads
float_col integer_col string_col
0 15.0 10.0 12
float_col float64
integer_col float64
string_col object
dtype: object
Test done for agent_wait_time
Test done for zendesk_ticket_data
Or we can pass a --threads n
flag:
❯ fal run --profiles-dir . --threads 1
16:48:39 Found 3 models, 3 tests, 0 snapshots, 0 analyses, 165 macros, 0 operations, 2 seed files, 1 source, 0 exposures, 0 metrics
12:48:39 | Starting fal run for following models and scripts:
agent_wait_time: /Users/matteo/Projects/fal/fal/integration_tests/mock/fal_scripts/test.py
zendesk_ticket_data: /Users/matteo/Projects/fal/fal/integration_tests/mock/fal_scripts/test.py
john_table: /Users/matteo/Projects/fal/fal/integration_tests/mock/fal_scripts/john_test.py
Concurrency: 1 threads
Test done for agent_wait_time
Test done for zendesk_ticket_data
float_col integer_col string_col
0 15.0 10.0 12
float_col float64
integer_col float64
string_col object
dtype: object
In both cases, we see that fal communicated the number of threads with a line Concurrency: 1 threads
. Notice that the order of logging changes between the two example runs: the 1 thread example logs in the same order they were announced before (<agent_wait_time, test.py>
, <zendesk_ticket_data, test.py>
, <john_table, john_test.py>
), while 4 threads example logs in a different order; which is an expected race condition of threads.
The profiles.yml
and --threads n
flag settings also work for the fal flow run
command. And it sets up the threads to use for the dbt run
section.
❯ fal flow run --profiles-dir . --threads 2
16:55:33 Found 3 models, 3 tests, 0 snapshots, 0 analyses, 165 macros, 0 operations, 2 seed files, 1 source, 0 exposures, 0 metrics
12:55:34 | Starting fal run for following models and scripts:
agent_wait_time: /Users/matteo/Projects/fal/fal/integration_tests/mock/fal_scripts/before.py
Concurrency: 2 threads
Before for agent_wait_time
Executing command: dbt --log-format json run --project-dir /Users/matteo/Projects/fal/fal/integration_tests/mock --profiles-dir . --threads 2
Running with dbt=1.0.4
Found 3 models, 3 tests, 0 snapshots, 0 analyses, 165 macros, 0 operations, 2 seed files, 1 source, 0 exposures, 0 metrics
Concurrency: 2 threads (target='dev')
1 of 3 START table model dbt_fal.agent_wait_time................................ [RUN]
2 of 3 START table model dbt_fal.john_table..................................... [RUN]
1 of 3 OK created table model dbt_fal.agent_wait_time........................... [SELECT 76 in 0.24s]
3 of 3 START table model dbt_fal.zendesk_ticket_data............................ [RUN]
2 of 3 OK created table model dbt_fal.john_table................................ [SELECT 1 in 0.26s]
3 of 3 OK created table model dbt_fal.zendesk_ticket_data....................... [SELECT 10 in 0.11s]
Finished running 3 table models in 0.58s.
Completed successfully
Done. PASS=3 WARN=0 ERROR=0 SKIP=0 TOTAL=3
12:55:39 | Starting fal run for following models and scripts:
zendesk_ticket_data: /Users/matteo/Projects/fal/fal/integration_tests/mock/fal_scripts/test.py
agent_wait_time: /Users/matteo/Projects/fal/fal/integration_tests/mock/fal_scripts/test.py
john_table: /Users/matteo/Projects/fal/fal/integration_tests/mock/fal_scripts/john_test.py
Concurrency: 2 threads
Test done for zendesk_ticket_data
Test done for agent_wait_time
float_col integer_col string_col
0 15.0 10.0 12
float_col float64
integer_col float64
string_col object
dtype: object
Notice the 2 fal lines and 1 dbt line with threads information: Concurrency: 2 threads
for fal and Concurrency: 2 threads (target='dev')
for dbt.
The next step for parallelization is making logging for each different script clearer.
To start working with parallel Python workflows, get the latest version of fal at the fal-ai/fal repository!