Terminate based on cluster tag | Fix small issues

This commit is contained in:
abejgonzalez 2022-12-08 14:47:06 -08:00
parent c1e1c9e14a
commit 27b0c5e0da
3 changed files with 35 additions and 26 deletions

View File

@ -40,11 +40,11 @@ def cull_aws_instances(current_time: DateTime) -> None:
# Grab all instances with a CI-generated tag # Grab all instances with a CI-generated tag
aws_platform_lib = get_platform_lib(Platform.AWS) aws_platform_lib = get_platform_lib(Platform.AWS)
all_ci_instances = aws_platform_lib.find_all_ci_instances() all_ci_instances = aws_platform_lib.find_all_ci_instances()
select_ci_instances = aws_platform_lib.find_select_ci_instances() run_farm_ci_instances = aws_platform_lib.find_run_farm_ci_instances()
client = boto3.client('ec2') client = boto3.client('ec2')
instances_to_terminate = find_timed_out_resources(FPGA_INSTANCE_LIFETIME_LIMIT_HOURS, current_time, map(lambda x: (x, x['LaunchTime']), select_ci_instances)) instances_to_terminate = find_timed_out_resources(FPGA_INSTANCE_LIFETIME_LIMIT_HOURS, current_time, map(lambda x: (x, x['LaunchTime']), run_farm_ci_instances))
instances_to_terminate += find_timed_out_resources(INSTANCE_LIFETIME_LIMIT_HOURS, current_time, map(lambda x: (x, x['LaunchTime']), all_ci_instances)) instances_to_terminate += find_timed_out_resources(INSTANCE_LIFETIME_LIMIT_HOURS, current_time, map(lambda x: (x, x['LaunchTime']), all_ci_instances))
instances_to_terminate = list(set(instances_to_terminate)) instances_to_terminate = list(set(instances_to_terminate))
@ -60,10 +60,10 @@ def cull_aws_instances(current_time: DateTime) -> None:
def cull_azure_resources(current_time: DateTime) -> None: def cull_azure_resources(current_time: DateTime) -> None:
azure_platform_lib = get_platform_lib(Platform.AZURE) azure_platform_lib = get_platform_lib(Platform.AZURE)
all_azure_ci_vms = azure_platform_lib.find_all_ci_instances() all_azure_ci_vms = azure_platform_lib.find_all_ci_instances()
select_azure_ci_vms = azure_platform_lib.find_select_ci_instances() run_farm_azure_ci_vms = azure_platform_lib.find_run_farm_ci_instances()
vms_to_terminate = find_timed_out_resources(FPGA_INSTANCE_LIFETIME_LIMIT_HOURS, current_time, \ vms_to_terminate = find_timed_out_resources(FPGA_INSTANCE_LIFETIME_LIMIT_HOURS, current_time, \
map(lambda x: (x, datetime.datetime.strptime(x['LaunchTime'],'%Y-%m-%d %H:%M:%S.%f%z')), select_azure_ci_vms)) map(lambda x: (x, datetime.datetime.strptime(x['LaunchTime'],'%Y-%m-%d %H:%M:%S.%f%z')), run_farm_azure_ci_vms))
vms_to_terminate += find_timed_out_resources(INSTANCE_LIFETIME_LIMIT_HOURS, current_time, \ vms_to_terminate += find_timed_out_resources(INSTANCE_LIFETIME_LIMIT_HOURS, current_time, \
map(lambda x: (x, datetime.datetime.strptime(x['LaunchTime'],'%Y-%m-%d %H:%M:%S.%f%z')), all_azure_ci_vms)) map(lambda x: (x, datetime.datetime.strptime(x['LaunchTime'],'%Y-%m-%d %H:%M:%S.%f%z')), all_azure_ci_vms))
vms_to_terminate = list(set(vms_to_terminate)) vms_to_terminate = list(set(vms_to_terminate))

View File

