抓狐狸python_用Python操作Kubernetes的Job
本文給出Python SDK操作Kubernetes Job的更多示例代碼,以及相關解釋。
pip install kubernetes
初始化
from kubernetes.client import BatchV1Api
from kubernetes.config import load_kube_config
load_kube_config()
batch = BatchV1Api()
load_kube_config 是從默認位置,也就是 ~/.kube/config 加載配置。如果在其它位置,可以通過第一個參數傳入其路徑。
BatchV1Api() 可以當做Job的客戶端來用。命名上,Batch和Job是類似的概念,前者強調批量。
創建Job
以下來自官方樣例job_crud.py。
def create_job_object():
container = client.V1Container(
name="pi",
image="perl",
command=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"])
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": "pi"}),
spec=client.V1PodSpec(restart_policy="Never", containers=[container]))
spec = client.V1JobSpec(
template=template,
backoff_limit=4)
job = client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=client.V1ObjectMeta(name=JOB_NAME),
spec=spec)
return job
def create_job(api_instance, job):
api_response = api_instance.create_namespaced_job(
body=job,
namespace="default")
print("Job created. status='%s'" % str(api_response.status))
雖然,根據官方教程這樣的寫法,也能得到可用的 V1Job ,拿去執行創建操作。但還是過于陌生和偏門,不如主流、常見的YAML方便、易讀寫。
這里該出兩種更方便的做法。
直接使用YAML
---
apiVersion: batch/v1
kind: Job
metadata:
name: hello
spec:
template:
spec:
containers:
- name: echo
image: alpine:3.11
args:
- 'echo'
- 'Hello world!'
以上是一個最精簡的Job配置樣例,
通過讀取文件為dict,可以直接拿去使用。
from kubernetes.client import V1Job
import yaml
with open('job.yaml') as file:
cfg = yaml.safe_load(file)
job = batch.create_namespaced_job(namespace='default', body=cfg)
assert isinstance(job, V1Job)
create_namespaced_job 同樣接受字典作為body輸入,因此YAML配置可以讀出后直接傳入。
這里返回的 V1Job 只是創建時的狀態,但是會包含更多集群中的信息。
使用dict
由于 create_namespaced_job 接受字典作為 body 輸入,因此直接使用 dict 也是可行的。
cfg = {
'apiVersion': 'batch/v1',
'kind': 'Job',
'metadata': {
'name': 'hello'
},
'spec': {
'template': {
'spec': {
'restartPolicy':
'Never',
'containers': [{
'name': 'upload',
'image': 'alpine:3.11',
'args': ['echo', 'Hello world!']
}]
}
}
}
}
batch.create_namespaced_job(namespace='default', body=cfg)
由于 dict 結構與YAML相同,而又沒有類的束縛,所以也很靈活方便。
此外,從YAML讀出為 dict 后,也可以通過修改部分字段,達到動態變化的效果。這種結合YAML和dict的使用方式,是對官方用法的最佳替代。
監控Job運行
在創建Job后,通常需要監控Job的運行,做一些外圍處理。輪詢當然是下下策,而Kubernetes提供了一個 Watch 機制,通過接收Event,實現對狀態變化的掌控。Event只有在狀態變化時才會有,所以是非常理想的回調。
from kubernetes.client import V1Job
from kubernetes.watch import Watch
job_name = 'hello'
watcher = Watch()
for event in watcher.stream(
batch.list_namespaced_job,
namespace='default',
label_selector=f'job-name={job_name}',
):
assert isinstance(event, dict)
job = event['object']
assert isinstance(job, V1Job)
Watch().stream 就是前面說的理想回調,它第一個參數是列出類的函數,這里選擇 list_namespaced_job 。后面的參數,都是 list_namespaced_job 的參數。除了必備的namespace以外, label_selector也 是一個常用參數,可以避免關注無關的Job。每個Job在創建后,都會自動帶一個 f'job-name={job_name}' 的Label,可以借此篩選。 job_name 就是 metadata 里設置的 name ,如這里 job-name=hello 。
event是一個dict,只有三個值。其中 event['raw_object'] 只是 event['object'] 的 dict 形式,沒有太大意義。 event['type'] 常見三個值,對應增刪改。
ADDED ,創建時的信息,和
create_namespaced_job 的返回值通常沒有區別。
MODIFIED ,Job狀態變化時的信息。
DELETED ,Job刪除時的信息。
以上三個狀態值,對其它類型的資源也是通用的,比如Pod、Deployment等。
V1Job的使用
對于具體的 V1Job 實例,其它字段都是和創建時的配置差不多的,只是多一些集群中的具體信息。所以,常用的還是.status字段。
>>> from kubernetes.client import V1JobStatus
>>> isinstance(job.status, V1JobStatus)
True
>>> print(job.status)
{'active': None,
'completion_time': datetime.datetime(2020, 8, 10, 9, 49, 38, tzinfo=tzutc()),
'conditions': [{'last_probe_time': datetime.datetime(2020, 8, 10, 9, 49, 38, tzinfo=tzutc()),
'last_transition_time': datetime.datetime(2020, 8, 10, 9, 49, 38, tzinfo=tzutc()),
'message': None,
'reason': None,
'status': 'True',
'type': 'Complete'}],
'failed': None,
'start_time': datetime.datetime(2020, 8, 10, 9, 49, 32, tzinfo=tzutc()),
'succeeded': 1}
直接使用 job.status.succeeded ,可以得到成功的Container數量。下面幾乎每一級都有特定的類,可以連續使用.操作符。如果有特殊需要,也可以用 job.to_dict() 轉換成字典來用。
其它字段,作用基本上也和名稱相關,不難推測。
列出Job
列出所有Job的 list_job_for_all_namespaces 不常用,一般只列出指定Namespace的Job。
from kubernetes.client import V1JobList, V1Job
job_list = batch.list_namespaced_job(namespace='default')
assert isinstance(job_list, V1JobList)
assert isinstance(job_list.items, list)
for job in job_list.items:
assert isinstance(job, V1Job)
與監控的示例相比,這里去掉了 label_selector ,可以獲取Namespace中所有的Job。如果有需要,可以通過自定義Label把所有Job分類,并使用l abel_selector 獲取指定類型的Job。
讀取Job
如果知道Job的 name ,可以直接通過 read_* 系列接口,獲得指定的 V1Job 。
from kubernetes.client import V1Job
job = batch.read_namespaced_job(name='hello', namespace='default')
assert isinstance(job, V1Job)
如果更看重狀態,可以改用 read_namespaced_job_status 。雖然訪問的API不同,但在Python的 V1Job 這個結果層面,沒有本質差異。
列出一個Job的Pod
Pod是Kubernetes調度的最小單元,也是最常用的一種資源。
from typing import List
from kubernetes.client import CoreV1Api, V1Pod
def get_pods_by(job_name: str) -> List[V1Pod]:
core = CoreV1Api()
pods = core.list_namespaced_pod(
namespace='default',
label_selector=f'job-name={job_name}',
limit=1,
)
return pods.items
這里的 get_pods_by ,可以用 job_name 獲取對應的Pod。 limit=1 是在已知Pod只有一個的情況下做出的優化,可按需調整或去掉。
刪除Job
刪除一個Job:
from kubernetes.client import V1Status
status = batch.delete_namespaced_job(
namespace='default',
name=job_name,
propagation_policy='Background',
)
assert isinstance(status, V1Status)
其中, propagation_policy='Background' 是不可省略的關鍵,否則默認是 Orphan ,其Pod不會被刪除。這屬于API設計的一個失誤,與 kubectl 的默認行為不符合。而且,應該沒有人在刪除了Job之后,還要保留Pod的吧。這里也可以選擇 'Foreground' ,阻塞等待相關資源的刪除完畢。
刪除多個、或所有Job:
status = batch.delete_collection_namespaced_job(
namespace='default',
propagation_policy='Background',
label_selector='some-label=your-value',
)
assert isinstance(status, V1Status)
如果沒有 label_selector ,那就是刪除一個 Namespace 中的所有Job。
更新Job
這個比較少用,因為一般都是建新的。用法其實和 create_namespaced_job 差不多,參考官方樣例即可。
def update_job(api_instance, job):
job.spec.template.spec.containers[0].image = "perl"
api_response = api_instance.patch_namespaced_job(
name=JOB_NAME,
namespace="default",
body=job)
print("Job updated. status='%s'" % str(api_response.status))
總結
用Python操作Kubernetes的Job,總體上還是比較方便的,雖然有一些坑。
作者: 匿蟒
原文鏈接:https://note.qidong.name/2020/08/python-k8s-job/
總結
以上是生活随笔為你收集整理的抓狐狸python_用Python操作Kubernetes的Job的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: sql unicode转中文_SELEC
- 下一篇: python open ascii co