Configuration parsers#

Create the default wellies argument parser.

Returns: ArgumentParser: An ArgumentParser object including the wellies options.

Source code in wellies/config.py
def get_parser() -> ArgumentParser:
    """
    Create the default wellies argument parser.

    Returns:
    ArgumentParser: An ArgumentParser object including the wellies options.
    """
    description = (
        "\n" "Generate required files for a pyflow suite project." "\n"
    )
    parser = ArgumentParser(
        usage="%(prog)s <PROFILE>",
        description=description,
    )
    parser.add_argument(
        "name",
        metavar="PROFILE",
        help="YAML configuration profile name",
    )
    parser.add_argument(
        "-p",
        "--profiles",
        default="profiles.yaml",
        metavar="CONFIG_NAME",
        help="YAML configuration profiles ",
    )
    parser.add_argument(
        "-s",
        "--set",
        metavar="KEY=VALUE",
        nargs="+",
        help="Set a number of key-value pairs "
        "(do not put spaces before or after the = sign). "
        "If a value contains spaces, you should define "
        "it with double quotes: "
        'foo="this is a sentence". Note that '
        "values are always treated as strings.",
    )
    parser.add_argument(
        "-m",
        "--message",
        help="Deployment git commit message",
    )
    parser.add_argument(
        "-b",
        "--build_dir",
        help="Build directory for suite deployment, by default a temporary directory is created",  # noqa: E501
    )
    parser.add_argument(
        "-f",
        "--files",
        nargs="+",
        help="Specific files to deploy, by default everything is deployed",
    )
    parser.add_argument(
        "-y",
        help="Answers yes to all prompts",
        action="store_true",
    )
    parser.add_argument(
        "-n",
        "--no_deploy",
        help="Skip deployment",
        action="store_true",
    )
    parser.add_argument(
        "--log_level",
        default="INFO",
        choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
        help="Set the logging level for the suite deployment",
    )
    return parser
usage: python -m deploy <PROFILE>

Generate required files for a pyflow suite project.

positional arguments:
  PROFILE               YAML configuration profile name

options:
  -h, --help            show this help message and exit
  -p, --profiles CONFIG_NAME
                        YAML configuration profiles
  -s, --set KEY=VALUE [KEY=VALUE ...]
                        Set a number of key-value pairs (do not put spaces
                        before or after the = sign). If a value contains
                        spaces, you should define it with double quotes:
                        foo="this is a sentence". Note that values are always
                        treated as strings.
  -m, --message MESSAGE
                        Deployment git commit message
  -b, --build_dir BUILD_DIR
                        Build directory for suite deployment, by default a
                        temporary directory is created
  -f, --files FILES [FILES ...]
                        Specific files to deploy, by default everything is
                        deployed
  -y                    Answers yes to all prompts
  -n, --no_deploy       Skip deployment
  --log_level {DEBUG,INFO,WARNING,ERROR,CRITICAL}
                        Set the logging level for the suite deployment

Create an environment tool based on the given options.

Parameters#

lib_dir : str The lib directory of the suite where the tools are installed. name : str The name of the environment. options : dict A dictionary containing the environment options.

Returns#

Tool: A Tool object representing the environment.

Raises#

Exception: If the environment type is not supported or if certain options are used together. NotImplementedError: If the environment type is not implemented.

Source code in wellies/tools.py
def parse_environment(
    lib_dir: str, name: str, options: Dict[str, any]
) -> Tool:
    """
    Create an environment tool based on the given options.

    Parameters
    ----------
    lib_dir : str
        The lib directory of the suite where the tools are installed.
    name : str
        The name of the environment.
    options : dict
        A dictionary containing the environment options.

    Returns
    -------
    Tool: A Tool object representing the environment.

    Raises
    -------
    Exception: If the environment type is not supported or if certain
        options are used together.
    NotImplementedError: If the environment type is not implemented.
    """
    type = options["type"]
    depends = options.get("depends", [])
    if type == "folder":
        env = FolderTool(name, lib_dir, depends=depends)
    elif type == "system_venv":
        extra_packages = options.get("extra_packages", [])
        venv_options = options.get("venv_options", [])
        env = SystemEnvTool(
            name,
            lib_dir,
            venv_options=venv_options,
            extra_packages=extra_packages,
            depends=depends,
        )
    elif type == "conda" and "environment" in options:
        if "env_file" in options or "extra_packages" in options:
            raise Exception(
                "environment, file and extra_packages options cannot be used at the same time for conda"  # noqa: E501
            )
        environment = options["environment"]
        env = CondaEnvTool(
            name,
            environment,
            depends=depends,
            conda_activate_cmd=options.get("conda_activate_cmd", "conda"),
        )
    elif type == "conda" and "env_file" in options:
        if "environment" in options:
            raise Exception(
                "environment and file options cannot be used at the same time for conda"  # noqa: E501
            )
        env = FileCondaEnvTool(
            name,
            lib_dir,
            options["env_file"],
            depends,
            conda_cmd=options.get("conda_cmd", "conda"),
            conda_activate_cmd=options.get("conda_activate_cmd", "conda"),
        )
    elif type == "conda" and "extra_packages" in options:
        if "environment" in options:
            raise Exception(
                "environment and extra_packages options cannot be used at the same time for conda"  # noqa: E501
            )
        conda_packages = options["extra_packages"]
        env = SimpleCondaEnvTool(
            name,
            lib_dir,
            conda_packages,
            depends,
            conda_cmd=options.get("conda_cmd", "conda"),
            conda_activate_cmd=options.get("conda_activate_cmd", "conda"),
        )
    elif type == "custom":
        env = Tool(
            name,
            depends,
            options.get("load", ""),
            options.get("unload", ""),
            options.get("setup", None),
        )
    elif type == "venv":
        venv_options = options.get("venv_options", "")
        extra_packages = options.get("extra_packages", [])
        env = VirtualEnvTool(
            name,
            lib_dir,
            venv_options=venv_options,
            extra_packages=extra_packages,
            depends=depends,
            options=options,
        )
    else:
        raise Exception("Environment type {} not supported".format(type))
    return env

