07 Parallel Computing on Clusters#

This example demonstrates how to use parallel computing on a cluster to accelerate CFAST fire simulations using PyCFAST and Dask.

Locally you would normally use multiprocessing or joblib library to run simulations in parallel on multiple CPU cores. However, for larger parameter studies or optimization tasks, you may want to scale up to an HPC cluster or cloud environment.

We’ll compare sequential (single-core) vs parallel execution and show how to set up distributed computing for CFAST simulations.

Step 1: Import Required Libraries#

We’ll need the following libraries:

  • dask.distributed: For parallel computing

  • NumPy: Numerical operations

import os
import shutil
import time
import uuid
from pathlib import Path

import numpy as np
from dask.distributed import Client, LocalCluster, get_worker

from pycfast.parsers import parse_cfast_file

Step 2: Setting Up the Dask Client#

Dask provides a flexible framework for parallel computing. It can be used with a variety of cluster managers, including local clusters, HPC schedulers (like SLURM), and cloud services.

Here we create a local cluster that will use 4 CPU cores on your machine.

Cluster Configuration:

  • n_workers: Number of worker processes (typically = number of CPU cores)

  • threads_per_worker: Threads per worker (1 for CPU-bound tasks like CFAST)

  • memory_limit: Memory limit per worker to prevent system overload

cluster = LocalCluster(
    n_workers=4,
    threads_per_worker=1,
    memory_limit="256MB",
)
client = Client(cluster)

The Dask client is now set up and ready to manage our parallel computations. You can monitor the cluster’s performance and task progress using the Dask dashboard at http://localhost:8787/status while the simulations are running.

client

Client

Client-6e1d67de-52a9-11f1-8b75-7db9c78bb37c

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info



Step 3: Load Base Model#

We start with an existing CFAST model as our template. We’ll use parse_cfast_file to load the USN_Hawaii_Test_03.in model. This model serves as the foundation that we’ll modify systematically to generate our dataset.

model = parse_cfast_file("data/USN_Hawaii_Test_03.in")

The parsed model is displayed below.

print(model.summary())
Model: USN_Hawaii_Test_03_parsed.in
Simulation: 'HawaiiTest 3' (570s)

Components:
  Material Properties (2):
    Material 'CONCRETE' (Concrete Normal Weight (6 in)): k=1.6, ρ=2400.0, c=0.75, t=0.15, ε=0.94
    Material 'STEELSHT' (Steel Plain Carbon (1/16 in)): k=60.0, ρ=7850.0, c=0.48, t=0.0015, ε=0.9
  Compartment (1):
    Compartment 'Bay 1': 97.6x74.0x14.8 m, volume: 106891.52 m³ (ceiling: CONCRETE, wall: CONCRETE, floor: CONCRETE)
  Fire (1):
    Fire 'Hawaii_03' (Hawaii_03_Fire) in 'Bay 1' at (36.7, 39.9) (peak: 1 kW, duration: 11min, χr: 0.4)
  Device (1):
    Target 'Targ 1' (PLATE) in 'Bay 1' at (36.7, 39.9, 14.7) (material: STEELSHT, depth: 0.00075m)

Step 4: Generate Parameter Combinations#

We use NumPy to create systematic parameter variations. For this study, we’ll vary two key fire parameters:

  • Heat of combustion: Energy released per unit mass of fuel (affects fire intensity)

  • Radiative fraction: Portion of fire energy released as radiation (affects heat transfer)

For demonstration, we’ll use a smaller sample size. In practice, you might use hundreds or thousands of combinations.

n_samples = 100

heat_of_combustion_values = np.linspace(15000, 25000, n_samples)  # kJ/kg
radiative_fraction_values = np.linspace(0.2, 0.4, n_samples)  # Fraction

parameter_combinations = list(
    zip(heat_of_combustion_values, radiative_fraction_values, strict=False)
)

print(f"Generated {len(parameter_combinations)} parameter combinations")
print(
    f"Heat of combustion range: {heat_of_combustion_values[0]:.0f}"
    f" - {heat_of_combustion_values[-1]:.0f} kJ/kg"
)
print(
    f"Radiative fraction range: {radiative_fraction_values[0]:.2f}"
    f" - {radiative_fraction_values[-1]:.2f}"
)
Generated 100 parameter combinations
Heat of combustion range: 15000 - 25000 kJ/kg
Radiative fraction range: 0.20 - 0.40

Step 5: Sequential Execution (Single Core)#

First, let’s run simulations sequentially using a traditional for loop. This will serve as our baseline for performance comparison.

Sequential approach characteristics:

  • Uses only one CPU core

  • Simulations run one after another

  • Simple but slower for multiple runs

def run_sequential(heat_of_combustion, radiative_fraction, file_name=None):
    temp_model = model.update_fire_params(
        fire="Hawaii_03_Fire",
        heat_of_combustion=heat_of_combustion,
        radiative_fraction=radiative_fraction,
    )

    results = temp_model.run(file_name=file_name)

    return results

