Skip to content

Commit

Permalink
[MINOR][INFRA-147] Logging with JSON format. (#8)
Browse files Browse the repository at this point in the history
- app now logs into json format
- updated chart version
- fixed log key

---------

Co-authored-by: Valentin Daviot <[email protected]>
  • Loading branch information
fabienduhamel and vdaviot committed Jan 5, 2024
1 parent df133f6 commit 1580673
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 81 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__pycache__/
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.11-alpine3.18
FROM python:3.12-alpine3.18

WORKDIR /app

Expand Down
2 changes: 1 addition & 1 deletion charts/aws-secrets-synchronizer/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
apiVersion: v2
name: aws-secrets-synchronizer
version: 0.2.0
version: 0.2.1
description: K8S Operator that allows you to sync AWS SecretManager secrets with K8S secrets.
type: application
keywords:
Expand Down
2 changes: 1 addition & 1 deletion charts/aws-secrets-synchronizer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ The following table lists the configurable parameters of the aws-secrets-synchro
|------------------------------|-----------------------------|----------------------------|
| `image.repository` | Image repository | `ghcr.io/reezogit` |
| `image.name` | Image name | `aws-secrets-synchronizer` |
| `image.tag` | Image tag | `0.2.0` |
| `image.tag` | Image tag | `0.2.1` |
| `image.pullPolicy` | Image pull policy | `IfNotPresent` |
| `replicaCount` | Replica count | `1` |
| `env` | Environment variables | `{}` |
Expand Down
2 changes: 1 addition & 1 deletion charts/aws-secrets-synchronizer/values.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
image:
repository: "ghcr.io/reezogit"
name: "aws-secrets-synchronizer"
tag: "0.2.0"
tag: "0.2.1"
pullPolicy: IfNotPresent

replicaCount: 1
Expand Down
5 changes: 3 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ awscli==1.29.4
boto3==1.28.4
botocore==1.31.4
cachetools==5.3.0
certifi==2023.5.7
certifi==2023.7.22
charset-normalizer==3.1.0
colorama==0.4.4
Cython==3.0.0
Expand All @@ -21,5 +21,6 @@ requests-oauthlib==1.3.1
rsa==4.7.2
s3transfer==0.6.1
six==1.16.0
urllib3==1.26.15
urllib3==1.26.18
websocket-client==1.5.1
structlog==23.2.0
242 changes: 167 additions & 75 deletions script.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,73 @@
import os
import time
import logging
import structlog
import boto3
from botocore.exceptions import ClientError
from kubernetes import client, config

AWS_REGION = os.environ['AWS_REGION']
SYNC_INTERVAL = os.getenv('SYNC_INTERVAL', 300)
SYNC_EMPTY = os.getenv('SYNC_EMPTY', 'true')
AWS_TAG_KEY = os.getenv('AWS_TAG_KEY', 'SyncedBy')
AWS_TAG_VALUE = os.getenv('AWS_TAG_VALUE', 'aws-secret-synchronizer')
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO')


def get_base_logger(name=None):
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.dev.set_exc_info,
structlog.processors.EventRenamer("message"), # rename 'event' to 'message'
structlog.processors.TimeStamper(fmt="iso", utc=False),
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.make_filtering_bound_logger(logging.getLevelName(LOG_LEVEL)),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=False
)

return structlog.get_logger(name=name)


class SecretSyncer:
def __init__(self, v1_api, region_name, log_level, params):
self.v1_api = v1_api
"""
SecretSyncer is a class that synchronize AWS Secrets Manager secrets with Kubernetes secrets.
"""
_base_config = {
'aws_tag_key': AWS_TAG_KEY,
'aws_tag_value': AWS_TAG_VALUE,
'sync_empty': SYNC_EMPTY == 'true',
'sync_interval': SYNC_INTERVAL,
}

def __init__(self):
# Initialize Kubernetes client
config.load_incluster_config()
self.v1_api = client.CoreV1Api()

# Initiliaze AWS Secrets Manager client
self.client = boto3.client(
service_name='secretsmanager',
region_name=region_name,
region_name=AWS_REGION,
)
self.region_name = region_name
logging.basicConfig(level=log_level)
# merge default params with params passed in
self.params = {
'aws_tag_key': 'SyncedBy',
'aws_tag_value': 'aws-secrets-synchronizer',
'sync_empty': True,
'sync_interval': 300,
}
self.params.update(params)

