AWS Secrets¶
Caution
This example is for reference only. It is not extensively tested, and it not intended to be a fully-fledged Concourse resource for production pipelines. Copy and paste at your own risk.
This example will showcase the TriggerOnChangeConcourseResource, and how to build a resource to emit versions whenever something has changed, rather than when a new linear version becomes available. In this example, the resource will watch an secret in AWS SecretsManager, and yield a new version whenever the value of that secret has changed. It will also allow the user to optionally download the secret value, and also to update the secret via a string or a file. The functionality will depend heavily on boto3 in order to reduce the amount of code needed to function.
Secrets Version¶
Every secret in SecretsManager has a number of versions representing changes to the secret value. This is all the version needs to contain:
@dataclass(unsafe_hash=True)
class SecretVersion(TypedVersion):
version_id: str
Here, we only inherit from TypedVersion to save some lines of code.
Secrets Resource¶
We start by inheriting from TriggerOnChangeConcourseResource and passing in the version class. The resource should take an ARN string of the secret (AWS recommends this over the secret name). We also define the AWS SecretsManager client in the __init__ method for ease. To get the AWS region, we need only parse the ARN string to avoid asking for duplicate values from the user:
def __init__(self, secret: str) -> None:
super().__init__(SecretVersion)
self.secret = secret
# arn:aws:secretsmanager:<region>:<account>:secret:<name>
_, _, _, region, _, _, _ = secret.split(":")
self._client = boto3.client("secretsmanager", region_name=region)
With this resource type, we don’t overload from fetch_new_versions(), as it would force us to define the behaviour of the trigger. Instead, we let the resource implement this for us, and overload fetch_latest_version(). If the version we fetch is the same, then no new versions are emitted. If the version is different, then that new version is sent which which will trigger the pipeline:
def fetch_latest_version(self) -> SecretVersion:
try:
response = self._client.list_secret_version_ids(SecretId=self.secret,
IncludeDeprecated=False)
except ClientError:
raise ValueError(f"Cannot find secret: {self.secret!r}")
versions = response["Versions"]
for version in versions:
version_stages = version["VersionStages"]
if "AWSCURRENT" in version_stages:
version_id = version["VersionId"]
return SecretVersion(version_id)
raise RuntimeError("No current version of the secret could be found.")
We use list_secret_version_ids with IncludeDeprecated=False to ensure that we only get versions which are either current or pending. We then iterate over the versions and find the one marked with AWSCURRENT. If we don’t find it then we raise an error to alert the user.
Next, we overload download_version() to allow us to download the metadata (and optionally the value) of the new secret:
def download_version(self, version: SecretVersion, destination_dir: Path,
build_metadata: BuildMetadata, value: bool = False,
metadata_file: str = "metadata.json",
value_file: str = "value") -> tuple[SecretVersion, dict[str, str]]:
meta_response: dict[str, Any] = self._client.describe_secret(SecretId=self.secret)
meta_response.pop("ResponseMetadata")
metadata_path = destination_dir / metadata_file
metadata_path.write_text(json_package.dumps(meta_response,
cls=DatetimeSafeJSONEncoder))
if value:
value_response = self._client.get_secret_value(SecretId=self.secret)
value_path = destination_dir / value_file
try:
secret_value = value_response["SecretString"]
except KeyError:
secret_value_as_bytes: bytes = value_response["SecretBinary"]
value_path.write_bytes(secret_value_as_bytes)
else:
value_path.write_text(secret_value)
return version, {}
The behaviour of the resource is as follows:
The metadata of the secret is fetched from AWS using describe_secret. The response metadata is removed, but could potentially be output as Step Metadata.
The metadata is saved to a file. By default this is named
metadata.json, but the user can customise this with the parameters of the get step.If the user has requested the secret value also (which is not the default behaviour), then this is fetched using get_secret_value to be saved to a file, which defaults to
valuebut is again customisable by the user.If the response contains a string, then this is written directly to the file using
write_text(), but if it contains a binary then it is instead written usingwrite_bytes().
The json module is imported as json_package to avoid a name collision with a future argument. The metadata of the secret contains some datetime objects, and so a custom JSONEncoder is required:
class DatetimeSafeJSONEncoder(json_package.JSONEncoder):
def default(self, o: object) -> object:
if isinstance(o, datetime):
return o.isoformat()
return super().default(o)
Finally, we overload publish_new_version() to allow the user to update the secret. We could make this rotate the secret, but for the purposes of this example we will allow the user to specify a new value exactly:
def publish_new_version(self, sources_dir: Path,
build_metadata: BuildMetadata, string: str | None = None,
file: str | None = None,
json: dict[str, str] | None = None) -> tuple[SecretVersion, dict[str, str]]:
if json is not None:
string = json_package.dumps(json)
if string is not None:
response = self._client.put_secret_value(SecretId=self.secret,
SecretString=string)
elif file is not None:
file_path = sources_dir / file
file_contents = file_path.read_bytes()
response = self._client.put_secret_value(SecretId=self.secret,
SecretBinary=file_contents)
else:
raise ValueError("Missing new value for the secret.")
version_id = response["VersionId"]
metadata = {
"Version Staging Labels": ", ".join(response["VersionStages"])
}
return SecretVersion(version_id), metadata
The behaviour is as follows:
If the user has specified the secret value as JSON, then encode that as a string as pass it forward.
If the secret is specified as a string, then attempt to set the secret value with put_secret_value.
If the secret is specified as a file path, then use
read_bytes()to get the contents of the file (more resilient than reading the text) and set theSecretBinaryinstead.If none of these have been set, then raise an error.
Pull out the new ID from the response to establish the new version to return, and also pass the
VersionStagesas metadata to be output to the console.
AWS Secrets Conclusion¶
We have added a lot of functionality for this resource in only 105 lines of code. The final module looks like this:
1# (C) Crown Copyright GCHQ
2from __future__ import annotations
3
4from dataclasses import dataclass
5from datetime import datetime
6import json as json_package
7from pathlib import Path
8from typing import Any
9
10import boto3
11from botocore.exceptions import ClientError
12
13from concoursetools import BuildMetadata
14from concoursetools.additional import TriggerOnChangeConcourseResource
15from concoursetools.version import TypedVersion
16
17
18class DatetimeSafeJSONEncoder(json_package.JSONEncoder):
19
20 def default(self, o: object) -> object:
21 if isinstance(o, datetime):
22 return o.isoformat()
23 return super().default(o)
24
25
26@dataclass(unsafe_hash=True)
27class SecretVersion(TypedVersion):
28 version_id: str
29
30
31class Resource(TriggerOnChangeConcourseResource[SecretVersion]):
32 """
33 :param secret: The full Amazon Resource Name (ARN) of the secret.
34 """
35 def __init__(self, secret: str) -> None:
36 super().__init__(SecretVersion)
37 self.secret = secret
38
39 # arn:aws:secretsmanager:<region>:<account>:secret:<name>
40 _, _, _, region, _, _, _ = secret.split(":")
41 self._client = boto3.client("secretsmanager", region_name=region)
42
43 def fetch_latest_version(self) -> SecretVersion:
44 try:
45 response = self._client.list_secret_version_ids(SecretId=self.secret,
46 IncludeDeprecated=False)
47 except ClientError:
48 raise ValueError(f"Cannot find secret: {self.secret!r}")
49
50 versions = response["Versions"]
51 for version in versions:
52 version_stages = version["VersionStages"]
53 if "AWSCURRENT" in version_stages:
54 version_id = version["VersionId"]
55 return SecretVersion(version_id)
56 raise RuntimeError("No current version of the secret could be found.")
57
58 def download_version(self, version: SecretVersion, destination_dir: Path,
59 build_metadata: BuildMetadata, value: bool = False,
60 metadata_file: str = "metadata.json",
61 value_file: str = "value") -> tuple[SecretVersion, dict[str, str]]:
62 meta_response: dict[str, Any] = self._client.describe_secret(SecretId=self.secret)
63 meta_response.pop("ResponseMetadata")
64
65 metadata_path = destination_dir / metadata_file
66 metadata_path.write_text(json_package.dumps(meta_response,
67 cls=DatetimeSafeJSONEncoder))
68
69 if value:
70 value_response = self._client.get_secret_value(SecretId=self.secret)
71 value_path = destination_dir / value_file
72
73 try:
74 secret_value = value_response["SecretString"]
75 except KeyError:
76 secret_value_as_bytes: bytes = value_response["SecretBinary"]
77 value_path.write_bytes(secret_value_as_bytes)
78 else:
79 value_path.write_text(secret_value)
80
81 return version, {}
82
83 def publish_new_version(self, sources_dir: Path,
84 build_metadata: BuildMetadata, string: str | None = None,
85 file: str | None = None,
86 json: dict[str, str] | None = None) -> tuple[SecretVersion, dict[str, str]]:
87 if json is not None:
88 string = json_package.dumps(json)
89
90 if string is not None:
91 response = self._client.put_secret_value(SecretId=self.secret,
92 SecretString=string)
93 elif file is not None:
94 file_path = sources_dir / file
95 file_contents = file_path.read_bytes()
96 response = self._client.put_secret_value(SecretId=self.secret,
97 SecretBinary=file_contents)
98 else:
99 raise ValueError("Missing new value for the secret.")
100
101 version_id = response["VersionId"]
102 metadata = {
103 "Version Staging Labels": ", ".join(response["VersionStages"])
104 }
105 return SecretVersion(version_id), metadata