跳到主要内容

任务与派发器

任务 (Task)

任务是在 DAG (有向无环图) 中的每一个节点。在分布式执行环境中,任务可通过 resourceName 属性绑定到派发器(Dispatcher),派发器负责将任务信息分发给不同的执行器(Executor)。

任务架构图

任务属性

任务属性在 DAG 图的 YAML 描述文件中的 tasks 字段中定义。这些属性包括:

属性名必选类型说明
namestring任务的名称
categorystring任务的分类,详情见category
patternstring任务的执行模式,可选值为同步(task_sync)或异步(task_async
resourceProtocolstring指定派发器资源协议,若为空,则使用 resourceName 解析的协议。resourceProtocolresourceName 不能同时为空
resourceNamestring资源描述符,详见派发器
nextstring下一个任务的名称
inputMappingsmap输入映射,详见参数映射
parametersmap任务输入的默认值,若 inputMappingparameters 同时定义了同一键,则以 inputMappings 为准
outputMappingsmap输出映射,详见参数映射
toleranceboolean任务失败时是否忽略并继续执行
successConditionsstring定义成功条件,优先级高于 result_type,若输出满足所有条件则任务成功,否则失败
failConditionsstring定义失败条件,优先级高于 successConditions,若输出满足所有条件则任务失败,否则成功
retrymap对于计算类任务,如果执行失败,Rill Flow 将按照该选项配置的策略进行重试,详见retry

category

任务根据类型分为以下类别:

  • 计算类任务
    • function:执行具体的计算任务,例如 HTTP 或 RPC 调用任务。
  • 流程控制类任务
    • choice:在多个子任务中选择一个执行。
    • foreach:循环执行一组子任务。
    • pass:空任务,执行后直接标记为成功。
    • return:根据条件决定是否跳过后续任务。
    更多信息请参考流程控制

pattern

通过 pattern 属性可以指定任务在派发器与执行器之间的执行模式,支持同步(task_sync)和异步(task_async)模式。

retry

重试策略,对于计算类任务,有可能存在调用失败的情况。该参数设置了失败后的重试策略,例如:

retry:
maxRetryTimes:3
intervalInSeconds:2
multiplier:1

retry 结构中共有三个选项:

  • maxRetryTimes:最大重试次数,默认值为 0,即:不重试。
  • intervalInSeconds:重试间隔秒数,默认值为 0,即:失败后立即重试。
  • multiplier:重试间隔放大引子,默认值为 1,即:不放大。

Rill Flow 在计算任务执行失败后,将以上述配置中的策略进行重试。假设当前已经重试过 n 次,那么下一次重试的间隔时间为:intervalInSeconds*multiplier^n,最多重试 maxRetryTimes 次。

同步与异步任务模式

同步模式(task_sync

在同步模式下,派发器与执行器保持连接直到任务执行完成或超时。这种模式适合执行时间在毫秒级的快速任务。

异步模式(task_async

在异步模式下,派发器会在 Header 中添加回调地址 X-Callback-Url,并将任务信息传递给执行器。执行器完成任务后,会通过调用 X-Callback-Url 将执行结果返回给 Rill Flow。这种模式适用于重型计算或执行时间较长的任务,如 AIGC 模型生成任务。

派发器

概述

任务通过 resourceProtocol 属性选择派发器。同一类型的派发器可以对应多种执行器。任务通过 resourceName 属性绑定派发器和执行器。Rill Flow 支持多种派发器,如 HTTP 协议派发器、K8s Service 派发器、阿里云通义千问派发器、ChatGPT 派发器等。Rill Flow 也支持通过插件对派发器进行扩展,提供了实现自定义派发器的灵活性。

resourceProtocol

任务通过 resourceProtocol 指定派发器。这是一个可选字段。如果任务的 resourceProtocol 为空,则会通过 resourceName 解析出 resourceProtocol

resourceName

任务通过 resourceName 属性使用统一资源定位符(URL)的简化格式来描述派发器和执行器。常见格式为:

[协议类型]://[服务器地址]:[端口号]/[资源层级UNIX文件路径][文件名]?[查询参数]#[片段ID]

例如:

http://www.sample.com/callback.json

支持的派发器类型

Rill Flow 支持以下类型的派发器:

HTTP协议派发器

HTTP 派发器用于转发任务信息,发起 HTTP 请求时使用 application-json 作为 content-type,并以接收到的 json 字符串作为输出。

任务属性
参数参数值说明
resourceProtocolhttp/https指定协议类型
resourceNamehttp://www.sample.com/execute.jsonHTTP URL
patterntask_sync/task_async指定任务执行模式,同步(task_sync)或异步(task_async)
requestTypeget/postHTTP 请求类型,默认为 post
输入参数
值类型说明
query_params_*mapGET 请求的参数,以 query_params_ 前缀的键对应的值需为 map 类型,所有键/值以键=值形式拼接至请求 URL
request_header_*map请求头,以 request_header_ 前缀的键对应的值需为 map 类型,所有键/值将加入请求头
其余键stringPOST 请求体参数,目前仅支持 json 类型,其余键/值将加入 POST 请求体的 json 结构中
输出参数

HTTP 请求返回的 json 结构体将被赋值给 $.output 变量。

派发至 K8s Service: 在 K8S 环境下,建议通过 HTTP 派发器对接 K8s Service 域名,例如 http://service_name.namespace/execute.json。有关 Service 的更多信息,请参考 Kubernetes Service。也可以使用 Istio 等服务网格或其他高级服务负载均衡机制。

ChatGPT 派发器

Rill Flow 支持 OpenAI 的 ChatGPT 模型派发器。使用此派发器需要拥有 OpenAI 模型调用 Apikey。具体信息可参考OpenAI ChatGPT 模型文档

任务属性
参数参数值说明
resourceProtocolchatgpt使用 ChatGPT 派发器
输入参数
值类型说明
apikeystringApikey 用于模型调用
modelstring模型名称,详见OpenAI支持的模型列表
promptstring请求模型的文本内容
输出参数

ChatGPT请求返回的 json 结构体将被赋值给 $.output.result 变量。其他返回值可参考OpenAI SDK文档

阿里云模型服务派发器

Rill Flow 支持阿里云的灵积模型服务派发器。使用此派发器需要拥有阿里云通义千问的模型调用 Apikey。具体信息可参考阿里云灵积模型服务文档

任务属性
参数参数值说明
resourceProtocolaliyun_ai使用阿里云 AI 派发器
输入参数
值类型说明
apikeystringApikey 用于模型调用
modelstring模型名称,详见阿里云支持的模型列表
messagestring请求模型的文本内容
message_suffixstring(可选)请求模型的内容后缀
message_prefixstring(可选)请求模型的内容前缀
输出参数

阿里云请求返回的 json 结构体将被赋值给 $.output 变量,其中 $.output.output.text 为模型的文本返回。其他返回值可参考阿里云SDK文档