Skip to main content

Tasks and Dispatchers

Task

Task is for every node in DAG (with no loop map).A task represents each node in a Directed Acyclic Graph (DAG). In a distributed execution environment, tasks can be bound to dispatchers (Dispatchers) through the resourceName attribute. The dispatcher is responsible for distributing task information to different executors (Executors).

Task Architecture Diagram

Task Attributes

Task attributes are defined in the tasks field of the DAG's YAML description file. These attributes include:These attributes include:

Attribute NameRequiredTypeDescription
NameYesStringName of the task
CategoryYesStringTask category, details at category
PatternYesStringExecution mode of the task, options are synchronous (task_sync) or asynchronous (task_async)
resourceProtocolNoStringSpecifies the sender resource protocol or, if empty, the resourceName protocol is parsed.resourceProtocol and resourceName cannot be empty at the same time
resourceNameNoStringResource descriptor, see派发器
nextNoStringName of next task
KeyYesmapEnter mapping, see参数映射
ParameterYesmapDefault value for task types, if inputMapping and parameters are defined using inputMappings
protocol from resourceName. Can't be empty if resourceName is also empty | | resourceName | No | string | Resource descriptor, see Dispatcher | | next | No | string | Name of the next task | | inputMappings | No | map | Input mappings, details at parameter mapping | | parameters | No | map | Default values for task input, inputMappings takes precedence if the same key is defined in both | | outputMappings | No | map | Output mappings, details at parameter mapping | | tolerance | No | boolean | Whether to ignore failure and skip the current node | | successConditions | No | string | Defines success conditions, takes precedence over result_type, success if all conditions are met | | failConditions | No | string | Defines failure conditions, takes precedence over successConditions, fails if all conditions are met |YesmapOutput mapping, see参数映射
oleranceNobooleanWhether to ignore and continue when the task fails.
lucess ConditionsNoStringDefines the success condition, priority is higher than result_type. The task is successful if the output satisfies all the conditions, otherwise failure
FailConditionsYesStringDefine failed condition, priority is higher than successConditions. Task fails if output meets all conditions, otherwise success
RetryYesmapFor compute classes, if execution fails, Rill Flow will try again according to the strategy configured by this option, seeretry

Category

Tasks are categorized as follows:

  • Computational Tasks
    • function: Executes specific computational tasks, such as HTTP or RPC call tasks.
  • Flow Control Tasks
    • choice: Chooses one task to execute from multiple sub-tasks.
    • foreach: Repeatedly executes a set of sub-tasks.
    • pass: A null task that is marked successful upon execution.
    • return: Decides whether to skip subsequent tasks based on conditions. For more information, refer to Flow Control.

Pattern

The pattern attribute specifies the mode of execution between the dispatcher and executor, supporting synchronous (task_sync) and asynchronous (task_async) modes.

Retry

Retry strategy. There may be a failure of call for computing classes.This parameter sets a failed retry strategy, eg::

Retry:
maxRetryTimes:3
intervalInseconds:2
multipleier:1

There are three options: in retry structure

  • maxRetryTimes:maximum retries with a default value of 0, i.e.:does not retry.
  • IntervalInSeconds:retries interval seconds, default value is 0, i.e.:fails to retry.
  • Multipler:retries to zoom at intervals, with a default value of 1, i.e.:.

Rill Flow will try again with the strategy in the above configuration after computing failed tasks.Assume that n times have already been tried, the next retry will be between:intervalInseconds*multipleer^n, and at most maxRetryTimes times.

Synchronous and Asynchronous Task Modes

Synchronous Mode (task_sync)

In synchronous mode, the dispatcher maintains a connection with the executor until the task is completed or times out. This mode is suitable for tasks that take milliseconds to execute.This mode is suitable for quick tasks in milliseconds.

Asynchronous Mode (task_async)

In asynchronous mode, the dispatcher includes a callback address X-Callback-Url in the Header and passes the task information to the executor. After completing the task, the executor notifies Rill Flow of the results by calling the X-Callback-Url. This mode is suitable for heavy computational tasks or tasks that take longer to complete, such as AIGC model generation tasks.Tasks select a dispatcher through the resourceProtocol attribute. The same type of dispatcher can correspond to multiple executors. Tasks bind to dispatchers and executors through the resourceName attribute. Rill Flow supports various dispatchers, including HTTP protocol dispatcher, K8s Service dispatcher, Alibaba Cloud Tongyi Qianwen dispatcher, ChatGPT dispatcher, etc. Rill Flow also supports extending dispatchers through plugins, offering flexibility in implementing custom dispatchers.This mode is applicable to heavy computing or long time performing tasks, such as AIGC model generation tasks.

