Additional upload options with S3 Storage and S3 Results

I’m trying to utilize the S3 storage option for my flows. We have some organizational standards/requirements that force us to use some additional options when uploading data into S3, in particular, setting the ACL and ServerSideEncryption parameters.

I was able to register my flow successfully to S3 by doing this:

flow.storage = prefect.storage.S3(bucket=“mybucket”, upload_options={‘ACL’: ‘private’, ‘ServerSideEncryption’: ‘aws:kms’})

However, when I run my flow, I get failures when it tries to upload the task results to S3:

[2022-03-23 08:04:35-0400] ERROR - prefect.S3Result | Error uploading to S3: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
[2022-03-23 08:04:35-0400] ERROR - prefect.CloudTaskRunner | Unexpected error: ClientError(‘An error occurred (AccessDenied) when calling the PutObject operation: Access Denied’)

The task result upload to S3 does not appear to be using the same upload_options that were specified when I registered the flow. Is there a way to add those options for task uploads somewhere?

1 Like

Hi @derekkromm, welcome to Prefect Discourse! :wave:

A general recommendation from AWS is to always use IAM roles to attach permissions to your execution layer instead of embedding credentials directly. Below you can see how you can do that.

IAM roles

So for example, if you are running your Prefect agent on an EC2 instance, you could attach an IAM role with S3 access to the instance:

For convenience in a simple PoC, you may create a role with S3FullAccess permissions.

Similarly, if you are using an ECSAgent, you could attach S3 permissions to your ECS task role.

If you are running your flows on AWS EKS, then IAM Roles for Service Accounts is the right option.

Embedding credentials directly

Even though IAM roles are much more secure, some users attach their credentials directly. You could do it as follows:

from prefect import Flow
from prefect.storage import S3
from prefect.run_config import UniversalRun

with Flow(
    FLOW_NAME,
    storage=S3(bucket=“mybucket”, upload_options={‘ACL’: ‘private’, ‘ServerSideEncryption’: ‘aws:kms’}),
    run_config=UniversalRun(
        env=dict(AWS_ACCESS_KEY_ID="xxxxx", AWS_SECRET_ACCESS_KEY="yyyyy"),
    ),
) as flow:
    hw = hello_world()

LMK if you have any more questions.

1 Like

Hi @anna_geller , thank you for the detailed reply.

I understand what you’re saying about IAM roles. I was trying to do a quick POC locally just to understand how the S3 storage option works.

I’ve been able to get the flow to register and download from S3 by adding those upload_options parameters to the flow storage, but the problem I’m running into is, at run time, when tasks complete, it appears that Prefect attempts to upload the results to S3 as well (and presumably other tasks would download them?). However, when the task runner tries to upload the result to S3, it isn’t able to, because it isn’t setting those additional upload options (e.g., ACL/ServerSideEncryption) the same way that the flow storage is. Is there a way to set these options at the task level as well? Or have the task storage options inherit the flow storage options?

Edit to add: I’m relatively sure that, even if I have this running in ECS, I would still need to specify these additional upload_options to ensure that the ACL/ServerSideEncryption are being explicitly set. So yes, in a proper environment, I would definitely be using IAM roles within ECS to handle S3 authentication, but I would still need to be able to configure the task’s result upload options per the above in order to utilize the S3 storage mechanism.

Gotcha. If you don’t want that behavior, you can disable checkpointing on a per-task level using this syntax:

@task(checkpoint=False)

I think the easiest way of solving this problem would be to set up S3 default server-side encryption on a bucket level. You could set this up as described on this AWS docs page:

This way, there is no need to set up those extra options for each and every object, especially given that you said:

1 Like

Thank you @anna_geller ! The checkpoint=False option worked and I’m definitely going to take a look at the default bucket options. Cheers!

1 Like

@anna_geller , it turns out our security team has configured all buckets with a default policy configuration that includes an explicit deny if the server side encryption parameter is not explicitly set to aws:kms, despite having aws:kms encryption configured as the default encryption mechanism on the bucket.

Presumably, this is to prevent other encryption methods from being used/specified, but throws a bit of a wrinkle in for me.

I tried to find the checkpoint=False option in the documentation, but came up empty. What exactly does setting this do, what feature(s) would be unavailable for me if I used it? And I guess perhaps I’d like to know, what would the level of effort be to include some sort of option for task upload options similar to the flow ones? We are in the process of evaluating a number of options (airflow, prefect, and a couple others), and so I just want to know what the workarounds/options are.

Thank you so much for your time!

You can always remove that policy on a given bucket that you use with Prefect? This setting seems a little redundant when the encryption is set by default, don’t you think? :smile: Maybe you can talk to your security team?

If all objects are encrypted by default, forcing to set this explicitly to encrypt objects manually doesn’t have any effect because everything is server-side encrypted anyway by default! :upside_down_face: So this policy seems rather unnecessary (no judgment here, just a recommendation).

Disabling checkpoints means the task run results won’t be stored. If you would want to restart failed flow run from the UI from some task run in the middle of the flow, you would need the results to be configured. So this is nothing necessary, but often quite useful.

Here you can read more about Results and checkpointing: