mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 09:38:39 +08:00
128 lines
5.9 KiB
Python
128 lines
5.9 KiB
Python
from __future__ import print_function
|
|
from kubernetes import client, config
|
|
from kubernetes.client.rest import ApiException
|
|
from utils.util_log import test_log as log
|
|
import time
|
|
|
|
_GROUP = 'milvus.io'
|
|
_VERSION = 'v1alpha1'
|
|
_NAMESPACE = "default"
|
|
|
|
|
|
class CustomResourceOperations(object):
|
|
def __init__(self, kind, group=_GROUP, version=_VERSION, namespace=_NAMESPACE):
|
|
self.group = group
|
|
self.version = version
|
|
self.namespace = namespace
|
|
if kind.lower() == "schedule":
|
|
self.plural = "schedules"
|
|
else:
|
|
self.plural = kind.lower()
|
|
|
|
def create(self, body):
|
|
"""create or apply a custom resource in k8s"""
|
|
pretty = 'true'
|
|
config.load_kube_config()
|
|
api_instance = client.CustomObjectsApi()
|
|
try:
|
|
api_response = api_instance.create_namespaced_custom_object(self.group, self.version, self.namespace,
|
|
plural=self.plural, body=body, pretty=pretty)
|
|
log.debug(f"create custom resource response: {api_response}")
|
|
except ApiException as e:
|
|
log.error("Exception when calling CustomObjectsApi->create_namespaced_custom_object: %s\n" % e)
|
|
raise Exception(str(e))
|
|
return api_response
|
|
|
|
def delete(self, metadata_name, raise_ex=True):
|
|
"""delete or uninstall a custom resource in k8s"""
|
|
print(metadata_name)
|
|
try:
|
|
config.load_kube_config()
|
|
api_instance = client.CustomObjectsApi()
|
|
data = api_instance.delete_namespaced_custom_object(self.group, self.version, self.namespace, self.plural,
|
|
metadata_name)
|
|
log.debug(f"delete custom resource response: {data}")
|
|
except ApiException as e:
|
|
if raise_ex:
|
|
log.error("Exception when calling CustomObjectsApi->delete_namespaced_custom_object: %s\n" % e)
|
|
raise Exception(str(e))
|
|
|
|
def patch(self, metadata_name, body):
|
|
"""patch a custom resource in k8s"""
|
|
config.load_kube_config()
|
|
api_instance = client.CustomObjectsApi()
|
|
try:
|
|
api_response = api_instance.patch_namespaced_custom_object(self.group, self.version, self.namespace,
|
|
plural=self.plural,
|
|
name=metadata_name,
|
|
body=body)
|
|
log.debug(f"patch custom resource response: {api_response}")
|
|
except ApiException as e:
|
|
log.error("Exception when calling CustomObjectsApi->patch_namespaced_custom_object: %s\n" % e)
|
|
raise Exception(str(e))
|
|
return api_response
|
|
|
|
def list_all(self):
|
|
"""list all the customer resources in k8s"""
|
|
pretty = 'true'
|
|
try:
|
|
config.load_kube_config()
|
|
api_instance = client.CustomObjectsApi()
|
|
data = api_instance.list_namespaced_custom_object(self.group, self.version, self.namespace,
|
|
plural=self.plural, pretty=pretty)
|
|
log.debug(f"list custom resource response: {data}")
|
|
except ApiException as e:
|
|
log.error("Exception when calling CustomObjectsApi->list_namespaced_custom_object: %s\n" % e)
|
|
raise Exception(str(e))
|
|
return data
|
|
|
|
def get(self, metadata_name):
|
|
"""get a customer resources by name in k8s"""
|
|
try:
|
|
config.load_kube_config()
|
|
api_instance = client.CustomObjectsApi()
|
|
api_response = api_instance.get_namespaced_custom_object(self.group, self.version,
|
|
self.namespace, self.plural,
|
|
name=metadata_name)
|
|
log.debug(f"get custom resource response: {api_response}")
|
|
except ApiException as e:
|
|
log.error("Exception when calling CustomObjectsApi->get_namespaced_custom_object: %s\n" % e)
|
|
raise Exception(str(e))
|
|
return api_response
|
|
|
|
def delete_all(self):
|
|
"""delete all the customer resources in k8s"""
|
|
cus_objects = self.list_all()
|
|
if len(cus_objects["items"]) > 0:
|
|
for item in cus_objects["items"]:
|
|
metadata_name = item["metadata"]["name"]
|
|
self.delete(metadata_name)
|
|
|
|
def wait_pods_ready(self, namespace, label_selector):
|
|
"""wait pods with label selector all ready"""
|
|
config.load_kube_config()
|
|
api_instance = client.CoreV1Api()
|
|
try:
|
|
all_pos_ready_flag = False
|
|
timeout = 0
|
|
while (not all_pos_ready_flag and timeout < 360):
|
|
api_response = api_instance.list_namespaced_pod(namespace=namespace,label_selector=label_selector)
|
|
all_pos_ready_flag = True
|
|
for item in api_response.items:
|
|
for c in item.status.container_statuses:
|
|
log.info(f"{c.name} statu is {c.ready}")
|
|
if c.ready is False:
|
|
all_pos_ready_flag = False
|
|
break
|
|
if not all_pos_ready_flag:
|
|
log.info("all pods are not ready, please wait")
|
|
time.sleep(30)
|
|
timeout += 30
|
|
if all_pos_ready_flag:
|
|
log.info(f"all pods in namespace {namespace} with label {label_selector} are ready")
|
|
else:
|
|
log.info("timeout for waiting all pods in namespace {namespace} with label {label_selector} ready")
|
|
except ApiException as e:
|
|
log.error("Exception when calling CoreV1Api->list_namespaced_pod: %s\n" % e)
|
|
raise Exception(str(e))
|
|
return all_pos_ready_flag |