引言
“工作流”是在系统开发中经常被提及的一个概念,比如机器学习平台里需要用工作流串联从建模、训练到预测的全流程,CICD系统需要用工作流打通从开发、构建再到部署的链路,办公自动化中也经常要用到工作流实现逐级审批等,因此很有必要为项目引入一个简单易用,又具有一定抽象度和扩展性的工作流框架——Argo Workflows,它是一个基于 Kubernetes 的工作流组件,通常用来编排节点(任务)依赖关系,然后调度这些节点,让他们以串行或并行方式运行。本文想从开发者角度介绍一下 Argo Workflows 的总体框架与核心能力,还有一些实际工作中很容易用到的特性。
总体架构
Argo Workflows 遵循 Kubernetes 的开发范式,总体架构还是比较清晰的,如下图所示:

从图中可以看到下面几个核心组件:
- Workflow CRD:Workflow 资源描述文件,文件定义了 Workflow 内各执行节点的依赖关系、串/并行行为、入参出参、退出回调等。
- Workflow Controller:Workflow 调度与生命周期管理控制器,负责监视 Workflow CR 的创建、更新与删除,并控制 Workflow 实例及每个实例下节点的运行。
- Argo UI:提供 Web GUI 页面工具,可视化观测 Workflow 实例的运行动态,以及一些运维工具。
- Argo Cli:命令行工具,可以更加方便简洁地对 Workflow 实例进行创建、查看、删除等操作。
Argo UI 和 Arg Cli 作为客户端为用户提供了更便捷的访问入口,而 Workflow CRD 和 Workflow Controller 作为服务端提供了控制 Workflow 实例调度、运转与管理的能力,最终 Workflow 内每个节点将作为核心的业务执行单元,以 Kubernetes 原生 Pod 形式运行。
Workflow CRD
Workflow CRD 遵循 Kubernetes CRD 标准规范定义,可以简单理解为是描述 Workflow 配置的一份文件,里面包含了对 Workflow 本身的配置定义,又包含了对每个运行节点及它们之间依赖关系的配置定义。下面是一个最简单的 Workflow CR:
apiVersion: argoproj.io/v1alpha1
kind: Workflow # new type of k8s spec
metadata:
generateName: hello-world- # name of the workflow spec
spec:
entrypoint: whalesay # invoke the whalesay template
templates:
- name: whalesay # name of the template
container:
image: docker/whalesay
command: [ cowsay ]
args: [ "hello world" ]
resources: # limit the resources
limits:
memory: 32Mi
cpu: 100m
Workflow CRD 里很重要的一个概念是 templates,它用来描述这个工作流下节点的执行内容与参数,以及节点与节点间的依赖关系,串行还是并行执行。templates 有三种类型:
- Container:不可再细分的原子类型 template,也可以理解为必须要定义某个节点对应的 container 执行行为的 template(比如上面的 whalesay 就是),包括容器名称、资源定义、镜像、启动命令、执行参数等,与我们定义 pod 里的 container 无异。
- Steps:分步执行类型的 template。这种 template 将一系列节点以串行或并行方式编排,并指定每个节点用到的 template,这个 template 通常就引用 container 类型的 template,但如果有复杂场景,也支持嵌套引用 steps 或 dag 类型的 template。关于 steps 后面会给出一个示例讲解。
- DAG:依赖关系类型的 template。这种 template 更加灵活,是上面 steps 的另外一种表示方式,它针对每个节点定义了他们所依赖的上游节点名称,或串行或并行执行,每个节点的 template 同上可以任意引用。关于 dag 后面也会给出一个示例讲解。
Arguments
上面展示了一个最简单的 container template 用法,但既然被叫做模版,那每个节点总不能都执行一样的内容吧,因此 Argo Workflows 还提供了在 template.inputs 定义参数的功能,然后用 {{ inputs.parameters.xxx }} 来引用参数,如下所示:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hello-world-parameters-
spec:
# invoke the whalesay template with
# "hello world" as the argument
# to the message parameter
entrypoint: whalesay
templates:
- name: whalesay
inputs:
parameters:
- name: message # parameter declaration
value: hello world
container:
# run cowsay with that message input parameter as args
image: docker/whalesay
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
但如果多个 template 都想引用同一个参数,那么可以省略 inputs 这块,直接在 spec.arguments 定义参数,然后在 template 中用 {{ workflow.parameters.xxx }} 来引用参数,像下面这样:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: global-parameters-
spec:
entrypoint: A
arguments:
parameters:
- name: log-level
value: INFO
templates:
- name: A
container:
image: containerA
env:
- name: LOG_LEVEL
value: "{{workflow.parameters.log-level}}"
command: [runA]
- name: B
container:
image: containerB
env:
- name: LOG_LEVEL
value: "{{workflow.parameters.log-level}}"
command: [runB]
Steps Template
Steps Template 其实很好理解,就是当你要编排一个 Workflow 下各节点的执行顺序和依赖关系的时候,就可以用它。下面是一个示例:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: steps-
spec:
entrypoint: hello-hello-hello
# This spec contains two templates: hello-hello-hello and whalesay
templates:
- name: hello-hello-hello
# Instead of just running a container
# This template has a sequence of steps
steps:
- - name: hello1 # hello1 is run before the following steps
template: whalesay
arguments:
parameters:
- name: message
value: "hello1"
- - name: hello2a # double dash => run after previous step
template: whalesay
arguments:
parameters:
- name: message
value: "hello2a"
- name: hello2b # single dash => run in parallel with previous step
template: whalesay
arguments:
parameters:
- name: message
value: "hello2b"
# This is the same template as from the previous example
- name: whalesay
inputs:
parameters:
- name: message
container:
image: docker/whalesay
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
从上面可以看出定义了一个新的叫做 hello-hello-hello 的 Steps Template,其中双横线 “- -” 表示步骤自上而下、串行执行,而单横线 “-” 表示同级步骤并行执行。下面是用 Argo Cli 查看得到的这个 Workflow 实例的工作状态:
STEP TEMPLATE PODNAME DURATION MESSAGE ✔ steps-z2zdn hello-hello-hello ├───✔ hello1 whalesay steps-z2zdn-27420706 2s └─┬─✔ hello2a whalesay steps-z2zdn-2006760091 3s └─✔ hello2b whalesay steps-z2zdn-2023537710 3s
DAG Template
DAG Template 其实是与 Steps Template 等价的另外一种书写方式,只是它在描述依赖关系的时候更加精确,每个节点直接指定它所依赖的上游节点名字,像下面这样:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dag-diamond-
spec:
entrypoint: diamond
templates:
- name: echo
inputs:
parameters:
- name: message
container:
image: alpine:3.7
command: [echo, "{{inputs.parameters.message}}"]
- name: diamond
dag:
tasks:
- name: A
template: echo
continueOn:
error: true
failed: true
arguments:
parameters: [{name: message, value: A}]
- name: B
dependencies: [A]
template: echo
arguments:
parameters: [{name: message, value: B}]
- name: C
dependencies: [A]
template: echo
arguments:
parameters: [{name: message, value: C}]
- name: D
dependencies: [B, C]
template: echo
arguments:
parameters: [{name: message, value: D}]
从上面可以看到,DAG Template 大多配置与之前的都一样,比较关键的一项配置是节点的 dependencies,里面指定了节点所依赖的其他节点的名称。其中 task A 设置了参数 continueOn,这表明当这个节点运行产生 Failed 或 Error 的时候,都不会导致整个 Workflow 实例直接失败,而会继续运行下面的节点,但下面的节点其实又依赖节点 A,因此这种设置方法是自相矛盾的,仅适合一些特殊的业务场景使用。
RetryStrategy
节点执行的时候难免会因为特殊原因失败,Workflow 为此支持了重试策略 retryStrategy,可以定义在失败的时候是否重试、重试几次等,如下所示:
# This example demonstrates the use of retry back offs
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: retry-backoff-
spec:
entrypoint: retry-backoff
templates:
- name: retry-backoff
retryStrategy:
limit: 10
retryPolicy: "Always"
backoff:
duration: "1" # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
factor: 2
maxDuration: "1m" # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
affinity:
nodeAntiAffinity: {}
container:
image: python:alpine3.6
command: ["python", -c]
# fail with a 66% probability
args: ["import random; import sys; exit_code = random.choice([0, 1, 1]); sys.exit(exit_code)"]
- limit:容器失败时最大重试次数
- retryPolicy:容器在何种失败场景下进行重试,如OnFailure(default), OnError, Always, OnTransientError
- backoff:每次重试之间时间间隔的衰减配置
- duration:起始时间间隔
- factor:每次重试的时间间隔为前一次的几倍
- maxDuration:最大时间间隔
- affinity
- nodeAntiAffinity:节点反亲和设置,避免每次重试调度到同一节点
Exit Handler
当一个 Workflow 运行结束后如果希望做一些事情,那么 Exit Handler 就可以派上用场。一些常见场景有:
- 发送消息通知
- 重新提交 Workflow 或 提交一个新的 Workflow
- 清理该 Workflow 产生的临时数据
- 上报该 Workflow 的完成状态给数据库
Exit Handler 的定义方法如下:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: exit-handlers-
spec:
entrypoint: intentional-fail
onExit: exit-handler # invoke exit-handler template at end of the workflow
templates:
# primary workflow template
- name: intentional-fail
container:
image: alpine:latest
command: [sh, -c]
args: ["echo intentional failure; exit 1"]
# Exit handler templates
# After the completion of the entrypoint template, the status of the
# workflow is made available in the global variable {{workflow.status}}.
# {{workflow.status}} will be one of: Succeeded, Failed, Error
- name: exit-handler
steps:
- - name: notify
template: send-email
- name: celebrate
template: celebrate
when: "{{workflow.status}} == Succeeded"
- name: cry
template: cry
when: "{{workflow.status}} != Succeeded"
- name: send-email
container:
image: alpine:latest
command: [sh, -c]
args: ["echo send e-mail: {{workflow.name}} {{workflow.status}} {{workflow.duration}}"]
- name: celebrate
container:
image: alpine:latest
command: [sh, -c]
args: ["echo hooray!"]
- name: cry
container:
image: alpine:latest
command: [sh, -c]
args: ["echo boohoo!"]
Timeouts
可以在 Workflow 或 template 上设置执行的超时时间,以免因为个别节点阻塞整个 Workflow 的运行,如下所示:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: timeouts-
spec:
activeDeadlineSeconds: 10 # terminate workflow after 10 seconds
entrypoint: sleep
templates:
- name: sleep
activeDeadlineSeconds: 5 # terminate container after 5 seconds
container:
image: alpine:latest
command: [sh, -c]
args: ["echo sleeping for 1m; sleep 60; echo done"]
Suspending
如果希望暂停 Workflow 的执行,可以加入一个包含 suspend 配置的 template,然后在需要暂停的地方引用它,如下所示:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: suspend-template-
spec:
entrypoint: suspend
templates:
- name: suspend
steps:
- - name: build
template: whalesay
- - name: approve
template: approve
- - name: delay
template: delay
- - name: release
template: whalesay
- name: approve
suspend: {}
- name: delay
suspend:
duration: "20" # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h"
- name: whalesay
container:
image: docker/whalesay
command: [cowsay]
args: ["hello world"]
一旦暂停,Workflow 将不会再调度后面的节点,而是一直等到暂停的时间结束。
结语
Argo Workflows 其实还有很多特性本文没有介绍,比如生成/消费artifacts、执行scripts、挂载secrets/volumes等,更多详细的用法可以参考官方文档。另外需要注意的是,Argo Workflows 因为本质是基于 Kubernetes 的 etcd 数据库保存工作流配置和运行数据,数据会有时效性,因此如果希望持久保存,建议还是自己实现一个 Persist Controller 来监听 Workflow CR 的生命周期并持久化到关系型数据库。