Custom Prefect Result

I use Prefect 2 to run image analysis workflows. An image analysis workflow often contains a series of image transformations which are applied to individual images in sequence. A very basic workflow would take image raw data, apply some filtering to reduce noise and then threshold the data to obtain a semantic segmentation (foreground vs background).

I can easily build this workflow with Prefect 2 and use caching to reduce computations when a flow is re-run. However, it gets more tricky if I want to access and inspect cached task-run results. Because Prefect 2 has currently no support for custom result types. One work-around is to take care of reading and writing image data within the task and only pass file paths between tasks. This works, but it does not feel very nice to add all this extra code to every task.

I think I found a better solution which uses Prefect’s JSONSerializer with custom implementations for the object_encoder and object_decoder and would like to get feedback and inspiration from the community. The code can be found in fmi-faim/custom-prefect-result.

Custom Prefect Result (cpr) introduces custom result targets which have

  • a location: storage location
  • a name: file name
  • an ext: file extension
  • a data: actual result data
    and
  • a data_hash: hash result of data

field. When the data is set for such a target via set_data the data_hash is computed and when such a target is returned by a task and the customized JSONSerializer encounters it, only the location, name, ext and data_hash will be serialized to JSON and the actual data is written to the storage location. On decoding the custom result target is restored and the data can be accessed via get_data, which will load the actual data from the storage location and compute the hash of the loaded data and compare it against the stored data_hash.

Importantly, when the data is stored the data_hash is appended to the file name, which ensures that existing results are not overwritten by task runs with different parameters.

Usage

Installation: pip install git+https://github.com/fmi-faim/custom-prefect-result@main

To use CPR the task results must be of type cpr.Target.Target, the cache_key_fn must use the customized JSONSerializer and the result_serializer must be set to the customized JSONSerializer.

So far, I have only two custom result types:

  • ImageTarget wraps a numpy array and saves the numpy array to a compressed tif-file.
  • CSVTarget wraps a pandas DataFrame and saves it to a csv-file.

I have also added ImageSource and CSVSource, which don’t have the set_data method and only point to existing data. Since ...Source and ...Target are both of type cpr.Resource.Resource the tasks arguments can use Resource as type hint.

Example Flow

from glob import glob
from os import makedirs
from os.path import join, exists

from cpr.Resource import Resource
from cpr.Serializer import cpr_serializer
from cpr.image.ImageSource import ImageSource
from cpr.image.ImageTarget import ImageTarget
from cpr.utilities.utilities import task_input_hash
from prefect import flow, task
from prefect_dask import DaskTaskRunner

from scipy.ndimage import gaussian_filter


@task()
def list_files(input_data_dir):
    images = []
    for f in glob(join(input_data_dir, "*.tif")):
        images.append(ImageSource.from_path(f))

    return images


@task(cache_key_fn=task_input_hash) # Use cpr.utilities.utilities.task_input_hash to hash cpr.Resource.Resource input parameters correctly
def denoise_image(result_dir,
                  image: Resource,
                  sigma):
    output = ImageTarget.from_path(
        path=join(result_dir, f"{image.get_name()}.tif"))

    output.set_data(gaussian_filter(image.get_data(), sigma))

    return output

@task(cache_key_fn=task_input_hash)
def segment_image(result_dir: str,
                  denoised: Resource,
                  threshold: float):
    output = ImageTarget.from_path(
        path=join(result_dir, f"{denoised.get_name()}.tif"))

    output.set_data(denoised.get_data() > threshold)

    return output


task_runner = DaskTaskRunner(
    cluster_class="dask.distributed.LocalCluster",
    cluster_kwargs={
        "n_workers": 8,
        "processes": True,
        "threads_per_worker": 1,
    }
)


@flow(name="Example",
      cache_result_in_memory=False,
      persist_result=True,
      result_serializer=cpr_serializer(), # Use CPR serializer to catch cpr.Resource.Resource objects.
      task_runner=task_runner)
def measure_flow(
        input_data_dir: str = "/home/tibuch/Data/broad/nuclei_U2OS/images/",
        result_root_dir: str = "/home/tibuch/Data/broad/nuclei_U2OS/",
        denoise_sigma: float = 2,
        threshold: float = 400):

    assert exists(result_root_dir), "Output directory does not exist."

    denoising_results = join(result_root_dir, "denoised")
    segmentation_results = join(result_root_dir, "segmented")
    makedirs(denoising_results, exist_ok=True)
    makedirs(segmentation_results, exist_ok=True)

    img_files = list_files.submit(input_data_dir).result()

    for file in img_files:
        denoised = denoise_image.submit(denoising_results, file,
                                        denoise_sigma)

        segmented = segment_image.submit(segmentation_results, denoised,
                                         threshold)

if __name__ == "__main__":
    measure_flow(input_data_dir="/home/tibuch/Data/broad/nuclei_U2OS/images/",
                 result_root_dir="/home/tibuch/Data/broad/nuclei_U2OS/",
                 denoise_sigma=2,
                 threshold=400, )

There is still a bit of extra code, because the special targets have to be created inside the task.

Happy to get feedback!

4 Likes

thanks so much for sharing this!

CPR to the rescue!

2 Likes