The slurmR
package provides wrappers and tools for integrating R with the HPC workload manager Slurm. Overall, there are two different approaches to do so, either using Socket clusters, in essence, following the workflow of CRAN’s parallel
package, or using Job arrays, which are a different implementation of the same idea behind the par*apply
functions in the parallel
package, which, at times, can be more powerful.
Another important component of slurmR
is makeSlurmCluster
function. This allow users creating multi-node PSOCKCluster class objects. The implementation of this function, wrapper of parallel::makePSOCKcluster
, is very simple:
It submits a job to Slurm requesting the desired number of tasks. Each task will then return information regarding the node at which it is operating.
Once Slurm allocates the resources, the master R session (from which the job was submitted) will read in the node names returned by each task.
With the full list of nodenames in usage, makeSlurmCluster
will pass the list of names to parallel::makePSOCKcluster
, which ultimately creates the cluster
class object.
After creating the cluster object, the workflow is exactly the same as with the parallel
package. Here is an example from the makeSlurmCluster
manual
# Creating a cluster with 100 workers/offpring/child R sessions
cl <- makeSlurmCluster(100)
# Computing the mean of a 100 random uniforms within each worker
# for this we can use any of the function available in the parallel package.
ans <- parSapply(1:200, function(x) mean(runif(100)))
# We simply call stopCluster as we would do with any other cluster
# object
stopCluster(ans)
Whenever Slurm_lapply
, Slurm_sapply
, or Slurm_Map
are called, a lot of things happen under the hood. What the user does not see is the way in which slurmR
sets us a job and submits it to the queue.
Just like rslurm
, slurmR
has two levels of job distribution: first, Slurm Jobs, and second, within each job via parallel::mclapply
and parallel::mcMap
(task forking). In general, the function Slurm_*
is implemented as follows:
List whatever R packages are loaded, including the path to the R package.
List all the objects passed via ellipsis (...
), and, together with X
and FUN
or f
, save them at [tmp_path]/[job_name]/
as [object-name].rds
.
Write out the corresponding R script and Slurm bash file, and save them as [tmp_path]/[job_name]/00-rscript.r
, and [tmp_path]/[job_name]/01-bash.sh
respectively.
If plan = "collect"
(the default), the job will be submitted to the queue via sbatch()
, and the function will wait until is flagged as completed by Slurm.
Once sbatch()
is called, a Job Array will be submitted in which each R job will lunch up to mc.cores
forked processes (second layer of palatalization)
Once it is done, the the results can be collected using Slurm_collect
, which happens automatically if the user set plan = "collect"
.
The next section discusses some advantages of submitting jobs using socket clusters versus job arrays.
While socket clusters, created via makePSOCKcluster
or, in the case of slurmR, via makeSlurmCluster
, may be more efficient in terms of data communication1, using job arrays has some important benefits over socket cluster:
The number of workers can be much higher than clusters with the parallel package.2 Users needing to work with hundreds or thousands of jobs/instances may need to use job arrays instead.
If part of the job fails due to a failure of one of the tasks in the array, the job can be easily resubmitted. The same is not necessarily true for socket clusters.
Job arrays can run independently from the main session that started the job. This means that, if for some reason the main session crashes or stops, the job arrays will continue working regardless, and what’s more, the results can be collected anyway.
We would like to implement a simulation algorithm to be run in a cluster. In this case, we have the very simple function we would like to parallelize:
This simple function generates an estimate of Pi. This approximation is based on the following observation
\[ \mbox{Area} = \pi\times r^2 \implies \frac{Area}{r^2} = \pi \]
Since we know what \(r\) is, we just need to get an estimate of the Area to obtain an approximation of \(\pi\). A rather simple way of doing so is with Monte Carlo simulations, in particular, sampling points in a unit square. The proportion of points that fall within the unit circle, i.e. the proportion of points whose distance to the origin is smaller than the radius of the circle, has an expected value equal to the area of its circumscribed circle (for more details, check out the Wikipedia article about this topic here).
Using parallel::mclapply
, we could just type
Which estimates pi using a single node(computer).
In the case of job arrays, we can use the Slurm_lapply
function implemented in the package. Before submitting a job to the queue, we need to specify some options that are needed to create it:
tmp_path
: A path to a directory to which all computing nodes of the cluster have read+write access.
job_name
: The name of the job, passed to sbatch
via the job-name
flag. This will also be used as the name of the folder that is created within tmp_path
.
Ultimately, all the objects saved by the job will be located in the path defined by tmp_path
/job_name
.
library(slurmR)
# Setting required parameters
opts_slurmR$set_tmp_path("/stagging/slurmr-jobs/")
opts_slurmR$set_job_name("simulating-pi")
Moreover, we can specify more options to be set as default options for all the jobs submitted for the current session. For example, we can set the default partition and account as follows:
# Optional parameters are set via set_opts
opts_slurmR$set_opts(partition="conti", account="lc_dvc")
A comprehensive list of options can be found here. To see what are all the current defaults, we can just print the opts_slurmR
object:
opts_slurmR
##
## Options for sbatch (Slurm workflow):
## partition : conti
## account : lc_dvc
## job-name : simulating-pi
##
## Preamble:
## n/a
##
## Other options (R workflow):
## tmp_path : /stagging/slurmr-jobs/
## cmd : sbatch
## verbose : FALSE
## debug : FALSE
##
## To get and set options for Slurm jobs creation use (see ?opts_slurmR):
##
## debug_off : function ()
## debug_on : function ()
## get_cmd : function ()
## get_debug : function ()
## get_job_name : function (check = TRUE)
## get_opts_job : function (...)
## get_opts_r : function (...)
## get_preamble : function ()
## get_tmp_path : function ()
## get_verbose : function ()
## reset : function ()
## set_job_name : function (name)
## set_opts : function (...)
## set_preamble : function (...)
## set_tmp_path : function (path = Sys.getenv("SLURMR_TMP_PATH", getwd()))
## verbose_off : function ()
## verbose_on : function ()
Once we have specified all the needed options, we can do our Slurm_lapply
call and submit the job to the queue as follows:
If plan = "wait"
, then Slurm_lapply
will return once the job is done (or failed). To collect the results we can use the Slurm_collect
function:
Alternatively, we could have collected the results on the fly by telling slurmR
that the plan is to "collect"
the results:
ans <- Slurm_lapply(rep(1e6, 100), simpi, njobs=10, mc.cores=10, plan = "collect")
mean(unlist(job))
This way Slurm_lapply
will do the Slurm_collect
call before returning.
Another way to do this is using parallel::parLapply
with a multi-node socket cluster.3 To do this, we can use the makeSlurmCluster
function and proceed as follows:
Once we are done with the calculations, we can stop the cluster object by simply calling the stopCluster
function:
And slurmR
will kill the job (and thus, the socket connections) calling scancel
.
Data transfering on Socket clusters is done using serialization with the serialize
and unserialize
functions. This way, data is sent directly through the connection. In the case of job arrays, data is sent using saveRDS
and readRDS
which involves I/O on the disk.↩
The current default configuration of R does not allow having more than 128 connections simulatenously (see ?connection
). This can be changed during installation time.↩
In general, Slurm will try to allocate multiple tasks in the same node (machine). But if no node with that many resources is available, the tasks will span multiple nodes.↩