使用工作佇列進行粗略平行處理

在此範例中,您將執行具有多個平行工作程序的 Kubernetes Job。

在此範例中,當每個 Pod 建立時,它會從任務佇列中提取一個工作單元、完成它、從佇列中刪除它,然後結束。

以下是此範例中步驟的總覽

  1. 啟動訊息佇列服務。 在此範例中,您使用 RabbitMQ,但您可以使用另一個。實際上,您會設定訊息佇列服務一次,並將其重複用於多個 Job。
  2. 建立佇列,並用訊息填滿它。 每則訊息代表要完成的一項任務。在此範例中,訊息是一個整數,我們將對其進行耗時的運算。
  3. 啟動處理佇列中任務的 Job。 Job 會啟動多個 Pod。每個 Pod 會從訊息佇列中提取一項任務、處理它,然後結束。

開始之前

您應該已熟悉 Job 的基本、非平行使用方式。

您需要有一個 Kubernetes 叢集,並且必須設定 kubectl 命令列工具以與您的叢集通訊。建議在至少有兩個節點且未充當控制平面主機的叢集上執行本教學課程。如果您還沒有叢集,您可以使用 minikube 建立一個,或者您可以使用以下 Kubernetes 實驗環境之一

您將需要一個容器映像檔登錄檔,您可以在其中上傳映像檔以在叢集中執行。

此任務範例也假設您已在本機安裝 Docker。

啟動訊息佇列服務

此範例使用 RabbitMQ,但是,您可以調整範例以使用另一個 AMQP 類型訊息服務。

實際上,您可以在叢集中設定訊息佇列服務一次,並將其重複用於多個 Job,以及用於長時間執行的服務。

如下所示啟動 RabbitMQ

# make a Service for the StatefulSet to use
kubectl create -f https://kubernetes.dev.org.tw/examples/application/job/rabbitmq/rabbitmq-service.yaml
service "rabbitmq-service" created
kubectl create -f https://kubernetes.dev.org.tw/examples/application/job/rabbitmq/rabbitmq-statefulset.yaml
statefulset "rabbitmq" created

測試訊息佇列服務

現在,我們可以實驗存取訊息佇列。我們將建立一個臨時互動式 Pod、在其上安裝一些工具,並實驗佇列。

首先,建立一個臨時互動式 Pod。

# Create a temporary interactive container
kubectl run -i --tty temp --image ubuntu:22.04
Waiting for pod default/temp-loe07 to be running, status is Pending, pod ready: false
... [ previous line repeats several times .. hit return when it stops ] ...

請注意,您的 Pod 名稱和命令提示字元會有所不同。

接下來,安裝 amqp-tools,以便您可以使用訊息佇列。下一個命令顯示您需要在該 Pod 的互動式 Shell 中執行的內容

apt-get update && apt-get install -y curl ca-certificates amqp-tools python3 dnsutils

稍後,您將製作一個包含這些套件的容器映像檔。

接下來,您將檢查是否可以探索 RabbitMQ 的服務

# Run these commands inside the Pod
# Note the rabbitmq-service has a DNS name, provided by Kubernetes:
nslookup rabbitmq-service
Server:        10.0.0.10
Address:    10.0.0.10#53

Name:    rabbitmq-service.default.svc.cluster.local
Address: 10.0.147.152

(IP 位址會有所不同)

如果 kube-dns 附加元件未正確設定,則先前的步驟可能不適用於你。你也可以在環境變數中找到該服務的 IP 位址

# run this check inside the Pod
env | grep RABBITMQ_SERVICE | grep HOST
RABBITMQ_SERVICE_SERVICE_HOST=10.0.147.152

(IP 位址會有所不同)

接下來,你將驗證是否可以建立佇列,以及發布和取用訊息。

# Run these commands inside the Pod
# In the next line, rabbitmq-service is the hostname where the rabbitmq-service
# can be reached.  5672 is the standard port for rabbitmq.
export BROKER_URL=amqp://guest:guest@rabbitmq-service:5672
# If you could not resolve "rabbitmq-service" in the previous step,
# then use this command instead:
BROKER_URL=amqp://guest:guest@$RABBITMQ_SERVICE_SERVICE_HOST:5672

# Now create a queue:

/usr/bin/amqp-declare-queue --url=$BROKER_URL -q foo -d
foo

發布一則訊息到佇列

/usr/bin/amqp-publish --url=$BROKER_URL -r foo -p -b Hello

# And get it back.

/usr/bin/amqp-consume --url=$BROKER_URL -q foo -c 1 cat && echo 1>&2
Hello

在最後一個指令中,amqp-consume 工具從佇列中取用了一則訊息(-c 1),並將該訊息傳遞到任意指令的標準輸入。在本例中,cat 程式會印出從標準輸入讀取的字元,而 echo 則會加入一個歸位字元,使範例易於閱讀。

將佇列填滿任務

現在,用一些模擬任務填滿佇列。在本範例中,任務是要列印的字串。

在實務上,訊息的內容可能是

  • 需要處理的檔案名稱
  • 程式的額外旗標
  • 資料庫表格中的索引鍵範圍
  • 模擬的組態參數
  • 要渲染的場景影格編號

如果 Job 的所有 Pod 在唯讀模式下需要大量資料,你通常會將其放在共用檔案系統(如 NFS)中,並以唯讀方式掛載在所有 Pod 上,或在 Pod 中編寫程式,使其能夠原生讀取叢集檔案系統中的資料(例如:HDFS)。

在此範例中,你將使用 AMQP 命令列工具建立佇列並填滿它。在實務上,你可能會編寫程式以使用 AMQP 客戶端程式庫填滿佇列。

