---
title: "Launcher plugins"
output:
  rmarkdown::html_vignette:
    toc: true
    number_sections: true
vignette: >
  %\VignetteIndexEntry{Launcher plugins}
  %\VignetteEngine{knitr::rmarkdown}
  %\VignetteEncoding{UTF-8}
---

```{r, include = FALSE}
knitr::opts_chunk$set(
  collapse = TRUE,
  comment = "#>"
)
library(crew)
```

# About

`crew` lets users write custom [launchers](https://wlandau.github.io/crew/reference/crew_class_launcher.html) for different types of workers that connect over the local network. The [`crew.cluster`](https://wlandau.github.io/crew.cluster/) package already has plugins for traditional high-performance computing schedulers ([SLURM](https://slurm.schedmd.com/), SGE, LSF, and PBS/TORQUE).

# How it works

These [launcher](https://wlandau.github.io/crew/reference/crew_class_launcher.html) plugins need not become part of the `crew` package itself. You can write your plugin in a simple R script, or you write it in a custom R package that [depends on](https://r-pkgs.org/dependencies-in-practice.html) `crew`. Published packages with [launcher](https://wlandau.github.io/crew/reference/crew_class_launcher.html) plugins are powerful extensions that enhance `crew` for the entire open-source community. See [R Packages](https://r-pkgs.org/) by [Hadley Wickham](https://github.com/hadley) and [Jenny Bryan](https://github.com/jennybc) for how to write an R package.

# Scope

This vignette demonstrates how to write a `crew` launcher plugin. It assumes prior familiarity with [`R6` classes](https://r6.r-lib.org/articles/Introduction.html) and the computing platform of your plugin.

# Implementation

To create your own launcher plugin, write an [`R6`](https://r6.r-lib.org/articles/Introduction.html) subclass of [`crew_class_launcher`](https://wlandau.github.io/crew/reference/crew_class_launcher.html) with a [`launch_worker()`](https://wlandau.github.io/crew/reference/crew_class_launcher_local.html#method-launch-worker-) method analogous the one in the [local process launcher](https://wlandau.github.io/crew/reference/crew_class_launcher_local.html). `launch_worker()` must accept the same arguments as the [local process `launch_worker()` method](https://wlandau.github.io/crew/reference/crew_class_launcher_local.html#method-launch-worker-), generate a call to [`crew_worker()`](https://wlandau.github.io/crew/reference/crew_worker.html), and then submit a new job or process to run that call.

# Network

Each worker that launches must be able to dial into the client over the local network. The `host` argument of `crew_client()` provides the local IP address, and the `port` argument provides the TCP port. The controller helper function (see below) should expose arguments `host` and `port` in order to solve potential network problems like [this one](https://github.com/wlandau/crew.cluster/issues/1#issuecomment-1546024163).

By default, `host` is the local IP address. `crew` assumes the local network is secure. Please take the time to assess the network security risks of your computing environment. Use at your own risk.

# Example

The following is a custom custom launcher class whose workers are local R processes on Unix-like systems.

```r
custom_launcher_class <- R6::R6Class(
  classname = "custom_launcher_class",
  inherit = crew::crew_class_launcher,
  public = list(
    launch_worker = function(call) {
      bin <- file.path(R.home("bin"), "Rscript")
      processx::process$new(
        command = bin,
        args = c(self$r_arguments, "-e", call),
        cleanup = FALSE
      )
    }
  )
)
```

Inside `launch_worker()`, the `processx::process$new(command = bin, args = c(self$r_arguments, "-e", call))` line runs the [`crew_worker()`](https://wlandau.github.io/crew/reference/crew_worker.html) call in an external R process with the command line arguments from `r_arguments` (supplied when the launcher is created). This process runs in the background, connects back to `crew` and `mirai` over the local network, and accepts the tasks you push to the controller. 

Every `launch_worker()` method must accept a `call` argument. This argument is a text string with an R function call to  [`crew_worker()`](https://wlandau.github.io/crew/reference/crew_worker.html). `launch_worker()` must launch a worker that runs the R code in `call`.

To see what the `call` argument will look like from inside `launch_worker()`, create a new launcher and run the `call()` method.

```r
library(crew)
launcher <- crew_launcher_local()
launcher$start(url = "tcp://127.0.0.1:57000", profile = "example_profile")
launcher$call()
#> [1] "crew::crew_worker(settings = list(url = \"tcp://127.0.0.1:57000\", dispatcher = TRUE, asyncdial = FALSE, autoexit = 15L, cleanup = FALSE, output = TRUE, maxtasks = Inf, idletime = Inf, walltime = Inf, timerstart = 0L, tlscert = NULL, rs = NULL), controller = \"a28f357a\", options_metrics = crew::crew_options_metrics(path = NULL, seconds_interval = 5))"
```

# Batched launches

Some platforms support launching multiple workers from a single system call. For example, clusters like SLURM and cloud services like AWS Batch support job arrays. To leverage this feature in `crew`, define a method called `launch_workers()` (plural) instead of `launch_worker()` (singular). The former supersedes the latter when it is user-defined.^[The default `launch_workers()` method just calls `launch_worker()` `n` times.] For example:

```r
R6::R6Class(
  classname = "slurm_launcher_class",
  inherit = crew::crew_class_launcher,
  public = list(
    launch_workers = function(call, n) {
      template <- c(
        "#!/bin/bash",
        "#SBATCH --array=1-%s",
        "module load R",
        "Rscript -e '%s'"
      )
      script <- tempfile()
      writeLines(sprintf(template, n, call), script)
      system2("sbatch", script, wait = FALSE)
    }
  )
)
```

Above, `call` is the same as before: a call to `mirai::daemon()` to run a single worker. `n` is the number of `crew` workers (i.e. SLURM jobs) to launch in the current round of auto-scaling. The body of the function creates a job script for an array job, then submits the script to the cluster with `sbatch`.

# Controllers

It is useful to have a helper function that creates controllers with your custom launcher. It should:

1. Accept all the same arguments as [`crew_controller_local()`](https://wlandau.github.io/crew/reference/crew_controller_local.html).
2. Create a client object using [`crew_client()`](https://wlandau.github.io/crew/reference/crew_client.html).
3. Create a launcher object with the [`new()` method](https://wlandau.github.io/crew/reference/crew_class_launcher.html#method-crew_class_launcher-new) of your custom launcher class.
4. Create a new controller using [`crew_controller()`](https://wlandau.github.io/crew/reference/crew_controller.html).
5. Scan the controller for obvious errors using the [`validate()`](https://wlandau.github.io/crew/reference/crew_class_controller.html#method-crew_class_controller-validate) method of the controller.

Feel free to borrow from the [`crew_controller_local()` source code](https://github.com/wlandau/crew/blob/main/R/crew_controller_local.R). For packages, you can use the `@inheritParams` [`roxygen2`](https://roxygen2.r-lib.org/) tag to inherit the documentation of all the arguments instead of writing it by hand. You may want to adjust the default arguments based on the specifics of your platform, especially `seconds_launch` if workers take a long time to launch.

```r
#' @title Create a controller with the custom launcher.
#' @export
#' @description Create an `R6` object to submit tasks and
#'   launch workers.
#' @inheritParams crew::crew_controller_local
crew_controller_custom <- function(
  name = "custom controller name",
  workers = 1L,
  host = NULL,
  port = NULL,
  tls = crew::crew_tls(),
  serialization = NULL,
  profile = crew::crew_random_name(),
  seconds_interval = 0.5,
  seconds_timeout = 30,
  seconds_launch = 30,
  seconds_idle = Inf,
  seconds_wall = Inf,
  tasks_max = Inf,
  tasks_timers = 0L,
  reset_globals = TRUE,
  reset_packages = FALSE,
  reset_options = FALSE,
  garbage_collection = FALSE,
  r_arguments = NULL,
  options_metrics = crew::crew_options_metrics(),
  crashes_max = 5L,
  backup = NULL
) {
  client <- crew::crew_client(
    host = host,
    port = port,
    tls = tls,
    serialization = serialization,
    profile = profile,
    seconds_interval = seconds_interval,
    seconds_timeout = seconds_timeout
  )
  launcher <- custom_launcher_class$new(
    name = name,
    workers = workers,
    seconds_interval = seconds_interval,
    seconds_timeout = seconds_timeout,
    seconds_launch = seconds_launch,
    seconds_idle = seconds_idle,
    seconds_wall = seconds_wall,
    tasks_max = tasks_max,
    tasks_timers = tasks_timers,
    tls = tls,
    r_arguments = r_arguments,
    options_metrics = options_metrics
  )
  controller <- crew::crew_controller(
    client = client,
    launcher = launcher,
    reset_globals = reset_globals,
    reset_packages = reset_packages,
    reset_options = reset_options,
    garbage_collection = garbage_collection,
    crashes_max = crashes_max,
    backup = backup
  )
  controller$validate()
  controller
}
```

# Informal testing

Before you begin testing, please begin monitoring local processes and remote jobs on your platform. In the case of the above `crew` launcher which only creates local processes, it is sufficient to start [`htop`](https://htop.dev/) and filter for R processes, or launch a new R session to monitor the process table from [`ps::ps()`](https://ps.r-lib.org/reference/ps.html). However, for more ambitious launchers that submit workers to e.g. [AWS Batch](https://aws.amazon.com/batch/), you may need to open the [CloudWatch](https://aws.amazon.com/cloudwatch/) dashboard, then view the AWS billing dashboard after testing.

When you are ready to begin testing, try out the example in the [README](https://wlandau.github.io/crew/index.html#usage), but use your your custom controller helper instead of [`crew_controller_local()`](https://wlandau.github.io/crew/reference/crew_controller_local.html). 

First, create and start a controller.

```r
library(crew)
controller <- crew_controller_custom(workers = 2)
controller$start()
```

Try pushing a task that gets the local IP address and process ID of the worker instance.

```r
controller$push(
  name = "get worker IP address and process ID",
  command = paste(nanonext::ip_addr()[1], ps::ps_pid())
)
```

Wait for the task to complete and look at the result.

```r
controller$wait()
result <- controller$pop()
result$result[[1]]
#> [1] "192.168.0.2 27336"
```

Please use the result to verify that the task really ran on a worker as intended. The process ID above should agree with the one from the handle ([except on Windows](https://github.com/r-lib/processx/issues/364) because the actual R process may be different from the `Rscript.exe` process created first).

```r
controller$launcher$instances$handle[[1]]$get_pid()
#> [1] 27336
```

In addition, please compare the worker IP address to the IP address of the local R session. Since our custom launcher creates local processes, the IP addresses are the same in this case. However, if the worker runs on a different computer (as with the[SLURM](https://slurm.schedmd.com/) or [AWS Batch](https://aws.amazon.com/batch/) launcher) then the worker IP address should be different from the one you get from the local R session. 

```r
as.character(nanonext::ip_addr())[1]
#> "192.168.0.2"
```

If you did not set any timeouts or task limits, the worker that ran the task should still be running. The other worker had no tasks, so it did not need to launch.

```r
controller$launcher$instances$handle[[1]]$is_alive()
#> [1] TRUE
```

When you are done, either close the local R session or terminate the controller manually.

```r
controller$terminate()
```

Finally, use the process monitoring interface of your computing platform or operating system (e.g. `crew::crew_monitor_local()` if using `crew_controller_local()`) to verify that all `crew` workers are terminated.

# Load testing

If the informal testing succeeded, we recommend you scale up testing to more ambitious scenarios. As one example, you can test that your workers can auto-scale and quickly churn through a large number of tasks.

```r
library(crew)
controller <- crew_controller_custom(
  seconds_idle = 2L,
  workers = 2L
)
controller$start()

# Push 100 tasks
for (index in seq_len(100L)) {
  name <- paste0("task_", index)
  controller$push(name = name, command = index, data = list(index = index))
  message(paste("push", name))
}

# Wait for the tasks to complete.
controller$wait(mode = "all")

# Do the same for 100 more tasks.
for (index in (seq_len(100L) + 100L)) {
  name <- paste0("task_", index)
  controller$push(name = name, command = index, data = list(index = index))
  message(paste("push", name))
}
controller$wait(mode = "all")

# Collect the results.
results <- controller$collect()

# Check the results
all(sort(unlist(results$result)) == seq_len(200L))
#> [1] TRUE

# View the controller summary.
controller$summary()

# Terminate the controller.
controller$terminate()

# Now outside crew, verify that all the
# crew workers successfully terminated.
```

# Managing workers

There are safeguards to make sure workers terminate when the controller terminates or the parent R session exits.
However, these safeguards are based on network connections and operating system signals, which are not guaranteed to work in all cases.
It is the user's responsibility to monitor and manage `crew` workers.
To make the user's job easier, it is good practice to implement job management utilities to go along with your launcher plugin.
The `crew` ecosystem implements "monitor" objects such as [`crew_monitor_local()`](https://wlandau.github.io/crew/reference/crew_class_monitor_local.html) and [`crew_monitor_aws_batch()`](https://wlandau.github.io/crew.aws.batch/reference/crew_class_monitor_aws_batch.html) to help users list and terminate workers, as well as view logs.

The essence of the local monitor is copied below:

```r
crew_monitor_local <- function() {
  crew_class_monitor_local$new()
}

crew_class_monitor_local <- R6::R6Class(
  classname = "crew_class_monitor_local",
  cloneable = FALSE,
  public = list(
    workers = function() {
      crew_monitor_pids(pattern = "crew::crew_worker")
    },
    terminate = function(pids) {
      lapply(as.integer(pids), crew::crew_terminate_process)
    }
  )
)

crew_monitor_pids <- function(pattern) {
  processes <- ps::ps()
  commands <- map(
    processes$ps_handle,
    ~tryCatch(ps::ps_cmdline(.x), error = function(condition) "")
  )
  filter <- grepl(pattern = pattern, x = as.character(commands), fixed = TRUE)
  as.integer(sort(processes$pid[filter]))
}
```

Example usage:

```r
monitor <- crew_monitor_local()
monitor$workers()
#> [1] 57001 57002
monitor$terminate(pids = c(57001, 57002))
monitor$workers()
#> integer(0)
```
