top of page

Orchestrating AI-driven Geospatial Workflows with Prefect

Introduction

In our latest project within the forestry industry, we faced the challenge of processing large-scale aerial imagery for AI-driven object detection and Machine Learning (ML) analysis. The goal was to use multiple data sources to generate Canopy Height Models (CHM), perform statistical analyses, and produce delineation (dividing the forest into logical sectors) outputs efficiently.


With many required steps, we needed a robust workflow orchestration tool to handle our ETL (Extract, Transform, Load) processes, manage data ingestion, execute AI models, and aggregate results. After evaluating multiple options, we selected Prefect as our data orchestrator. In this article, we outline why Prefect was chosen and how it streamlined our MLOps pipeline.


The Challenge

The project required us to process high-resolution aerial imagery and LiDAR datasets, detect trees, calculate canopy heights, and extract meaningful statistics to divide the forest into logical sectors.


Our pipeline needed to handle these key components:

  1. Preprocessing

    1. Orthophotomap generation

    2. Geospatial transformations for coordinate system alignment

    3. Point cloud cropping to focus on regions of interest


  2. AI Object Detection

    1. Tree detection using deep learning models

    2. Instance segmentation for other type of objects (ground preparation, etc.)


  3. Single Tree Metrics

    1. Point cloud alignment for accurate measurements

    2. Tree top detection algorithms

    3. CHM (Canopy Height Model) calculation

    4. Individual tree metrics computation


  4. Forest Stand Delineation

    1. Metrics data loading and validation

    2. Stand boundary detection and processing

    3. Results storage in database


  5. Aggregated Stats Calculation

    1. Statistical analysis of tree populations

    2. Stand-level metrics computation


  6. Results Storage

    1. Structured data export to databases

    2. Artifact storage in cloud storage

    3. Result validation and verification

Flowchart diagram showing a pipeline for forestry data processing, including AI Object Detection, tree metrics, and result storage.
Pipeline overview

Managing these steps efficiently required a tool that could handle the complexity of the pipeline and provide a flexible and scalable way to execute the pipeline.


Why Prefect?

We evaluated several orchestration tools and chose Prefect, which stood out with:


  • Dynamic DAG (directed acyclic graph) processing – Prefect's workflows can be dynamically constructed at runtime, making them more flexible. We can create a workflow with specific steps or conditionally switch to other steps when chosen or depending on the results of previous steps.

  • Python-native – we already had existing functions that we simply could wrap with Prefect decorators (@flow, @task), which served as the glue between our code and Prefect. Not much boilerplate code was needed.

  • Error Handling & Resilience – Automatic retries, error tracking, and logging ensured that long-running tasks could be easily tracked, recovered and debugged.

  • Local Development & Cloud Execution – We could test locally and seamlessly transition to cloud execution with multiple options in each cloud provider.

  • UI – Prefect server provides a beautiful UI to monitor the pipeline, track the status of tasks, and view logs. It also allows other team members to run flows they are not actively working on.

  • Logging features - Aggregates logging information in each step with the same format, enabling quick analysis of failure points instead of having to search through logs.


Technical Implementation

Here's simplified example of how you could implement a pipeline like thisusing Prefect with different flows and tasks.

from prefect import task, flow, run_deployment
from forest_app.data_processing import load_lidar_data, crop_lidar_data, save_lidar_data, calculate_chm
from forest_app.image_processing import generate_orthophotomap, crop_point_cloud, geospatial_postprocessing
from forest_app.stats import compute_chm, compute_raw_stats, compute_aggregated_stats, compute_delineation
from forest_app.ai_models import detect_trees

# preprocess data
@task
def preprocess_lidar(order_id: str):
    """Processes LiDAR data"""
    load_lidar_data(order_id)
    crop_lidar_data(order_id)
    geospatial_postprocessing(order_id, 'lidar')
    save_lidar_data(order_id)

@task
def preprocess_images(order_id: str):
    """Processes images"""
    generate_orthophotomap(order_id)
    geospatial_postprocessing(order_id, 'images')

# Preprocessing is organized as separate flow to be able to run separately if needed
@flow
def preprocess_data(order_id: str):
    """Handles geospatial transformations and orthophotomap generation"""
    process_lidar(order_id)
    process_images(order_id)

