Light loggers are often deployed in bulk. In order to speed up processing you can easily convert a standard serial {dplyr} based processing to a parallel one using the {multidplyr} package. To demonstrate this I’ll use a fake dataset with two loggers (using the included demo data).
For this exercise you will need {skytrackr} and {multidplyr} libraries loaded. I use the included demo dataset cc876
and load it into two different data frames, while renaming the logger in one of them. I then merge both data frames. This will create a dataset with two loggers (although containing the same information), suitable to demonstrate parallel processing.
library(skytrackr)
library(dplyr)
library(multidplyr)
# creating a fake dataset
# by duplicating data and
# renaming the logger
df1 <- skytrackr::cc876
df1$logger <- "CC888"
df2 <- skytrackr::cc876
df <- bind_rows(df1,df2)
Next I’ll define the cluster setup using the new_cluster()
function. In general you can safely use all but one of your cores on your local machine (n - 1). To detect the number of cores on your machine you can use parallel::detectCores()
. In most modern computers there are at least four (4) local cores. In our case, with two datasets to process, one core will go unused. If more data is presented it will be distributed over the available cores.
# detect number of cores automatically
# n <- parallel::detectCores() - 1
# in this case I force them to two (2)
n <- 2
# create a new cluster
cluster <- new_cluster(n)
Since every CPU processes the data in isolation you need to explicitly specify the libraries you wish to use. In this case, we provide the cluster with the {skytrackr} library (and its dependencies).
With the cluster details specified I can now partition the data for distribution to the different CPUs. The partitioning of data is done by the standard {dplyr} group_by()
function, followed by the {multidplyr} partition()
function using the cluster specifications as an argument. This will transform the tibble
data frame into a partitioned data frame, or party_df
.
# split tasks by logger
# across cluster partitions
df_logger <- df |>
group_by(logger) |>
partition(cluster)
print(df_logger)
#> Source: party_df [17,280 x 6]
#> Groups: logger
#> Shards: 2 [8,640--8,640 rows]
#>
#> # A data frame: 17,280 × 6
#> logger date_time date hour measurement value
#> <chr> <dttm> <date> <dbl> <chr> <dbl>
#> 1 CC876 2021-08-02 00:04:10 2021-08-02 0.0694 lux 0.08
#> 2 CC876 2021-08-02 00:09:10 2021-08-02 0.153 lux 0.08
#> 3 CC876 2021-08-02 00:14:10 2021-08-02 0.236 lux 0.08
#> 4 CC876 2021-08-02 00:19:10 2021-08-02 0.319 lux 0.08
#> 5 CC876 2021-08-02 00:24:10 2021-08-02 0.403 lux 0.08
#> 6 CC876 2021-08-02 00:29:10 2021-08-02 0.486 lux 0.08
#> # ℹ 17,274 more rows
Finally, after the setup we can now call the main parallel routine to be executed. The setup follows the routine specified in the main README with a few exceptions. Mainly, the generation of the mask, step-selection function and setting the random seed must be included in the do()
statement. The parallel sessions do not have access to shared memory. As such, we have to define the mask, step-selection function and random seed for each run (logger) separately.
# run the analysis in parallel
# on the cluster (local or remote)
locations <- df_logger |>
group_by(logger) |>
do({
# set seed per parallel unit
set.seed(1)
# define land mask
mask <- stk_mask(
bbox = c(-20, -40, 60, 60),
buffer = 150, # in km
resolution = 0.5 # in degrees
)
# define land mask with a bounding box
# and an off-shore buffer (in km), in addition
# you can specifiy the resolution of the resulting raster
mask <- stk_mask(
bbox = c(-20, -40, 60, 60), #xmin, ymin, xmax, ymax
buffer = 150, # in km
resolution = 0.5 # map grid in degrees
)
# define a step selection distribution
ssf <- function(x, shape = 0.9, scale = 100, tolerance = 1500){
# normalize over expected range with km increments
norm <- sum(stats::dgamma(1:tolerance, shape = shape, scale = scale))
prob <- stats::dgamma(x, shape = shape, scale = scale) / norm
return(prob)
}
skytrackr(
.data,
mask = mask,
step_selection = ssf,
plot = FALSE,
verbose = FALSE,
start_location = c(51.08, 3.73),
tolerance = 1500, # in km
scale = log(c(0.00001, 50)),
range = c(0.09, 148),
control = list(
sampler = 'DEzs',
settings = list(
burnin = 250,
iterations = 3000,
message = FALSE
)
)
)
})
The output of the parallel run is a party_df
data frame, which is incompatible with {skytrackr} functions. A simple coversion back to a tibble
data frame is possible by calling the as.data.frame()
function (dropping {multidplyr} ancillary data), making the data compatible with for example stk_map()
.