Argo Workflows 工作流引擎

引言

“工作流”是在系统开发中经常被提及的一个概念,比如机器学习平台里需要用工作流串联从建模、训练到预测的全流程,CICD系统需要用工作流打通从开发、构建再到部署的链路,办公自动化中也经常要用到工作流实现逐级审批等,因此很有必要为项目引入一个简单易用,又具有一定抽象度和扩展性的工作流框架——Argo Workflows,它是一个基于 Kubernetes 的工作流组件,通常用来编排节点(任务)依赖关系,然后调度这些节点,让他们以串行或并行方式运行。本文想从开发者角度介绍一下 Argo Workflows 的总体框架与核心能力,还有一些实际工作中很容易用到的特性。

总体架构

Argo Workflows 遵循 Kubernetes 的开发范式,总体架构还是比较清晰的,如下图所示:

从图中可以看到下面几个核心组件:

  • Workflow CRD:Workflow 资源描述文件,文件定义了 Workflow 内各执行节点的依赖关系、串/并行行为、入参出参、退出回调等。
  • Workflow Controller:Workflow 调度与生命周期管理控制器,负责监视 Workflow CR 的创建、更新与删除,并控制 Workflow 实例及每个实例下节点的运行。
  • Argo UI:提供 Web GUI 页面工具,可视化观测 Workflow 实例的运行动态,以及一些运维工具。
  • Argo Cli:命令行工具,可以更加方便简洁地对 Workflow 实例进行创建、查看、删除等操作。

Argo UI 和 Arg Cli 作为客户端为用户提供了更便捷的访问入口,而 Workflow CRD 和 Workflow Controller 作为服务端提供了控制 Workflow 实例调度、运转与管理的能力,最终 Workflow 内每个节点将作为核心的业务执行单元,以 Kubernetes 原生 Pod 形式运行。

Workflow CRD

Workflow CRD 遵循 Kubernetes CRD 标准规范定义,可以简单理解为是描述 Workflow 配置的一份文件,里面包含了对 Workflow 本身的配置定义,又包含了对每个运行节点及它们之间依赖关系的配置定义。下面是一个最简单的 Workflow CR:

apiVersion: argoproj.io/v1alpha1
kind: Workflow                  # new type of k8s spec
metadata:
  generateName: hello-world-    # name of the workflow spec
spec:
  entrypoint: whalesay          # invoke the whalesay template
  templates:
    - name: whalesay              # name of the template
      container:
        image: docker/whalesay
        command: [ cowsay ]
        args: [ "hello world" ]
        resources: # limit the resources
          limits:
            memory: 32Mi
            cpu: 100m

Workflow CRD 里很重要的一个概念是 templates,它用来描述这个工作流下节点的执行内容与参数,以及节点与节点间的依赖关系,串行还是并行执行。templates 有三种类型:

  • Container:不可再细分的原子类型 template,也可以理解为必须要定义某个节点对应的 container 执行行为的 template(比如上面的 whalesay 就是),包括容器名称、资源定义、镜像、启动命令、执行参数等,与我们定义 pod 里的 container 无异。
  • Steps:分步执行类型的 template。这种 template 将一系列节点以串行或并行方式编排,并指定每个节点用到的 template,这个 template 通常就引用 container 类型的 template,但如果有复杂场景,也支持嵌套引用 steps 或 dag 类型的 template。关于 steps 后面会给出一个示例讲解。
  • DAG:依赖关系类型的 template。这种 template 更加灵活,是上面 steps 的另外一种表示方式,它针对每个节点定义了他们所依赖的上游节点名称,或串行或并行执行,每个节点的 template 同上可以任意引用。关于 dag 后面也会给出一个示例讲解。

Arguments

上面展示了一个最简单的 container template 用法,但既然被叫做模版,那每个节点总不能都执行一样的内容吧,因此 Argo Workflows 还提供了在 template.inputs 定义参数的功能,然后用 {{ inputs.parameters.xxx }} 来引用参数,如下所示:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: hello-world-parameters-
spec:
  # invoke the whalesay template with
  # "hello world" as the argument
  # to the message parameter
  entrypoint: whalesay
  templates:
  - name: whalesay
    inputs:
      parameters:
      - name: message       # parameter declaration
        value: hello world
    container:
      # run cowsay with that message input parameter as args
      image: docker/whalesay
      command: [cowsay]
      args: ["{{inputs.parameters.message}}"]

