本文已發布超過一年。較舊的文章可能包含過時的內容。請檢查頁面中的資訊自發布以來是否已變得不正確。

Kubernetes 上的 Airflow (第一部分):一種不同的運算元

簡介

作為 Bloomberg 持續致力於開發 Kubernetes 生態系統 的一部分,我們很高興宣布推出 Kubernetes Airflow Operator;這是一種機制,讓 Apache Airflow 這個熱門的工作流程編排框架,可以使用 Kubernetes API 原生啟動任意 Kubernetes Pod。

什麼是 Airflow?

Apache Airflow 是 DevOps「組態即程式碼」哲學的一種實現方式。Airflow 允許使用者使用簡單的 Python 物件 DAG (有向無環圖) 啟動多步驟管線。您可以定義依賴關係、以程式設計方式建構複雜的工作流程,並在易於閱讀的 UI 中監控排程的工作。

Airflow DAGs Airflow UI

為何在 Kubernetes 上使用 Airflow?

自創立以來,Airflow 最強大的優勢一直是其靈活性。Airflow 為各種服務提供廣泛的整合,從 Spark 和 HBase,到各種雲端供應商提供的服務。Airflow 還透過其外掛程式框架提供簡單的擴充性。但是,該專案的一個限制是,Airflow 使用者受限於執行時 Airflow 工作節點上存在的框架和用戶端。單一組織可以擁有各種 Airflow 工作流程,從資料科學管線到應用程式部署。這種用例的差異會在依賴關係管理中產生問題,因為兩個團隊可能為其工作流程使用截然不同的程式庫。

為了解決這個問題,我們利用 Kubernetes 允許使用者啟動任意 Kubernetes Pod 和組態。Airflow 使用者現在可以完全掌控其執行時環境、資源和密碼,基本上將 Airflow 變成「您想要的任何工作」工作流程編排器。

Kubernetes Operator

在我們繼續深入之前,我們應該澄清 Airflow 中的 Operator 是一個任務定義。當使用者建立 DAG 時,他們會使用像「SparkSubmitOperator」或「PythonOperator」這樣的 Operator 來提交/監控 Spark 工作或 Python 函數。Airflow 隨附用於 Apache Spark、BigQuery、Hive 和 EMR 等框架的內建 Operator。它還提供了一個外掛程式入口點,允許 DevOps 工程師開發自己的連接器。

Airflow 使用者一直在尋找使部署和 ETL 管線更易於管理的方法。任何可以在增加監控的同時解耦管線步驟的機會,都可以減少未來的中斷和緊急應變。以下是 Airflow Kubernetes Operator 提供的好處清單

  • 提高部署的靈活性
    Airflow 的外掛程式 API 一直為希望在其 DAG 中測試新功能的工程師提供顯著的優勢。不利的一面是,每當開發人員想要建立新的 Operator 時,他們都必須開發一個全新的外掛程式。現在,任何可以在 Docker 容器中執行的任務都可以透過完全相同的 Operator 存取,而無需維護額外的 Airflow 程式碼。

  • 組態和依賴關係的靈活性: 對於在靜態 Airflow 工作節點中執行的 Operator,依賴關係管理可能會變得非常困難。如果開發人員想要執行一個需要 SciPy 的任務和另一個需要 NumPy 的任務,則開發人員必須在所有 Airflow 工作節點中維護這兩個依賴關係,或者將任務卸載到外部機器 (如果該外部機器以未追蹤的方式變更,則可能會導致錯誤)。自訂 Docker 映像允許使用者確保任務環境、組態和依賴關係是完全等冪的。

  • 使用 Kubernetes 密碼以增加安全性: 處理敏感資料是任何 DevOps 工程師的核心責任。在每一個機會中,Airflow 使用者都希望在嚴格的需要知道的基礎上隔離任何 API 金鑰、資料庫密碼和登入憑證。使用 Kubernetes Operator,使用者可以利用 Kubernetes Vault 技術來儲存所有敏感資料。這表示 Airflow 工作節點永遠無法存取此資訊,而只能請求建構僅具有所需密碼的 Pod。

架構

Airflow Architecture

Kubernetes Operator 使用 Kubernetes Python 用戶端 來產生由 APIServer (1) 處理的請求。然後 Kubernetes 將使用您定義的任何規格啟動您的 Pod (2)。映像將載入所有必要的環境變數、密碼和依賴關係,執行單一命令。工作啟動後,Operator 只需要監控追蹤記錄的健康狀況 (3)。使用者可以選擇將記錄在本機收集到排程器,或收集到 Kubernetes 叢集中目前存在的任何分散式記錄服務。

使用 Kubernetes Operator

基本範例

以下 DAG 可能是我們可以編寫的最簡單範例,以展示 Kubernetes Operator 的運作方式。此 DAG 在 Kubernetes 上建立兩個 Pod:一個具有 Python 的 Linux 發行版和一個沒有 Python 的基本 Ubuntu 發行版。Python Pod 將正確執行 Python 請求,而沒有 Python 的 Pod 將向使用者報告失敗。如果 Operator 運作正常,則 passing-task Pod 應完成,而 failing-task Pod 應向 Airflow Web 伺服器傳回失敗。

from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.utcnow(),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'kubernetes_sample', default_args=default_args, schedule_interval=timedelta(minutes=10))


start = DummyOperator(task_id='run_this_first', dag=dag)

passing = KubernetesPodOperator(namespace='default',
                          image="Python:3.6",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="passing-test",
                          task_id="passing-task",
                          get_logs=True,
                          dag=dag
                          )

