Base API and schedulers
The main worker functions are wrapped in an R6
class with the name of QSys
. This provides a standardized API to the lower-level messages that are sent via ZeroMQ
.
The base class itself is derived in scheduler classes that add the required functions for submitting and cleaning up jobs:
+ QSys
|- Multicore
|- LSF
+ SGE
|- PBS
|- Torque
|- etc.
A pool of workers can be created using the workers()
function, which instantiates an object of the corresponding QSys
-derived scheduler class. See ?workers
for details.
# start up a pool of three workers using the default scheduler
w = workers(n_jobs=3)
# if we make an unclean exit for whatever reason, clean up the jobs
on.exit(w$finalize())
Worker startup
For workers that are started up via a scheduler, we do not know which machine they will run on. This is why we start up every worker with a TCP/IP address of the master socket that will distribute work.
This is achieved by the call to R common to all schedulers:
R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
On the master’s side, we wait until a worker connects:
# this will block until a worker is ready
msg = w$receive_data()
Common data and exports
Workers will start up without any knowledge of what they should process or how. In order to transfer initial data to the worker, we first create and serialize a list object with the following fields:
fun
- the function to call with iterated data
const
- the constant data each function call should receive
export
- objects that will be exported to the workers’ .GlobalEnv
rettype
- character string which data type to return; e.g. list
, logical
common_seed
- random seed for function calls; will be offset by job ID
token
- character string to identify this data set; this is optional, if an automatically generated token will be returned if none is given
# create a reusable, serialized ZeroMQ object with the common data on the master
w$set_common_data(fun, const, export, rettype, common_seed, token)
Workers that connect to the master will send a list with a field token
. This can be used to check if the worker already received the common data it is supposed to work on.
if (msg$token != <token>)
w$send_common_data()
Iterated data
If the worker has already received the common data, we can send it a chunk of iterated arguments to work on. These are passed as a list of iterables, e.g. a data.frame
with a column for each iterated argument.
It also needs to have a column with name <space>id<space>
, which will be used to identify each call.
chunk = data.frame(arg1=1:5, arg2=5:1, ` id `=1:5)
w$send_job_data(chunk)
If the worker has finished processing, it will send a message with the field result
that is a list, containing:
result
- a named rettype with results
warnings
- a list with warning messages of individual calls
errors
- a list with error messages of individual calls
msg = w$receive_data()
if (!is.null(msg$result)) {
# store result here, handle errors/warnings if required
}
Custom calls
Apart from sending common and iterated data that the worker will process in chunks, it is also possible to send arbitrary calls that it will evaluate. It needs the following fields:
expr
- the expression to be evaluated
env
- list with all additional objects required to perform the call
ref
- an identifier for the call; will default to the expression itself
w$send_call(expr, env=list(...), ref="mycall1")
Main event loop
Putting the above together in an event loop, we get what is essentially implemented in master
.
w = workers(3)
on.exit(w$finalize())
while (we have new work to send) {
msg = w$receive_data()
if (!is.null(msg$result))
# handle result
if (msg$token != <token>)
w$send_common_data()
else
w$send_job_data(...)
}
# if proper cleanup is successful, cancel kill-on-exit
if (w$cleanup())
on.exit()
A loop of a similar structure can be used to extend clustermq
. As an example, this was done by drake
using common data and custom calls only (no iterated chunks).