2015-06-29

What is R?

The presentation is going to review R packages which can turn R into ETL processing engine.

Core packages for ETL

Extraction from databases

Extraction is performed by native drivers, JDBC or ODBC connection.

Some of supported databases:

Oracle, MySQL / MariaDB, PostgreSQL, SQLite, SQLserver, Teradata, Informix,
DB2, SAP HANA, SAP Adaptive Server (Sybase), Access, Vertica, 
Cassandra, MongoDB, HBase, Hive, and more...

Core packages for extraction, each of them unifies calls to particular drivers.

Extraction examples DBI

library(RPostgreSQL)
DBIconn <- dbConnect(PostgreSQL(), 
                     host="192.168.56.101", 
                     port="5432", 
                     dbname="rdb", 
                     user="ruser", 
                     password="userpassr")
dbGetQuery(DBIconn, "select * from prices limit 1")
#   prod_code month_code price
# 1        10     201506  21.5

Extraction examples RJDBC

library(RSQLServer)
# http://sqlblog.com/blogs/jamie_thomson/archive/2012/03/27/adventureworks2012-now-available-to-all-on-sql-azure.aspx
RJDBCconn <- dbConnect(SQLServer(), 
                       server="mhknbn2kdz.database.windows.net", 
                       database="AdventureWorks2012",
                       properties=list(user="sqlfamily", 
                                       password="sqlf@m1ly"))
dbGetQuery(RJDBCconn, "select top 1 ProductID, ProductNumber, Name from production.product;")
#   ProductID ProductNumber            Name
# 1         1       AR-5381 Adjustable Race

Extraction examples RODBC

library(RODBC)
RODBCconn <- odbcConnect(dsn="psql_rdb", # predefined dsn
                         uid="ruser",
                         pwd="userpassr")
sqlQuery(RODBCconn, "select * from prices limit 1")
#   prod_code month_code price
# 1        10     201506  21.5

Extraction from non-databases

Transformation

data.table

  • Great syntax for faster development
    FROM[ where, select|update, group ][ ... ]
  • Best performance and scalability
  • CPU and memory efficient

dplyr

  • Pipe-like syntax
    FROM %>% fun() %>% ...
  • Great performance
  • Can operate on remote data stores

Transformation examples

#    time_code prod_code quantity
# 1:  20150630        10      125
# 2:  20150630        12       24
# 3:  20150630        12       78
# 4:  20150631        10      131
# 5:  20150631        12      104
# 6:  20150701        10      119
# 7:  20150701        11      194
# 8:  20150701        10      102
# 9:  20150702        12       97
  • Filtering dataset on prod_code and selecting time_code and quantity
  • Select sum of quantity group by prod_code

data.table

library(data.table)
# thanks to auto index feature
# any subsequent filter on prod_code will be blazingly fast
DT[prod_code %in% 10:11, .(time_code, quantity)]
#    time_code quantity
# 1:  20150630      125
# 2:  20150631      131
# 3:  20150701      119
# 4:  20150701      194
# 5:  20150701      102
DT[, sum(quantity), prod_code]
#    prod_code  V1
# 1:        10 477
# 2:        12 303
# 3:        11 194

dplyr

library(dplyr)
DF <- as.data.frame(DT)
DF %>% 
    filter(prod_code %in% 10:11) %>% 
    select(time_code, quantity)
#   time_code quantity
# 1  20150630      125
# 2  20150631      131
# 3  20150701      119
# 4  20150701      194
# 5  20150701      102
DF %>% 
    group_by(prod_code) %>% 
    summarize(quantity = sum(quantity)) %>% 
    select(prod_code, quantity)
# Source: local data frame [3 x 2]
# 
#   prod_code quantity
# 1        10      477
# 2        11      194
# 3        12      303

Loading

DT object is your dataset in R session.

  • DBI
dbWriteTable(DBIconn, "output_dbi", DT, append=TRUE, row.names=FALSE)
  • RJDBC
# used community db is not writable
# dbWriteTable(RJDBCconn, "output_jdbc", DT, append=TRUE, row.names=FALSE)
  • RODBC
sqlSave(RODBCconn, DT, "output_odbc", append=TRUE, rownames=FALSE)

Higher level helper packages

Fully optional as they usually just simplify the code/logic you need to handle when working with core ETL packages

Database helpers

  • dplyr: data manipulation on remote DBI-compliant databases
  • ETLUtils: unify DBI, RJDBC, RODBC interface to R's ff object
  • dwtools: unify DBI, RJDBC, RODBC interface
  • db.r: schema exploration
  • RODBCext: Parameterized queries extension for RODBC

Transformation helpers

  • sqldf: query data.frames using SQL statements
  • statar: Stata-like data manipulation
  • splitstackshape: higher level wrappers for split, stack and shape
  • dwtools: batch join, EAV processing, MDX-like queries

