Cardinal Agent Builder
Release Agent
ECS Lambda Code

ECS Lambda Function Code Reference

This is the complete Python code for the ECS deployment tracker Lambda function.

Lambda Function

ecs-deployment-tracker.py:

import boto3
import json
import logging
import sys
import os
from urllib import request, error
from typing import Dict, List, Optional
 
ecs = boto3.client("ecs")
ecr = boto3.client("ecr")
 
# --- Structured JSON logger ---
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter('%(message)s'))
logger.handlers = [handler]
 
def log_json(**kwargs):
    """Emit a single-line JSON log entry."""
    logger.info(json.dumps(kwargs))
 
def get_image_digest(image_uri: str) -> Optional[str]:
    """
    Extract image digest from ECR image URI.
    Handles both tag-based and digest-based URIs.
    Returns SHA256 digest or None if not found.
    """
    try:
        if '@sha256:' in image_uri:
            return image_uri.split('@')[1]
 
        parts = image_uri.split('/')
        if len(parts) < 2:
            return None
 
        registry = parts[0]
        repo_and_tag = '/'.join(parts[1:])
 
        if ':' in repo_and_tag:
            repository, tag = repo_and_tag.rsplit(':', 1)
        else:
            repository = repo_and_tag
            tag = 'latest'
 
        response = ecr.describe_images(
            repositoryName=repository,
            imageIds=[{'imageTag': tag}]
        )
 
        if response['imageDetails']:
            digest = response['imageDetails'][0]['imageDigest']
            log_json(level="debug", msg="Found digest", image=image_uri, digest=digest)
            return digest
 
        return None
 
    except ecr.exceptions.ImageNotFoundException:
        log_json(level="warn", msg="Image not found in ECR", image=image_uri)
        return None
    except Exception as e:
        log_json(level="error", msg="Failed to get image digest", image=image_uri, error=str(e))
        return None
 
def post_deployment(payload: Dict) -> bool:
    """Post deployment info to configured endpoint."""
    endpoint = os.environ.get('DEPLOYMENT_ENDPOINT_URL')
    if not endpoint:
        log_json(level="warn", msg="No DEPLOYMENT_ENDPOINT_URL configured, skipping POST")
        return False
 
    try:
        headers = {
            'Content-Type': 'application/json',
            'User-Agent': 'ECS-Deployment-Tracker/1.0'
        }
 
        api_key = os.environ.get('API_KEY')
        if api_key:
            headers['Authorization'] = f'Bearer {api_key}'
 
        data = json.dumps(payload).encode('utf-8')
        req = request.Request(endpoint, data=data, headers=headers, method='POST')
 
        with request.urlopen(req, timeout=10) as response:
            status = response.getcode()
            body = response.read().decode('utf-8')
            log_json(level="info", msg="Posted to endpoint", status=status, response=body)
            return status >= 200 and status < 300
 
    except error.HTTPError as e:
        log_json(level="error", msg="HTTP error posting deployment",
                status=e.code, error=e.read().decode('utf-8'))
        return False
    except Exception as e:
        log_json(level="error", msg="Failed to post deployment", error=str(e))
        return False
 
def lambda_handler(event, context):
    log_json(event="raw_ecs_event", data=event)
 
    resources = event.get("resources", [])
    if not resources:
        log_json(level="warn", msg="No resources found in event")
        return {"statusCode": 400, "body": "No resources"}
 
    service_arn = resources[0]
    parts = service_arn.split(":service/")[-1].split("/")
    if len(parts) != 2:
        log_json(level="error", msg="Unexpected ARN format", arn=service_arn)
        return {"statusCode": 400, "body": "Invalid ARN"}
 
    cluster_name, service_name = parts
 
    try:
        svc_resp = ecs.describe_services(cluster=cluster_name, services=[service_name])
    except Exception as e:
        log_json(level="error", msg="Failed to describe service", error=str(e))
        return {"statusCode": 500, "body": "ECS API error"}
 
    if not svc_resp.get("services"):
        log_json(level="error", msg="Service not found", cluster=cluster_name, service=service_name)
        return {"statusCode": 404, "body": "Service not found"}
 
    service = svc_resp["services"][0]
    task_def_arn = service["taskDefinition"]
 
    try:
        td_resp = ecs.describe_task_definition(taskDefinition=task_def_arn)
    except Exception as e:
        log_json(level="error", msg="Failed to describe task definition", error=str(e))
        return {"statusCode": 500, "body": "ECS API error"}
 
    task_def = td_resp["taskDefinition"]
    container_defs = task_def["containerDefinitions"]
 
    digests = []
    images_info = []
 
    for container in container_defs:
        image_uri = container["image"]
        digest = get_image_digest(image_uri)
 
        if digest:
            digests.append(digest)
 
        images_info.append({
            "containerName": container["name"],
            "image": image_uri,
            "digest": digest
        })
 
    region = event.get("region", "us-east-1")
    scope = f"{region}/{cluster_name}"
 
    deployment_payload = {
        "runtime": "ecs",
        "scope": scope,
        "workloads": [
            {
                "name": service_name,
                "properties": {
                    "serviceArn": service_arn,
                    "cluster": cluster_name,
                    "region": region,
                    "deploymentId": event["detail"].get("deploymentId"),
                    "eventType": event["detail"].get("eventName"),
                    "taskDefinition": task_def_arn,
                    "images": images_info,
                    "timestamp": event["time"]
                },
                "digests": digests
            }
        ]
    }
 
    log_json(event="ecs_deployment_enriched", payload=deployment_payload)
    success = post_deployment(deployment_payload)
 
    return {
        "statusCode": 200 if success else 500,
        "body": json.dumps(deployment_payload)
    }

What This Code Does

1. Event Processing

  • Receives ECS deployment completion events from EventBridge
  • Parses the service ARN to extract cluster and service names
  • Fetches service and task definition details from ECS API

2. Image Digest Extraction

  • Queries ECR for each container image's SHA256 digest
  • Handles both tag-based URIs (image:tag) and digest-based URIs (image@sha256:...)
  • Supports multiple containers per task definition

3. Payload Construction

  • Builds a standardized payload with runtime, scope, and workloads
  • Includes all container digests and metadata
  • Formats scope as {region}/{cluster}

4. API Integration

  • POSTs the deployment payload to Cardinal API
  • Includes API key authentication via Bearer token
  • Uses structured JSON logging for debugging

Environment Variables

VariableDescriptionRequired
DEPLOYMENT_ENDPOINT_URLCardinal API endpoint (e.g., https://app.cardinalhq.io/_/chip/workloads)Yes
API_KEYYour Cardinal API key for authenticationYes

Required AWS Permissions

The Lambda function needs these IAM permissions:

  • ecs:DescribeServices
  • ecs:DescribeTaskDefinition
  • ecr:DescribeImages
  • ecr:BatchGetImage
  • logs:CreateLogGroup, logs:CreateLogStream, logs:PutLogEvents

Usage

  1. Save this code as ecs-deployment-tracker.py
  2. Create a ZIP file:
    zip lambda.zip ecs-deployment-tracker.py
  3. Upload to S3 or include in your CloudFormation template
  4. Deploy using the CloudFormation template

Related Pages