AWS SageMaker Pipeline¶
Caution
This example is for reference only. It is not extensively tested, and it not intended to be a fully-fledged Concourse resource for production pipelines. Copy and paste at your own risk.
In this example we will explore how to create a custom Concourse Resource using the Concourse Tools library. In this particular example we will consider a resource to interact with an AWS SageMaker pipeline, which needs to keep track of executions of the pipeline, and to also be able to trigger new executions. In particular, we want the following behaviour:
get step: Fetch the latest execution and download the metadata.
put step: Start a new pipeline execution.
The functionality of the resource will depend heavily on boto3 in order to reduce the amount of code needed to function.
Execution Version¶
When selecting the version for a custom resource, it is important to ensure that they will satisfy the following conditions:
Versions should be linearly ordered.
Versions should be unique within the context of the resource.
Versions should contain the minimum amount of information required to be unique within the resource.
It is possible to break some of these assumptions, but to do so requires great care. For example, the built-in git resource uses a git commit to map to a version, but these are only unique within the context of a specific branch. Therefore, the resource requires a branch to be specified, otherwise the versions would not be linear. Although it would be possible to also include commit-specific information (such as author and timestamp) within the version, this is unnecessary.
Previously, some custom Concourse resources (such as the Bitbucket pull request resource) attempt to use non-linear versions to be able to track both commits and pull request IDs, but this requires the user to set version: every on their resource in order to properly pick up every change, which can have some unintended consequences (such as multiple commits pushed to a branch at once being considered multiple versions). The correct way to deal with this now is to make use of instanced pipelines to correspond to each pull request, and to keep versions as linear. For more information on this pattern, see the GitHub Branches example.
In this example, each execution has a corresponding ARN, which uniquely defines it within the context of a pipeline (in fact, it uniquely defines it within an AWS account). Therefore, our version should contain this ARN as a string. This can be done by inheriting from the Version class, but it is generally easier (and requires less code) to instead make use of dataclasses and to inherit from TypedVersion:
@dataclass(unsafe_hash=True)
class ExecutionVersion(TypedVersion):
execution_arn: str
Passing around an instance of a class instead of a JSON object or even just a string is much easier, and allows us to make use of type hinting and other linter features. It also allows us to more finely specify how version equality and comparisons work, and also to define how to map the attributes of the version to and from the JSON object that Concourse will use.
Pipeline Resource¶
The first thing to do is to establish what the source of the resource will be, and what we require of the user to properly configure it. For Concourse Tools, a resource is a subclass of the ConcourseResource class, and the arguments in the __init__ method correspond to the source. In this example, we require one configuration option, and an additional optional option:
pipeline: This will be the ARN of the pipeline itself. We can parse this for the pipeline name we use for the boto3 calls, as well as the region we need for constructing the client. It should be a string.statuses: A pipeline execution can have one of five statuses at any time:Executing,Stopping,Stopped,FailedandSucceeded. By default, we don’t want to trigger a version for executions which are still going, but we want the user to have this choice. Therefore, we pass a default value of a list containing the statuses which trigger a version, and allow the user to pass their own.
These options correspond to the following __init__ method:
def __init__(self, pipeline: str, statuses: list[str] = ["Succeeded", "Stopped", "Failed"]) -> None:
super().__init__(ExecutionVersion)
# arn:aws:sagemaker:<region>:<account>:pipeline:<name>
_, _, _, region, _, _, pipeline_name = pipeline.split(":")
self._client = boto3.client("sagemaker", region_name=region)
self.pipeline_name = pipeline_name
self.statuses = statuses
A few things are happening here:
We inherited from
ConcourseResource, and we are required to pass in ourVersionsubclass so that the resource can properly parse the versions as JSON.We split the ARN into subgroups to fetch the region and pipeline name. Using tuple unpacking gives us an extra opportunity to catch when a user has passed a name instead of an ARN.
We create the client here to avoid storing the region in the class. It also helps in testing to allow us to avoid recreating it each time.
We set the pipeline name and statuses as instance attributes. Note that we don’t bother storing the pipeline ARN.
Checking for Executions¶
The first behaviour we will implement is the check, when the external resource is queried for new executions. There are a few cases that we need to handle:
If no previous version is passed (i.e., the parameter is
None), then we know that this is the first request and we need to return the latest valid version.If no versions are available at all then an empty list should be returned.
If there are new executions which have finished since the previous version, then they need to be returned in “oldest-to-newest” order, including the previous version “if it’s still valid”.
If there have been no more executions since the previous version then the response should only include the previous version.
If the previous version is no longer in the set of versions (i.e., something has gone wrong or the external resource has somehow changed) then the latest version should be returned. In practice this rarely happens, but should be planned for.
We start by defining a private method on the resource to yield “potential versions” from the external source. This is just to allow us to check equality with the previous version directly, which is cleaner than worrying about a potential AttributeError if the previous version is None. It also allows us to handle the way in which AWS batches up its response when calling list_pipeline_executions. All we need be concerned with is that this method will yield instances of ExecutionVersion in newest-to-oldest order until the server runs out.
def _yield_potential_execution_versions(self) -> Generator[ExecutionVersion]:
kwargs = {
"PipelineName": self.pipeline_name,
"SortOrder": "Descending",
}
first_response = self._client.list_pipeline_executions(**kwargs)
response = first_response
while True:
for summary in response["PipelineExecutionSummaries"]:
if summary["PipelineExecutionStatus"] in self.statuses:
yield ExecutionVersion(summary["PipelineExecutionArn"])
try:
next_token = response["NextToken"]
except KeyError:
break
response = self._client.list_pipeline_executions(**kwargs, NextToken=next_token)
We can then handle the actual behaviour, which is done by overloading fetch_new_versions() like so:
def fetch_new_versions(self, previous_version: ExecutionVersion | None = None) -> list[ExecutionVersion]:
potential_versions = iter(self._yield_potential_execution_versions())
if previous_version is None:
try:
first_version = next(potential_versions)
except StopIteration:
new_versions = []
else:
new_versions = [first_version]
else:
new_versions = []
for potential_version in potential_versions:
new_versions.append(potential_version)
if potential_version == previous_version:
break
else:
new_versions = [new_versions[0]]
new_versions.reverse()
return new_versions
We start by defining an iterator on the output of _yield_potential_execution_versions. We then handle the case where the previous version has not been passed. We try to pull the first version from the iterator. If this fails (it will raise a StopIteration error) then there are no available versions (case 2) and we return a list. If this succeeds, then we return this first (and newest) version (case 1). If a previous version has been passed, we begin to iterate through the versions, checking for equality with the previous version. If we reach the version then we return this list in reverse order (see the reversal at the end), which is case 3 or 4. If we do not reach the version (note the else clause in the for loop) then we know that the external source has changed, and we return the latest version (case 5).
Downloading Executions¶
The next functionality to consider is the downloading of a version in a get step. This is implemented within Concourse Tools by overloading download_version(). The behaviour we want for this step is:
Download the metadata of the execution (as described in describe_pipeline_execution) to a JSON file.
Optionally (but by default) download the definition of the pipeline to a different JSON file. This is done by calling describe_pipeline_definition_for_execution.
Return information about the pipeline as metadata to be displayed in the UI.
The code for doing this is as follows:
def download_version(self, version: ExecutionVersion, destination_dir: Path,
build_metadata: BuildMetadata, download_pipeline: bool = True,
metadata_file: str = "metadata.json",
pipeline_file: str = "pipeline.json") -> tuple[ExecutionVersion, dict[str, str]]:
response = self._client.describe_pipeline_execution(PipelineExecutionArn=version.execution_arn)
response.pop("ResponseMetadata")
metadata_path = destination_dir / metadata_file
metadata_path.write_text(json.dumps(response, cls=DatetimeSafeJSONEncoder))
if download_pipeline:
pipeline_response = self._client.describe_pipeline_definition_for_execution(PipelineExecutionArn=version.execution_arn)
pipeline_path = destination_dir / pipeline_file
pipeline_path.write_text(pipeline_response["PipelineDefinition"])
metadata = {
"Display Name": response.get("PipelineExecutionDisplayName"),
"Status": response["PipelineExecutionStatus"],
"Created By": response["CreatedBy"]["UserProfileName"],
"Description": response.get("PipelineExecutionDescription"),
}
if metadata["Status"] == "Failed":
metadata["Failure Reason"] = response["FailureReason"]
metadata = {key: value for key, value in metadata.items() if value is not None}
return version, metadata
We start by describing the execution, removing the ResponseMetadata (no reason to download this) and writing it to a file. We allow the user to configure the name of this file by making it an optional parameter of the function. Because the execution information contains datetime instances, we need a custom encoder:
class DatetimeSafeJSONEncoder(json.JSONEncoder):
def default(self, o: object) -> object:
if isinstance(o, datetime):
return o.isoformat()
return super().default(o)
Next, we check if the user wishes to download the pipeline, and then write the definition to file. Again, each of these are configurable and the defaults are handled with default parameters. Finally, we create the metadata. It is a lot easier to deal with a Python dict than JSON in bash, and we can take advantage of the numerous ways to update and mutate the dictionary. Here, we specifically filter out any pieces of metadata with a value of None to reduce the amount of code we need to write. Finally, both the original version and the metadata are returned.
Publishing New Executions¶
FInally, we consider the functionality of the put step, and creating new versions. To do this in Concourse Tools, we overload the publish_new_version() method. The code will rely on a call to start_pipeline_execution.
def publish_new_version(self, sources_dir: Path, build_metadata: BuildMetadata,
display_name: str | None = None, description: str | None = None,
parameters: dict[str, str] = {}) -> tuple[ExecutionVersion, dict[str, str]]:
default_description = (f"Execution from build #{build_metadata.BUILD_ID} "
f"of pipeline {build_metadata.BUILD_PIPELINE_NAME}")
kwargs: dict[str, object] = {
"PipelineName": self.pipeline_name,
"PipelineExecutionDescription": description or default_description,
}
if display_name:
kwargs["PipelineExecutionDisplayName"] = display_name
if parameters:
kwargs["PipelineParameters"] = [{"Name": name, "Value": value}
for name, value in parameters.items()]
metadata = {f"Parameter: {parameter}": value
for parameter, value in parameters.items()}
else:
metadata = {}
response = self._client.start_pipeline_execution(**kwargs)
execution_arn = response["PipelineExecutionArn"]
new_version = ExecutionVersion(execution_arn)
return new_version, metadata
We start with the execution description. We include an optional parameter to allow the user to specify this in their get params, but if they don’t then we create a default description based on the BuildMetadata. In instance of this class is passed to the download_version() and publish_new_version() methods, and wraps the build environment to more easily make use of environment variables which are made available to resources, as well as some useful methods for computing the build_url() and working with the instance_vars(). In particular here we’ve made use of the BUILD_ID and BUILD_PIPELINE_NAME attributes. We also allow the user to pass in parameters for the execution. These are passed in as a mapping, but need to be converted to a list to fit with the function call. We also add them to the metadata for the user’s benefit. Finally, we return the new version and the metadata.
Note
The response from the start_pipeline_execution function contains only the ARN of the new execution, and is a big reason why the version doesn’t contain the start time of the execution. To fill that in, we would need to make a second request to the server to fetch the information about the execution before creating and returning the version, which is not ideal.
Pipeline Conclusion¶
The final resource only requires 135 lines of code, and looks like this:
1# (C) Crown Copyright GCHQ
2from __future__ import annotations
3
4from collections.abc import Generator
5from dataclasses import dataclass
6from datetime import datetime
7import json
8from pathlib import Path
9
10import boto3
11
12from concoursetools import ConcourseResource
13from concoursetools.metadata import BuildMetadata
14from concoursetools.version import TypedVersion
15
16
17class DatetimeSafeJSONEncoder(json.JSONEncoder):
18
19 def default(self, o: object) -> object:
20 if isinstance(o, datetime):
21 return o.isoformat()
22 return super().default(o)
23
24
25@dataclass(unsafe_hash=True)
26class ExecutionVersion(TypedVersion):
27 execution_arn: str
28
29
30class PipelineResource(ConcourseResource[ExecutionVersion]):
31
32 def __init__(self, pipeline: str, statuses: list[str] = ["Succeeded", "Stopped", "Failed"]) -> None:
33 super().__init__(ExecutionVersion)
34 # arn:aws:sagemaker:<region>:<account>:pipeline:<name>
35 _, _, _, region, _, _, pipeline_name = pipeline.split(":")
36 self._client = boto3.client("sagemaker", region_name=region)
37 self.pipeline_name = pipeline_name
38 self.statuses = statuses
39
40 def fetch_new_versions(self, previous_version: ExecutionVersion | None = None) -> list[ExecutionVersion]:
41 potential_versions = iter(self._yield_potential_execution_versions())
42 if previous_version is None:
43 try:
44 first_version = next(potential_versions)
45 except StopIteration:
46 new_versions = []
47 else:
48 new_versions = [first_version]
49 else:
50 new_versions = []
51 for potential_version in potential_versions:
52 new_versions.append(potential_version)
53 if potential_version == previous_version:
54 break
55 else:
56 new_versions = [new_versions[0]]
57
58 new_versions.reverse()
59 return new_versions
60
61 def download_version(self, version: ExecutionVersion, destination_dir: Path,
62 build_metadata: BuildMetadata, download_pipeline: bool = True,
63 metadata_file: str = "metadata.json",
64 pipeline_file: str = "pipeline.json") -> tuple[ExecutionVersion, dict[str, str]]:
65 response = self._client.describe_pipeline_execution(PipelineExecutionArn=version.execution_arn)
66 response.pop("ResponseMetadata")
67
68 metadata_path = destination_dir / metadata_file
69 metadata_path.write_text(json.dumps(response, cls=DatetimeSafeJSONEncoder))
70
71 if download_pipeline:
72 pipeline_response = self._client.describe_pipeline_definition_for_execution(PipelineExecutionArn=version.execution_arn)
73 pipeline_path = destination_dir / pipeline_file
74 pipeline_path.write_text(pipeline_response["PipelineDefinition"])
75
76 metadata = {
77 "Display Name": response.get("PipelineExecutionDisplayName"),
78 "Status": response["PipelineExecutionStatus"],
79 "Created By": response["CreatedBy"]["UserProfileName"],
80 "Description": response.get("PipelineExecutionDescription"),
81 }
82
83 if metadata["Status"] == "Failed":
84 metadata["Failure Reason"] = response["FailureReason"]
85
86 metadata = {key: value for key, value in metadata.items() if value is not None}
87
88 return version, metadata
89
90 def publish_new_version(self, sources_dir: Path, build_metadata: BuildMetadata,
91 display_name: str | None = None, description: str | None = None,
92 parameters: dict[str, str] = {}) -> tuple[ExecutionVersion, dict[str, str]]:
93 default_description = (f"Execution from build #{build_metadata.BUILD_ID} "
94 f"of pipeline {build_metadata.BUILD_PIPELINE_NAME}")
95 kwargs: dict[str, object] = {
96 "PipelineName": self.pipeline_name,
97 "PipelineExecutionDescription": description or default_description,
98 }
99
100 if display_name:
101 kwargs["PipelineExecutionDisplayName"] = display_name
102
103 if parameters:
104 kwargs["PipelineParameters"] = [{"Name": name, "Value": value}
105 for name, value in parameters.items()]
106 metadata = {f"Parameter: {parameter}": value
107 for parameter, value in parameters.items()}
108 else:
109 metadata = {}
110
111 response = self._client.start_pipeline_execution(**kwargs)
112 execution_arn = response["PipelineExecutionArn"]
113 new_version = ExecutionVersion(execution_arn)
114 return new_version, metadata
115
116 def _yield_potential_execution_versions(self) -> Generator[ExecutionVersion]:
117 kwargs = {
118 "PipelineName": self.pipeline_name,
119 "SortOrder": "Descending",
120 }
121
122 first_response = self._client.list_pipeline_executions(**kwargs)
123
124 response = first_response
125 while True:
126 for summary in response["PipelineExecutionSummaries"]:
127 if summary["PipelineExecutionStatus"] in self.statuses:
128 yield ExecutionVersion(summary["PipelineExecutionArn"])
129
130 try:
131 next_token = response["NextToken"]
132 except KeyError:
133 break
134
135 response = self._client.list_pipeline_executions(**kwargs, NextToken=next_token)