Parallelizing Python Workflows in fal

Parallelizing Python Workflows in fal

Today's release introduces parallelization of Python scripts in your fal runs. During a run, fal creates a directed acyclic graph (DAG) of links between models and scripts. The number of threads represents the maximum number of tasks fal handles simultaneously.

When fal is running, it will read the configuration for threads the same way dbt does it. We can set it in the profiles.yml file like this:

fal_test:
  target: dev
  outputs:
    dev:
      type: postgres
      threads: 4
      # ...
profiles.yml
❯ fal run --profiles-dir .
16:48:50  Found 3 models, 3 tests, 0 snapshots, 0 analyses, 165 macros, 0 operations, 2 seed files, 1 source, 0 exposures, 0 metrics
12:48:51 | Starting fal run for following models and scripts:
agent_wait_time: /Users/matteo/Projects/fal/fal/integration_tests/mock/fal_scripts/test.py
zendesk_ticket_data: /Users/matteo/Projects/fal/fal/integration_tests/mock/fal_scripts/test.py
john_table: /Users/matteo/Projects/fal/fal/integration_tests/mock/fal_scripts/john_test.py

Concurrency: 4 threads
   float_col  integer_col string_col
0       15.0         10.0         12
float_col      float64
integer_col    float64
string_col      object
dtype: object
Test done for agent_wait_time
Test done for zendesk_ticket_data
fal run of our integration tests project

Or we can pass a --threads n flag:

❯ fal run --profiles-dir . --threads 1
16:48:39  Found 3 models, 3 tests, 0 snapshots, 0 analyses, 165 macros, 0 operations, 2 seed files, 1 source, 0 exposures, 0 metrics
12:48:39 | Starting fal run for following models and scripts:
agent_wait_time: /Users/matteo/Projects/fal/fal/integration_tests/mock/fal_scripts/test.py
zendesk_ticket_data: /Users/matteo/Projects/fal/fal/integration_tests/mock/fal_scripts/test.py
john_table: /Users/matteo/Projects/fal/fal/integration_tests/mock/fal_scripts/john_test.py

Concurrency: 1 threads
Test done for agent_wait_time
Test done for zendesk_ticket_data
   float_col  integer_col string_col
0       15.0         10.0         12
float_col      float64
integer_col    float64
string_col      object
dtype: object
fal run of our integration tests project with only 1 threadN

In both cases, we see that fal communicated the number of threads with a line Concurrency: 1 threads. Notice that the order of logging changes between the two example runs: the 1 thread example logs in the same order they were announced before (<agent_wait_time, test.py>, <zendesk_ticket_data, test.py>, <john_table, john_test.py>), while 4 threads example logs in a different order; which is an expected race condition of threads.

The profiles.yml and --threads n flag settings also work for the fal flow run command. And it sets up the threads to use for the dbt run section.

❯ fal flow run --profiles-dir . --threads 2
16:55:33  Found 3 models, 3 tests, 0 snapshots, 0 analyses, 165 macros, 0 operations, 2 seed files, 1 source, 0 exposures, 0 metrics
12:55:34 | Starting fal run for following models and scripts:
agent_wait_time: /Users/matteo/Projects/fal/fal/integration_tests/mock/fal_scripts/before.py

Concurrency: 2 threads
Before for agent_wait_time

Executing command: dbt --log-format json run --project-dir /Users/matteo/Projects/fal/fal/integration_tests/mock --profiles-dir . --threads 2
Running with dbt=1.0.4
Found 3 models, 3 tests, 0 snapshots, 0 analyses, 165 macros, 0 operations, 2 seed files, 1 source, 0 exposures, 0 metrics
Concurrency: 2 threads (target='dev')
1 of 3 START table model dbt_fal.agent_wait_time................................ [RUN]
2 of 3 START table model dbt_fal.john_table..................................... [RUN]
1 of 3 OK created table model dbt_fal.agent_wait_time........................... [SELECT 76 in 0.24s]
3 of 3 START table model dbt_fal.zendesk_ticket_data............................ [RUN]
2 of 3 OK created table model dbt_fal.john_table................................ [SELECT 1 in 0.26s]
3 of 3 OK created table model dbt_fal.zendesk_ticket_data....................... [SELECT 10 in 0.11s]
Finished running 3 table models in 0.58s.
Completed successfully
Done. PASS=3 WARN=0 ERROR=0 SKIP=0 TOTAL=3

12:55:39 | Starting fal run for following models and scripts:
zendesk_ticket_data: /Users/matteo/Projects/fal/fal/integration_tests/mock/fal_scripts/test.py
agent_wait_time: /Users/matteo/Projects/fal/fal/integration_tests/mock/fal_scripts/test.py
john_table: /Users/matteo/Projects/fal/fal/integration_tests/mock/fal_scripts/john_test.py

Concurrency: 2 threads
Test done for zendesk_ticket_data
Test done for agent_wait_time
   float_col  integer_col string_col
0       15.0         10.0         12
float_col      float64
integer_col    float64
string_col      object
dtype: object
fal flow run of our integration tests project with 2 threads

Notice the 2 fal lines and 1 dbt line with threads information: Concurrency: 2 threads for fal and  Concurrency: 2 threads (target='dev') for dbt.

The next step for parallelization is making logging for each different script clearer.

To start working with parallel Python workflows, get the latest version of fal at the fal-ai/fal repository!