I use Prefect 2 to run image analysis workflows. An image analysis workflow often contains a series of image transformations which are applied to individual images in sequence. A very basic workflow would take image raw data, apply some filtering to reduce noise and then threshold the data to obtain a semantic segmentation (foreground vs background).
I can easily build this workflow with Prefect 2 and use caching to reduce computations when a flow is re-run. However, it gets more tricky if I want to access and inspect cached task-run results. Because Prefect 2 has currently no support for custom result types. One work-around is to take care of reading and writing image data within the task and only pass file paths between tasks. This works, but it does not feel very nice to add all this extra code to every task.
I think I found a better solution which uses Prefect’s JSONSerializer
with custom implementations for the object_encoder
and object_decoder
and would like to get feedback and inspiration from the community. The code can be found in fmi-faim/custom-prefect-result.
Custom Prefect Result (cpr) introduces custom result targets which have
- a
location
: storage location - a
name
: file name - an
ext
: file extension - a
data
: actual result data
and - a
data_hash
: hash result ofdata
field. When the data is set for such a target via set_data
the data_hash
is computed and when such a target is returned by a task and the customized JSONSerializer
encounters it, only the location
, name
, ext
and data_hash
will be serialized to JSON and the actual data
is written to the storage location. On decoding the custom result target is restored and the data can be accessed via get_data
, which will load the actual data from the storage location and compute the hash of the loaded data and compare it against the stored data_hash
.
Importantly, when the data is stored the data_hash
is appended to the file name, which ensures that existing results are not overwritten by task runs with different parameters.
Usage
Installation: pip install git+https://github.com/fmi-faim/custom-prefect-result@main
To use CPR the task results must be of type cpr.Target.Target
, the cache_key_fn
must use the customized JSONSerializer
and the result_serializer
must be set to the customized JSONSerializer
.
So far, I have only two custom result types:
-
ImageTarget
wraps a numpy array and saves the numpy array to a compressed tif-file. -
CSVTarget
wraps a pandas DataFrame and saves it to a csv-file.
I have also added ImageSource
and CSVSource
, which don’t have the set_data
method and only point to existing data. Since ...Source
and ...Target
are both of type cpr.Resource.Resource
the tasks arguments can use Resource
as type hint.
Example Flow
from glob import glob
from os import makedirs
from os.path import join, exists
from cpr.Resource import Resource
from cpr.Serializer import cpr_serializer
from cpr.image.ImageSource import ImageSource
from cpr.image.ImageTarget import ImageTarget
from cpr.utilities.utilities import task_input_hash
from prefect import flow, task
from prefect_dask import DaskTaskRunner
from scipy.ndimage import gaussian_filter
@task()
def list_files(input_data_dir):
images = []
for f in glob(join(input_data_dir, "*.tif")):
images.append(ImageSource.from_path(f))
return images
@task(cache_key_fn=task_input_hash) # Use cpr.utilities.utilities.task_input_hash to hash cpr.Resource.Resource input parameters correctly
def denoise_image(result_dir,
image: Resource,
sigma):
output = ImageTarget.from_path(
path=join(result_dir, f"{image.get_name()}.tif"))
output.set_data(gaussian_filter(image.get_data(), sigma))
return output
@task(cache_key_fn=task_input_hash)
def segment_image(result_dir: str,
denoised: Resource,
threshold: float):
output = ImageTarget.from_path(
path=join(result_dir, f"{denoised.get_name()}.tif"))
output.set_data(denoised.get_data() > threshold)
return output
task_runner = DaskTaskRunner(
cluster_class="dask.distributed.LocalCluster",
cluster_kwargs={
"n_workers": 8,
"processes": True,
"threads_per_worker": 1,
}
)
@flow(name="Example",
cache_result_in_memory=False,
persist_result=True,
result_serializer=cpr_serializer(), # Use CPR serializer to catch cpr.Resource.Resource objects.
task_runner=task_runner)
def measure_flow(
input_data_dir: str = "/home/tibuch/Data/broad/nuclei_U2OS/images/",
result_root_dir: str = "/home/tibuch/Data/broad/nuclei_U2OS/",
denoise_sigma: float = 2,
threshold: float = 400):
assert exists(result_root_dir), "Output directory does not exist."
denoising_results = join(result_root_dir, "denoised")
segmentation_results = join(result_root_dir, "segmented")
makedirs(denoising_results, exist_ok=True)
makedirs(segmentation_results, exist_ok=True)
img_files = list_files.submit(input_data_dir).result()
for file in img_files:
denoised = denoise_image.submit(denoising_results, file,
denoise_sigma)
segmented = segment_image.submit(segmentation_results, denoised,
threshold)
if __name__ == "__main__":
measure_flow(input_data_dir="/home/tibuch/Data/broad/nuclei_U2OS/images/",
result_root_dir="/home/tibuch/Data/broad/nuclei_U2OS/",
denoise_sigma=2,
threshold=400, )
There is still a bit of extra code, because the special targets have to be created inside the task.
Happy to get feedback!