但如果多个 template 都想引用同一个参数,那么可以省略 inputs 这块,直接在 spec.arguments 定义参数,然后在 template 中用 {{ workflow.parameters.xxx }} 来引用参数,像下面这样:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: global-parameters-
spec:
  entrypoint: A
  arguments:
    parameters:
    - name: log-level
      value: INFO

  templates:
  - name: A
    container:
      image: containerA
      env:
      - name: LOG_LEVEL
        value: "{{workflow.parameters.log-level}}"
      command: [runA]
  - name: B
    container:
      image: containerB
      env:
      - name: LOG_LEVEL
        value: "{{workflow.parameters.log-level}}"
      command: [runB]

Steps Template

Steps Template 其实很好理解,就是当你要编排一个 Workflow 下各节点的执行顺序和依赖关系的时候,就可以用它。下面是一个示例:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: steps-
spec:
  entrypoint: hello-hello-hello

  # This spec contains two templates: hello-hello-hello and whalesay
  templates:
  - name: hello-hello-hello
    # Instead of just running a container
    # This template has a sequence of steps
    steps:
    - - name: hello1            # hello1 is run before the following steps
        template: whalesay
        arguments:
          parameters:
          - name: message
            value: "hello1"
    - - name: hello2a           # double dash => run after previous step
        template: whalesay
        arguments:
          parameters:
          - name: message
            value: "hello2a"
      - name: hello2b           # single dash => run in parallel with previous step
        template: whalesay
        arguments:
          parameters:
          - name: message
            value: "hello2b"

  # This is the same template as from the previous example
  - name: whalesay
    inputs:
      parameters:
      - name: message
    container:
      image: docker/whalesay
      command: [cowsay]
      args: ["{{inputs.parameters.message}}"]

从上面可以看出定义了一个新的叫做 hello-hello-hello 的 Steps Template,其中双横线 “- -” 表示步骤自上而下、串行执行,而单横线 “-” 表示同级步骤并行执行。下面是用 Argo Cli 查看得到的这个 Workflow 实例的工作状态:

STEP            TEMPLATE           PODNAME                 DURATION  MESSAGE
 ✔ steps-z2zdn  hello-hello-hello
 ├───✔ hello1   whalesay           steps-z2zdn-27420706    2s
 └─┬─✔ hello2a  whalesay           steps-z2zdn-2006760091  3s
   └─✔ hello2b  whalesay           steps-z2zdn-2023537710  3s

DAG Template

DAG Template 其实是与 Steps Template 等价的另外一种书写方式,只是它在描述依赖关系的时候更加精确,每个节点直接指定它所依赖的上游节点名字,像下面这样:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: dag-diamond-
spec:
  entrypoint: diamond
  templates:
  - name: echo
    inputs:
      parameters:
      - name: message
    container:
      image: alpine:3.7
      command: [echo, "{{inputs.parameters.message}}"]
  - name: diamond
    dag:
      tasks:
      - name: A
        template: echo
        continueOn:
          error: true
          failed: true
        arguments:
          parameters: [{name: message, value: A}]
      - name: B
        dependencies: [A]
        template: echo
        arguments:
          parameters: [{name: message, value: B}]
      - name: C
        dependencies: [A]
        template: echo
        arguments:
          parameters: [{name: message, value: C}]
      - name: D
        dependencies: [B, C]
        template: echo
        arguments:
          parameters: [{name: message, value: D}]

从上面可以看到,DAG Template 大多配置与之前的都一样,比较关键的一项配置是节点的 dependencies,里面指定了节点所依赖的其他节点的名称。其中 task A 设置了参数 continueOn,这表明当这个节点运行产生 Failed 或 Error 的时候,都不会导致整个 Workflow 实例直接失败,而会继续运行下面的节点,但下面的节点其实又依赖节点 A,因此这种设置方法是自相矛盾的,仅适合一些特殊的业务场景使用。

RetryStrategy

