Tasks and Dispatchers
Task
Task is for every node in DAG (with no loop map).A task represents each node in a Directed Acyclic Graph (DAG). In a distributed execution environment, tasks can be bound to dispatchers (Dispatchers) through the resourceName
attribute. The dispatcher is responsible for distributing task information to different executors (Executors).
Task Attributes
Task attributes are defined in the tasks
field of the DAG's YAML description file. These attributes include:These attributes include:
Attribute Name | Required | Type | Description |
---|---|---|---|
Name | Yes | String | Name of the task |
Category | Yes | String | Task category, details at category |
Pattern | Yes | String | Execution mode of the task, options are synchronous (task_sync ) or asynchronous (task_async ) |
resourceProtocol | No | String | Specifies the sender resource protocol or, if empty, the resourceName protocol is parsed.resourceProtocol and resourceName cannot be empty at the same time |
resourceName | No | String | Resource descriptor, see派发器 |
next | No | String | Name of next task |
Key | Yes | map | Enter mapping, see参数映射 |
Parameter | Yes | map | Default value for task types, if inputMapping and parameters are defined using inputMappings |
protocol from resourceName . Can't be empty if resourceName is also empty |
| resourceName | No | string | Resource descriptor, see Dispatcher |
| next | No | string | Name of the next task |
| inputMappings | No | map | Input mappings, details at parameter mapping |
| parameters | No | map | Default values for task input, inputMappings takes precedence if the same key is defined in both |
| outputMappings | No | map | Output mappings, details at parameter mapping |
| tolerance | No | boolean | Whether to ignore failure and skip the current node |
| successConditions | No | string | Defines success conditions, takes precedence over result_type , success if all conditions are met |
| failConditions | No | string | Defines failure conditions, takes precedence over successConditions , fails if all conditions are met | | Yes | map | Output mapping, see参数映射 |
olerance | No | boolean | Whether to ignore and continue when the task fails. |
lucess Conditions | No | String | Defines the success condition, priority is higher than result_type . The task is successful if the output satisfies all the conditions, otherwise failure |
FailConditions | Yes | String | Define failed condition, priority is higher than successConditions . Task fails if output meets all conditions, otherwise success |
Retry | Yes | map | For compute classes, if execution fails, Rill Flow will try again according to the strategy configured by this option, seeretry |
Category
Tasks are categorized as follows:
- Computational Tasks
function
: Executes specific computational tasks, such as HTTP or RPC call tasks.
- Flow Control Tasks
choice
: Chooses one task to execute from multiple sub-tasks.foreach
: Repeatedly executes a set of sub-tasks.pass
: A null task that is marked successful upon execution.return
: Decides whether to skip subsequent tasks based on conditions. For more information, refer to Flow Control.
Pattern
The pattern
attribute specifies the mode of execution between the dispatcher and executor, supporting synchronous (task_sync
) and asynchronous (task_async
) modes.
Retry
Retry strategy. There may be a failure of call for computing classes.This parameter sets a failed retry strategy, eg::
Retry:
maxRetryTimes:3
intervalInseconds:2
multipleier:1
There are three options: in retry structure
- maxRetryTimes:maximum retries with a default value of 0, i.e.:does not retry.
- IntervalInSeconds:retries interval seconds, default value is 0, i.e.:fails to retry.
- Multipler:retries to zoom at intervals, with a default value of 1, i.e.:.
Rill Flow will try again with the strategy in the above configuration after computing failed tasks.Assume that n times have already been tried, the next retry will be between:intervalInseconds*multipleer^n
, and at most maxRetryTimes times.
Synchronous and Asynchronous Task Modes
Synchronous Mode (task_sync
)
In synchronous mode, the dispatcher maintains a connection with the executor until the task is completed or times out. This mode is suitable for tasks that take milliseconds to execute.This mode is suitable for quick tasks in milliseconds.
Asynchronous Mode (task_async
)
In asynchronous mode, the dispatcher includes a callback address X-Callback-Url
in the Header and passes the task information to the executor. After completing the task, the executor notifies Rill Flow of the results by calling the X-Callback-Url
. This mode is suitable for heavy computational tasks or tasks that take longer to complete, such as AIGC model generation tasks.Tasks select a dispatcher through the resourceProtocol
attribute. The same type of dispatcher can correspond to multiple executors. Tasks bind to dispatchers and executors through the resourceName
attribute. Rill Flow supports various dispatchers, including HTTP protocol dispatcher, K8s Service dispatcher, Alibaba Cloud Tongyi Qianwen dispatcher, ChatGPT dispatcher, etc. Rill Flow also supports extending dispatchers through plugins, offering flexibility in implementing custom dispatchers.This mode is applicable to heavy computing or long time performing tasks, such as AIGC model generation tasks.
Dispatcher
Overview
Tasks are selected via the resourceProtocol
attribute.The same type of distributor can correspond to multiple executors.Tasks are bound by the resourceName
property to assign and execute.Rill Flow supports a variety of dispatchers, such as HTTP protocol dispatchers, K8s Service dispatchers, Aliyun ChatGPT dispatchers, etc.Rill Flow also supports the extension of the dispatcher via插件, providing flexibility to achieve a custom dispatcher.
resourceProtocol
Tasks are assigned via resourceProtocol
.This is an optional field.Tasks specify a dispatcher using the resourceProtocol
attribute. This field is optional. If the task's resourceProtocol
is empty, the resourceProtocol
will be parsed from the resourceName
.
resourceName
Tasks describe a dispatcher and executor using a simplified format of a Uniform Resource Locator (URL). The common format is:Common format is:
[Protocol Type]://[Server Address]:[Port Number]/[Resource Hierarchy UNIX File Path][File Name]?[Query Parameters]#[Fragment ID]
For example:
http://www.sample.com/callback.json
Supported Dispatcher Types
Rill Flow supports the following types of dispatchers:
HTTP protocol dispatcher
The HTTP dispatcher is used to forward task information. When initiating an HTTP request, it uses application-json
as the content-type
, and the received json
string is used as output.
Task Attributes
Parameter | Value | Description |
---|---|---|
resourceProtocol | http/https | Specifies the protocol type |
resourceName | http://www.sample.com/execute.json | HTTP URL |
Pattern | task_sync/task_async | Specifies the task execution mode, synchronous (task_sync ) or asynchronous (task_async ) |
requestType | get/post | HTTP request type, default is post |
Input Parameters
Key | Value Type | Description |
---|---|---|
query_params_* | map | GET request parameters, keys with query_params_ prefix should be of map type, appended to the URL as key=value |
request_header_* | map | Request headers, keys with request_header_ prefix should be of map type, included in the request headers |
Other keys | String | POST request body parameters, currently only supports json type, other key/values will be included in the json structure of the post body |
Output Parameters
The json
structure returned by the HTTP request will be assigned to the $.output
variable.
Dispatching to K8s Service: In a K8S environment, it's recommended to use the HTTP dispatcher to connect to a K8s Service domain, like
http://service_name.namespace/execute.json
. For more details about Service, refer to Kubernetes Service. You can also use service meshes like Istio or other advanced service load balancing mechanisms.For more information on the Service, refer to [Kubernetes Service] (https://kubernetes.io/docs/concepts/services-networking/service/).Also use a service grid such as Istio or other advanced service load equilibrium mechanisms.
ChatGPT Dispatcher
Rill Flow supports the OpenAI ChatGPT model dispatcher.Use this dispatcher to call Apikey with an OpenAI model.For specific information, please refer to [OpenAI ChatGPT model documents] (https://platform.openai.com/api-keys)
Task Attributes
Parameter | Value | Description |
---|---|---|
resourceProtocol | chatgpt | Use ChatGPT dispatcher |
Input Parameters
Key | Value Type | Description |
---|---|---|
apikey | String | Apikey for model invocation |
Model | String | Model name, see OpenAI支持的模型列表 |
Prop | String | Text content for model request |
Output Parameters
The json
structure that ChatGPT requests to return will be assigned to .output.result
variables.Other return values can be found in [OpenAI SDK documents] (https://platform.openai.com/docs/guides/text-generation/chat-completions-response-form).
Alibaba Cloud Model Service Dispatcher
Rill Flow supports the dispatcher for Alibaba Cloud's Lingji Model Service. To use this dispatcher, an Apikey for Alibaba Cloud's Tongyi Qianwen model invocation is required. For more details, refer to the Alibaba Cloud Lingji Model Service Documentation.Use this dispatcher to call Apikey with an Aliyun popular model.Specific information can be found in阿里云灵积模型服务文档.
Task Attributes
Parameter | Value | Description |
---|---|---|
resourceProtocol | aliyun_ai | Use Alibaba Cloud AI dispatcher |
Input Parameters
Key | Value Type | Description |
---|---|---|
apikey | String | Apikey for model invocation |
Model | String | Model name, see Alibaba Cloud's Supported Model List |
Message | string | Text content for model request |
message_suffix | string | (Optional) Suffix for the model request content |
message_prefix | string | (Optional) Prefix for the model request content |
Output Parameters
The json
structure returned by the Alibaba Cloud request will be assigned to the $.output
variable, with $.output.output.text
being the text return from the model. Other return values can be referenced in the Alibaba Cloud SDK Documentation.Specifies the dispatcher resource protocol, if null, uses