Bases: Family

A family whose tasks are only run once a month while another/superior YMD is daily.

Parameters#

name : str Family name. start : datetime | None First date to calculate first occurrence from, today if None (default). day_of_month : int Day of the month. Negative values are relative to 1st of month, i.e. -1 is last day of month. ymd : pf.Variable | None Other YMD to track, by default uses the superior YMD. **kwargs : Any arguments passed to the family.

Source code in wellies/triggers.py
class OnceAMonthFamily(pf.Family):
    """
    A family whose tasks are only run once a month while
    another/superior YMD is daily.

    Parameters
    ----------
    name : str
        Family name.
    start : datetime | None
        First date to calculate first occurrence from, today if None (default).
    day_of_month : int
        Day of the month. Negative values are relative to 1st of month,
        i.e. -1 is last day of month.
    ymd : pf.Variable | None
        Other YMD to track, by default uses the superior YMD.
    **kwargs :
        Any arguments passed to the family.
    """

    update_ymd_script = """
        # 1st of next month counting from current ymd + 1 as reference
        d={{day_of_month}}
        ref_ymd=$(date -d "$YMD + 1 day" +%Y%m01)

        if (( $d < 0 )); then
            next_ymd=$(date -d "$ref_ymd + 1 month $d day" +%Y%m%d)
        else
            if (( $(date -d $YMD +%d) >= $d )); then
                ref_ymd=$(date -d "$(date -d $YMD +%Y%m01) + 1 month" +%Y%m01)
            fi
            next_ymd=$(date -d "$ref_ymd + $d day - 1 day" +%Y%m%d)
        fi
        ecflow_client --alter change variable YMD $next_ymd {{family_path}}
        ecflow_client --alter change label next_ymd $next_ymd {{family_path}}
    """

    def __init__(
        self,
        name="once_a_month",
        start=None,
        day_of_month=-1,
        ymd=None,
        **kwargs,
    ):
        sd = start or dt.date.today()
        self.day_of_month = day_of_month
        assert day_of_month != 0, "day_of_month must not be 0."
        # calculate first occurrence
        nxt_month = (day_of_month < 0) or (sd.day > day_of_month)
        first_occ = dt.date(
            sd.year, sd.month + (1 if nxt_month else 0), 1
        ) + dt.timedelta(day_of_month - (1 if day_of_month > 0 else 0))

        kwargs.setdefault("YMD", first_occ.strftime("%Y%m%d"))
        super().__init__(
            name=name, labels={"next_ymd": kwargs["YMD"]}, **kwargs
        )
        self.completes = (ymd or self.superior_ymd) != self.YMD

        # add update task, trigger will be set when family is generated
        with self:
            self._update_ymd_task = self.update_ymd_task()

    @property
    def superior_ymd(self):
        p = self.parent
        while True:
            if hasattr(p, "YMD"):
                return p.YMD
            p = p.parent

    def update_ymd_task(self):
        # run this trivial update task on the ecflow server
        # rather than submit to troika
        job_cmd = "bash %ECF_JOB% > %ECF_JOBOUT%"
        script = [
            pf.TemplateScript(
                self.update_ymd_script,
                family_path=self.fullname,
                day_of_month=self.day_of_month,
            ),
        ]
        return pf.Task(
            f"{self.name}_check_next_occurrence",
            script=script,
            ECF_JOB_CMD=job_cmd,
        )

    def generate_node(self):
        """Before generating node, make sure update_ymd_task is only
        triggered when all other tasks have completed.
        """
        chld = [
            c
            for c in self.executable_children
            if c.name != self._update_ymd_task.name
        ]
        for c in chld:
            self._update_ymd_task.triggers &= c
        return super().generate_node()

generate_node() #

Before generating node, make sure update_ymd_task is only triggered when all other tasks have completed.

Source code in wellies/triggers.py
def generate_node(self):
    """Before generating node, make sure update_ymd_task is only
    triggered when all other tasks have completed.
    """
    chld = [
        c
        for c in self.executable_children
        if c.name != self._update_ymd_task.name
    ]
    for c in chld:
        self._update_ymd_task.triggers &= c
    return super().generate_node()

Bases: Task

Class that automatically sets ecflow variables for each submit_arguments key:value pair as they are in the host configuration file.

Using variables makes it easier to change job directives values after deployment and using ecflow api.

Also, inspects task tree for identical key:values pairs defined in higher levels to avoid creating duplicated variable at every level.

:Attention: Following names are protected and will not be touched: - STHOST - ECF_*

!!! Attention Following names will generate variables with other names than config keys: - TMPDIR -> SSDTMP