@ -79,17 +79,20 @@ class PlatformLib(metaclass=abc.ABCMeta):
@abc.abstractmethod @abc.abstractmethod
def find_all_workflow_instances(self, workflow_tag: str) -> List: def find_all_workflow_instances(self, workflow_tag: str) -> List:
""" Returns all instances in this workflow (including manager) """ """ Returns all manager instances in this workflow """
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def find_all_ci_instances(self) -> List: def find_all_ci_instances(self) -> List:
""" Returns all instances across CI workflows """ """ Returns all manager instances across all CI workflows """
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def find_select_ci_instances(self, workflow_tag: str = '*') -> List: def find_run_farm_ci_instances(self, workflow_tag: str = '*') -> List:
""" Grabs a list of select instances across all CI using the CI unique tag key""" """
Returns all run farm instance types (normally FPGA instances) that have the
`workflow_tag` in the cluster name.
"""
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
@ -130,8 +133,8 @@ class PlatformLib(metaclass=abc.ABCMeta):
return f"centos@{self.get_manager_ip(workflow_tag)}" return f"centos@{self.get_manager_ip(workflow_tag)}"
@abc.abstractmethod @abc.abstractmethod
def check_and_terminate_select_instances(self, timeout: int, workflow_tag: str) -> None: def check_and_terminate_run_farm_instances(self, timeout: int, workflow_tag: str) -> None:
""" Check if platform-specific instances are running past a `timeout` minutes designated time. If so, then terminate them. """ """ Check if run farm instances are running past a `timeout` minutes designated time. If so, then terminate them. """
raise NotImplementedError raise NotImplementedError
@ -182,11 +185,11 @@ class AWSPlatformLib(PlatformLib):
all_ci_instances = get_instances_with_filter([all_ci_instances_filter], allowed_states=['*']) all_ci_instances = get_instances_with_filter([all_ci_instances_filter], allowed_states=['*'])
return all_ci_instances return all_ci_instances
def find_select_ci_instances(self, workflow_tag: str = '*') -> List: def find_run_farm_ci_instances(self, workflow_tag: str = '*') -> List:
""" Grabs a list of select instances across all CI using the CI unique tag key""" # on AWS run farm instances are marked with 'fsimcluster'
instances_filter = [ instances_filter = [
self.get_filter(workflow_tag), {'Name': 'tag:fsimcluster', 'Values': f'*{workflow_tag}*'},
{'Name': 'instance-type', 'Values': ['f1.2xlarge', 'f1.16xlarge']}, {'Name': 'instance-type', 'Values': ['f1.2xlarge', 'f1.4xlarge', 'f1.16xlarge']},
] ]
ci_instances = get_instances_with_filter(instances_filter, allowed_states=['*']) ci_instances = get_instances_with_filter(instances_filter, allowed_states=['*'])
return ci_instances return ci_instances
@ -245,7 +248,7 @@ class AWSPlatformLib(PlatformLib):
else: else:
raise ValueError(f"Unrecognized transition type: {state_change}") raise ValueError(f"Unrecognized transition type: {state_change}")
def get_platform_enum(self) -> None: def get_platform_enum(self) -> Platform:
return Platform.AWS return Platform.AWS
def get_manager_metadata_string(self, workflow_tag: str) -> str: def get_manager_metadata_string(self, workflow_tag: str) -> str:
@ -264,18 +267,21 @@ class AWSPlatformLib(PlatformLib):
return static_md + dynamic_md return static_md + dynamic_md
def check_and_terminate_select_instances(self, timeout: int, workflow_tag: str) -> None: def check_and_terminate_run_farm_instances(self, timeout: int, workflow_tag: str) -> None:
# terminate f1.{2,16}xlarge instances after timeout minutes of running (extra backup) # We need this in case terminate is called in setup-self-hosted-workflow before aws-configure is run
instances = self.find_select_ci_instances(workflow_tag) if self.client is None:
self.client = boto3.client('ec2')
instances = self.find_run_farm_ci_instances(workflow_tag)
terminated_insts = False terminated_insts = False
for inst in instances: for inst in instances:
if (datetime.datetime.now() - inst.launch_time) >= datetime.timedelta(minutes=timeout): if (datetime.datetime.now() - inst.launch_time) >= datetime.timedelta(minutes=timeout):
print("Uncaught FPGA instance shutdown detected") print("Uncaught run farm instance shutdown detected")
instids = [ inst.instance_id ] instids = [ inst.instance_id ]
terminate_instances(instids, False) self.client.terminate_instances(InstanceIds=instids, DryRun=False)
print(f"Terminated FPGA instance {instids}") print(f"Terminated run farm instance {instids}")
terminated_insts = True terminated_insts = True
# post comment after instances are terminated just in case there is an issue with posting # post comment after instances are terminated just in case there is an issue with posting
@ -428,8 +434,8 @@ class AzurePlatformLib(PlatformLib):
else: else:
print(f"Succeeded in deleting VM {vm['name']}") print(f"Succeeded in deleting VM {vm['name']}")
def check_and_terminate_select_instances(self, timeout: int, workflow_tag: str) -> None: def check_and_terminate_run_farm_instances(self, timeout: int, workflow_tag: str) -> None:
raise NotImplementedError raise NotImplementedError
def find_select_ci_instances(self, workflow_tag: str = '*') -> List: def find_run_farm_ci_instances(self, workflow_tag: str = '*') -> List:
raise NotImplementedError raise NotImplementedError

