使用工作佇列進行精細平行處理

在此範例中,您將執行一個 Kubernetes Job,它將多個平行任務作為工作程序執行,每個程序都作為單獨的 Pod 執行。

在此範例中,當每個 Pod 建立時,它會從任務佇列中取出一個工作單元,處理它,然後重複,直到到達佇列的末端。

以下是此範例中步驟的概觀

  1. 啟動儲存服務以保留工作佇列。 在此範例中,您將使用 Redis 來儲存工作項目。在先前的範例中,您使用了 RabbitMQ。在此範例中,您將使用 Redis 和自訂工作佇列用戶端程式庫;這是因為 AMQP 沒有提供一種讓用戶端偵測有限長度工作佇列何時為空的好方法。實際上,您將設定一個儲存區(例如 Redis)一次,並將其重複用於許多 Job 的工作佇列和其他事物。
  2. 建立佇列,並用訊息填滿它。 每個訊息代表一個要完成的任務。在此範例中,訊息是一個整數,我們將對其進行冗長的計算。
  3. 啟動一個處理佇列中任務的 Job。 Job 啟動了多個 Pod。每個 Pod 從訊息佇列中取得一個任務,處理它,然後重複,直到到達佇列的末端。

開始之前

您需要有一個 Kubernetes 叢集,並且必須設定 kubectl 命令列工具以與您的叢集通訊。建議在至少有兩個節點且不充當控制平面主機的叢集上執行本教學課程。如果您還沒有叢集,可以使用 minikube 建立一個,或者您可以使用以下 Kubernetes playground 之一

您將需要一個容器映像檔登錄檔,您可以在其中上傳映像檔以在叢集中執行。此範例使用 Docker Hub,但您可以將其調整為不同的容器映像檔登錄檔。

此任務範例也假設您已在本機安裝 Docker。您可以使用 Docker 建置容器映像檔。

熟悉 Job 的基本、非平行使用方式。

啟動 Redis

在此範例中,為了簡單起見,您將啟動 Redis 的單一實例。有關可擴展且冗餘地部署 Redis 的範例,請參閱 Redis 範例

您也可以直接下載以下檔案

若要啟動 Redis 的單一實例,您需要建立 redis pod 和 redis 服務

kubectl apply -f https://k8s.io/examples/application/job/redis/redis-pod.yaml
kubectl apply -f https://k8s.io/examples/application/job/redis/redis-service.yaml

用任務填滿佇列

現在讓我們用一些「任務」填滿佇列。在此範例中,任務是要列印的字串。

啟動一個暫時的互動式 Pod 以執行 Redis CLI。

kubectl run -i --tty temp --image redis --command "/bin/sh"
Waiting for pod default/redis2-c7h78 to be running, status is Pending, pod ready: false
Hit enter for command prompt

現在按下 Enter 鍵,啟動 Redis CLI,並建立一個包含一些工作項目的清單。

redis-cli -h redis
redis:6379> rpush job2 "apple"
(integer) 1
redis:6379> rpush job2 "banana"
(integer) 2
redis:6379> rpush job2 "cherry"
(integer) 3
redis:6379> rpush job2 "date"
(integer) 4
redis:6379> rpush job2 "fig"
(integer) 5
redis:6379> rpush job2 "grape"
(integer) 6
redis:6379> rpush job2 "lemon"
(integer) 7
redis:6379> rpush job2 "melon"
(integer) 8
redis:6379> rpush job2 "orange"
(integer) 9
redis:6379> lrange job2 0 -1
1) "apple"
2) "banana"
3) "cherry"
4) "date"
5) "fig"
6) "grape"
7) "lemon"
8) "melon"
9) "orange"

因此,索引鍵為 job2 的清單將會是工作佇列。

注意:如果您沒有正確設定 Kube DNS,您可能需要將上述區塊的第一步變更為 redis-cli -h $REDIS_SERVICE_HOST

建立容器映像檔

現在您可以建立一個映像檔,用於處理該佇列中的工作。

您將使用一個 Python worker 程式和一個 Redis 用戶端來讀取訊息佇列中的訊息。

提供了一個簡單的 Redis 工作佇列用戶端程式庫,稱為 rediswq.py (下載)。

Job 的每個 Pod 中的「worker」程式都使用工作佇列用戶端程式庫來取得工作。以下是它:

