# (C) Crown Copyright GCHQ
"""
Resources are the heart and soul of Concourse.They represent all
external inputs to and outputs of :concourse:`jobs` in the pipeline.
Each resource represents a versioned artifact with an external source of truth.
Configuring the same resource in any pipeline on any Concourse cluster will
behave the exact same way. Concourse will continuously check each configured
resource to discover new versions. These versions then flow through the pipeline
via :concourse:`get steps <get-step.get-step>` configured on jobs.
Find out more about resources in the :concourse:`Concourse resource documentation <resources>`.
To learn more about how Concourse resource types are actually implemented under the hood,
check out :concourse:`implementing-resource-types` in Concourse.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
import contextlib
from pathlib import Path
import sys
from typing import Generic
from concoursetools import parsing
from concoursetools.metadata import BuildMetadata
from concoursetools.typing import Metadata, Params, ResourceConfig
from concoursetools.version import VersionT
[docs]
class ConcourseResource(ABC, Generic[VersionT]):
"""
Represents an external input or output to a pipeline.
The :concourse:`resource-types.schema.resource_type.source`
defined in a Concourse :concourse:`pipeline <pipelines>` is
parsed into JSON by Concourse, and will be passed to the initialiser
of the :class:`ConcourseResource` class.
All resource logic is contained in three methods to be overloaded:
:meth:`fetch_new_versions`, :meth:`download_version` and :meth:`publish_new_version`.
:param version_class: The resource parses all inputs with this version class.
:Example:
A resource that looks like this:
.. code-block:: yaml
resources:
- name: my-resource
type: my-resource-type
source:
project_key: concourse
repo: concourse
file_path: README.md
would translate to a resource a little like this:
.. code-block:: python3
class MyResource(ConcourseResource):
def __init__(self, project_key, repo, file_path, host="https://github.com/"):
super().__init__(MyVersion)
self.project_key = project_key
self.repo = repo
self.file_path = file_path
self.host = host.rstrip("/")
If the source contains lists or mappings, then these will be passed as
:class:`list` and :class:`dict` types respectively. The call to
:code:`super().__init__` sets the version class (a subclass of
:class:`~concoursetools.version.Version`) to be used by the resource,
ensuring that output can be properly parsed.
.. tip::
Note that the ``__init__`` method has a default value for host,
meaning that pipeline users need not include it in their source configuration.
The parameters need not be set as attributes if they can all be combined
into a single class, such as an API wrapper or other construct.
"""
def __init__(self, version_class: type[VersionT]):
self.version_class = version_class
@property
def certs_dir(self) -> Path:
"""
The path to the Concourse worker's certificate directory.
.. warning::
This folder may not always exist, depending on how the Concourse runner was configured.
See the :concourse:`implementing-resource-types.resource-certs` documentation for more information.
"""
return Path("/etc/ssl/certs")
[docs]
@abstractmethod
def fetch_new_versions(self, previous_version: VersionT | None = None) -> list[VersionT]:
"""
Fetch new versions of the resource.
The method will be passed the previous version (an instance of the :class:`~concoursetools.version.Version`
class) if it exists, or :data:`None` if this is the first version. It should return a list of version instances
in **chronological order with the oldest first**, "including the requested version if it's still valid.".
.. attention::
That means that if nothing has changed, you should return :code:`[previous_version]`.
Since get steps are cached by Concourse, this will **not** kick anything off.
.. important::
If there was no previous version, then this method should **only** return the latest version,
and **not** every version from the past. This is also the case if it is impossible to determine
newer versions due to something wrong with the external resource, such as a git repo which has been
force pushed and can no longer be properly compared.
:param previous_version: The most recent version of the resource. This will be set to :data:`None`
if the resource has never been run before.
:returns: A list of new versions.
"""
[docs]
@abstractmethod
def download_version(self, version: VersionT, destination_dir: Path, build_metadata: BuildMetadata) -> tuple[VersionT, Metadata]:
"""
Download a version and place its files within the resource directory in your pipeline.
This method is called on a get step, and the step parameters are passed as additional
keyword arguments. The method should return the version (unchanged, although *technically*
one could alter it slightly to no real effect), and a dictionary of metadata (see :ref:`Step Metadata`).
.. note::
If the desired resource version is unavailable (for example, if it was deleted),
the script must exit with error.
:Example:
If the resource code looks like this:
.. code:: python3
class MyResource(ConcourseResource):
def download_version(self, version, destination_dir, build_metadata,
download_metadata=False, metadata_file_name="metadata.json"):
...
metadata = {
"HTTP Status": 200,
}
return version, metadata
then the resource user would invoke it in the pipeline like this:
.. code:: yaml
- get: my-resource
params:
download_metadata: true
.. tip::
object version returned by the :meth:`publish_new_version` method is passed to
this method due to an implicit get step. The pipeline user has the option to
set some additional parameters for this step, and so if you intend to upload
something large in your put step, it might be worth including a flag in this
method to skip re-downloading that data.
:param version: The version to be downloaded.
:param destination_dir: A path to a folder into which resource files should be placed.
:param build_metadata: Metadata associated with this build.
:returns: The version (most likely unchanged), and a dictionary of metadata.
"""
[docs]
@abstractmethod
def publish_new_version(self, sources_dir: Path, build_metadata: BuildMetadata) -> tuple[VersionT, Metadata]:
"""
Update a resource by publishing a new version.
This method is called on a put step, and the step parameters are passed as additional
keyword arguments. The method should return the new, and a dictionary of metadata
(see :ref:`Step Metadata`).
.. warning::
The ``sources_dir`` argument does **not** behave the same as the ``destination_dir``
argument passed to the :meth:`~concoursetools.resource.ConcourseResource.download_version`
method. This is to more easily enable the resource to interact with other resources
(and task outputs), but makes it difficult to "track down" the files relating to
*this* resource. This is a deliberate design decision by Concourse, and you should
expect users to explicitly pass the path to those files should they be needed in this method.
:Example:
If the resource code looks like this:
.. code:: python3
class MyResource(ConcourseResource):
def publish_new_version(self, sources_dir, build_metadata,
file_path, overwrite=False):
...
metadata = {
"HTTP Status": 200,
}
return version, metadata
then the resource user would invoke it in the pipeline like this:
.. code:: yaml
- put: my-resource
params:
file_path: path/to/file.txt
overwrite: true
:param sources_dir: A path to folder containing all resources, **not** just this resource.
:param build_metadata: Metadata associated with this build.
:returns: The new version, and a dictionary of metadata.
"""
[docs]
@classmethod
def check_main(cls) -> None:
"""
Check for new versions.
.. caution::
This method should not be overloaded.
:returns: This method only prints output to ``stdout`` and ``stderr``.
"""
resource, previous_version = cls._parse_check_input()
with contextlib.redirect_stdout(sys.stderr):
new_versions = resource.fetch_new_versions(previous_version)
output = parsing.format_check_output([version.to_flat_dict() for version in new_versions])
_output(output)
[docs]
@classmethod
def in_main(cls) -> None:
"""
Fetch a given resource.
.. caution::
This method should not be overloaded.
:returns: This method only prints output to ``stdout`` and ``stderr``.
"""
resource, version, destination_dir, params = cls._parse_in_input()
build_metadata = BuildMetadata.from_env()
with contextlib.redirect_stdout(sys.stderr):
version, metadata = resource.download_version(version, destination_dir, build_metadata, **params)
output = parsing.format_in_out_output(version.to_flat_dict(), metadata)
_output(output)
[docs]
@classmethod
def out_main(cls) -> None:
"""
Update a resource.
.. caution::
This method should not be overloaded.
:returns: This method only prints output to ``stdout`` and ``stderr``.
"""
resource, sources_dir, params = cls._parse_out_input()
build_metadata = BuildMetadata.from_env()
with contextlib.redirect_stdout(sys.stderr):
version, metadata = resource.publish_new_version(sources_dir, build_metadata, **params)
output = parsing.format_in_out_output(version.to_flat_dict(), metadata)
_output(output)
@classmethod
def _parse_check_input(cls) -> tuple[ConcourseResource[VersionT], VersionT | None]:
"""Parse input from the command line."""
check_payload = sys.stdin.read()
resource_config, previous_version_config = parsing.parse_check_payload(check_payload)
resource = cls._from_resource_config(resource_config)
if previous_version_config is None:
previous_version = None
else:
previous_version = resource.version_class.from_flat_dict(previous_version_config)
return resource, previous_version
@classmethod
def _parse_in_input(cls) -> tuple[ConcourseResource[VersionT], VersionT, Path, Params]:
"""Parse input from the command line."""
in_payload = sys.stdin.read()
try:
destination_dir = Path(sys.argv[1])
except IndexError as error:
raise ValueError("Path to the destination directory for the resource "
"must be passed to the command line") from error
resource_config, version_config, params = parsing.parse_in_payload(in_payload)
resource = cls._from_resource_config(resource_config)
version = resource.version_class.from_flat_dict(version_config)
return resource, version, destination_dir, params
@classmethod
def _parse_out_input(cls) -> tuple[ConcourseResource[VersionT], Path, Params]:
"""Parse input from the command line."""
out_payload = sys.stdin.read()
try:
sources_dir = Path(sys.argv[1])
except IndexError as error:
raise ValueError("Path to the directory containing the build's full set of sources "
"must be passed to the command line") from error
resource_config, params = parsing.parse_out_payload(out_payload)
resource = cls._from_resource_config(resource_config)
return resource, sources_dir, params
@classmethod
def _from_resource_config(cls, resource_config: ResourceConfig) -> ConcourseResource[VersionT]:
return cls(**resource_config)
def _output(payload: str) -> None:
"""
Output data to Concourse to be carried to the next step.
This function is more or less equivalent to :func:`print`.
"""
print(payload, file=sys.stdout)