Parse submission arguments to be used on pyflow.Task definitions.

If a defaults mapping is defined it will be used as a default definition for all other mappings defined.

:Attention: "defaults" can't be used as a context key name.

Parameters#

options : dict submit_arguments configuration dictionary (from yaml file)

Returns#

submit_arguments, submit_arguments_defaults : dict, dict Return parsed options as two dictionaries. The base submit arguments and a second one with the defaults options in a compatible format to be passed to a pyflow.Node variables argument.

Source code in wellies/config.py
def parse_submit_arguments(options: dict) -> tuple:
    """Parse submission arguments to be used on [pyflow.Task][]
    definitions.

    If a `defaults` mapping is defined it will be used as a default
    definition for all other mappings defined.

    :Attention: "defaults" can't be used as a context key name.

    Parameters
    ----------
    options : dict
        submit_arguments configuration dictionary (from yaml file)

    Returns
    -------
    submit_arguments, submit_arguments_defaults : dict, dict
        Return parsed options as two dictionaries. The base submit arguments
        and a second one with the defaults options in a compatible format
        to be passed to a pyflow.Node variables argument.
    """
    PROTECTED = ["sthost"]
    replacements = {"tmpdir": "ssdtmp"}
    default_vars = {}
    if options:
        # remove global special key and add global values
        # to each defined context
        global_rsrc = options.pop("defaults", {})
        for ctx, resources in options.items():
            new = global_rsrc.copy()
            new.update(resources)
            options[ctx] = new

        # remove STHOST. We don't want to create new var at the suite level
        for prt in PROTECTED:
            global_rsrc.pop(prt, {})
        # replace other protected names for new safe variable names
        new = {}
        for name, val in global_rsrc.items():
            if name.lower() in replacements:
                new[replacements[name.lower()]] = val
            else:
                new[name] = val
        default_vars = {k.upper(): v for k, v in new.items()}

    return options, default_vars

Concatenates the config dictionaries and check for duplicates Override values in files with entries given on set_variables.

Source code in wellies/config.py
def parse_yaml_files(
    config_files: list, set_variables=None, global_vars=None
) -> dict:
    """
    Concatenates the config dictionaries and check for duplicates
    Override values in files with entries given on set_variables.
    """

    # concatenate all yaml files into one dict
    options = concatenate_yaml_files(config_files)

    # replace entries given on command line
    options = overwrite_entries(options, set_variables)

    # subsitute variables
    options = substitute_variables(options, global_vars)

    validate_main_keys(options)

    return options

Deploy Tools#

Deploy a suite to a remote repository.

Parameters#

suite (dict): The suite to deploy. user (str): The username to use for the deployment. name (str): The name of the suite. hostname (str): The hostname of the remote repository. deploy_dir (str): The target to deploy the suite to. backup_deploy (str, optional): The backup repository to use. Defaults to None. build_dir (str, optional): The build directory to use. If None, a temporary directory will be created. Defaults to None. no_prompt (bool, optional): Skip all prompts and answer yes to all. Defaults to False. no_deploy (bool, optional): Whether to skip the deployment. Defaults to False. message (str, optional): The commit message to use for the deployment. Defaults to None. files (list, optional): The files to deploy. If None, everything is deployed. Defaults to None.

