AWS Secrets

Caution

This example is for reference only. It is not extensively tested, and it not intended to be a fully-fledged Concourse resource for production pipelines. Copy and paste at your own risk.

This example will showcase the TriggerOnChangeConcourseResource, and how to build a resource to emit versions whenever something has changed, rather than when a new linear version becomes available. In this example, the resource will watch an secret in AWS SecretsManager, and yield a new version whenever the value of that secret has changed. It will also allow the user to optionally download the secret value, and also to update the secret via a string or a file. The functionality will depend heavily on boto3 in order to reduce the amount of code needed to function.

Secrets Version

Every secret in SecretsManager has a number of versions representing changes to the secret value. This is all the version needs to contain:

@dataclass(unsafe_hash=True)
class SecretVersion(TypedVersion):
    version_id: str

Here, we only inherit from TypedVersion to save some lines of code.

Secrets Resource

We start by inheriting from TriggerOnChangeConcourseResource and passing in the version class. The resource should take an ARN string of the secret (AWS recommends this over the secret name). We also define the AWS SecretsManager client in the __init__ method for ease. To get the AWS region, we need only parse the ARN string to avoid asking for duplicate values from the user:

def __init__(self, secret: str) -> None:
    super().__init__(SecretVersion)
    self.secret = secret

    # arn:aws:secretsmanager:<region>:<account>:secret:<name>
    _, _, _, region, _, _, _ = secret.split(":")
    self._client = boto3.client("secretsmanager", region_name=region)

With this resource type, we don’t overload from fetch_new_versions(), as it would force us to define the behaviour of the trigger. Instead, we let the resource implement this for us, and overload fetch_latest_version(). If the version we fetch is the same, then no new versions are emitted. If the version is different, then that new version is sent which which will trigger the pipeline:

def fetch_latest_version(self) -> SecretVersion:
    try:
        response = self._client.list_secret_version_ids(SecretId=self.secret,
                                                        IncludeDeprecated=False)
    except ClientError:
        raise ValueError(f"Cannot find secret: {self.secret!r}")

    versions = response["Versions"]
    for version in versions:
        version_stages = version["VersionStages"]
        if "AWSCURRENT" in version_stages:
            version_id = version["VersionId"]
            return SecretVersion(version_id)
    raise RuntimeError("No current version of the secret could be found.")

We use list_secret_version_ids with IncludeDeprecated=False to ensure that we only get versions which are either current or pending. We then iterate over the versions and find the one marked with AWSCURRENT. If we don’t find it then we raise an error to alert the user.

Next, we overload download_version() to allow us to download the metadata (and optionally the value) of the new secret:

def download_version(self, version: SecretVersion, destination_dir: Path,
                     build_metadata: BuildMetadata, value: bool = False,
                     metadata_file: str = "metadata.json",
                     value_file: str = "value") -> tuple[SecretVersion, dict[str, str]]:
    meta_response: dict[str, Any] = self._client.describe_secret(SecretId=self.secret)
    meta_response.pop("ResponseMetadata")

    metadata_path = destination_dir / metadata_file
    metadata_path.write_text(json_package.dumps(meta_response,
                                                cls=DatetimeSafeJSONEncoder))

    if value:
        value_response = self._client.get_secret_value(SecretId=self.secret)
        value_path = destination_dir / value_file

        try:
            secret_value = value_response["SecretString"]
        except KeyError:
            secret_value_as_bytes: bytes = value_response["SecretBinary"]
            value_path.write_bytes(secret_value_as_bytes)
        else:
            value_path.write_text(secret_value)

    return version, {}

The behaviour of the resource is as follows:

  1. The metadata of the secret is fetched from AWS using describe_secret. The response metadata is removed, but could potentially be output as Step Metadata.

  2. The metadata is saved to a file. By default this is named metadata.json, but the user can customise this with the parameters of the get step.

  3. If the user has requested the secret value also (which is not the default behaviour), then this is fetched using get_secret_value to be saved to a file, which defaults to value but is again customisable by the user.

  4. If the response contains a string, then this is written directly to the file using write_text(), but if it contains a binary then it is instead written using write_bytes().

The json module is imported as json_package to avoid a name collision with a future argument. The metadata of the secret contains some datetime objects, and so a custom JSONEncoder is required:

class DatetimeSafeJSONEncoder(json_package.JSONEncoder):

    def default(self, o: object) -> object:
        if isinstance(o, datetime):
            return o.isoformat()
        return super().default(o)

Finally, we overload publish_new_version() to allow the user to update the secret. We could make this rotate the secret, but for the purposes of this example we will allow the user to specify a new value exactly:

def publish_new_version(self, sources_dir: Path,
                        build_metadata: BuildMetadata, string: str | None = None,
                        file: str | None = None,
                        json: dict[str, str] | None = None) -> tuple[SecretVersion, dict[str, str]]:
    if json is not None:
        string = json_package.dumps(json)

    if string is not None:
        response = self._client.put_secret_value(SecretId=self.secret,
                                                 SecretString=string)
    elif file is not None:
        file_path = sources_dir / file
        file_contents = file_path.read_bytes()
        response = self._client.put_secret_value(SecretId=self.secret,
                                                 SecretBinary=file_contents)
    else:
        raise ValueError("Missing new value for the secret.")

    version_id = response["VersionId"]
    metadata = {
        "Version Staging Labels": ", ".join(response["VersionStages"])
    }
    return SecretVersion(version_id), metadata

