---
title: "Introduction to crew"
output: rmarkdown::html_vignette
vignette: >
  %\VignetteIndexEntry{Introduction to crew}
  %\VignetteEngine{knitr::rmarkdown}
  %\VignetteEncoding{UTF-8}
---

`crew` is a distributed computing framework with a centralized interface and auto-scaling. A `crew` controller is an object in R which accepts tasks, returns results, and launches workers. Workers can be local processes, jobs on traditional clusters such as SLURM, or jobs on cloud services such as AWS Batch, depending on the [launcher plugin](plugins.html) of the controller.

# Tasks vs workers

A *task* is a piece of R code, such as an expression or a function call. A *worker* is a [non-interactive](https://stat.ethz.ch/R-manual/R-devel/library/base/html/interactive.html) R process that runs one or more tasks. When tasks run on workers, the local R session is free and responsive, and work gets done faster. For example, [this vignette](shiny.html) shows how `crew` and [`mirai`](https://mirai.r-lib.org/) work together to speed up [Shiny](https://rstudio.github.io/shiny/) apps.

# How to use `crew`

First, create a controller object to manage tasks and workers.

```r
library(crew)
controller <- crew_controller_local(
  name = "example",
  workers = 2,
  seconds_idle = 10
)
```

Next, start the controller to create the [`mirai`](https://mirai.r-lib.org/) client. Later, when you are done with the controller, call `controller$terminate()` to clean up your resources.

```r
controller$start()
```

Use `push()` to submit a new task and `pop()` to return a completed task.

```r
controller$push(name = "get pid", command = ps::ps_pid())
```

As a side effect, methods `push()`, `pop()`, and `scale()` also launch workers to run the tasks. However, by default^[when the `throttle` argument is `TRUE`] workers only launch at certain times. This is for efficiency: it avoids the slowdown of frequent system calls, and it gives plugins like `crew.cluster` enough time to collect tasks for large efficient job arrays. To ensure workers are launched, either set the `throttle` argument to `FALSE` or repeat calls to `pop()`, `collect()`, or `scale()`.

```r
controller$pop() # No workers started yet and the task is not done.
#> NULL

task <- controller$pop() # Worker started, task complete.
task
#> # A tibble: 1 × 13
#>   name    command    result status error  code trace warnings seconds  seed
#>   <chr>   <chr>      <list> <chr>  <chr> <int> <chr> <chr>      <dbl> <int>
#> 1 get pid ps::ps_pi… <int>  succe… NA        0 NA    NA             0    NA
#> # ℹ 3 more variables: algorithm <chr>, controller <chr>, worker <chr>
```

Alternatively, `wait()` is a loop that repeatedly checks tasks and launches workers until all tasks complete, and it also launches workers as needed.

```r
controller$wait(mode = "all")
```

The return value of the task is in the `result` column.

```r
task$result[[1]] # return value of the task
#> [1] 69631
```

See the [reference](../reference/crew_class_controller.html#returns-18) for the full list of output in the `task` object returned by `pop()`.

# Synchronous functional programming

The [`map()`](../reference/crew_class_controller.html#method-crew_class_controller-map) method of the controller supports functional programming similar to [`purrr::map()`](https://purrr.tidyverse.org/reference/map.html) and [`clustermq::Q()`](https://mschubert.github.io/clustermq/reference/Q.html). The arguments of [`map()`](../reference/crew_class_controller.html#method-crew_class_controller-map) are mostly the same those of [`push()`](../reference/crew_class_controller.html#method-crew_class_controller-push), but there is a new `iterate` argument to define the inputs of individual tasks. [`map()`](../reference/crew_class_controller.html#method-crew_class_controller-map) submits a whole collection of tasks, auto-scales the workers, waits for all the tasks to finish, and returns the results in a `tibble`.

Below, [`map()`](../reference/crew_class_controller.html#method-crew_class_controller-map) submits one task to compute `1 + 2 + 5 + 6` and another task to compute `3 + 4 + 5 + 6`. The lists and vectors inside `iterate` vary from task to task, while the elements of `data` and `globals` stay constant across tasks.

```r
results <- controller$map(
  command = a + b + c + d,
  iterate = list(
    a = c(1, 3),
    b = c(2, 4)
  ),
  data = list(c = 5),
  globals = list(d = 6)
)

results
#> # A tibble: 2 × 13
#>   name       command result status error  code trace warnings seconds  seed
#>   <chr>      <chr>   <list> <chr>  <chr> <int> <chr> <chr>      <dbl> <int>
#> 1 unnamed_t… a + b … <dbl>  succe… NA        0 NA    NA             0    NA
#> 2 unnamed_t… a + b … <dbl>  succe… NA        0 NA    NA             0    NA
#> # ℹ 3 more variables: algorithm <chr>, controller <chr>, worker <chr>

as.numeric(results$result)
#> [1] 14 18
```

If at least one task in [`map()`](../reference/crew_class_controller.html#method-crew_class_controller-map) throws an error, the default behavior is to error out in the main session and not return the results, If that happens, the results are available in the `controller$error`. To return the results instead of setting `controller$error`, regardless of error status, set `error = "warn"` or `"silent"` in [`map()`](../reference/crew_class_controller.html#method-crew_class_controller-map). To conserve memory, consider setting `controller$error <- NULL` when you are done troubleshooting.

# Asynchronous functional programming

The [`walk()`](../reference/crew_class_controller.html#method-crew_class_controller-walk) method is just like `map()`, but it does not wait for any tasks to complete. Instead, it returns control to the local R session immediately and lets you do other things while the tasks run in the background.

```r
controller$walk(
  command = a + b + c + d,
  iterate = list(
    a = c(1, 3),
    b = c(2, 4)
  ),
  data = list(c = 5),
  globals = list(d = 6)
)
```

The [`collect()`](../reference/crew_class_controller.html#method-crew_class_controller-collect) pops all completed tasks. Put together, `walk()`, `wait(mode = "all")`, and `collect()` have the same overall effect as `map()`.

```r
controller$wait(mode = "all")

controller$collect()
#> # A tibble: 2 × 13
#>   name       command result status error  code trace warnings seconds  seed
#>   <chr>      <chr>   <list> <chr>  <chr> <int> <chr> <chr>      <dbl> <int>
#> 1 unnamed_t… a + b … <dbl>  succe… NA        0 NA    NA             0    NA
#> 2 unnamed_t… a + b … <dbl>  succe… NA        0 NA    NA             0    NA
#> # ℹ 3 more variables: algorithm <chr>, controller <chr>, worker <chr>
```

However, there are subtle differences between the synchronous and asynchronous functional programming methods:

1. `map()` requires an empty controller to start with (no prior tasks). But with `walk()`, the controller can have any number of running or unpopped tasks beforehand.
2. `wait()` does not show a progress bar because it would be misleading if there are a lot of prior tasks. Because `map()` requires the controller to be empty initially (i.e. (1)), it shows a progress bar while correctly representing the amount of work left to do.

# Summaries

The controller summary shows how many tasks were completed and popped, how many total seconds the workers spent running tasks, how many tasks threw warnings or errors, etc.

```r
controller$summary()
#> # A tibble: 1 × 8
#>   controller tasks seconds success error crash cancel warning
#>   <chr>      <int>   <dbl>   <int> <int> <int>  <int>   <int>
#> 1 example        5       0       5     0     0      0       0
```

# Termination

The `terminate()` method signals the workers to close and severs the connection.

```r
controller$terminate()
```

Alternatively, closing the local R session also terminates the workers.

# Monitoring local processes

`crew` controllers launch workers to run tasks.
Although `crew` and `mirai` try to clean up superfluous worker processes, cleanup is not guaranteed to succeed.
It is the user's responsibility to monitor and manage worker processes.
For workers launched through `crew_controller_local()`, you can list and terminate worker processes with the `crew_monitor_local()` object:

```r
monitor <- crew_monitor_local()
monitor$workers()
#> [1] 57001 57002
monitor$terminate(pid = c(57001, 57002))
monitor$workers()
#> integer(0)
```

`crew_monitor_local()` only manages processes running on your local computer.'
Other [launcher plugins](plugins.html), as in [`crew.cluster`](https://wlandau.github.io/crew.cluster/) and [`crew.aws.batch`](https://wlandau.github.io/crew.aws.batch/), support monitors of their own.

# Tuning and auto-scaling

As explained above, `push()`, `pop()`, and `wait()` launch new workers to run tasks. The number of new workers depends on the number of tasks at the time. In addition, workers can shut themselves down as work completes. In other words, `crew` automatically raises and lowers the number of workers in response to fluctuations in the task workload.

The most useful arguments for down-scaling, in order of importance, are:

1. `seconds_idle`: shut down a worker if it spends too long waiting for a task.
2. `tasks_max`: shut down a worker after it completes a certain number of tasks. This argument also determines how frequently the controller auto-scales (see auto-scaling section of `crew_throttle()` for details).
3. `seconds_wall`: soft wall time of a worker.

Please tune these these arguments to achieve the desired balance for auto-scaling. The two extremes of auto-scaling are [`clustermq`](https://mschubert.github.io/clustermq/)-like *persistent workers* and [`future`](https://future.futureverse.org/)-like *transient workers*, and each is problematic in its own way.

1. *Persistent workers*: a persistent worker launches once, typically runs many tasks, and stays running for the entire lifetime of the controller. Persistent workers minimize overhead and quickly complete large numbers of short tasks. However, they risk spending too much time in an idle state if there are no tasks to run. Excessive idling wastes resources, which could impact your colleagues on a shared cluster or drive up costs on Amazon Web Services.
2. *Transient workers*: a transient worker terminates as soon as it completes a single task. Each subsequent task requires a new transient worker to run it. Transient workers avoid excessive idling, but frequent worker launches cause significant overhead and slows down the computation as a whole.

# Crashes and retries
 
In rare cases, a worker may exit unexpectedly before it completes its current task.
This could happen for any number of reasons: for example, the worker could run out of memory, the task could cause a segmentation fault, the [AWS Batch](https://wlandau.github.io/crew.aws.batch/) spot instance could exit because of a spike in price, etc. To troubleshoot, it is best to consult the worker log files and [`autometric` resource logs](logging.html).
 
If a worker crashes, the task will return a status of `"crashed"` in `pop()` (and `collect()` and `map()`).
You can simulate a crash for yourself:
 
```r
library(crew)
controller <- crew_controller_local(name = "my_controller", crashes_max = 5L)
controller$start()

# Submit a task and wait for it to start.
controller$push(command = Sys.sleep(300), name = "my_task")
Sys.sleep(2)

# Terminate the worker in the middle of the task.
controller$launcher$terminate_workers()

# Wait for the task to resolve.
controller$wait()

# Retrieve the result.
task <- controller$pop()
task[, c("name", "result", "status", "error", "code", "controller")]
#> # A tibble: 1 × 6
#>   name    result    status error                  code controller   
#>   <chr>   <list>    <chr>  <chr>                 <int> <chr>        
#> 1 my_task <lgl [1]> crash  19 | Connection reset    19 my_controller
```

In the event of a crash like this one, you can choose to abandon the workflow and troubleshoot, or you can choose to retry the task on a different (possibly new) worker.
Simply push the task again and use the same task name.^[As of `targets` >= 1.10.0.9002, `targets` pipelines automatically retry tasks whose workers crash.]

```r
controller$push(command = Sys.sleep(300), name = "my_task")
```

The controller enforces a maximum number of retries, given by the `crashes_max` argument to `crew_controller_local()`. If a task's worker crashes more than `crashes_max` times in a row in the same controller, then a subsequent `pop()` (or `collect()` or `map()`) throws an informative error:

```r
controller$pop()
#> Error:
#> ! the crew worker of task 'my_task' crashed 6 consecutive time(s)
#> in controller 'my_controller'.
#> For details and advice, please see the crashes_max argument of
#> crew::crew_controller(), as well as
#> risks.html#crashes and
#> logging.html.
```

With multiple controllers in a [controller group](groups.html), you can configure tasks to run in a different controller if the task crashes in the original controller `crashes_max` times. For details, see the [controller group vignette](groups.html).