Sequential execution with timing.

start_time = time.perf_counter()
all_runs_sequential = []

print("Running simulations sequentially")
for i, (hoc, rf) in enumerate(parameter_combinations):
    if i % 5 == 0:
        print(f"Running simulation {i + 1}/{len(parameter_combinations)}")

    outputs = run_sequential(heat_of_combustion=hoc, radiative_fraction=rf)
    all_runs_sequential.append(
        {
            "simulation_id": i,
            "hoc": hoc,
            "rf": rf,
            "outputs": outputs,
        }
    )

sequential_time = time.perf_counter() - start_time
print(f"\nSequential execution completed in {sequential_time:.2f} seconds")
print(
    f"Average time per simulation: "
    f"{sequential_time / len(parameter_combinations):.2f} seconds"
)
Running simulations sequentially
Running simulation 1/100
Running simulation 6/100
Running simulation 11/100
Running simulation 16/100
Running simulation 21/100
Running simulation 26/100
Running simulation 31/100
Running simulation 36/100
Running simulation 41/100
Running simulation 46/100
Running simulation 51/100
Running simulation 56/100
Running simulation 61/100
Running simulation 66/100
Running simulation 71/100
Running simulation 76/100
Running simulation 81/100
Running simulation 86/100
Running simulation 91/100
Running simulation 96/100

Sequential execution completed in 14.19 seconds
Average time per simulation: 0.14 seconds

Step 6: Parallel Execution with Dask#

Now let’s implement the same simulations using parallel execution. This approach distributes work across multiple CPU cores.

Parallel approach characteristics:

  • Uses multiple CPU cores simultaneously

  • Each worker runs in isolated temporary directories

  • Requires careful handling of file I/O to avoid conflicts

def _run_one(hoc, rf, sim_idx: int):
    w = get_worker()

    rundir = Path(w.local_directory) / f"cfast-{uuid.uuid4().hex}"
    rundir.mkdir(parents=True, exist_ok=True)

    try:
        in_name = rundir / f"parallel_sim_{sim_idx:03d}.in"

        outputs = run_sequential(
            heat_of_combustion=hoc, radiative_fraction=rf, file_name=str(in_name)
        )

        return {
            "simulation_id": sim_idx,
            "hoc": hoc,
            "rf": rf,
            "outputs": outputs,
        }
    finally:
        shutil.rmtree(rundir, ignore_errors=True)


def run_all_parallel(parameter_combinations, client: Client):
    futures = [
        client.submit(_run_one, hoc, rf, i, pure=False)
        for i, (hoc, rf) in enumerate(parameter_combinations)
    ]

    results = client.gather(futures)
    return results

As mentioned earlier, you can monitor progress in real time on the Dask dashboard at http://localhost:8787/status to see real-time progress and resource usage.

start_time = time.perf_counter()

all_runs_parallel = run_all_parallel(parameter_combinations, client)

parallel_time = time.perf_counter() - start_time
print(f"\nParallel execution completed in {parallel_time:.2f} seconds")
print(
    f"Average time per simulation: "
    f"{parallel_time / len(parameter_combinations):.2f} seconds"
)
Parallel execution completed in 6.92 seconds
Average time per simulation: 0.07 seconds

Step 7: Speed Comparison#

Note: For small workloads, parallel overhead may exceed benefits.

print(f"Sequential execution time: {sequential_time:.2f} seconds")
print(f"Parallel execution time:   {parallel_time:.2f} seconds")
Sequential execution time: 14.19 seconds
Parallel execution time:   6.92 seconds

Below we compute and display the speedup factor, parallel efficiency, and time saved by using parallel execution compared to sequential execution.

speedup = sequential_time / parallel_time
efficiency = speedup / len(client.scheduler_info()["workers"]) * 100

print(f"Speedup factor: {speedup:.2f}x")
print(f"Parallel efficiency: {efficiency:.1f}%")

time_saved = sequential_time - parallel_time
print(
    f"Time saved: {time_saved:.2f} seconds ({time_saved / sequential_time * 100:.1f}%)"
)
Speedup factor: 2.05x
Parallel efficiency: 51.2%
Time saved: 7.27 seconds (51.2%)

Cleanup#

Clean up generated files from sequential run and close the Dask cluster.

files_removed = 0
for fname in os.listdir("."):
    if fname.startswith("USN_Hawaii_Test_03_"):
        try:
            os.remove(fname)
            files_removed += 1
        except Exception as e:
            print(f"Could not remove {fname}: {e}")

print(f"Cleanup complete. Removed {files_removed} sequential simulation files.")

client.close()
cluster.close()
print("Dask cluster closed successfully")
Cleanup complete. Removed 12 sequential simulation files.
Dask cluster closed successfully

Total running time of the script: (0 minutes 23.207 seconds)

Gallery generated by Sphinx-Gallery