The behaviour is as follows:

  1. If the user has specified the secret value as JSON, then encode that as a string as pass it forward.

  2. If the secret is specified as a string, then attempt to set the secret value with put_secret_value.

  3. If the secret is specified as a file path, then use read_bytes() to get the contents of the file (more resilient than reading the text) and set the SecretBinary instead.

  4. If none of these have been set, then raise an error.

  5. Pull out the new ID from the response to establish the new version to return, and also pass the VersionStages as metadata to be output to the console.

AWS Secrets Conclusion

We have added a lot of functionality for this resource in only 105 lines of code. The final module looks like this:

  1# (C) Crown Copyright GCHQ
  2from __future__ import annotations
  3
  4from dataclasses import dataclass
  5from datetime import datetime
  6import json as json_package
  7from pathlib import Path
  8from typing import Any
  9
 10import boto3
 11from botocore.exceptions import ClientError
 12
 13from concoursetools import BuildMetadata
 14from concoursetools.additional import TriggerOnChangeConcourseResource
 15from concoursetools.version import TypedVersion
 16
 17
 18class DatetimeSafeJSONEncoder(json_package.JSONEncoder):
 19
 20    def default(self, o: object) -> object:
 21        if isinstance(o, datetime):
 22            return o.isoformat()
 23        return super().default(o)
 24
 25
 26@dataclass(unsafe_hash=True)
 27class SecretVersion(TypedVersion):
 28    version_id: str
 29
 30
 31class Resource(TriggerOnChangeConcourseResource[SecretVersion]):
 32    """
 33    :param secret: The full Amazon Resource Name (ARN) of the secret.
 34    """
 35    def __init__(self, secret: str) -> None:
 36        super().__init__(SecretVersion)
 37        self.secret = secret
 38
 39        # arn:aws:secretsmanager:<region>:<account>:secret:<name>
 40        _, _, _, region, _, _, _ = secret.split(":")
 41        self._client = boto3.client("secretsmanager", region_name=region)
 42
 43    def fetch_latest_version(self) -> SecretVersion:
 44        try:
 45            response = self._client.list_secret_version_ids(SecretId=self.secret,
 46                                                            IncludeDeprecated=False)
 47        except ClientError:
 48            raise ValueError(f"Cannot find secret: {self.secret!r}")
 49
 50        versions = response["Versions"]
 51        for version in versions:
 52            version_stages = version["VersionStages"]
 53            if "AWSCURRENT" in version_stages:
 54                version_id = version["VersionId"]
 55                return SecretVersion(version_id)
 56        raise RuntimeError("No current version of the secret could be found.")
 57
 58    def download_version(self, version: SecretVersion, destination_dir: Path,
 59                         build_metadata: BuildMetadata, value: bool = False,
 60                         metadata_file: str = "metadata.json",
 61                         value_file: str = "value") -> tuple[SecretVersion, dict[str, str]]:
 62        meta_response: dict[str, Any] = self._client.describe_secret(SecretId=self.secret)
 63        meta_response.pop("ResponseMetadata")
 64
 65        metadata_path = destination_dir / metadata_file
 66        metadata_path.write_text(json_package.dumps(meta_response,
 67                                                    cls=DatetimeSafeJSONEncoder))
 68
 69        if value:
 70            value_response = self._client.get_secret_value(SecretId=self.secret)
 71            value_path = destination_dir / value_file
 72
 73            try:
 74                secret_value = value_response["SecretString"]
 75            except KeyError:
 76                secret_value_as_bytes: bytes = value_response["SecretBinary"]
 77                value_path.write_bytes(secret_value_as_bytes)
 78            else:
 79                value_path.write_text(secret_value)
 80
 81        return version, {}
 82
 83    def publish_new_version(self, sources_dir: Path,
 84                            build_metadata: BuildMetadata, string: str | None = None,
 85                            file: str | None = None,
 86                            json: dict[str, str] | None = None) -> tuple[SecretVersion, dict[str, str]]:
 87        if json is not None:
 88            string = json_package.dumps(json)
 89
 90        if string is not None:
 91            response = self._client.put_secret_value(SecretId=self.secret,
 92                                                     SecretString=string)
 93        elif file is not None:
 94            file_path = sources_dir / file
 95            file_contents = file_path.read_bytes()
 96            response = self._client.put_secret_value(SecretId=self.secret,
 97                                                     SecretBinary=file_contents)
 98        else:
 99            raise ValueError("Missing new value for the secret.")
100
101        version_id = response["VersionId"]
102        metadata = {
103            "Version Staging Labels": ", ".join(response["VersionStages"])
104        }
105        return SecretVersion(version_id), metadata