Source code in wellies/tasks.py
class EcfResourcesTask(pf.Task):
    """Class that automatically sets ecflow variables for each submit_arguments
    key:value pair as they are in the host configuration file.

    Using variables makes it easier to change job directives values after
    deployment and using ecflow api.

    Also, inspects task tree for identical key:values pairs defined in higher
    levels to avoid creating duplicated variable at every level.

    :Attention: Following names are protected and will not be touched:
      - STHOST
      - ECF_*

    !!! Attention
        Following names will generate variables with other names than config
        keys:
        - TMPDIR -> SSDTMP
    """

    protected_list = [
        "^STHOST$",
        "^ECF_",
    ]
    replacements = {
        "TMPDIR": "SSDTMP",
        "TIME": "TIMEOUT",
    }

    def __init__(self, name, submit_arguments, **kwargs):
        new_vars = kwargs.pop("variables", {})
        super().__init__(name, **kwargs)

        # init with values as they are
        if isinstance(submit_arguments, str):
            submit_args = self.host.submit_arguments[submit_arguments].copy()
        else:
            submit_args = submit_arguments.copy()
            # update variable lists with values set on submit_arguments config

        new_vars.update(
            {
                self.replacements.get(k.upper(), k.upper()): v
                for k, v in submit_args.items()
                if not self._is_protected(k)
            }
        )
        # replace non-protected keys by ecf_variables
        submit_args.update(
            {
                k: f"%{self.replacements.get(k.upper(), k.upper())}%"
                for k in submit_args.keys()
                if not self._is_protected(k)
            }
        )
        self.submit_arguments = submit_args

        # after node is created, if variable in parent node with same value,
        # remove from task
        for key, val in new_vars.items():
            try:
                parent_val = self._parent_lookup(key)
            except AttributeError:
                make_variable(self, key, val)
            else:
                if val != parent_val:
                    make_variable(self, key, val)

    def _is_protected(self, key):
        for protected in self.protected_list:
            if (
                re.search(protected, key.strip().lstrip(), re.IGNORECASE)
                is not None
            ):
                return True
        return False

    def _parent_lookup(self, name):
        parent = self.parent
        if name in parent._nodes:
            return parent._nodes[name].value
        return parent.parent.lookup_variable(name)

Bases: AnchorFamily

Source code in wellies/data.py
class DeployDataFamily(pf.AnchorFamily):
    def __init__(
        self,
        data_store: StaticDataStore,
        submit_arguments: Optional[Dict] = None,
        name: str = "deploy_data",
        **kwargs,
    ):
        """Defines "static_data" family contaning all tasks needed to deploy
        all types of datasets defined on a [data.StaticDataStore].

        Parameters
        ----------
        data_store : data.StaticDataStore
            A static data store object, usually created from a dictionary-like
            definition.
        submit_arguments : dict, optional
            An saubmit argument mapping to configure each task submit
            arguments, by default None
        """
        super().__init__(name=name, **kwargs)

        if submit_arguments is None:
            submit_arguments = {}

        with self:
            has_tasks = False
            for dataset, data in data_store.items():
                has_tasks = True
                pf.Task(
                    name=dataset,
                    submit_arguments=data.options.get(
                        "submit_arguments", submit_arguments
                    ),
                    script=data.script,
                    labels={"version": "NA"},
                )
            if not has_tasks:
                self.defstatus = pf.state.complete

__init__(data_store, submit_arguments=None, name='deploy_data', **kwargs) #

Defines "static_data" family contaning all tasks needed to deploy all types of datasets defined on a [data.StaticDataStore].

Parameters#

data_store : data.StaticDataStore A static data store object, usually created from a dictionary-like definition. submit_arguments : dict, optional An saubmit argument mapping to configure each task submit arguments, by default None

Source code in wellies/data.py
def __init__(
    self,
    data_store: StaticDataStore,
    submit_arguments: Optional[Dict] = None,
    name: str = "deploy_data",
    **kwargs,
):
    """Defines "static_data" family contaning all tasks needed to deploy
    all types of datasets defined on a [data.StaticDataStore].

    Parameters
    ----------
    data_store : data.StaticDataStore
        A static data store object, usually created from a dictionary-like
        definition.
    submit_arguments : dict, optional
        An saubmit argument mapping to configure each task submit
        arguments, by default None
    """
    super().__init__(name=name, **kwargs)

    if submit_arguments is None:
        submit_arguments = {}

    with self:
        has_tasks = False
        for dataset, data in data_store.items():
            has_tasks = True
            pf.Task(
                name=dataset,
                submit_arguments=data.options.get(
                    "submit_arguments", submit_arguments
                ),
                script=data.script,
                labels={"version": "NA"},
            )
        if not has_tasks:
            self.defstatus = pf.state.complete

Bases: AnchorFamily

The deployTools Family contains all the nodes required to install the tools of the suite in the target location (lib directory, $LIB_DIR).

The structure is the following
  • The first level contains the environments, which includes a setup Task.
  • The second level contains the packages to install in the environment.

The environment will be created in $LIB_DIR/env_name.