# Run this on your computer, not in the Pod
/usr/bin/amqp-declare-queue --url=$BROKER_URL -q job1  -d
job1

將項目新增至佇列

for f in apple banana cherry date fig grape lemon melon
do
  /usr/bin/amqp-publish --url=$BROKER_URL -r job1 -p -b $f
done

你已將 8 則訊息新增至佇列。

建立容器映像

現在你已準備好建立將作為 Job 執行的映像。

Job 將使用 amqp-consume 工具從佇列讀取訊息並執行實際工作。以下是一個非常簡單的範例程式

#!/usr/bin/env python

# Just prints standard out and sleeps for 10 seconds.
import sys
import time
print("Processing " + sys.stdin.readlines()[0])
time.sleep(10)

給予腳本執行權限

chmod +x worker.py

現在,建置映像。建立一個暫時目錄,切換到該目錄,下載 Dockerfileworker.py。在任一情況下,使用此指令建置映像

docker build -t job-wq-1 .

對於 Docker Hub,使用你的使用者名稱標記你的應用程式映像,並使用以下指令推送到 Hub。將 <username> 替換為你的 Hub 使用者名稱。

docker tag job-wq-1 <username>/job-wq-1
docker push <username>/job-wq-1

如果你使用的是替代的容器映像登錄檔,請標記映像並將其推送到該處。

定義 Job

以下是 Job 的 manifest 檔案。你需要複製 Job manifest 檔案(命名為 ./job.yaml),並編輯容器映像的名稱以符合你使用的名稱。

apiVersion: batch/v1
kind: Job
metadata:
  name: job-wq-1
spec:
  completions: 8
  parallelism: 2
  template:
    metadata:
      name: job-wq-1
    spec:
      containers:
      - name: c
        image: gcr.io/<project>/job-wq-1
        env:
        - name: BROKER_URL
          value: amqp://guest:guest@rabbitmq-service:5672
        - name: QUEUE
          value: job1
      restartPolicy: OnFailure

在此範例中,每個 Pod 處理佇列中的一個項目,然後結束。因此,Job 的完成計數對應於已完成的工作項目數量。這就是為什麼範例 manifest 檔案將 .spec.completions 設定為 8

執行 Job

現在,執行 Job

# this assumes you downloaded and then edited the manifest already
kubectl apply -f ./job.yaml

你可以等待 Job 成功,並設定逾時時間

# The check for condition name is case insensitive
kubectl wait --for=condition=complete --timeout=300s job/job-wq-1

接下來,檢查 Job

kubectl describe jobs/job-wq-1
Name:             job-wq-1
Namespace:        default
Selector:         controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
Labels:           controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
                  job-name=job-wq-1
Annotations:      <none>
Parallelism:      2
Completions:      8
Start Time:       Wed, 06 Sep 2022 16:42:02 +0000
Pods Statuses:    0 Running / 8 Succeeded / 0 Failed
Pod Template:
  Labels:       controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
                job-name=job-wq-1
  Containers:
   c:
    Image:      container-registry.example/causal-jigsaw-637/job-wq-1
    Port:
    Environment:
      BROKER_URL:       amqp://guest:guest@rabbitmq-service:5672
      QUEUE:            job1
    Mounts:             <none>
  Volumes:              <none>
Events:
  FirstSeen  LastSeen   Count    From    SubobjectPath    Type      Reason              Message
  ─────────  ────────   ─────    ────    ─────────────    ──────    ──────              ───────
  27s        27s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-hcobb
  27s        27s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-weytj
  27s        27s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-qaam5
  27s        27s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-b67sr
  26s        26s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-xe5hj
  15s        15s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-w2zqe
  14s        14s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-d6ppa
  14s        14s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-p17e0

該 Job 的所有 Pod 都成功了!你完成了。

替代方案

這種方法的優點是你不需要修改你的「worker」程式,使其意識到有工作佇列。你可以將未修改的 worker 程式包含在你的容器映像中。

使用這種方法確實需要你執行訊息佇列服務。如果執行佇列服務不方便,你可能需要考慮其他 job 模式之一。

這種方法為每個工作項目建立一個 Pod。但是,如果你的工作項目僅需幾秒鐘,則為每個工作項目建立 Pod 可能會增加大量額外負擔。請考慮另一種設計,例如 細粒度平行工作佇列範例中的設計,該設計在每個 Pod 中執行多個工作項目。

在本範例中,你使用了 amqp-consume 工具從佇列讀取訊息並執行實際程式。這具有優點,你不需要修改你的程式以使其意識到佇列。 細粒度平行工作佇列範例 示範了如何使用客戶端程式庫與工作佇列進行通訊。

注意事項

如果完成次數設定為小於佇列中的項目數量,則並非所有項目都會被處理。

如果完成次數設定為大於佇列中的項目數量,則即使佇列中的所有項目都已處理完畢,Job 也不會顯示為已完成。它將啟動額外的 Pod,這些 Pod 將會封鎖並等待訊息。你需要建立自己的機制來發現何時有工作要做並測量佇列的大小,並設定完成次數以符合。

此模式存在不太可能發生的競爭情況。如果在 amqp-consume 指令確認訊息的時間與容器成功退出的時間之間容器被終止,或者如果節點在 kubelet 能夠將 Pod 成功的訊息發布回 API 伺服器之前崩潰,則即使佇列中的所有項目都已處理完畢,Job 也不會顯示為已完成。

上次修改時間:2024 年 3 月 16 日下午 2:39 PST:修正平行處理工作佇列任務的文件。(bed970676c)