Processing helpers

It is nice to monitor your processing. There are multiple logging packages.

  • logging: logging to console, file, sentry server
  • futile.logger: logging to files, console
  • logR: transactional logging to databases and email notifications
  • loggr: logging to files, console
  • logr: logging to files
  • dtq: detailed auditing of data.table queries

You can easily create your own logging as base R has good functions for that.

Working example

sources:

  • sales quantity data dimensioned by product and time (day) - multiple csv files
  • product prices data dimensioned by product and time (month) - postgres native driver

targets:

  • sales data including sales value - postgres odbc connection
  • rename source files with loaded timestamp prefix - multiple csv files

Using core ETL packages

etl <- function(sales.dir = getwd()){
    all.csv <- list.files(path = sales.dir, pattern = "\\.csv$")
    if(length(all.csv)==0L) return(FALSE) # no new files
    joinkey <- c("prod_code","month_code")
    sales <- rbindlist(lapply(all.csv, fread))[
        ][, month_code := substr(time_code,1L,6L)
          ][, .SD,, joinkey]
    prices <- setDT(dbGetQuery(DBIconn, "SELECT * FROM prices"), 
                    key = joinkey)
    sales[prices, value := quantity * price]
    sqlSave(RODBCconn, sales, "sales", append=TRUE, rownames=FALSE)
    file.rename(all.csv, paste(all.csv,paste0("loaded",as.character(Sys.time(),"%Y%m%d%H%M%S")), sep="."))
}
etl()
# disconnect from database if not using it anymore
dbDisconnect(DBIconn)
odbcClose(RODBCconn)

Enhancements and helpers

  • You can parallelize your code using built-in parallel package or foreach package. It is mostly useful for time consuming Extract or Load because Transform is blazingly fast in R. Simplest usage of parallel package is to change lapply calls into mclapply.
sales <- setkeyv(rbindlist(lapply(all.csv, fread)), joinkey)
# turns to
sales <- setkeyv(rbindlist(mclapply(all.csv, fread)), joinkey)

Longer list of packages useful for parallelization is available in CRAN Task View: High-Performance and Parallel Computing with R

  • Use helpers/wrappers - having multiple calls to same sequence of function calls you can wrap them into new single function and call the new one.
batch_csv_load <- function(csv.files, key = NULL){
    setkeyv(rbindlist(lapply(csv.files, fread)), cols = key)
}
# an example
sales <- setkeyv(rbindlist(lapply(all.csv, fread)), joinkey)
# turns to
sales <- batch_csv_load(all.csv, joinkey)

  • Log processing details - add desired status checks to your code, redirect its output to file or database. Below the most basic example of writing logs to csv.
write.log <- function(event, file="log.csv"){
    write.table(data.table(timestamp = Sys.time(), event = event), 
                file=file,
                sep=",",
                col.names=!file.exists(file), 
                row.names=FALSE, 
                append=file.exists(file))
}
# before your code
write.log(event = "etl start")
# after your code
write.log(event = "etl end")

Best practices

Use your favorite IDE

Organize code into R packages

  • well defined organization of code and metadata
  • easy version manage
  • easy dependency manage
  • standard documentation
  • easy extract docs to html, pdf
  • standard vignettes (tutorials)
  • easy deploy
  • easy integrate unit tests

Use source control

I recommend git, R integrates with git very well.
Git is lightweight and powerful.

Install package from shell:

git clone https://github.com/Rdatatable/data.table.git
R CMD INSTALL data.table

Install package from R:

library(devtools)
install_github("Rdatatable/data.table")

Unit tests

Include unit test in the package tests directory.
Some helper frameworks:

  • testthat: An R package to make testing fun
# /tests/testthat/test-script.R
context("my tests of script")
test_that("my tests of script", {
  a <- function() 1
  b <- 1
  expect_identical(a(), b, info = "dummy test")
})
unitize("script.R")

Continuous integration

Simple setup example would be a github repo and travis-ci. Each commit to github repo will automatically build your package from source and run your unit tests against recent version.
You can also run package tests check locally anytime.

Check from shell:

R CMD check data.table

Check from R:

library(devtools)
check("data.table")

Support

Bonus round

Business Intelligence

Directly answer business questions to end-user or app:

  • shiny: Interactive web application framework, including shinydashboard
  • rapier: Turn your R code into a web API
  • opencpu: The OpenCPU framework exposes a web API interfacing R, Latex and Pandoc
  • rpivotTable: A R wrapper for the great (javascript) library pivottable
  • rmarkdown: automation of reports to various formats: markdown / html / word / pdf
  • commonmark: wrapper around cmark markdown C library
  • pander: An R Pandoc Writer
  • DeployR: Add the power of R Analytics to any Application, Dashboard, Backend, etc.

Thanks for audience