Dispatcher

Overview

Tasks are selected via the resourceProtocol attribute.The same type of distributor can correspond to multiple executors.Tasks are bound by the resourceName property to assign and execute.Rill Flow supports a variety of dispatchers, such as HTTP protocol dispatchers, K8s Service dispatchers, Aliyun ChatGPT dispatchers, etc.Rill Flow also supports the extension of the dispatcher via插件, providing flexibility to achieve a custom dispatcher.

resourceProtocol

Tasks are assigned via resourceProtocol.This is an optional field.Tasks specify a dispatcher using the resourceProtocol attribute. This field is optional. If the task's resourceProtocol is empty, the resourceProtocol will be parsed from the resourceName.

resourceName

Tasks describe a dispatcher and executor using a simplified format of a Uniform Resource Locator (URL). The common format is:Common format is:

[Protocol Type]://[Server Address]:[Port Number]/[Resource Hierarchy UNIX File Path][File Name]?[Query Parameters]#[Fragment ID]

For example:

http://www.sample.com/callback.json

Supported Dispatcher Types

Rill Flow supports the following types of dispatchers:

HTTP protocol dispatcher

The HTTP dispatcher is used to forward task information. When initiating an HTTP request, it uses application-json as the content-type, and the received json string is used as output.

Task Attributes
ParameterValueDescription
resourceProtocolhttp/httpsSpecifies the protocol type
resourceNamehttp://www.sample.com/execute.jsonHTTP URL
Patterntask_sync/task_asyncSpecifies the task execution mode, synchronous (task_sync) or asynchronous (task_async)
requestTypeget/postHTTP request type, default is post
Input Parameters
KeyValue TypeDescription
query_params_*mapGET request parameters, keys with query_params_ prefix should be of map type, appended to the URL as key=value
request_header_*mapRequest headers, keys with request_header_ prefix should be of map type, included in the request headers
Other keysStringPOST request body parameters, currently only supports json type, other key/values will be included in the json structure of the post body
Output Parameters

The json structure returned by the HTTP request will be assigned to the $.output variable.

Dispatching to K8s Service: In a K8S environment, it's recommended to use the HTTP dispatcher to connect to a K8s Service domain, like http://service_name.namespace/execute.json. For more details about Service, refer to Kubernetes Service. You can also use service meshes like Istio or other advanced service load balancing mechanisms.For more information on the Service, refer to [Kubernetes Service] (https://kubernetes.io/docs/concepts/services-networking/service/).Also use a service grid such as Istio or other advanced service load equilibrium mechanisms.

ChatGPT Dispatcher

Rill Flow supports the OpenAI ChatGPT model dispatcher.Use this dispatcher to call Apikey with an OpenAI model.For specific information, please refer to [OpenAI ChatGPT model documents] (https://platform.openai.com/api-keys)

Task Attributes
ParameterValueDescription
resourceProtocolchatgptUse ChatGPT dispatcher
Input Parameters
KeyValue TypeDescription
apikeyStringApikey for model invocation
ModelStringModel name, see OpenAI支持的模型列表
PropStringText content for model request
Output Parameters

The json structure that ChatGPT requests to return will be assigned to .output.result variables.Other return values can be found in [OpenAI SDK documents] (https://platform.openai.com/docs/guides/text-generation/chat-completions-response-form).

Alibaba Cloud Model Service Dispatcher

Rill Flow supports the dispatcher for Alibaba Cloud's Lingji Model Service. To use this dispatcher, an Apikey for Alibaba Cloud's Tongyi Qianwen model invocation is required. For more details, refer to the Alibaba Cloud Lingji Model Service Documentation.Use this dispatcher to call Apikey with an Aliyun popular model.Specific information can be found in阿里云灵积模型服务文档.

Task Attributes
ParameterValueDescription
resourceProtocolaliyun_aiUse Alibaba Cloud AI dispatcher
Input Parameters
KeyValue TypeDescription
apikeyStringApikey for model invocation
ModelStringModel name, see Alibaba Cloud's Supported Model List
MessagestringText content for model request
message_suffixstring(Optional) Suffix for the model request content
message_prefixstring(Optional) Prefix for the model request content
Output Parameters

The json structure returned by the Alibaba Cloud request will be assigned to the $.output variable, with $.output.output.text being the text return from the model. Other return values can be referenced in the Alibaba Cloud SDK Documentation.Specifies the dispatcher resource protocol, if null, uses