Safegraph Spark commands into two scripts that aggregate all of the social distancing raw files and generate county-level summaries by day for device count, completely home device count, leaving home device count, and percent leaving home

I was never able to get Jupyter running properly, but I was able to adapt the Safegraph Spark commands into two scripts that aggregate all of the social distancing raw files and generate county-level summaries by day for device count, completely home device count, leaving home device count, and percent leaving home. Sharing below in case anyone needs them.

sdm_spark_raw = spark.read.csv("C:\\data\\SafeGraph\\sg-social-distancing\\2020\\*\\*\\*.csv.gz", header=True, escape="\"", inferSchema=False)
sdm_spark = sdm_spark_raw.select("origin_census_block_group", "date_range_start", "date_range_end", "device_count", "completely_home_device_count", "part_time_work_behavior_devices", "full_time_work_behavior_devices")

sdm_[spark.to](http://spark.to)_csv('C:\\data\\SafeGraph\\sg-social-distancing\\consolidated.csv')```
and then...

```# process consolidated file(s)
sdm_df = spark.read.csv("C:\\data\\SafeGraph\\sg-social-distancing\\consolidated\\*.csv", header=True, escape="\"", inferSchema=False).toPandas()
cbg_fips_codes = spark.read.csv("C:\\data\\SafeGraph\\sg-open-census-data\\metadata\\cbg_fips_codes.csv", header=True).toPandas()
cbg_fips_codes['county_fips'] = cbg_fips_codes.state_fips + cbg_fips_codes.county_fips

# convert numerical columns
int_columns = ['device_count', 'completely_home_device_count']
for int_col in int_columns:
  sdm_df[int_col] = sdm_df[int_col].astype('int')

#datetime columns
sdm_df['date_start'] = sdm_df.date_range_start.str.slice(start=0, stop=10)

import pandas as pd 
sdm_df['dt'] = [pd.to](http://pd.to)_datetime(sdm_df['date_start'])
sdm_df['week'] = sdm_df.dt.dt.week 

# join county_fips for county names and states
sdm_df['county_fips'] = sdm_df.origin_census_block_group.str.slice(start=0, stop=5) # county is the first 5 digits of the CBG
sdm_df = sdm_df.merge(cbg_fips_codes, on='county_fips', how='left')

sdm_columns = ['device_count', 'completely_home_device_count']  # 'part_time_work_behavior_devices', 'full_time_work_behavior_devices'
geo_groupby = 'county_fips'

by_countyday = sdm_df.groupby([geo_groupby, 'dt', 'date_start'])[sdm_columns].sum().sort_values(by=[geo_groupby, 'dt', 'date_start'], ascending=True).reset_index()

# compute new metrics
by_countyday['leaving_home'] = by_countyday['device_count'] - by_countyday['completely_home_device_count']
by_countyday['pct_leaving_home'] = by_countyday['leaving_home'] / by_countyday['device_count'] * 100

by_[countyday.to](http://countyday.to)_csv('C:\\data\\SafeGraph\\processed\\dailystats.csv')```
Please note: you will either have to mirror my local filepaths or update the script to use your own paths. I am NOT a Spark expert (this is my first time!), so I won't be too much help in debugging your implementation of this. YMMV. Good luck!