On task mark as success, still inner loops are in progress

I’m running one flow, inside the flow there are 4,5 task, I’m mark the task as success using graphql based on task id, inside the task one while loop is running, after marking/changing state success, still inner while loop is running, till condition is satisfied & due to that next task is not able to initiate.

Also passing the result to task with change state, not able to receive the result into task.

Could you share your flow code to demonstrate how you are doing that?

@task
def task_1(timeout):
    while (timeout > datetime.datetime.now():
        print("running")
        
@task
def task_2(y):
    print("finished")

with Flow("test-flow") as flow:
    timeout = Parameter("timeout", default=None)
    y = task_1(timeout)
    task_2(y)

After this I have registered my flow in cloud and then I’m creating flow run in prefect cloud through flow run id.

Updating state by GraphQL

mutation ($state: JSON!) {
    setTaskRunState(
      input: {taskRunId: "5b8d9e50-2959-4298-9640-8775a8a57f96", version: 1, state: $state}
      ) {
      id
    }
}

{
"state": {
"type": "Success",
"result" : 2
}
}

**State change to success for task but still process is running for success task (task_1) **

Thanks for sharing your code. I’d love to help, but I struggle to understand what end goal you have in mind. Do you try to have some never-ending flow using this while loop in your task, and you try to stop this never-ending loop by setting the state to Success?

Could you explain your use case a bit more? Is this some sort of real-time streaming use case or polling for some file to arrive?

End goal for this workflow execute the task in interval and mark that task as success if task is completed before time or reschedule the task with new datetime, for that we are checking with while loop to reach that timeout.
if we mark the task as success, while loop continues run, its doesn’t stop it, that’s why it’s not jumping to next task.

Yes, we are trying to stop the flow by marking it success.

  1. What is the success meaning, it will not stop the internal running loop (like while loop) ?
  2. What is solution for it ?

Few more issue are coming,
3. We are trying to run simple task using client with key and schedule datetime, its appearing duplicate in prefect cloud console, one with schedule time and one immediate.
4. If we try to reschedule with new datetime, this also not working, only states are getting update
5. Can’t we reschedule the same flow ?

Please ans the above question.

Sort simple : Run flow (Working, Sometime not working normal schedule also)-> Schedule / Re-schedule with new date time(Not working) → setFlowRunState(Working , but not working in case of schedule with new datetime, i have used with “Schedule()”)

It looks like you could accomplish that by setting a timeout on your task - here is how you could do that:

from prefect import task, Flow


@task(timeout=15)  # timeout: Union[int, timedelta] = None
def hello_world():
    print("hello")


with Flow("timeout_flow") as flow:
    hw = hello_world()

One possible solution to end the task run early based on some condition would be raising signals, e.g.:

from prefect.engine import signals

@task
def signal_task(message):
    if message == 'go!':
        raise signals.SUCCESS(message='going!')
    elif message == 'stop!':
        raise signals.FAIL(message='stopping!')
    elif message == 'skip!':
        raise signals.SKIP(message='skipping!')

Could you share the code showing how you do it?

Sort of: you could start a new flow run, or when you are on Prefect Cloud, you could trigger a new flow run based on Automation SLA, e.g. if your flow run takes too long to start or complete. Example:

Scheduled with date and time but not running


@task
def check_schedule():
    print("success")
    
with Flow("vishwaas") as flow_2:
    check_schedule()
    
flow_2.register(project_name="amoga",idempotency_key=flow_2.serialized_hash())

**Running create flow after register**


flow_run_id = client.create_flow_run(
                "ebe79109-4259-409d-b705-484fcd876bda", 
    scheduled_start_time=pendulum.now().add(minutes=1),
                labels="amoga")

Preformatted text

Reply to : We are trying to run simple task using client with key and schedule datetime, its appearing duplicate in prefect cloud console, one with schedule time and one immediate.

schedule_2 = Schedule(

clocks=[DatesClock([pendulum.now().add(minutes=5)])]

)

# schedule = IntervalSchedule(interval=timedelta(minutes=2))

@task
def check_schedule():
    print("success")
    
with Flow("gaurav",schedule=schedule_2) as flow_2:
    check_schedule()
    
flow_2.register(project_name="amoga",idempotency_key=flow_2.serialized_hash())

Running flow

client = Client(api_key="api_key")
flow_run_id = client.create_flow_run(
                "b3f55da7-262f-4cad-90d7-465705a6c70b",
                labels="amoga")
print(flow_run_id)

Manual state change to success but still the inner task is in pending state
What do you mean by state change ??? is it only of make colour change or any implementation is there
related to it


Refer to above one

It looks like some of your flows are stuck in a Scheduled state, causing Late runs. This usually happens due to a label mismatch between your flow run config and your agent. This topic explains how to solve it:

And regarding the image - the Pending state refers to the task run state while Success refers to the flow run state.

This page will explain it all more:

Can we create a event based flow, which will trigger if some external event is created without doing polling ?

Yes! Here is the topic you are looking for: