Tutorial: process SafeGraph patterns files in parallel with R: mclapply & data.table

Hi all,

Since the Dewey API doesn’t allow selection by geography, it can be challenging to track a few places over time, since one must read & process all available patterns for each time-point. In my case, I’m reading weekly patterns for the whole US, which is about 250GB/yr of compressed CSVs.

For this tutorial, I downloaded each week’s pattern files into a separate directory. I don’t show the code for that part (fwiw, I used GNU parellel & curl - if there’s interest, I can make a separate tutorial on that). The following code finds those directories and then, for each directory, reads each (compressed) pattern file, subsets out select columns, and combines the files into one compressed fst file for that directory/week.

In total, I end up with one file per week (~30x decrease), faster loads & searches, and reduce disk space by a factor of ~10x. An added benefit: the resulting fst files can be mapped from disk directly to R tables, allowing for some subset operations to occur without reading the files into RAM.

I’ve used a SLURM cluster to speed up this task (see code comments), but this example can be run on a modern desktop 8+ core workstation in one day.

The following assumes installation of the data.table, wrapr, and fst packages.

## read Safegraph weekly patterns
## run once to install dependencies: 
# install.packages(c('data.table', 'wrapr', 'fst'))

library(data.table)
library(wrapr)
library(fst)
## included in base R
library(parallel)

## Set cores manualy
.ncore <- 8

## On SLURM use:
#.ncore <- as.numeric(Sys.getenv("SLURM_CPUS_PER_TASK"))

## config threading 
threads_fst(.ncore/2)
setDTthreads(.ncore/4)

## change paths to match your config
## data directory, contains one dir per week
## e.g.: 2019-01-07, 2019-01-14, 2019-01-21
.indir <- 'sg-week/'
 
## output dir, writes one file per week
.outdir <- 'sg-week-out/'

## process a subset of dirs in one go
## all of 2019:
.weeks <- dir(.indir, patt='2019-.*')

## or 2021-2022 
# .weeks <- dir(.indir, patt='202[12]-.*')

## Keep a subset of columns
.col.keep <- c('placekey', 'parent_placekey', 'iso_country_code', 'region', "poi_cbg", 
    'naics_code', 'wkt_area_sq_meters', 'opened_on', 'closed_on',
    'date_range_start',  'date_range_end', 
    "raw_visit_counts", "raw_visitor_counts", "visits_by_day", "visits_by_each_hour", 
    "visitor_home_cbgs", 'median_dwell', 'bucketed_dwell_times'
)

## for mclapply, stop on lost workers (SLURM memory limits, etc) 
options(warn=2)

## Process each dir in serial
lapply(.weeks, function(.wk) {
    cat("\n## Processing ", .wk, '\n')

    ## list of filenames in this dir
    fnl <- dir(
        paste0(.indir, .wk), 
        ## SG filenames
        pattern='core_poi-geometry-patterns',
        full.names=T
    )
    ## empty dir
    if (length(fnl)==0) return(NULL)

    ## read all files in parallel 
    .dat <- mclapply(fnl, mc.cores=.ncore, function(x) {
        cat(x, '\n'); 
        data.table::fread(
            cmd=paste('zcat', x),
            showProgress=F,
            ## drop=... also available
            select=.col.keep
        ) %.>% .[!(is.na(date_range_start) | is.na(raw_visit_counts))]  ## exclude missing
    }) %.>% rbindlist(.)  ## combine into one table

    ## write out results, max compression
    write.fst(.dat, 
        compress=100,
        ## input dir name used in output filename 
        path=sprintf('%s/sg-week.%s.fst', .outdir, .wk)
    )
   ## cleanup to reduce memory footprint
    rm(.dat)
    ## Not sure this is needed...
    gc()
})

Notes

I expect the above will also work for monthly patterns, except for changes to the column names in .col.keep and the filename used for pattern="core_poi-geometry-patterns".

Windows doesn’t support parallell::mclapply. You can replace it with lapply for a single-thread version, or use parLapply (more info here).

You may see modest speed improvements by enabling OMP-based threading in R (used by fread and fst.write but unrelated to mclapply). I have the following chunk in my .zshrc to set the environment variable used by R based on the SLURM session:
export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK

Refs:

2 Likes

@Christian_Gunning_University_of_Georgia this is awesome! Thanks for putting it together and sharing with the community, very helpful.