Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup IP race condition on EC2Architect #606

Merged
merged 16 commits into from
Nov 16, 2021
Merged
12 changes: 1 addition & 11 deletions mephisto/abstractions/architects/ec2/ec2_architect.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ def __init__(

self.server_dir: Optional[str] = None
self.server_id: Optional[str] = None
self.allocation_id: Optional[str] = None
self.association_id: Optional[str] = None
self.target_group_arn: Optional[str] = None
self.router_rule_arn: Optional[str] = None
self.created = False
Expand Down Expand Up @@ -231,11 +229,7 @@ def __setup_ec2_server(self) -> str:
print("EC2: Starting instance...")

# Launch server
(
server_id,
self.allocation_id,
self.association_id,
) = ec2_helpers.create_instance(
server_id = ec2_helpers.create_instance(
self.session,
self.fallback_details["key_pair_name"],
self.fallback_details["security_group_id"],
Expand Down Expand Up @@ -264,8 +258,6 @@ def __setup_ec2_server(self) -> str:
server_details = {
"balancer_rule_arn": self.router_rule_arn,
"instance_id": self.server_id,
"ip_allocation_id": self.allocation_id,
"ip_association_id": self.association_id,
"subdomain": self.subdomain,
"target_group_arn": self.target_group_arn,
}
Expand Down Expand Up @@ -301,8 +293,6 @@ def __delete_ec2_server(self):
ec2_helpers.delete_instance(
self.session,
server_id,
self.allocation_id,
self.association_id,
)
os.unlink(self.server_detail_path)

Expand Down
269 changes: 160 additions & 109 deletions mephisto/abstractions/architects/ec2/ec2_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
SCRIPTS_DIRECTORY = os.path.join(MY_DIR, "run_scripts")
DEFAULT_FALLBACK_FILE = os.path.join(DEFAULT_SERVER_DETAIL_LOCATION, "fallback.json")
FALLBACK_SERVER_LOC = os.path.join(MY_DIR, "fallback_server")
KNOWN_HOST_PATH = os.path.expanduser("~/.ssh/known_hosts")
MAX_RETRIES = 10


Expand Down Expand Up @@ -533,7 +534,7 @@ def create_instance(
instance_name: str,
volume_size: int = 8,
instance_type: str = DEFAULT_INSTANCE_TYPE,
) -> Tuple[str, str, str]:
) -> str:
"""
Create an instance, return the instance id, allocation id, and association id
"""
Expand Down Expand Up @@ -589,36 +590,13 @@ def create_instance(
)
instance_id = instance_response["Instances"][0]["InstanceId"]

allocation_response = client.allocate_address(
Domain="vpc",
TagSpecifications=[
{
"ResourceType": "elastic-ip",
"Tags": [
{
"Key": "Name",
"Value": f"{instance_name}-ip-address",
}
],
}
],
)
allocation_id = allocation_response["AllocationId"]

logger.debug(f"Waiting for instance {instance_id} to come up before continuing")
waiter = client.get_waiter("instance_running")
waiter.wait(
InstanceIds=[instance_id],
)

associate_response = client.associate_address(
AllocationId=allocation_id,
InstanceId=instance_id,
AllowReassociation=False,
)
association_id = associate_response["AssociationId"]

return instance_id, allocation_id, association_id
return instance_id


def create_target_group(
Expand Down Expand Up @@ -810,104 +788,191 @@ def configure_base_balancer(
return listener_arn


def deploy_fallback_server(
def get_instance_address(
session: boto3.Session,
instance_id: str,
key_pair: str,
log_access_pass: str,
) -> bool:
) -> Tuple[str, str, str]:
"""
Deploy the fallback server to the given instance,
return True if successful
Create a temporary publicly accessible IP for the given instance.
Return the IP address, the allocation id, and the association id.
"""
client = session.client("ec2")
server_response = client.describe_instances(InstanceIds=[instance_id])
server_host = server_response["Reservations"][0]["Instances"][0]["PublicIpAddress"]
keypair_file = os.path.join(DEFAULT_KEY_PAIR_DIRECTORY, f"{key_pair}.pem")
password_file_name = os.path.join(FALLBACK_SERVER_LOC, f"access_key.txt")
with open(password_file_name, "w+") as password_file:
password_file.write(log_access_pass)

remote_server = f"{AMI_DEFAULT_USER}@{server_host}"
allocation_response = client.allocate_address(
Domain="vpc",
TagSpecifications=[
{
"ResourceType": "elastic-ip",
"Tags": [
{
"Key": "Name",
"Value": f"{instance_id}-ip-address",
}
],
}
],
)
ip_address = allocation_response["PublicIp"]
allocation_id = allocation_response["AllocationId"]

dest = f"{remote_server}:/home/ec2-user/"
subprocess.check_call(
[
"scp",
"-o",
"StrictHostKeyChecking=no",
"-i",
keypair_file,
"-r",
f"{FALLBACK_SERVER_LOC}",
dest,
]
associate_response = client.associate_address(
AllocationId=allocation_id,
InstanceId=instance_id,
AllowReassociation=False,
)
association_id = associate_response["AssociationId"]

# Remove this IP from known hosts in case it's there,
# as it's definitely not the old host anymore
subprocess.check_call(
[
"ssh",
"-i",
keypair_file,
remote_server,
"bash",
"/home/ec2-user/fallback_server/scripts/first_setup.sh",
"ssh-keygen",
"-f",
f"{KNOWN_HOST_PATH}",
"-R",
f'"{ip_address}"',
]
)

os.unlink(password_file_name)
return True
return ip_address, allocation_id, association_id


def deploy_to_routing_server(
def detete_instance_address(
session: boto3.Session,
instance_id: str,
key_pair: str,
push_directory: str,
) -> bool:
allocation_id: str,
association_id: str,
) -> None:
"""
Removes the public ip described by the given allocation and association ids
"""
client = session.client("ec2")
server_response = client.describe_instances(InstanceIds=[instance_id])
server_host = server_response["Reservations"][0]["Instances"][0]["PublicIpAddress"]
keypair_file = os.path.join(DEFAULT_KEY_PAIR_DIRECTORY, f"{key_pair}.pem")
client.disassociate_address(
AssociationId=association_id,
)

remote_server = f"{AMI_DEFAULT_USER}@{server_host}"
client.release_address(
AllocationId=allocation_id,
)

print("Uploading files to server, then attempting to run")
dest = f"{remote_server}:/home/ec2-user/"
retries = 5
sleep_time = 10.0

def try_server_push(subprocess_args: List[str], retries=5, sleep_time=10.0):
"""
Try to execute the server push provided in subprocess args
"""
while retries > 0:
try:
subprocess.check_call(
[
"scp",
"-o",
"StrictHostKeyChecking=no",
"-i",
keypair_file,
"-r",
f"{push_directory}",
dest,
]
)
break
subprocess.check_call(subprocess_args)
return
except subprocess.CalledProcessError:
retries -= 1
sleep_time *= 1.5
logger.info(
f"Timed out trying to push to server. Retries remaining: {retries}"
)
time.sleep(sleep_time)
subprocess.check_call(
[
"ssh",
"-i",
keypair_file,
remote_server,
"bash",
"/home/ec2-user/routing_server/setup/init_server.sh",
]
raise Exception(
"Could not successfully push to the ec2 instance. See log for errors."
)
print("Server setup complete!")


def deploy_fallback_server(
session: boto3.Session,
instance_id: str,
key_pair: str,
log_access_pass: str,
) -> bool:
"""
Deploy the fallback server to the given instance,
return True if successful
"""
client = session.client("ec2")
server_host, allocation_id, association_id = get_instance_address(
session, instance_id
)
try:
keypair_file = os.path.join(DEFAULT_KEY_PAIR_DIRECTORY, f"{key_pair}.pem")
password_file_name = os.path.join(FALLBACK_SERVER_LOC, f"access_key.txt")
with open(password_file_name, "w+") as password_file:
password_file.write(log_access_pass)

remote_server = f"{AMI_DEFAULT_USER}@{server_host}"

dest = f"{remote_server}:/home/ec2-user/"
try_server_push(
[
"scp",
"-o",
"StrictHostKeyChecking=no",
"-i",
keypair_file,
"-r",
f"{FALLBACK_SERVER_LOC}",
dest,
]
)
os.unlink(password_file_name)
subprocess.check_call(
[
"ssh",
"-i",
keypair_file,
remote_server,
"bash",
"/home/ec2-user/fallback_server/scripts/first_setup.sh",
]
)
detete_instance_address(session, allocation_id, association_id)
except Exception as e:
detete_instance_address(session, allocation_id, association_id)
raise e

return True


def deploy_to_routing_server(
session: boto3.Session,
instance_id: str,
key_pair: str,
push_directory: str,
) -> bool:
client = session.client("ec2")
server_host, allocation_id, association_id = get_instance_address(
session, instance_id
)
keypair_file = os.path.join(DEFAULT_KEY_PAIR_DIRECTORY, f"{key_pair}.pem")

print("Uploading files to server, then attempting to run")
try:
remote_server = f"{AMI_DEFAULT_USER}@{server_host}"
dest = f"{remote_server}:/home/ec2-user/"
try_server_push(
[
"scp",
"-o",
"StrictHostKeyChecking=no",
"-i",
keypair_file,
"-r",
f"{push_directory}",
dest,
]
)

subprocess.check_call(
[
"ssh",
"-i",
keypair_file,
remote_server,
"bash",
"/home/ec2-user/routing_server/setup/init_server.sh",
]
)
detete_instance_address(session, allocation_id, association_id)
print("Server setup complete!")
except Exception as e:
detete_instance_address(session, allocation_id, association_id)
raise e

return True

Expand All @@ -933,21 +998,11 @@ def delete_rule(
def delete_instance(
session: boto3.Session,
instance_id: str,
allocation_id: str,
association_id: str,
) -> None:
"""
Remove the given instance and the associated elastic ip
"""
client = session.client("ec2")
client.disassociate_address(
AssociationId=association_id,
)

client.release_address(
AllocationId=allocation_id,
)

client.terminate_instances(InstanceIds=[instance_id])


Expand All @@ -970,8 +1025,6 @@ def remove_instance_and_cleanup(
delete_instance(
session,
details["instance_id"],
details["ip_allocation_id"],
details["ip_association_id"],
)
os.unlink(server_detail_path)
return None
Expand Down Expand Up @@ -1033,11 +1086,9 @@ def cleanup_fallback_server(
)

instance_id = details.get("instance_id")
ip_allocation_id = details.get("ip_allocation_id")
ip_association_id = details.get("ip_association_id")
if instance_id is not None:
print(f"Deleting instance {instance_id}...")
delete_instance(session, instance_id, ip_allocation_id, ip_association_id)
delete_instance(session, instance_id)

vpc_details = details.get("vpc_details")
if vpc_details is not None:
Expand Down
Loading