Source code in wellies/deployment.py
def deploy_suite(
    suite: pf.Suite,
    user: str,
    name: str,
    hostname: str,
    deploy_dir: str,
    backup_deploy: str = None,
    build_dir: str = None,
    no_prompt: bool = False,
    no_deploy: bool = False,
    message: str = None,
    files: list = None,
):
    """
    Deploy a suite to a remote repository.

    Parameters
    ----------
    suite (dict):
        The suite to deploy.
    user (str):
        The username to use for the deployment.
    name (str):
        The name of the suite.
    hostname (str):
        The hostname of the remote repository.
    deploy_dir (str):
        The target to deploy the suite to.
    backup_deploy (str, optional):
        The backup repository to use. Defaults to None.
    build_dir (str, optional):
        The build directory to use. If None, a temporary
        directory will be created. Defaults to None.
    no_prompt (bool, optional):
        Skip all prompts and answer yes to all. Defaults to False.
    no_deploy (bool, optional):
        Whether to skip the deployment. Defaults to False.
    message (str, optional):
        The commit message to use for the deployment. Defaults to None.
    files (list, optional):
        The files to deploy. If None, everything is deployed.
        Defaults to None.
    """
    if build_dir is None:
        build_dir = tempfile.mkdtemp(prefix=f"build_{name}_")

    logger.info("------------------------------------------------------")
    logger.info(f"Staging suite to {build_dir}")
    logger.info("------------------------------------------------------")
    build_dir = os.path.realpath(build_dir)
    staging_dir = os.path.join(build_dir, "staging")
    local_repo = os.path.join(build_dir, "local")
    target_repo = deploy_dir

    _generate_suite(suite, staging_dir, name)

    try:
        deployer = ts.GitDeployment(
            host=hostname,
            user=user,
            staging_dir=staging_dir,
            local_repo=local_repo,
            target_repo=target_repo,
            backup_repo=backup_deploy,
        )
    except git.exc.GitCommandError:
        if not no_prompt:
            check = input(
                "Remote repository does not seem to exist. Do you want to initialise it? (N/y)"  # noqa: E501
            )
            if check != "y":
                logger.error("Aborting deployment")
                exit(1)
        ts.setup_remote(
            hostname, user, target_repo, remote=backup_deploy, force=True
        )
        deployer = ts.GitDeployment(
            host=hostname,
            user=user,
            staging_dir=staging_dir,
            local_repo=local_repo,
            target_repo=target_repo,
            backup_repo=backup_deploy,
        )
    deployer.pull_remotes()
    deployer.diff_staging()

    if not no_deploy:
        logger.info("------------------------------------------------------")
        logger.info(f"Deploying suite to {target_repo}")
        logger.info("------------------------------------------------------")
        if files is not None:
            logger.info("Deploying only the following files:")
            for f in files:
                logger.info(f"    - {f}")
        if not no_prompt:
            check = input(
                "You are about to push the staged suite to the target directory. Are you sure? (N/y)"  # noqa: E501
            )
            if check != "y":
                logger.info("Aborting deployment")
                exit(1)

        message = git_commit_message(message)

        if deployer.deploy(message, files):
            logger.info(f"Suite deployed to {target_repo}")
            logger.info(f"Definition file: {target_repo}/{name}.def")
    else:
        logger.info("No deploy option activated. Deployment aborted")

Returns a pyflow host object based on the given hostname and user.

Parameters#

hostname (str): The name of the host to connect to. user (str): The username to use when connecting to the host. ecflow_path (str, optional): The path to the ecflow_client executable. If None, try to get the current path from the ecflow_client executable. Defaults to None. server_ecfvars (bool, optional): Whether to use server-side ECF_ variables. Defaults to False. extra_variables (dict, optional): Additional ecflow variables to set on the host **kwargs: Additional keyword arguments to pass to the pyflow host constructor.

Returns:
  • Host

    Union[pf.LocalHost, pf.TroikaHost]: A pyflow host object.

Source code in wellies/hosts.py
def get_host(
    hostname: str,
    user: str,
    ecflow_path: str = None,
    server_ecfvars: bool = False,
    extra_variables: dict = None,
    submit_arguments: dict = None,
    **kwargs,
) -> pf.Host:
    """
    Returns a pyflow host object based on the given hostname and user.

    Parameters
    ----------
    hostname (str):
        The name of the host to connect to.
    user (str):
        The username to use when connecting to the host.
    ecflow_path (str, optional):
        The path to the ecflow_client executable.
        If None, try to get the current path from the ecflow_client
        executable.
        Defaults to None.
    server_ecfvars (bool, optional):
        Whether to use server-side ECF_ variables. Defaults to False.
    extra_variables (dict, optional):
        Additional ecflow variables to set on the host
    **kwargs:
        Additional keyword arguments to pass to the pyflow host
        constructor.

    Returns:
        Union[pf.LocalHost, pf.TroikaHost]: A pyflow host object.
    """
    if ecflow_path is None:
        ecflow_path = os.path.dirname(shutil.which("ecflow_client"))

    options = submit_arguments or {}
    submit_arguments, variables = parse_submit_arguments(options)

    host_type = "localhost" if hostname in local_host else "troika"
    if ":" in hostname:
        try:
            host_type, hostname = hostname.split(":")
        except ValueError:
            pass

    extra_variables = extra_variables or {}
    extra_variables["HOST"] = f"%SCHOST:{hostname}%"

    defaults = HOST_DEFAULTS.get(host_type, {})
    kwargs = {**defaults, **kwargs}

    host = pf.host.host_factory(
        host_type,
        name="%HOST%",
        user=user,
        extra_variables=extra_variables,
        server_ecfvars=server_ecfvars,
        ecflow_path=ecflow_path,
        submit_arguments=submit_arguments,
        **kwargs,
    )
    print(f"Submitting jobs using {host_type} on host: {hostname}")

    return host, variables