# list secret from AWS Secrets manager
def list_aws_secrets_by_tags(self):

# Use the default logger if the user did not provide its own.
self.logger = get_base_logger('SecretSyncer') # not compatible with multiple instances of SecretSyncer

# Merge default params with user config
self.params = SecretSyncer._base_config


def list_aws_secrets_by_tags(self) -> list:
"""
List all AWS Secrets Manager secrets with the tag key/value pair
:return: list of secrets
"""
secrets = []
filters = [
{
Expand Down Expand Up @@ -58,6 +100,13 @@ def list_aws_secrets_by_tags(self):
return secrets

def aws_list_secrets_call(self, filters, next_token=None, max_results=100):
"""
Call AWS Secrets Manager list_secrets API
:param filters: array of filters
:param next_token: token to get next page
:param max_results: max results per page
:return: TODO
"""
try:
if next_token is None:
get_secret_value_response = self.client.list_secrets(
Expand All @@ -78,10 +127,12 @@ def aws_list_secrets_call(self, filters, next_token=None, max_results=100):

return get_secret_value_response

# get secret from AWS Secrets Manager
def get_secret_values(self, secret_name):
logging.info("Getting secret: %s", secret_name)

"""
Get secret value from AWS Secrets Manager
:param secret_name
:return: secret content
"""
try:
get_secret_value_response = self.client.get_secret_value(
SecretId=secret_name
Expand All @@ -91,72 +142,115 @@ def get_secret_values(self, secret_name):
# https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
raise e

self.logger.info("Secret found", secret_name=secret_name)

# Decrypts secret using the associated KMS key.
secret = get_secret_value_response['SecretString']

return secret

# get secret from AWS Secrets Manager
def get_secret_namespace_tag(self, aws_secret):
"""
Get secret namespace tag from AWS Secrets Manager
:param aws_secret
:return: namespace tag value
"""
for tag in aws_secret['Tags']:
if tag['Key'] == 'K8s-Namespace':
return tag['Value']
raise Exception("No Namespace tag found for secret: ", aws_secret['Name'])

def create_or_update_secret(self, namespace, name, data):
raise Exception("No Namespace tag found for secret: ", aws_secret['Name']) # FIXME Declare a custom error

def create_or_update_secret(self, namespace, secret_name, data):
"""
Create or update a secret in Kubernetes
:param namespace: namespace where to create the secret
:param secret_name: name of the secret
:param data: data to store in the secret
:return: None
"""
body = client.V1Secret(
api_version="v1",
kind="Secret",
metadata=client.V1ObjectMeta(name=name, annotations={},
labels={self.params['aws_tag_key']: self.params['aws_tag_value']}),
metadata=client.V1ObjectMeta(
name=secret_name,
annotations={},
labels={self.params['aws_tag_key']: self.params['aws_tag_value']}
),
data=data
)

try:
# Check if the secret already exists
existing_data = self.v1_api.read_namespaced_secret(name=name, namespace=namespace)
# Check if the secret exists
existing_data = self.v1_api.read_namespaced_secret(name=secret_name, namespace=namespace)

# If it exists, replace it
if existing_data.data != body.data:
logging.info("Updating secret: %s in namespace: %s", name, namespace)
self.v1_api.replace_namespaced_secret(
name=name,
name=secret_name,
namespace=namespace,
body=body
)
else:
logging.info("No change in secret: %s", name)

self.logger.info("Secret updated", secret_name=secret_name, k8s_namespace=namespace)

return

self.logger.info("Secret unchanged", secret_name=secret_name, k8s_namespace=namespace)

except client.rest.ApiException as e:
if e.status == 404:
# If it doesn't exist, create it
logging.info("Secret not found, creating secret: %s in namespace: %s", name, namespace)
self.v1_api.create_namespaced_secret(
namespace=namespace,
body=body
)
else:
raise e

self.logger.info("Secret created", secret_name=secret_name, k8s_namespace=namespace)

return

raise e

def delete_obsolete_secrets(self, existing_kube_secrets, aws_secrets):
"""
Delete secrets that are not in AWS Secrets Manager anymore
:param existing_kube_secrets: list of existing secrets in Kubernetes
:param aws_secrets: list of existing secrets in AWS Secrets Manager
:return: None
"""
for existing_kube_secret in existing_kube_secrets.items:
if existing_kube_secret.metadata.name not in [aws_secret['Name'] for aws_secret in aws_secrets]:
logging.info("Deleting secret: %s in namespace: %s", existing_kube_secret.metadata.name,
existing_kube_secret.metadata.namespace)
self.v1_api.delete_namespaced_secret(
name=existing_kube_secret.metadata.name,
namespace=existing_kube_secret.metadata.namespace,
body=client.V1DeleteOptions()
)

@staticmethod
def get_encoded_data_to_sync(data, sync_empty):
self.logger.info(
"Secret deleted",
secret_name=existing_kube_secret.metadata.name,
k8s_namespace=existing_kube_secret.metadata.namespace
)

def get_encoded_data_to_sync(self, data, sync_empty):
"""
Encode data to base64 and filter out empty values
:param data: data to encode
:param sync_empty: boolean to sync empty values
:return: encoded data
"""

# first, filter out empty values if sync_empty is False
filtered_data = {}
for key, value in data.items():
if value is None and sync_empty is False:
logging.warning("Key %s has an empty value, removed from synchronization.", key)
continue
elif value is None:
logging.warning("Key %s has an empty value.", key)
if not value:
if not sync_empty:
self.logger.warning("Empty key removed from synchronization", key=key)

continue

self.logger.warning("Empty key", key=key)
else:
filtered_data[key] = value

Expand All @@ -168,52 +262,50 @@ def get_encoded_data_to_sync(data, sync_empty):
return encoded_data

def run(self):
"""
Main loop
:return: None
"""
while True:
try:
self.logger.info("Syncing secrets")
aws_secrets = self.list_aws_secrets_by_tags()
self.logger.debug("Got list of secrets", secrets=aws_secrets)
existing_kube_secrets = self.v1_api.list_secret_for_all_namespaces(
watch=False,
label_selector= self.params['aws_tag_key'] + "=" + self.params['aws_tag_value'])
label_selector=self.params['aws_tag_key'] + "=" + self.params['aws_tag_value']
)
self.logger.debug("Existing secrets in k8s secrets", secrets=existing_kube_secrets.items)

for aws_secret in aws_secrets:
try:
namespace = self.get_secret_namespace_tag(aws_secret)
except Exception as e:
logging.error(e)
except Exception as e: # FIXME Should not catch generic exception, declare a custom one then catch it here
self.logger.error(
"Failed to get namespace tag from AWS secret",
secret_name=aws_secret['Name'],
err=e,
)

continue

# get secret data from AWS Secrets Manager
data = json.loads(self.get_secret_values(aws_secret['Name']))
self.create_or_update_secret(namespace=namespace,
name=aws_secret['Name'],
data=self.get_encoded_data_to_sync(data, self.params['sync_empty']))

self.create_or_update_secret(
namespace=namespace,
secret_name=aws_secret['Name'],
data=self.get_encoded_data_to_sync(
data,
self.params['sync_empty']
)
)

self.delete_obsolete_secrets(existing_kube_secrets, aws_secrets)
except Exception as e: # FIXME Really a bad practice
self.logger.error("Woops, something went wrong!", err=e)

except Exception as e:
logging.error(e)

time.sleep(self.params['sync_interval'])


def main():
params = {}
if 'SYNC_INTERVAL' in os.environ:
params['sync_interval'] = int(os.environ['SYNC_INTERVAL'])
if 'SYNC_EMPTY' in os.environ:
params['sync_empty'] = os.environ['SYNC_EMPTY'] == 'true'
if 'AWS_TAG_KEY' in os.environ:
params['aws_tag_key'] = os.environ['AWS_TAG_KEY']
if 'AWS_TAG_VALUE' in os.environ:
params['aws_tag_value'] = os.environ['AWS_TAG_VALUE']

config.load_incluster_config()
secret_syncer = SecretSyncer(
client.CoreV1Api(),
os.environ['AWS_REGION'],
os.environ.get('LOG_LEVEL', logging.INFO),
params
)
secret_syncer.run()
time.sleep(int(self.params['sync_interval']))


if __name__ == "__main__":
main()
SecretSyncer().run()

0 comments on commit 1580673

Please sign in to comment.