# AI detection - separate flow for different run time environment
@flow
def detect_trees(order_id: str, model: str):
    """Runs AI model for tree detection"""
    detections = model.predict(processed_data)
    save_detections(order_id, detections)


# CHM, raw stats, aggregated stats, delineation
@task
def generate_chm(order_id: str, method: str):
    """Generates Canopy Height Model"""
    detections = load_detections(order_id)
    calculate_chm(order_id, detections)

@task
def calculate_raw_stats(order_id: str):
    """Calculates raw statistics from CHM"""
    chm = load_chm(order_id)
    raw_stats = compute_raw_stats(order_id, chm)
    save_results(order_id, raw_stats)
    return raw_stats

@task
def calculate_aggregated_stats(order_id: str):
    """Calculates aggregated statistics"""
    raw_stats = load_stats(order_id)
    aggregated_stats = compute_aggregated_stats(raw_stats)
    save_results(order_id, aggregated_stats)
    return aggregated_stats

@task
def perform_delineation(order_id: str):
    """Performs delineation based on aggregated statistics"""
    load_stats(order_id)
    delineation_results = compute_delineation(order_id)
    save_results(order_id, delineation_results)

# main flow that orchestrates all the above flows and tasks
@flow(name="Forestry Analysis Pipeline", log_prints=True)
def forestry_pipeline(
    order_id: str,
    run_preprocess: bool = True,
    chm_method: str = 'chm_method_1',
    detection_model: str = 'tree_detection/latest'):
    # run sub-flow conditionally on same environment
    # only if it doesn't come in required format
    if run_preprocess:
        print("Running preprocess")
        preprocess_data(order_id)

    # run AI detection flow using different deployment for different infrastructure
    print("Running AI detection")
    run_deployment(
        'detect_trees/detect_trees_gpu_deployment',
        params={'order_id': order_id, 'model': detection_model}
    )

    # rest of the pipeline is executed on same environment
    print("Running CHM")
    generate_chm(order_id, chm_method)
    print("Running raw stats")
    calculate_raw_stats(order_id)
    print("Running delineation")
    perform_delineation(order_id)
    print("Running aggregated stats")
    calculate_aggregated_stats(order_id)
    print("Postprocessing artifacts")
    postprocess_artifacts(order_id)

You can see main properties of the pipeline:


  • Flow - is main unit of work in Prefect that is used to orchestrate tasks. Flow is a function that is decorated with @flow. It's directly connected to execution deployment which decides where and how the flow will be executed (via deployment). If you need different runtime environment for different parts of the pipeline you can create different flows for them and compose them together. It's good to think of flows as milestones in the pipeline that you can run separately or together.

  • Tasks - each task is a function that is decorated with @task they are smaller units of work that are orchestrated by the flow. They can consist of implementation or composition of other smaller functions.

  • Deployment - deployment is a server-side description of flow metadata that is used to run the flow. It also contains information about the infrastructure where the flow will be executed (workpool). After writing the flow you can deploy it to a Prefect Cloud or Prefect Server and then run it from the UI or from the command line. Think of deployment as pushing flow metadata to the server.

  • Parameters - parameters of flows and tasks are typed and are not only function parameters but prefect server uses them during deployment to build UI forms when you run the flow from the graphical interface.

  • Results - results of tasks can be passed to other tasks or stored in a database or cloud storage.

  • Logging - logging is enabled by default you can even use log_prints=True to log print statements without setting up logging.


Additional properties not shown in the example:


  • Caching - tasks can be cached to avoid re-running the same task with the same input. This can be done using Prefect native cache or implemented using custom caching logic like using external storage and loading from there.

  • Retries/Timeouts/Cancellation - tasks can be retried and timeout if they take too long to complete. You can also cancel a flow from the UI or from the command line or from the API.

  • Concurrency - when tasks are run in one machine you can limit the number of concurrent tasks to avoid overloading the machine.

  • Events - tasks can emit events that can be used to trigger other tasks or flows or be triggered by them.


Evolution of Our Architecture

We showed how to implement a pipeline using Prefect. Now let's look at how we could deploy it for different infrastructure setup as tasks need to be executed on certain infrastructure.