View File

@ -35,7 +35,7 @@ TERMINATE_STATES = ["cancelled", "success", "skipped", "stale", "failure", "time
STOP_STATES = [] STOP_STATES = []
NOP_STATES = ["action_required"] # TODO: unsure when this happens NOP_STATES = ["action_required"] # TODO: unsure when this happens
def wrap_in_code(wrap: str): def wrap_in_code(wrap: str) -> str:
return f"\n```\n{wrap}\n```" return f"\n```\n{wrap}\n```"
def main(platform: Platform): def main(platform: Platform):
@ -59,13 +59,16 @@ def main(platform: Platform):
print(f"Workflow {ci_env['GITHUB_RUN_ID']} status: {state_status} {state_concl}") print(f"Workflow {ci_env['GITHUB_RUN_ID']} status: {state_status} {state_concl}")
# check that select instances are terminated on time # check that select instances are terminated on time
platform_lib.check_and_terminate_select_instances(45, ci_env['GITHUB_RUN_ID']) platform_lib.check_and_terminate_run_farm_instances(45, ci_env['GITHUB_RUN_ID'])
if state_status in ['completed']: if state_status in ['completed']:
if state_concl in TERMINATE_STATES: if state_concl in TERMINATE_STATES:
platform_lib.check_and_terminate_run_farm_instances(0, ci_env['GITHUB_RUN_ID'])
platform_lib.terminate_instances(ci_env['PERSONAL_ACCESS_TOKEN'], ci_env['GITHUB_RUN_ID']) platform_lib.terminate_instances(ci_env['PERSONAL_ACCESS_TOKEN'], ci_env['GITHUB_RUN_ID'])
return return
elif state_concl in STOP_STATES: elif state_concl in STOP_STATES:
# if we stop then we should terminate the run farm instances
platform_lib.check_and_terminate_run_farm_instances(0, ci_env['GITHUB_RUN_ID'])
platform_lib.stop_instances(ci_env['PERSONAL_ACCESS_TOKEN'], ci_env['GITHUB_RUN_ID']) platform_lib.stop_instances(ci_env['PERSONAL_ACCESS_TOKEN'], ci_env['GITHUB_RUN_ID'])
return return
elif state_concl not in NOP_STATES: elif state_concl not in NOP_STATES:
@ -86,7 +89,7 @@ def main(platform: Platform):
issue_post(ci_env['PERSONAL_ACCESS_TOKEN'], post_str) issue_post(ci_env['PERSONAL_ACCESS_TOKEN'], post_str)
platform_lib.check_and_terminate_select_instances(0, ci_env['GITHUB_RUN_ID']) platform_lib.check_and_terminate_run_farm_instances(0, ci_env['GITHUB_RUN_ID'])
platform_lib.terminate_instances(ci_env['PERSONAL_ACCESS_TOKEN'], ci_env['GITHUB_RUN_ID']) platform_lib.terminate_instances(ci_env['PERSONAL_ACCESS_TOKEN'], ci_env['GITHUB_RUN_ID'])
post_str = f"Instances for CI run {ci_env['GITHUB_RUN_ID']} were supposedly terminated. Verify termination manually.\n" post_str = f"Instances for CI run {ci_env['GITHUB_RUN_ID']} were supposedly terminated. Verify termination manually.\n"