#!/usr/bin/env python

import time
import rediswq

host="redis"
# Uncomment next two lines if you do not have Kube-DNS working.
# import os
# host = os.getenv("REDIS_SERVICE_HOST")

q = rediswq.RedisWQ(name="job2", host=host)
print("Worker with sessionID: " +  q.sessionID())
print("Initial queue state: empty=" + str(q.empty()))
while not q.empty():
  item = q.lease(lease_secs=10, block=True, timeout=2) 
  if item is not None:
    itemstr = item.decode("utf-8")
    print("Working on " + itemstr)
    time.sleep(10) # Put your actual work here instead of sleep.
    q.complete(item)
  else:
    print("Waiting for work")
print("Queue empty, exiting")

您也可以下載 worker.pyrediswq.pyDockerfile 檔案,然後建置容器映像檔。以下是使用 Docker 進行映像檔建置的範例

docker build -t job-wq-2 .

推送映像檔

對於 Docker Hub,請使用您的使用者名稱標記您的應用程式映像檔,並使用以下命令推送至 Hub。將 <username> 替換為您的 Hub 使用者名稱。

docker tag job-wq-2 <username>/job-wq-2
docker push <username>/job-wq-2

您需要推送至公用儲存庫,或設定您的叢集以能夠存取您的私有儲存庫

定義 Job

以下是您將建立的 Job 的資訊清單

apiVersion: batch/v1
kind: Job
metadata:
  name: job-wq-2
spec:
  parallelism: 2
  template:
    metadata:
      name: job-wq-2
    spec:
      containers:
      - name: c
        image: gcr.io/myproject/job-wq-2
      restartPolicy: OnFailure

在此範例中,每個 Pod 處理佇列中的多個項目,然後在沒有更多項目時退出。由於 worker 本身會偵測工作佇列何時為空,且 Job 控制器不知道工作佇列,因此它依賴 worker 發出訊號告知它們何時完成工作。worker 通過成功退出來發出佇列為空的訊號。因此,一旦任何 worker 成功退出,控制器就知道工作已完成,且 Pod 將很快退出。因此,您需要將 Job 的完成計數保持未設定狀態。Job 控制器將等待其他 Pod 也完成。

執行 Job

因此,現在執行 Job

# this assumes you downloaded and then edited the manifest already
kubectl apply -f ./job.yaml

現在稍等一下,然後檢查 Job

kubectl describe jobs/job-wq-2
Name:             job-wq-2
Namespace:        default
Selector:         controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
Labels:           controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
                  job-name=job-wq-2
Annotations:      <none>
Parallelism:      2
Completions:      <unset>
Start Time:       Mon, 11 Jan 2022 17:07:59 +0000
Pods Statuses:    1 Running / 0 Succeeded / 0 Failed
Pod Template:
  Labels:       controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
                job-name=job-wq-2
  Containers:
   c:
    Image:              container-registry.example/exampleproject/job-wq-2
    Port:
    Environment:        <none>
    Mounts:             <none>
  Volumes:              <none>
Events:
  FirstSeen    LastSeen    Count    From            SubobjectPath    Type        Reason            Message
  ---------    --------    -----    ----            -------------    --------    ------            -------
  33s          33s         1        {job-controller }                Normal      SuccessfulCreate  Created pod: job-wq-2-lglf8

您可以等待 Job 成功,並設定逾時

# The check for condition name is case insensitive
kubectl wait --for=condition=complete --timeout=300s job/job-wq-2
kubectl logs pods/job-wq-2-7r7b2
Worker with sessionID: bbd72d0a-9e5c-4dd6-abf6-416cc267991f
Initial queue state: empty=False
Working on banana
Working on date
Working on lemon

如您所見,此 Job 的其中一個 Pod 處理了多個工作單元。

替代方案

如果執行佇列服務或修改你的容器來使用工作佇列不方便,你可能會想考慮其他 Job 模式

如果你有持續不斷的背景處理工作要執行,那麼請考慮改用 ReplicaSet 來執行你的背景工作程序,並考慮使用背景處理函式庫,例如 https://github.com/resque/resque

最後修改時間為 2024 年 3 月 16 日 凌晨 2:39 PST:修正平行處理工作佇列任務的文件。(bed970676c)