Type: | Package |
Title: | Orchestration of Data Pipelines |
Version: | 0.6.1 |
Maintainer: | Will Hipson <will.e.hipson@gmail.com> |
Description: | Framework for creating and orchestrating data pipelines. Organize, orchestrate, and monitor multiple pipelines in a single project. Use tags to decorate functions with scheduling parameters and configuration. |
License: | MIT + file LICENSE |
URL: | https://github.com/whipson/maestro, https://whipson.github.io/maestro/ |
BugReports: | https://github.com/whipson/maestro/issues |
Encoding: | UTF-8 |
LazyData: | true |
Imports: | cli (≥ 3.3.0), dplyr (≥ 1.1.0), glue, lifecycle, logger, lubridate (≥ 1.9.1), purrr (≥ 1.0.0), R.utils, R6, rlang (≥ 1.0.0), roxygen2, tictoc, timechange, utils |
RoxygenNote: | 7.3.2 |
Depends: | R (≥ 4.1.0) |
Suggests: | asciicast, DiagrammeR, furrr, future, knitr, quarto, rmarkdown, rstudioapi, testthat (≥ 3.0.0), withr |
Config/testthat/edition: | 3 |
VignetteBuilder: | knitr, quarto |
NeedsCompilation: | no |
Packaged: | 2025-07-18 11:13:37 UTC; willhipson |
Author: | Will Hipson |
Repository: | CRAN |
Date/Publication: | 2025-07-18 11:30:02 UTC |
maestro
package
Description
Lightweight pipeline orchestration in R
Details
Documentation: GitHub
Author(s)
Maintainer: Will Hipson will.e.hipson@gmail.com (ORCID) [copyright holder]
Authors:
Ryan Garnett ryangarnett78@gmail.com [contributor, copyright holder]
See Also
Useful links:
Report bugs at https://github.com/whipson/maestro/issues
Class for an individual maestro pipeline A pipeline is defined as a single R script with a schedule or input
Description
Class for an individual maestro pipeline A pipeline is defined as a single R script with a schedule or input
Class for an individual maestro pipeline A pipeline is defined as a single R script with a schedule or input
Methods
Public methods
Method new()
Create a new Pipeline object
Usage
MaestroPipeline$new( script_path, pipe_name, frequency = NA_character_, start_time = lubridate::NA_POSIXct_, tz = NA_character_, hours = NULL, days = NULL, months = NULL, skip = FALSE, log_level = "INFO", inputs = NULL, outputs = NULL, priority = Inf, flags = c() )
Arguments
script_path
path to the script
pipe_name
name of the pipeline
frequency
frequency of the pipeline (e.g., 1 day)
start_time
start time of the pipeline
tz
time zone of the pipeline
hours
specific hours of the day
days
specific days of week or month
months
specific months of year
skip
whether to skip the pipeline regardless of scheduling
log_level
log level of the pipeline
inputs
names of pipelines that this pipeline is dependent on for input
outputs
names of pipelines for which this pipeline is a dependency
priority
priority of the pipeline
flags
arbitrary pipelines flags
Returns
MaestroPipeline object
Method print()
Prints the pipeline
Usage
MaestroPipeline$print()
Returns
Method run()
Runs the pipeline
Usage
MaestroPipeline$run( resources = list(), log_file = tempfile(), quiet = FALSE, log_file_max_bytes = 1e+06, .input = NULL, cli_prepend = "", log_to_console = FALSE, ... )
Arguments
resources
named list of arguments and values to pass to the pipeline
log_file
path to the log file for logging
quiet
whether to silence console output
log_file_max_bytes
maximum bytes of the log file before trimming
.input
input values from upstream pipelines
cli_prepend
text to prepend to cli output
log_to_console
whether or not to output statements in the console (FALSE is to suppress and append to log)
...
additional arguments (unused)
Returns
invisible
Method get_pipe_name()
Get the pipeline name
Usage
MaestroPipeline$get_pipe_name()
Returns
pipeline_name
Method get_schedule()
Get the schedule as a data.frame
Usage
MaestroPipeline$get_schedule()
Returns
data.frame
Method check_timeliness()
Check whether a pipeline is scheduled to run based on orchestrator frequency and current time
Usage
MaestroPipeline$check_timeliness( orch_unit, orch_n, check_datetime = lubridate::now(), ... )
Arguments
orch_unit
unit of the orchestrator (e.g., day)
orch_n
number of units of the frequency
check_datetime
datetime against which to check the timeliness of the pipeline (should almost always be now)
...
unused
Returns
MaestroPipeline
Method get_status()
Get status of the pipeline as a data.frame
Usage
MaestroPipeline$get_status()
Returns
data.frame
Method get_status_chr()
Get status of the pipeline as a string
Usage
MaestroPipeline$get_status_chr()
Returns
character
Method get_outputs()
Names of pipelines that receive input from this pipeline
Usage
MaestroPipeline$get_outputs()
Returns
character
Method get_inputs()
Names of pipelines that input into this pipeline
Usage
MaestroPipeline$get_inputs()
Returns
character
Method get_priority()
Get priority of the pipeline
Usage
MaestroPipeline$get_priority()
Returns
numeric
Method get_artifacts()
Get artifacts (return values) from the pipeline
Usage
MaestroPipeline$get_artifacts()
Returns
list
Method get_errors()
Get list of errors from the pipeline
Usage
MaestroPipeline$get_errors()
Returns
list
Method get_warnings()
Get list of warnings from the pipeline
Usage
MaestroPipeline$get_warnings()
Returns
list
Method get_messages()
Get list of messages from the pipeline
Usage
MaestroPipeline$get_messages()
Returns
list
Method get_flags()
Get the flags of a pipeline as a vector
Usage
MaestroPipeline$get_flags()
Returns
character
Method update_inputs()
Update the inputs of a pipeline
Usage
MaestroPipeline$update_inputs(inputs)
Arguments
inputs
character vector of inputting pipeline names
Returns
vector
Method update_outputs()
Update the outputs of a pipeline
Usage
MaestroPipeline$update_outputs(outputs)
Arguments
outputs
character vector of outputting pipeline names
Returns
vector
Method get_run_sequence()
Get the run sequence of a pipeline
Usage
MaestroPipeline$get_run_sequence()
Arguments
outputs
character vector of times
Returns
vector
Method clone()
The objects of this class are cloneable with this method.
Usage
MaestroPipeline$clone(deep = FALSE)
Arguments
deep
Whether to make a deep clone.
Class for a list of MaestroPipelines A MaestroPipelineList is created when there are multiple maestro pipelines in a single script
Description
Class for a list of MaestroPipelines A MaestroPipelineList is created when there are multiple maestro pipelines in a single script
Class for a list of MaestroPipelines A MaestroPipelineList is created when there are multiple maestro pipelines in a single script
Public fields
MaestroPipelines
list of pipelines
n_pipelines
number of pipelines in the list
Methods
Public methods
Method new()
Create a MaestroPipelineList object
Usage
MaestroPipelineList$new(MaestroPipelines = list(), network = NULL)
Arguments
MaestroPipelines
list of MaestroPipelines
network
initialize a network
Returns
MaestroPipelineList
Method print()
Print the MaestroPipelineList
Usage
MaestroPipelineList$print()
Returns
Method add_pipelines()
Add pipelines to the list
Usage
MaestroPipelineList$add_pipelines(MaestroPipelines = NULL)
Arguments
MaestroPipelines
list of MaestroPipelines
Returns
invisible
Method get_pipe_names()
Get names of the pipelines in the list arranged by priority
Usage
MaestroPipelineList$get_pipe_names()
Returns
character
Method get_pipe_by_name()
Get a MaestroPipeline by its name
Usage
MaestroPipelineList$get_pipe_by_name(pipe_name)
Arguments
pipe_name
name of the pipeline
Returns
MaestroPipeline
Method get_priorities()
Get priorities
Usage
MaestroPipelineList$get_priorities()
Returns
numeric
Method get_schedule()
Get the schedule as a data.frame
Usage
MaestroPipelineList$get_schedule()
Returns
data.frame
Method get_timely_pipelines()
Get a new MaestroPipelineList containing only those pipelines scheduled to run
Usage
MaestroPipelineList$get_timely_pipelines(...)
Arguments
...
arguments passed to self$check_timeliness
Returns
MaestroPipelineList
Method get_primary_pipes()
Get pipelines that are primary (i.e., don't have an inputting pipeline)
Usage
MaestroPipelineList$get_primary_pipes()
Returns
list of MaestroPipelines
Method check_timeliness()
Check whether pipelines in the list are scheduled to run based on orchestrator frequency and current time
Usage
MaestroPipelineList$check_timeliness(...)
Arguments
...
arguments passed to self$check_timeliness
Returns
logical
Method get_status()
Get status of the pipelines as a data.frame
Usage
MaestroPipelineList$get_status()
Returns
data.frame
Method get_errors()
Get list of errors from the pipelines
Usage
MaestroPipelineList$get_errors()
Returns
list
Method get_warnings()
Get list of warnings from the pipelines
Usage
MaestroPipelineList$get_warnings()
Returns
list
Method get_messages()
Get list of messages from the pipelines
Usage
MaestroPipelineList$get_messages()
Returns
list
Method get_artifacts()
Get artifacts (return values) from the pipelines
Usage
MaestroPipelineList$get_artifacts()
Returns
list
Method get_run_sequences()
Get run sequences from the pipelines
Usage
MaestroPipelineList$get_run_sequences()
Returns
list
Method get_flags()
Get the flags of the pipelines as a named list
Usage
MaestroPipelineList$get_flags()
Returns
list
Method get_network()
Get the network structure as a edge list
Usage
MaestroPipelineList$get_network()
Returns
data.frame
Method validate_network()
Validates whether all inputs and outputs exist and that the network is a valid DAG
Usage
MaestroPipelineList$validate_network()
Returns
warning or invisible
Method run()
Runs all the pipelines in the list
Usage
MaestroPipelineList$run(..., cores = 1L, pipes_to_run = NULL)
Arguments
...
arguments passed to MaestroPipeline$run
cores
if using multicore number of cores to run in (uses
furrr
)pipes_to_run
an optional vector of pipe names to run. If
NULL
defaults to all primary pipelines
Returns
invisible
Method clone()
The objects of this class are cloneable with this method.
Usage
MaestroPipelineList$clone(deep = FALSE)
Arguments
deep
Whether to make a deep clone.
Class for a schedule of pipelines
Description
Class for a schedule of pipelines
Class for a schedule of pipelines
Public fields
PipelineList
object of type MaestroPipelineList
Methods
Public methods
Method new()
Create a MaestroSchedule object
Usage
MaestroSchedule$new(Pipelines = NULL)
Arguments
Pipelines
list of MaestroPipelines
Returns
MaestroSchedule
Method print()
Print the schedule object
Usage
MaestroSchedule$print()
Returns
Method run()
Run a MaestroSchedule
Usage
MaestroSchedule$run(..., quiet = FALSE, run_all = FALSE, n_show_next = 5)
Arguments
...
arguments passed to MaestroPipelineList$run
quiet
whether or not to silence console messages
run_all
run all pipelines regardless of the schedule (default is
FALSE
) - useful for testing.n_show_next
show the next n scheduled pipes
Returns
invisible
Method get_schedule()
Get the schedule as a data.frame
Usage
MaestroSchedule$get_schedule()
Returns
data.frame
Method get_status()
Get status of the pipelines as a data.frame
Usage
MaestroSchedule$get_status()
Returns
data.frame
Method get_artifacts()
Get artifacts (return values) from the pipelines
Usage
MaestroSchedule$get_artifacts()
Returns
list
Method get_network()
Get the network structure of the pipelines as an edge list (will be empty if there are no DAG pipelines)
Usage
MaestroSchedule$get_network()
Returns
data.frame
Method get_flags()
Get all pipeline flags as a long data.frame
Usage
MaestroSchedule$get_flags()
Returns
data.frame
Method show_network()
Visualize the DAG relationships between pipelines in the schedule
Usage
MaestroSchedule$show_network()
Returns
interactive visualization
Method clone()
The objects of this class are cloneable with this method.
Usage
MaestroSchedule$clone(deep = FALSE)
Arguments
deep
Whether to make a deep clone.
Examples
if (interactive()) {
pipeline_dir <- tempdir()
create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE)
schedule <- build_schedule(pipeline_dir = pipeline_dir)
}
Build a schedule table
Description
Builds a schedule data.frame for scheduling pipelines in run_schedule()
.
Usage
build_schedule(pipeline_dir = "./pipelines", quiet = FALSE)
Arguments
pipeline_dir |
path to directory containing the pipeline scripts |
quiet |
silence metrics to the console (default = |
Details
This function parses the maestro tags of functions located in pipeline_dir
which is
conventionally called 'pipelines'. An orchestrator requires a schedule table
to determine which pipelines are to run and when. Each row in a schedule table
is a pipeline name and its scheduling parameters such as its frequency.
The schedule table is mostly intended to be used by run_schedule()
immediately.
In other words, it is not recommended to make changes to it.
Value
MaestroSchedule
Examples
# Creating a temporary directory for demo purposes! In practice, just
# create a 'pipelines' directory at the project level.
if (interactive()) {
pipeline_dir <- tempdir()
create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE)
build_schedule(pipeline_dir = pipeline_dir)
}
Parse and validate tags then create and populate MaestroPipelineList
Description
Parse and validate tags then create and populate MaestroPipelineList
Usage
build_schedule_entry(script_path)
Arguments
script_path |
path to script |
Value
MaestroPipelineList R6 class
Convert a duration string to number of seconds
Description
Convert a duration string to number of seconds
Usage
convert_to_seconds(time_string)
Arguments
time_string |
string like 1 day, 1 week, 12 hours, etc. |
Value
number of seconds
Creates a new maestro project
Description
Creates a new maestro project
Usage
create_maestro(path, type = "R", overwrite = FALSE, quiet = FALSE, ...)
Arguments
path |
file path for the orchestrator script |
type |
file type for the orchestrator (supports R, Quarto, and RMarkdown) |
overwrite |
whether to overwrite an existing orchestrator or maestro project |
quiet |
whether to silence messages in the console (default = |
... |
unused |
Value
invisible
Examples
# Creates a new maestro project with an R orchestrator
if (interactive()) {
new_proj_dir <- tempdir()
create_maestro(new_proj_dir, type = "R", overwrite = TRUE)
create_maestro(new_proj_dir, type = "Quarto", overwrite = TRUE)
}
Create a new orchestrator
Description
Create a new orchestrator
Usage
create_orchestrator(
path,
type = c("R", "Quarto", "RMarkdown"),
open = interactive(),
quiet = FALSE,
overwrite = FALSE
)
Arguments
path |
file path for the orchestrator script |
type |
file type for the orchestrator (supports R, Quarto, and RMarkdown) |
open |
whether or not to open the script upon creation |
quiet |
whether to silence messages in the console (default = |
overwrite |
whether to overwrite an existing orchestrator or maestro project |
Value
invisible
Create a new pipeline in a pipelines directory
Description
Allows the creation of new pipelines (R scripts) and fills in the maestro tags as specified.
Usage
create_pipeline(
pipe_name,
pipeline_dir = "pipelines",
frequency = "1 day",
start_time = Sys.Date(),
tz = "UTC",
log_level = "INFO",
quiet = FALSE,
open = interactive(),
overwrite = FALSE,
skip = FALSE,
inputs = NULL,
outputs = NULL,
priority = NULL
)
Arguments
pipe_name |
name of the pipeline and function |
pipeline_dir |
directory containing the pipeline scripts |
frequency |
how often the pipeline should run (e.g., 1 day, daily, 3 hours, 4 months). Fills in maestroFrequency tag |
start_time |
start time of the pipeline schedule. Fills in maestroStartTime tag |
tz |
timezone that pipeline will be scheduled in. Fills in maestroTz tag |
log_level |
log level for the pipeline (e.g., INFO, WARN, ERROR). Fills in maestroLogLevel tag |
quiet |
whether to silence messages in the console (default = |
open |
whether or not to open the script upon creation |
overwrite |
whether or not to overwrite an existing pipeline of the same name and location. |
skip |
whether to skip the pipeline when running in the orchestrator (default = |
inputs |
vector of names of pipelines that input into this pipeline (default = |
outputs |
vector of names of pipelines that receive output from this pipeline (default = |
priority |
a single positive integer corresponding to the order in which this pipeline will be invoked in the presence of other simultaneously invoked pipelines. |
Value
invisible
Examples
if (interactive()) {
pipeline_dir <- tempdir()
create_pipeline(
"extract_data",
pipeline_dir = pipeline_dir,
frequency = "1 hour",
open = FALSE,
quiet = TRUE,
overwrite = TRUE
)
create_pipeline(
"new_job",
pipeline_dir = pipeline_dir,
frequency = "20 minutes",
start_time = as.POSIXct("2024-06-21 12:20:00"),
log_level = "ERROR",
open = FALSE,
quiet = TRUE,
overwrite = TRUE
)
}
Get the artifacts (return values) of the pipelines in a MaestroSchedule object.
Description
Artifacts are return values from pipelines. They are accessible as a named list where the names correspond to the names of the pipeline.
Usage
get_artifacts(schedule)
Arguments
schedule |
object of type MaestroSchedule created using |
Value
named list
Examples
if (interactive()) {
pipeline_dir <- tempdir()
create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE)
schedule <- build_schedule(pipeline_dir = pipeline_dir)
schedule <- run_schedule(
schedule,
orch_frequency = "1 day",
quiet = TRUE
)
get_artifacts(schedule)
# Alternatively, use the underlying R6 method
schedule$get_artifacts()
}
Get the flags of pipelines in a MaestroSchedule object
Description
Creates a long data.frame where each row is a flag for each pipeline.
Usage
get_flags(schedule)
Arguments
schedule |
object of type MaestroSchedule created using |
Value
data.frame
Examples
if (interactive()) {
pipeline_dir <- tempdir()
create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE)
schedule <- build_schedule(pipeline_dir = pipeline_dir)
get_flags(schedule)
# Alternatively, use the underlying R6 method
schedule$get_flags()
}
Generate a sequence of run times for a pipeline
Description
Generate a sequence of run times for a pipeline
Usage
get_pipeline_run_sequence(
pipeline_n,
pipeline_unit,
pipeline_datetime,
check_datetime,
pipeline_hours = 0:23,
pipeline_days_of_week = 1:7,
pipeline_days_of_month = 1:31,
pipeline_months = 1:12
)
Arguments
pipeline_n |
number of units for the pipeline frequency |
pipeline_unit |
unit for the pipeline frequency |
pipeline_datetime |
datetime of the first time the pipeline is to run |
check_datetime |
datetime against which to check the running of pipelines (default is current system time in UTC) |
pipeline_hours |
vector of integers [0-23] corresponding to hours of day for the pipeline to run |
pipeline_days_of_week |
vector of integers [1-7] corresponding to days of week for the pipeline to run (1 = Sunday) |
pipeline_days_of_month |
vector of integers [1-31] corresponding to days of month for the pipeline to run |
pipeline_months |
vector of integers [1-12] corresponding to months of year for the pipeline to run |
Value
vector of timestamps or dates
Get the schedule from a MaestroSchedule object
Description
A schedule is represented as a table where each row is a pipeline and the columns contain scheduling parameters such as the frequency and start time.
Usage
get_schedule(schedule)
Arguments
schedule |
object of type MaestroSchedule created using |
Details
The schedule table is used internally in a MaestroSchedule object but can be accessed using this function or accessing the R6 method of the MaestroSchedule object.
Value
data.frame
Examples
if (interactive()) {
pipeline_dir <- tempdir()
create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE)
schedule <- build_schedule(pipeline_dir = pipeline_dir)
get_schedule(schedule)
# Alternatively, use the underlying R6 method
schedule$get_schedule()
}
Get time slot usage of a schedule
Description
Get the number of pipelines scheduled to run for each time slot at a particular slot interval. Time slots are times that the orchestrator runs and the slot interval determines the level of granularity to consider.
Usage
get_slot_usage(schedule, orch_frequency, slot_interval = "hour")
Arguments
schedule |
object of type MaestroSchedule created using |
orch_frequency |
of the orchestrator, a single string formatted like "1 day", "2 weeks", "hourly", etc. |
slot_interval |
a time unit indicating the interval of time to consider between slots (e.g., 'hour', 'day') |
Details
This function is particularly useful when you have multiple pipelines in a project and you want to see what recurring time intervals may be available or underused for new pipelines.
Note that this function is intended for use in an interactive session while developing a maestro project. It is not intended for use in the orchestrator.
As an example, consider we have four pipelines running at various frequencies
and the orchestrator running every hour. Then let's say there's to be a new
pipeline that runs every day. One might ask 'what hour should I schedule this new
pipeline to run on?'. By using get_slot_usage(schedule, orch_frequency = '1 hour', slot_interval = 'hour')
on the existing schedule, you could identify for each hour how many pipelines
are already scheduled to run and choose the ones with the lowest usage.
Value
data.frame
Examples
if (interactive()) {
pipeline_dir <- tempdir()
create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE)
schedule <- build_schedule(pipeline_dir = pipeline_dir)
get_slot_usage(
schedule,
orch_frequency = "1 hour",
slot_interval = "hour"
)
}
Get the statuses of the pipelines in a MaestroSchedule object
Description
A status data.frame contains the names and locations of the pipelines as well as information around whether they were invoked, the status (error, warning, etc.), and the run time.
Usage
get_status(schedule)
Arguments
schedule |
object of type MaestroSchedule created using |
Value
data.frame
Examples
if (interactive()) {
pipeline_dir <- tempdir()
create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE)
schedule <- build_schedule(pipeline_dir = pipeline_dir)
schedule <- run_schedule(
schedule,
orch_frequency = "1 day",
quiet = TRUE
)
get_status(schedule)
# Alternatively, use the underlying R6 method
schedule$get_status()
}
Manually run a pipeline regardless of schedule
Description
Instantly run a single pipeline from the schedule. This is useful for testing purposes or if you want to just run something one-off.
Usage
invoke(schedule, pipe_name, resources = list(), ...)
Arguments
schedule |
object of type MaestroSchedule created using |
pipe_name |
name of a single pipe name from the schedule |
resources |
named list of shared resources made available to pipelines as needed |
... |
other arguments passed to |
Details
Scheduling parameters such as the frequency, start time, and specifiers are ignored.
The pipeline will be run even if maestroSkip
is present. If the pipeline is a DAG
pipeline, invoke
will attempt to execute the full DAG.
Value
invisible
Examples
if (interactive()) {
pipeline_dir <- tempdir()
create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE)
schedule <- build_schedule(pipeline_dir = pipeline_dir)
invoke(schedule, "my_new_pipeline")
}
Checks whether a DAG is valid (no loops)
Description
Checks whether a DAG is valid (no loops)
Usage
is_valid_dag(edges)
Arguments
edges |
a data.frame of edges (from, to) |
Value
boolean
Retrieve latest maestro build errors
Description
Gets the latest schedule build errors following use of build_schedule()
. If
the build succeeded or build_schedule()
has not been run it will be NULL
.
Usage
last_build_errors()
Value
error messages
Examples
last_build_errors()
Retrieve latest maestro pipeline errors
Description
Gets the latest pipeline errors following use of run_schedule()
. If
the all runs succeeded or run_schedule()
has not been run it will be NULL
.
Usage
last_run_errors()
Value
error messages
Examples
last_run_errors()
Retrieve latest maestro pipeline messages
Description
Gets the latest pipeline messages following use of run_schedule()
. If
there are no messages or run_schedule()
has not been run it will be NULL
.
Usage
last_run_messages()
Details
Note that setting maestroLogLevel
to something greater than INFO
will
ignore messages.
Value
messages
Examples
last_run_messages()
Retrieve latest maestro pipeline warnings
Description
Gets the latest pipeline warnings following use of run_schedule()
. If
there are no warnings or run_schedule()
has not been run it will be NULL
.
Usage
last_run_warnings()
Details
Note that setting maestroLogLevel
to something greater than WARN
will
ignore warnings.
Value
warning messages
Examples
last_run_warnings()
cli output for generate schedule table
Description
cli output for generate schedule table
Usage
maestro_parse_cli(parse_succeeds, parse_errors)
Arguments
parse_succeeds |
list of parse results (i.e., succeeded) |
parse_errors |
list of parse errors |
Value
cli output
Maestro Tags
Description
maestro tags are roxygen2 comments for configuring the scheduling and execution of pipelines.
Details
maestro tags follow the format: #' @maestroTagName value
Some tags may not take a value.
maestro tags must be written above the function that is to be included as a pipeline. A typical pipeline with tags could look like this:
#' @maestroFrequency 1 hour #' @maestroStartTime 12:30:00 #' @maestroLogLevel WARN my_pipeline <- function() { # Pipeline code }
Below are descriptions of all the tags currently available in maestro along with examples.
maestroFrequency
How often to run the pipeline. This tag takes a time unit indicating how long to wait between subsequent runs of the pipeline. Acceptable values include an integer value followed by one of minute(s), hour(s), day(s), week(s), month(s), and year(s). Note that some combinations of integer + unit may be invalid. Adverbs like 'hourly', 'daily', 'weekly', etc. are also valid.
Default: 1 day
Examples:
-
#' @maestroFrequency 1 hour
-
#' @maestroFrequency daily
-
#' @maestroFrequency 2 weeks
-
#' @maestroFrequency 3 months
maestroStartTime
Timestamp, date, or time corresponding to the start time of the pipeline. This
also sets the cadence of the pipeline in some cases. For instance, if the start time
is 2025-03-18 03:00:00
and the frequency is daily, the pipeline will run at 03:00
every day. A value in the future prevents the pipeline from running until that time
has been reached.
Default: 2024-01-01 00:00:00
Examples:
-
#' @maestroStartTime 2025-02-05 12:00:00
-
#' @maestroStartTime 2025-01-01
-
#' @maestroStartTime 08:00:00
maestroTz
Timezone in which the maestroStartTime is to be considered. Takes any valid timezone
string that can be found in OlsonNames()
.
Default: UTC
Examples:
-
#' @maestroTz Europe/Paris
-
#' @maestroTz America/Halifax
-
#' @maestroTz US/Pacific
maestroLogLevel
Minimum logging threshold for messages, warnings, and errors that come from the pipeline.
These levels correspond to those in logger:::log_levels_supported
but most commonly
used are ERROR, WARN, INFO. For example, if you use WARN
then any messages of lower
urgency (i.e., INFO) will be suppressed, but errors will be logged.
Default: INFO
Examples:
-
#' @maestroLogLevel ERROR
-
#' @maestroLogLevel WARN
maestroHours
Specific hours of the day in which to run the pipeline. This only applies for pipelines that run
at an hourly or minutely frequency. Acceptable values are integers from 0-23 separated
by spaces. If empty, pipeline runs at all hours. This tag uses the timezone specified by
maestroTz
(will be UTC if empty).
Default:
Examples:
-
#' @maestroHours 1 4 7 10
-
#' @maestroHours 0 5 20
maestroDays
Specific days of week or days of month on which to run the pipeline. This only applies for pipelines that run at a minutely, hourly, or daily frequency. Acceptable values are either integers from 1-31 or day of week strings like Mon, Tue, Wed, etc. These two options cannot be used in combination.
Default:
Examples
-
#' @maestroDays 1 11 21 31
-
#' @maestroDays Mon Tue Wed Thu Fri
maestroMonths
Specific months of the year on which to run the pipeline. This only applies for pipelines that do run at least monthly. Acceptable values are integers (1-12) corresponding to the month of the year (e.g., 1 = January, 2 = February, etc.).
Default:
Examples:
-
#' @maestroMonths 3 8 12
-
#' @maestroMonths 10
maestroInputs
For a DAG style pipeline, the names of pipelines that input into the pipeline.
These names must match the function name defining the inputting pipeline. Multiple
pipelines can be used as inputs and the input value is used in the target pipeline
via the required .input
parameter. Note that this tag could be redundant if the
inputting pipeline uses maestroOutputs
.
Default:
Examples:
-
#' @maestroInputs extract verify
maestroOutputs
For a DAG style pipeline, the names of pipelines that receive the return value of
this pipeline as input. These names must match the function name defining the
outputting pipeline. Multiple pipelines can be outputted into. The return value
of the pipeline will be passed into the receiving pipeline. Note that this tag
could be redundant if pipeline to be inputted into uses maestroInputs
.
Default:
Examples:
-
#' @maestroOutputs transform
maestroSkip
Flags a pipeline to never be executed even if it is scheduled to run. This can
be useful when developing or testing a pipeline. This tag takes no value, instead
the presence of the tag indicates whether it is skipped. This tag is ignored when
run_schedule(..., run_all = TRUE)
or when using invoke()
.
Default:
Examples:
-
#' @maestroSkip
maestroPriority
Determines the order in which pipelines that run at the same scheduled instance
are executed. Values are positive integers from 1 to N. Order is determined in
descending order such that 1 indicates the highest priority. Pipelines with the
same priority run in the order in which build_schedule()
parses the pipeline
(usually alphabetical according to file name and then line number within file).
By default, all pipelines are given equal priority.
Default:
Examples:
-
#' @maestroPriority 1
-
#' @maestroPriority 3
maestroFlags
Arbitrary labeling tags which are then made accessible via get_flags()
. A pipeline
can have multiple tags separated by spaces.
Default:
Examples:
-
#' @maestroFlags critical etl cloud
-
#' @maestroFlags aviation
maestro
Generic tag for identifying a maestro pipeline with all the defaults. Useful when you want a pipeline to be scheduled via maestro that accepts all default tag values. Only use this tag if you have no other maestro tags. The tag takes no value.
Default:
Examples:
-
#' @maestro
Parse a time string
Description
Parse a time string
Usage
parse_rounding_unit(time_string)
Arguments
time_string |
string like 1 day, daily, 1 week, 12 hours, etc. |
Value
nunit list
Run a schedule
Description
Given a schedule in a maestro
project, runs the pipelines that are scheduled to execute
based on the current time.
Usage
run_schedule(
schedule,
orch_frequency = "1 day",
check_datetime = lubridate::now(tzone = "UTC"),
resources = list(),
run_all = FALSE,
n_show_next = 5,
cores = 1,
log_file_max_bytes = 1e+06,
quiet = FALSE,
log_to_console = FALSE,
log_to_file = FALSE
)
Arguments
schedule |
object of type MaestroSchedule created using |
orch_frequency |
of the orchestrator, a single string formatted like "1 day", "2 weeks", "hourly", etc. |
check_datetime |
datetime against which to check the running of pipelines (default is current system time in UTC) |
resources |
named list of shared resources made available to pipelines as needed |
run_all |
run all pipelines regardless of the schedule (default is |
n_show_next |
show the next n scheduled pipes |
cores |
number of cpu cores to run if running in parallel. If > 1, |
log_file_max_bytes |
numeric specifying the maximum number of bytes allowed in the log file before purging the log (within a margin of error) |
quiet |
silence metrics to the console (default = |
log_to_console |
whether or not to include pipeline messages, warnings, errors to the console (default = |
log_to_file |
either a boolean to indicate whether to create and append to a |
Details
Pipeline schedule logic
The function run_schedule()
examines each pipeline in the schedule table and determines
whether it is scheduled to run at the current time using some simple time arithmetic. We assume
run_schedule(schedule, check_datetime = Sys.time())
, but this need not be the case.
Output
run_schedule()
returns the same MaestroSchedule object with modified attributes. Use get_status()
to examine the status of each pipeline and use get_artifacts()
to get any return values from the
pipelines as a list.
Pipelines with arguments (resources)
If a pipeline takes an argument that doesn't include a default value, these can be supplied
in the orchestrator via run_schedule(resources = list(arg1 = val))
. The name of the argument
used by the pipeline must match the name of the argument in the list. Currently, each named
resource must refer to a single object. In other words, you can't have two pipes using
the same argument but requiring different values.
Running in parallel
Pipelines can be run in parallel using the cores
argument. First, you must run future::plan(future::multisession)
in the orchestrator. Then, supply the desired number of cores to the cores
argument. Note that
console output appears different in multicore mode.
Logging & Console Output
By default, maestro
suppresses pipeline messages, warnings, and errors from appearing in the console, but
messages coming from print()
and other console logging packages like cli
and logger
are not suppressed
and will be interwoven into the output generated from run_schedule()
. Messages from cat()
and related functions are always suppressed
due to the nature of how those functions operate with standard output.
Users are advised to make use of R's message()
, warning()
, and stop()
functions in their pipelines
for managing conditions. Use log_to_console = TRUE
to print these to the console.
Maestro can generate a log file that is appended to each time the orchestrator is run. Use log_to_file = TRUE
or log_to_file = '[path-to-file]'
and
maestro will create/append to a file in the project directory.
This log file will be appended to until it exceeds the byte size defined in log_file_max_bytes
argument after which
the log file is deleted.
Value
MaestroSchedule object
Examples
if (interactive()) {
pipeline_dir <- tempdir()
create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE)
schedule <- build_schedule(pipeline_dir = pipeline_dir)
# Runs the schedule every 1 day
run_schedule(
schedule,
orch_frequency = "1 day",
quiet = TRUE
)
# Runs the schedule every 15 minutes
run_schedule(
schedule,
orch_frequency = "15 minutes",
quiet = TRUE
)
}
Visualize the schedule as a DAG
Description
Create an interactive network visualization to show the dependency structure of pipelines in the schedule. This is only useful if there are pipelines in the schedule that take inputs/outputs from other pipelines.
Usage
show_network(schedule)
Arguments
schedule |
object of type MaestroSchedule created using |
Details
Note that running this function on a schedule with all independent pipelines will produce a network visual with no connections.
This function requires the installation of DiagrammeR
which is not automatically
installed with maestro
.
Value
DiagrammeR visualization
Examples
if (interactive()) {
pipeline_dir <- tempdir()
create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE)
schedule <- build_schedule(pipeline_dir = pipeline_dir)
schedule <- run_schedule(
schedule,
orch_frequency = "1 day",
quiet = TRUE
)
show_network(schedule)
}
Suggest orchestrator frequency based on a schedule
Description
Suggests a frequency to run the orchestrator based on the frequencies of the pipelines in a schedule.
Usage
suggest_orch_frequency(
schedule,
check_datetime = lubridate::now(tzone = "UTC")
)
Arguments
schedule |
MaestroSchedule object created by |
check_datetime |
datetime against which to check the running of pipelines (default is current system time in UTC) |
Details
This function attempts to find the smallest interval of time between all pipelines. If the smallest interval is less than 15 minutes, it just uses the smallest interval.
Note this function is intended to be used interactively when deciding how often to schedule the orchestrator. Programmatic use is not recommended.
Value
frequency string
Examples
if (interactive()) {
pipeline_dir <- tempdir()
create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE)
schedule <- build_schedule(pipeline_dir = pipeline_dir)
suggest_orch_frequency(schedule)
}