For a recent project, I was browsing some food data from Open Food Facts. I haven't had a chance to really dig into it yet, but it seems like an interesting data source and a decent dataset to help me get more comfortable playing around with spark.
Their data can be downloaded in a few different forms including Mongo dump, RDF, CSV, Android and iPhone apps, to name a few. The CSV file is a bit of a beast at ~205MB at the time of writing. CSV is nice for its simplicity but a columnar store like parquet offers much better performance for most types of analysis. Coupled with compression, using parquet should shorten up that file quite a bit.
Reading the CSV
Databricks has a nice library called spark-csv that, believe it or not, reads and writes CSVs in spark. There is an option to infer the schema from the CSV but when I did that with the open-food-facts data, some of the field types were misinterpreted. As a simple solution, I went ahead and assumed all 159 fields were strings:
import org.apache.spark.sql.types.{StructType,StructField,StringType};
val header = sc.textFile("/tmp/en.openfoodfacts.org.products.csv").first
val schema =
StructType(
header.split("\t").map(fieldName => StructField(fieldName, StringType, true)))
From there, spark-csv does all the work:
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("delimiter", "\t")
.schema(schema)
.load("/tmp/en.openfoodfacts.org.products.csv")
Once the dataframe (df) is loaded up, writing it as a parquet file is straightforward. Before we do that though, we need to pick a compression format. Spark SQL has a config that lets us choose between uncompressed, snappy, gzip, and lzo.
sqlContext.setConf("spark.sql.parquet.compression.codec","gzip")
For fun, I compared the uncompressed, gzip, and snappy output sizes. The gzip parquet yieleded a file of 19MB, 33MB for snappy, and 112MB uncompressed. The gzip version can be found here and the code I used can be found in this gist.