mirai - Plumber Integration

Plumber Integration

mirai may be used as an asynchronous / distributed backend for plumber pipelines.

Example usage is provided below for different types of endpoint.

Example GET Endpoint

The plumber router code is run in a daemon process itself so that it does not block the interactive process.

The /echo endpoint takes a GET request, sleeps for 1 second (simulating an expensive computation) and simply returns the ‘msg’ request header together with a timestamp and the process ID of the process it is run on.

library(mirai)

# important to supply SIGINT so the plumber server is interrupted and exits
# cleanly when torn down
daemons(1L, dispatcher = FALSE, autoexit = tools::SIGINT)
#> [1] 1

m <- mirai({
  library(plumber)
  library(promises) # to provide the promise pipe
  library(mirai)

  # does not use dispatcher (suitable when all requests require similar compute)
  daemons(4L, dispatcher = FALSE) # handles 4 requests simultaneously

  pr() |>
    pr_get(
      "/echo",
      function(req, res) {
        mirai(
          {
            Sys.sleep(1L)
            list(status = 200L, body = list(time = format(Sys.time()),
                                            msg = msg,
                                            pid = Sys.getpid()))
          },
          msg = req[["HEADERS"]][["msg"]]
        ) %...>% (function(x) {
          res$status <- x$status
          res$body <- x$body
        })
      }
    ) |>
    pr_run(host = "127.0.0.1", port = 8985)
})

The API can be queried using an async HTTP client such as nanonext::ncurl_aio().

Here, all 8 requests are submitted at once, but we note that that responses have differing timestamps as only 4 can be processed at any one time (limited by the number of daemons set).

library(nanonext)
res <- lapply(1:8,
              function(i) ncurl_aio("http://127.0.0.1:8985/echo",
                                    headers = c(msg = as.character(i))))
collect_aio(res)
#> [[1]]
#> [1] "{\"time\":[\"2024-05-28 09:21:40\"],\"msg\":[\"1\"],\"pid\":[20466]}"
#> 
#> [[2]]
#> [1] "{\"time\":[\"2024-05-28 09:21:40\"],\"msg\":[\"2\"],\"pid\":[20511]}"
#> 
#> [[3]]
#> [1] "{\"time\":[\"2024-05-28 09:21:40\"],\"msg\":[\"3\"],\"pid\":[20556]}"
#> 
#> [[4]]
#> [1] "{\"time\":[\"2024-05-28 09:21:40\"],\"msg\":[\"4\"],\"pid\":[20601]}"
#> 
#> [[5]]
#> [1] "{\"time\":[\"2024-05-28 09:21:41\"],\"msg\":[\"5\"],\"pid\":[20511]}"
#> 
#> [[6]]
#> [1] "{\"time\":[\"2024-05-28 09:21:41\"],\"msg\":[\"6\"],\"pid\":[20466]}"
#> 
#> [[7]]
#> [1] "{\"time\":[\"2024-05-28 09:21:41\"],\"msg\":[\"7\"],\"pid\":[20601]}"
#> 
#> [[8]]
#> [1] "{\"time\":[\"2024-05-28 09:21:41\"],\"msg\":[\"8\"],\"pid\":[20556]}"

daemons(0)
#> [1] 0

Example POST Endpoint

Below is a demonstration of the equivalent using a POST endpoint, accepting a JSON instruction sent as request data.

Note that req$postBody should always be accessed in the router process and passed in as an argument to the ‘mirai’, as this is retrieved using a connection that is not serializable.

library(mirai)

# important to supply SIGINT so the plumber server is interrupted and exits cleanly when torn down
daemons(1L, dispatcher = FALSE, autoexit = tools::SIGINT)
#> [1] 1

m <- mirai({
  library(plumber)
  library(promises) # to provide the promise pipe
  library(mirai)

  # uses dispatcher (suitable for requests with differing compute lengths)
  daemons(4L, dispatcher = TRUE) # handles 4 requests simultaneously

  pr() |>
    pr_post(
      "/echo",
      function(req, res) {
        mirai(
          {
            Sys.sleep(1L) # simulate expensive computation
            list(status = 200L,
                 body = list(time = format(Sys.time()),
                             msg = jsonlite::fromJSON(data)[["msg"]],
                             pid = Sys.getpid()))
          },
          data = req$postBody
        ) %...>% (function(x) {
          res$status <- x$status
          res$body <- x$body
        })
      }
    ) |>
    pr_run(host = "127.0.0.1", port = 8986)
})

Querying the endpoint produces the same set of outputs as the previous example.

library(nanonext)
res <- lapply(1:8,
              function(i) ncurl_aio("http://127.0.0.1:8986/echo",
                                    method = "POST",
                                    data = sprintf('{"msg":"%d"}', i)))
collect_aio(res)
#> [[1]]
#> [1] "{\"time\":[\"2024-05-28 09:21:45\"],\"msg\":[\"1\"],\"pid\":[20739]}"
#> 
#> [[2]]
#> [1] "{\"time\":[\"2024-05-28 09:21:45\"],\"msg\":[\"2\"],\"pid\":[20742]}"
#> 
#> [[3]]
#> [1] "{\"time\":[\"2024-05-28 09:21:45\"],\"msg\":[\"3\"],\"pid\":[20751]}"
#> 
#> [[4]]
#> [1] "{\"time\":[\"2024-05-28 09:21:45\"],\"msg\":[\"4\"],\"pid\":[20744]}"
#> 
#> [[5]]
#> [1] "{\"time\":[\"2024-05-28 09:21:46\"],\"msg\":[\"5\"],\"pid\":[20742]}"
#> 
#> [[6]]
#> [1] "{\"time\":[\"2024-05-28 09:21:46\"],\"msg\":[\"6\"],\"pid\":[20744]}"
#> 
#> [[7]]
#> [1] "{\"time\":[\"2024-05-28 09:21:46\"],\"msg\":[\"7\"],\"pid\":[20739]}"
#> 
#> [[8]]
#> [1] "{\"time\":[\"2024-05-28 09:21:46\"],\"msg\":[\"8\"],\"pid\":[20751]}"

daemons(0)
#> [1] 0