Passing Payloads
This section shows how to pass outputs of one FuncX Function to another using Globus Flows.
Simple Payloads
Sometimes you may need a state that depends on the output from another state. This can either be to split up complex functions, or make FuncX Parallelize payloads into multiple simultanious tasks.
By default, all Gladier Tools take input from the main input source, defined as $.input
. However,
by using modifiers (see more in Flow Generation), this default can be changed to use the
output of another funcx function.
@generate_flow_definition(modifiers={
my_second_function: {'payload': '$.MyFirstFunction.details.result[0]'},
})
class MyTool(GladierBaseTool):
funcx_functions = [
my_first_function,
my_second_function,
]
In the example above, the first function is given the full input to work with on $.input
. The output
of my_first_function
will be produced with the name MyFirstFunction.details.result
as a list of
FuncX task results. By default, only one FuncX task is run per-function, so typically this will be a list
with only one entry. The path $.MyFirstFunction.details.result[0]
references the exact output returned
by a single invocation of my_first_function
.
Note
Gladier Automatically creates flow state names by translating them from snake case to upper camel case.
For example, my_first_function
results in the state name MyFirstFunction
When my_first_function
finishes and my_second_function
begins, it will be given the input stored in
$.MyFirstFunction.details.result[0]
. This value MUST be a dictionary containing expected parameters
in my_second_function
, otherwise a flow exception will be raised and the flow will be marked as a failure.
Warning
When using function outputs as payloads with ExceptionOnActionFailure: false
, this can result in cascading
failures where the stringified exception results are used as input to the next function.
It’s recommended you either set ExceptionOnActionFailure: true
or pass payloads as $.MyFirstFunction.details
.
Multiple FuncX Tasks
FuncX is built to run many tasks in parallel. You can instruct Gladier to pass multiple task payloads with the tasks
modifier.
However, at this level FuncX also needs an expcilit FuncX endpoint and Function ID for each task it will process. It’s common to
use one FuncX function to build the list of payloads to be run in parallel.
def parallel_workload_input_builder(funcx_endpoint_compute, parallel_workload_funcx_id, parallel_workloads, **data):
return [{
'endpoint': funcx_endpoint_compute,
'function': parallel_workload_funcx_id,
'payload': payload,
} for payload in parallel_workloads]
def parallel_workload(name, **data):
import time
return f'{name} finished at {time.time()}!'
@generate_flow_definition(modifiers={
parallel_workload: {'tasks': '$.ParallelWorkloadInputBuilder.details.result[0]'},
})
class ParallelWorkloadsTool(GladierBaseTool):
funcx_functions = [
parallel_workload_input_builder,
parallel_workload,
]
required_input = [
'funcx_endpoint_compute',
'parallel_workloads',
'parallel_workload_funcx_id'
]
Above, the parallel_workload_input_builder
function is run first and generates the list of FuncX tasks. This can be an arbitrarily
long list determined at runtime. Each task in the list must contain three elements: endpoint
, function
and payload
.
endpoint
above is typically specified by the user at input time, and is by default funcx_endpoint_compute
. But the FuncX function
is updated by Gladier every change, and the name is determined automatically. By default, Gladier appends _funcx_id
to the end of each
of the funcx_function
definitions and automaticaly adds them to $.input
. parallel_workload_funcx_id
can be determined above using
this method, or one can verify via the flow output.
payload
must be a dictonary containing keyword parameters for the function which match the function signature. This is similar to all other
FuncX functions used in Gladier, which are called with all input data specified on $.input
.
When parallel_workload
runs, it will execute all tasks in parallel, or by any rules defined by your particular FuncX endpoint. Each of the
outputs will be listed in $.ParallelWorkload.details.result
once all tasks finish. If any task fails, a stack trace will be returned as a
string. If all tasks fail, the flow will be marked as “FAILED”.
A full example of the Flow Definition as JSON output is below:
{
"Comment": "Flow with states: ParallelWorkloadInputBuilder, ParallelWorkload",
"StartAt": "ParallelWorkloadInputBuilder",
"States": {
"ParallelWorkloadInputBuilder": {
"Comment": null,
"Type": "Action",
"ActionUrl": "https://automate.funcx.org",
"ActionScope": "https://auth.globus.org/scopes/b3db7e59-a6f1-4947-95c2-59d6b7a70f8c/action_all",
"ExceptionOnActionFailure": false,
"Parameters": {
"tasks": [
{
"endpoint.$": "$.input.funcx_endpoint_compute",
"function.$": "$.input.parallel_workload_input_builder_funcx_id",
"payload.$": "$.input"
}
]
},
"ResultPath": "$.ParallelWorkloadInputBuilder",
"WaitTime": 300,
"Next": "ParallelWorkload"
},
"ParallelWorkload": {
"Comment": null,
"Type": "Action",
"ActionUrl": "https://automate.funcx.org",
"ActionScope": "https://auth.globus.org/scopes/b3db7e59-a6f1-4947-95c2-59d6b7a70f8c/action_all",
"Parameters": {
"tasks.$": "$.ParallelWorkloadInputBuilder.details.result[0]"
},
"ResultPath": "$.ParallelWorkload",
"WaitTime": 300,
"ExceptionOnActionFailure": true,
"End": true
}
}
}
Parallel Processing Example
Below is a full runnable example, using the FuncX tutorial endpoint.
from gladier import GladierBaseClient, GladierBaseTool, generate_flow_definition
from pprint import pprint
def parallel_workload_input_builder(funcx_endpoint_compute, parallel_workload_funcx_id, parallel_workloads, **data):
return [{
'endpoint': funcx_endpoint_compute,
'function': parallel_workload_funcx_id,
'payload': payload,
} for payload in parallel_workloads]
def parallel_workload(name, **data):
import time
return f'{name} finished at {time.time()}!'
@generate_flow_definition(modifiers={
parallel_workload: {'tasks': '$.ParallelWorkloadInputBuilder.details.result[0]'},
})
class ParallelWorkloadsTool(GladierBaseTool):
funcx_functions = [
parallel_workload_input_builder,
parallel_workload,
]
required_input = [
'funcx_endpoint_compute',
'parallel_workloads',
'parallel_workload_funcx_id'
]
@generate_flow_definition
class ParallelWorkloadsClient(GladierBaseClient):
gladier_tools = [
ParallelWorkloadsTool,
]
if __name__ == '__main__':
flow_input = {
'input': {
'parallel_workloads': [
{'name': 'foo'},
{'name': 'bar'},
{'name': 'baz'},
],
'funcx_endpoint_compute': '553e7b64-0480-473c-beef-be762ba979a9',
}
}
work_flow = ParallelWorkloadsClient()
pprint(work_flow.flow_definition)
flow = work_flow.run_flow(flow_input=flow_input)
run_id = flow['run_id']
work_flow.progress(run_id)
pprint(work_flow.get_status(run_id))