To understand the evolution of our architecture, we need to understand the different deployment options in Prefect.


For details on how to deploy prefect for different infrastructure setup refer to Prefect documentation.


But in the heart of it all there are three main components:

  • Prefect Server - An application responsible for storing the state of flow runs, scheduling, and orchestrating flow runs.

  • Workpool - Part of the server that acts as a bridge between the server and worker. It defines runtime environment settings (CPU, memory, network, etc.) used by workers to run tasks.

  • Worker - is a standalone process that is responsible for executing the tasks on certain infrastructure, it polls the server for tasks to execute. Important to note that workers can be deployed on different infrastructure than the server and they can be separate from machines that run the jobs.

Flowchart showing Prefect API and execution environment. Deployment assigned to Work Pool. Worker polls, creates Infrastructure with Flow Runs A and B.
Prefect architecture

Our architecture evolved through four distinct phases, each addressing specific needs while exploring different deployment options.


The final goal was to have Prefect Server running on scalable infrastructure and workers running on demand when needed. We had large spikes of work that could be parallelized with a lot of different orders coming in at once.


Different flows had different infrastructure requirements - some were CPU intensive, some were memory intensive, and AI models required GPUs. As a result, we couldn't rely on a single machine or configuration to run all the flows.


  1. Prefect Cloud initial setup

    • Used Prefect Cloud's managed service

    • Quick to set up (you host only the worker) but limited control over data, costs, and history

    • Good starting point to get familiar with Prefect and its UI


  2. Self-hosted single machine

    • Migrated to self-hosted Prefect Server Prefect GitHub

    • All components (Server, Workpool, Worker) on single Google Cloud VM

    • Better control but limited isolation between runs


  3. Self-hosted single machine with containerized execution

    • Introduced Docker containerization to isolate each run in its own container

    • Improved reproducibility and dependency management as we could package all necessary tools into containers that are used by certain tasks

    • Still constrained by single machine resources


  4. Scaling with Vertex AI Custom Jobs

    • Execution on-demand using Vertex AI Custom Jobs

    • We could scale workers to zero when not in use and automatically provision high-CPU instances when needed, keeping costs low by running a small instance with Prefect Server and Workers

Flowchart illustrating Prefect Cloud and Google Cloud Platform runtime environments, showing task execution in various setups using GCP VM and Docker.
Prefect architecture evolution

Outcomes & Benefits

By leveraging Prefect & Vertex AI Custom Jobs, we achieved:

  • Scalability – Parallel execution reduced processing time significantly.

  • Cost Efficiency – With Vertex AI Custom Jobs, we minimized costs by scaling resources dynamically. Keeping only small instance for Prefect Server and Workers and running flows in on-demand instances.

  • Reliability – Built-in monitoring ensured that we could track the status of our pipeline and that it was resilient to failures.

  • Faster Iteration – ML models and preprocessing steps could be rapidly debugged, updated and tested. It is easy to modify workflows as project requirements evolved. We can move the steps around, add new ones add conditional logic.

  • UI – Prefect server provides a beautiful UI to monitor the pipeline, track the status of the tasks and logs. It also allows other team members to run flows they are not actively working on.


Prefect also supports other cloud providers like AWS, Azure, etc. and has integrations with multiple services within each cloud provider e.g. Cloud Run, ECS, etc. so we also could have deployed Prefect Server and Workers on different infrastructure if needed.


For more details on how to deploy Prefect for different infrastructure setup refer to Prefect documentation.


Conclusion On Orchestrating AI Workflows

Prefect proved to be an invaluable tool for orchestrating our forestry pipeline. Its flexibility, reliability, and ease of use enabled us to streamline complex workflows and deliver results faster. As MLOps continues to evolve, tools like Prefect will play a crucial role in operationalizing data and ML pipelines.


If you're considering an orchestration tool for ML pipelines, Prefect is definitely worth exploring.


This is one of the many production-grade AI/ML cloud systems we have built for our customers at Apptimia. If you're building scalable AI/ML cloud processing solutions and need help, get in touch with us!

Michal O.

Lead Software Engineer at Apptimia

bottom of page