Source code in wellies/tools.py
class DeployToolsFamily(pf.AnchorFamily):
    """
    The deployTools Family contains all the nodes required to install the tools
    of the suite in the target location (lib directory, $LIB_DIR).

    The structure is the following:
        - The first level contains the environments, which includes a setup
        Task.
        - The second level contains the packages to install in the environment.

    The environment will be created in $LIB_DIR/env_name.
    """

    def __init__(
        self,
        tools: ToolStore,
        submit_arguments: Optional[Dict[str, str]] = None,
        name: str = "deploy_tools",
        **kwargs,
    ):
        """
        Creates the DeployToolsFamily and pass the extra arguments to the
        Family constructor.

        Parameters
        ----------
        tools : ToolStore
            The name of the tool.
        submit_arguments : Dict[str, any], optional
            The execution context of the nodes, by default {}.
        """
        super().__init__(name=name, **kwargs)

        if submit_arguments is None:
            submit_arguments = {}

        with self:
            has_tasks = False
            env_families = {}
            for env, options in tools.environments.items():
                env_tool = tools[env]
                if env_tool.scripts["setup"] is not None:
                    has_tasks = True
                    with pf.AnchorFamily(
                        name=env, variables={"ENV_NAME": env}
                    ) as env_families[env]:
                        env_task = pf.Task(
                            name="setup",
                            submit_arguments=env_tool.options.get(
                                "submit_arguments", submit_arguments
                            ),
                            script=tools.setup(env),
                        )
                        if "packages" in options:
                            env_packages = options["packages"]
                            package_family = DeployPackagesFamily(
                                tools, env_packages, env, submit_arguments
                            )
                            package_family.triggers = env_task.complete

            # create dependencies between environments tasks
            for env in tools.environments:
                for depend in tools[env].depends:
                    if depend in env_families:
                        env_families[env].triggers = (
                            env_families[env].triggers
                            & env_families[depend].complete
                        )

            if not has_tasks:
                self.defstatus = pf.state.complete

__init__(tools, submit_arguments=None, name='deploy_tools', **kwargs) #

Creates the DeployToolsFamily and pass the extra arguments to the Family constructor.

Parameters#

tools : ToolStore The name of the tool. submit_arguments : Dict[str, any], optional The execution context of the nodes, by default {}.

Source code in wellies/tools.py
def __init__(
    self,
    tools: ToolStore,
    submit_arguments: Optional[Dict[str, str]] = None,
    name: str = "deploy_tools",
    **kwargs,
):
    """
    Creates the DeployToolsFamily and pass the extra arguments to the
    Family constructor.

    Parameters
    ----------
    tools : ToolStore
        The name of the tool.
    submit_arguments : Dict[str, any], optional
        The execution context of the nodes, by default {}.
    """
    super().__init__(name=name, **kwargs)

    if submit_arguments is None:
        submit_arguments = {}

    with self:
        has_tasks = False
        env_families = {}
        for env, options in tools.environments.items():
            env_tool = tools[env]
            if env_tool.scripts["setup"] is not None:
                has_tasks = True
                with pf.AnchorFamily(
                    name=env, variables={"ENV_NAME": env}
                ) as env_families[env]:
                    env_task = pf.Task(
                        name="setup",
                        submit_arguments=env_tool.options.get(
                            "submit_arguments", submit_arguments
                        ),
                        script=tools.setup(env),
                    )
                    if "packages" in options:
                        env_packages = options["packages"]
                        package_family = DeployPackagesFamily(
                            tools, env_packages, env, submit_arguments
                        )
                        package_family.triggers = env_task.complete

        # create dependencies between environments tasks
        for env in tools.environments:
            for depend in tools[env].depends:
                if depend in env_families:
                    env_families[env].triggers = (
                        env_families[env].triggers
                        & env_families[depend].complete
                    )

        if not has_tasks:
            self.defstatus = pf.state.complete

Bases: Family

A deployment family class for the packages of an environment.

Source code in wellies/tools.py
class DeployPackagesFamily(pf.Family):
    """
    A deployment family class for the packages of an environment.
    """

    def __init__(
        self,
        tools: ToolStore,
        env_packages: List[str],
        env: str,
        submit_arguments: Dict[str, str],
        **kwargs,
    ):
        super().__init__(name="packages", **kwargs)

        with self:
            deploy_tasks = {}
            for package in env_packages:
                package_tool = tools[package]
                tool_script = [
                    tools.load(env),
                    tools.setup(package),
                ]
                deploy_tasks[package] = pf.Task(
                    name=package,
                    submit_arguments=package_tool.options.get(
                        "submit_arguments", submit_arguments
                    ),
                    script=tool_script,
                    labels={"version": "NA"},
                )
        # create dependencies between packages tasks
        for package in env_packages:
            for depend in tools[package].depends:
                if depend in env_packages:
                    deploy_tasks[package].triggers = (
                        deploy_tasks[package].triggers
                        & deploy_tasks[depend].complete
                    )