节点执行的时候难免会因为特殊原因失败,Workflow 为此支持了重试策略 retryStrategy,可以定义在失败的时候是否重试、重试几次等,如下所示:

# This example demonstrates the use of retry back offs
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: retry-backoff-
spec:
  entrypoint: retry-backoff
  templates:
  - name: retry-backoff
    retryStrategy:
      limit: 10
      retryPolicy: "Always"
      backoff:
        duration: "1"      # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
        factor: 2
        maxDuration: "1m"  # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
      affinity:
        nodeAntiAffinity: {}
    container:
      image: python:alpine3.6
      command: ["python", -c]
      # fail with a 66% probability
      args: ["import random; import sys; exit_code = random.choice([0, 1, 1]); sys.exit(exit_code)"]
  • limit:容器失败时最大重试次数
  • retryPolicy:容器在何种失败场景下进行重试,如OnFailure(default), OnError, Always, OnTransientError
  • backoff:每次重试之间时间间隔的衰减配置
    • duration:起始时间间隔
    • factor:每次重试的时间间隔为前一次的几倍
    • maxDuration:最大时间间隔
  • affinity
    • nodeAntiAffinity:节点反亲和设置,避免每次重试调度到同一节点

Exit Handler

当一个 Workflow 运行结束后如果希望做一些事情,那么 Exit Handler 就可以派上用场。一些常见场景有:

  • 发送消息通知
  • 重新提交 Workflow 或 提交一个新的 Workflow
  • 清理该 Workflow 产生的临时数据
  • 上报该 Workflow 的完成状态给数据库

Exit Handler 的定义方法如下:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: exit-handlers-
spec:
  entrypoint: intentional-fail
  onExit: exit-handler                  # invoke exit-handler template at end of the workflow
  templates:
  # primary workflow template
  - name: intentional-fail
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo intentional failure; exit 1"]

  # Exit handler templates
  # After the completion of the entrypoint template, the status of the
  # workflow is made available in the global variable {{workflow.status}}.
  # {{workflow.status}} will be one of: Succeeded, Failed, Error
  - name: exit-handler
    steps:
    - - name: notify
        template: send-email
      - name: celebrate
        template: celebrate
        when: "{{workflow.status}} == Succeeded"
      - name: cry
        template: cry
        when: "{{workflow.status}} != Succeeded"
  - name: send-email
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo send e-mail: {{workflow.name}} {{workflow.status}} {{workflow.duration}}"]
  - name: celebrate
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo hooray!"]
  - name: cry
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo boohoo!"]

Timeouts

可以在 Workflow 或 template 上设置执行的超时时间,以免因为个别节点阻塞整个 Workflow 的运行,如下所示:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: timeouts-
spec:
  activeDeadlineSeconds: 10 # terminate workflow after 10 seconds
  entrypoint: sleep
  templates:
  - name: sleep
    activeDeadlineSeconds: 5 # terminate container after 5 seconds
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo sleeping for 1m; sleep 60; echo done"]

Suspending

如果希望暂停 Workflow 的执行,可以加入一个包含 suspend 配置的 template,然后在需要暂停的地方引用它,如下所示:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: suspend-template-
spec:
  entrypoint: suspend
  templates:
  - name: suspend
    steps:
    - - name: build
        template: whalesay
    - - name: approve
        template: approve
    - - name: delay
        template: delay
    - - name: release
        template: whalesay

  - name: approve
    suspend: {}

  - name: delay
    suspend:
      duration: "20"    # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h"

  - name: whalesay
    container:
      image: docker/whalesay
      command: [cowsay]
      args: ["hello world"]

一旦暂停,Workflow 将不会再调度后面的节点,而是一直等到暂停的时间结束。

结语

Argo Workflows 其实还有很多特性本文没有介绍,比如生成/消费artifacts、执行scripts、挂载secrets/volumes等,更多详细的用法可以参考官方文档。另外需要注意的是,Argo Workflows 因为本质是基于 Kubernetes 的 etcd 数据库保存工作流配置和运行数据,数据会有时效性,因此如果希望持久保存,建议还是自己实现一个 Persist Controller 来监听 Workflow CR 的生命周期并持久化到关系型数据库。