failing = KubernetesPodOperator(namespace='default',
                          image="ubuntu:1604",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="fail",
                          task_id="failing-task",
                          get_logs=True,
                          dag=dag
                          )

passing.set_upstream(start)
failing.set_upstream(start)
Basic DAG Run

但這與我的工作流程有何關係?

雖然此範例僅使用基本映像,但 Docker 的魔力在於,相同的 DAG 將適用於您想要的任何映像/命令配對。以下是在 Airflow DAG 上執行生產就緒程式碼的建議 CI/CD 管線。

1:Github 中的 PR

使用 Travis 或 Jenkins 執行單元和整合測試,賄賂您最喜歡的隊友 PR 您的程式碼,然後合併到 master 分支以觸發自動化 CI 建置。

2:透過 Jenkins -> Docker 映像的 CI/CD

在您的 Jenkins 建置中產生您的 Docker 映像並遞增發行版本.

3:Airflow 啟動任務

最後,更新您的 DAG 以反映新的發行版本,您應該就可以開始使用了!

production_task = KubernetesPodOperator(namespace='default',
                          # image="my-production-job:release-1.0.1", <-- old release
                          image="my-production-job:release-1.0.2",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          name="fail",
                          task_id="failing-task",
                          get_logs=True,
                          dag=dag
                          )

啟動測試部署

由於 Kubernetes Operator 尚未發布,我們尚未發布官方 helm 圖表或 Operator (但是兩者目前都在進行中)。但是,我們在下面包含基本部署的說明,並且正在積極尋找冒失的 Beta 測試人員來試用這項新功能。若要試用此系統,請依照下列步驟操作

步驟 1:設定您的 kubeconfig 以指向 Kubernetes 叢集

步驟 2:複製 Airflow Repo

執行 git clone https://github.com/apache/incubator-airflow.git 以複製官方 Airflow Repo。

步驟 3:執行

為了執行此基本部署,我們正在採用我們目前用於 Kubernetes Executor 的整合測試腳本 (這將在本系列的下一篇文章中說明)。若要啟動此部署,請執行以下三個命令

sed -ie "s/KubernetesExecutor/LocalExecutor/g" scripts/ci/kubernetes/kube/configmaps.yaml
./scripts/ci/kubernetes/Docker/build.sh
./scripts/ci/kubernetes/kube/deploy.sh

在我們繼續之前,讓我們先討論這些命令的作用

sed -ie "s/KubernetesExecutor/LocalExecutor/g" scripts/ci/kubernetes/kube/configmaps.yaml

Kubernetes Executor 是另一個 Airflow 功能,允許將任務動態分配為等冪 Pod。我們將其切換為 LocalExecutor 的原因只是為了每次引入一個功能。如果您想嘗試 Kubernetes Executor,您可以隨時略過此步驟,但我們將在未來的文章中更詳細地介紹。

./scripts/ci/kubernetes/Docker/build.sh

此腳本將 tar Airflow master 原始碼建置基於 Airflow 發行版的 Docker 容器

./scripts/ci/kubernetes/kube/deploy.sh

最後,我們在您的叢集上建立完整的 Airflow 部署。這包括 Airflow 組態、Postgres 後端、Web 伺服器 + 排程器,以及所有必要的服務。需要注意的一點是,提供的角色繫結是叢集管理員,因此如果您在叢集上沒有該層級的權限,您可以在 scripts/ci/kubernetes/kube/airflow.yaml 中修改它

步驟 4:登入您的 Web 伺服器

現在您的 Airflow 執行個體正在執行中,讓我們看一下 UI!UI 位於 Airflow Pod 的 8080 埠中,因此只需執行

WEB=$(kubectl get pods -o go-template --template '{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}' | grep "airflow" | head -1)
kubectl port-forward $WEB 8080:8080

現在 Airflow UI 將存在於 https://#:8080。若要登入,只需輸入 airflow/airflow,您應該可以完全存取 Airflow Web UI。

步驟 5:上傳測試文件

若要修改/新增您自己的 DAG,您可以使用 kubectl cp 將本機檔案上傳到 Airflow 排程器的 DAG 資料夾中。然後 Airflow 將讀取新的 DAG 並自動將其上傳到其系統。以下命令會將任何本機檔案上傳到正確的目錄中

kubectl cp <本機檔案> <命名空間>/<Pod>:/root/airflow/dags -c 排程器

步驟 6:盡情享用!

那麼我什麼時候可以使用它呢?

雖然此功能仍處於早期階段,但我們希望在未來幾個月內看到它廣泛發布。

參與其中

此功能僅僅是多項重大努力的開始,旨在改進 Apache Airflow 與 Kubernetes 的整合。Kubernetes Operator 已合併到 Airflow 的 1.10 發行分支 (實驗模式下的執行器),以及一個稱為 Kubernetes Executor 的完全 K8s 原生排程器 (文章即將推出)。這些功能仍處於早期採用者/貢獻者可以對這些功能的未來產生巨大影響的階段。

對於那些有興趣加入這些努力的人,我建議查看以下步驟

  • 加入 airflow-dev 郵件清單,網址為 dev@airflow.apache.org
  • Apache Airflow JIRA 中提交問題
  • 在太平洋標準時間上午 10 點參加我們每週三的 SIG-BigData 會議。
  • 在 kubernetes.slack.com 上的 #sig-big-data Slack 頻道上聯絡我們

特別感謝 Apache Airflow 和 Kubernetes 社群,特別是 Grant Nicholas、Ben Goldberg、Anirudh Ramanathan、Fokko Dreisprong 和 Bolke de Bruin,感謝您們在這些功能以及我們未來的努力中提供的出色幫助。