Multy-Core Performance en R

Introducción

Hace unos días, caminando por ahí, vi que estaban vendiendo un HP Proliant DL360 G6 de segunda mano, para los que no saben es un servidor de alto rendimiento del año 2010.

Por mi interés por el el Data Science y BIG DATA, este juguete será util para un diplomado en el que tengo interés, el cual es dictado por la FCFM de la Universidad de Chile

Características del Jueguete:

  • 2 x Xeon 5530 2.4 ghz de 4 procesadores con hyperthreading (des habilitado en esta prueba)
  • 64 gb ram
  • Nvidia Quadro M2000 4GB vram GDDR5
  • Tarjeta Inteligente RAID con batería.
  • 2 fuentes de poder independiente
  • 4 discos duros SAS

Fotos del Juguete:

La prueba

Utilizaré la librería plyr, data.table, reshape2 para realizar agregaciones y doSNOW para la paralelización, el dataset será una tabla de unos 30 millones de registros similares a la base de datos de un operador movil (según recuerdo estos registros se llaman los CDR guardados en el CRCE de la plataforma de tarificación).

Básicamente la tabla tendrá un identificador de usuario, uno de tipo de trafico y la cantidad de trafico.

Generar los Datos

Generaremos 30 millones de registros CDR en una tabla que tendrá:

  • identificador de usuario.
  • tipo de trafico.
  • cantidad de trafico.

options(stringsAsFactors = FALSE)
library(reshape2)
library(plyr)

#Parametros
set.seed(1986) #seteada la semilla a la fecha de nacimiento de mi querida polola.
tipo_trafico = c("DATA","SMS","VOICE")
prob_tipo_trafico = c(0.6,0.1,0.3)
userids = 5e4 #simularemos n usuarios
registros = 3e7 #numnero de registros CDR

#valores aleatoreos del monto del trafico
datos_tipo_trafico = 
 data.frame(
 DATA = round(runif(registros,1,1024^2)),# sesion de datos en bytes con numero entre 1 byte y 1 mega
 SMS = rep(1,registros), #cada sms genera un registro
 VOICE = round(rexp(registros,1/120)) #largo de llamadas en segundos
 )


#generar los registros CDR
cdr = data.frame(
 id_cdr = 1:registros,
 userid = floor(runif(registros,1,userids+1)),
 tipo_trafico = sample(tipo_trafico,registros,replace = T,prob = prob_tipo_trafico))
TI = proc.time()
cdr$trafico = datos_tipo_trafico$DATA*(cdr$tipo_trafico=="DATA") + datos_tipo_trafico$SMS*(cdr$tipo_trafico=="SMS") + datos_tipo_trafico$VOICE*(cdr$tipo_trafico=="VOICE")
print(proc.time()-TI)
rm(datos_tipo_trafico) #borrar tabla de daltos aleatorios
gc() #limpiar memoria no usada

El resultado es una tabla con este formato:

id_cdr userid tipo_trafico trafico
1 1 5579 VOICE 20
2 2 28374 VOICE 82
3 3 28526 VOICE 36
4 4 39179 VOICE 56
5 5 14244 DATA 629075
6 6 36779 DATA 690397
7 7 42774 DATA 175632
8 8 4276 VOICE 115
9 9 4445 VOICE 44
10 10 29458 DATA 946171

Benchmark

Crearemos una tabla de trafico agregado por usuario, esta tabla será generada con: reshape2, data.table y plyr (en un thread y multy-thread).

Algunos comentarios antes de mostrar los resultados:

  • reshape2 permite realizar la agregación, pero permite muchas menos opciones que las otras librerías.
  • data.table es una extensión de data.frame que permite utilizar indices.
  • plyr perimite ejecutar en un thread como en paralelo

El código utilizado:

#usando reshape2
TI = proc.time()
tmp = reshape2::dcast(cdr,userid ~ tipo_trafico, fun.aggregate = sum, value.var = "trafico")
print(proc.time()-TI)

#usando plyr 1 thread
TI = proc.time()
tmp = ddply(cdr,"userid",function(x) data.frame(DATA = sum(ifelse(x$tipo_trafico == "DATA",x$trafico,0)),
 SMS = sum(ifelse(x$tipo_trafico == "SMS",x$trafico,0)),
 VOICE = sum(ifelse(x$tipo_trafico == "VOICE",x$trafico,0))
 ),.progress = T)
print(proc.time()-TI)

#usando plyr 8 thread (uno por CPU)
library("doSNOW")
nCPU = as.numeric(Sys.getenv("NUMBER_OF_PROCESSORS")[1])
cl = makeSOCKcluster(nCPU,outfile="cl.txt")
registerDoSNOW(cl)
TI = proc.time()
tmp = ddply(cdr,"userid",function(x) data.frame(DATA = sum(ifelse(x$tipo_trafico == "DATA",x$trafico,0)),
 SMS = sum(ifelse(x$tipo_trafico == "SMS",x$trafico,0)),
 VOICE = sum(ifelse(x$tipo_trafico == "VOICE",x$trafico,0))
),.parallel = T)
print(proc.time()-TI)
stopCluster(cl)

library("doSNOW")
nCPU = as.numeric(Sys.getenv("NUMBER_OF_PROCESSORS")[1])
cl = makeSOCKcluster(2,outfile="cl.txt")
registerDoSNOW(cl)
TI = proc.time()
tmp = ddply(cdr,"userid",function(x) data.frame(DATA = sum(ifelse(x$tipo_trafico == "DATA",x$trafico,0)),
 SMS = sum(ifelse(x$tipo_trafico == "SMS",x$trafico,0)),
 VOICE = sum(ifelse(x$tipo_trafico == "VOICE",x$trafico,0))
),.parallel = T)
print(proc.time()-TI)
stopCluster(cl)

#usando data.table
library(data.table)
cdr_dt = data.table(cdr,key = "userid")
TI = proc.time()
tmp = cdr_dt[,list(DATA = sum(ifelse(tipo_trafico == "DATA",trafico,0)),
 SMS = sum(ifelse(tipo_trafico == "SMS",trafico,0)),
 VOICE = sum(ifelse(tipo_trafico == "VOICE",trafico,0))
 ),by="userid"]
print(proc.time()-TI)

Resultados

libreria funcion threads segundos
plyr ddply 1 9,18
plyr ddply 2 78,37
plyr ddply 8 146,04
reshape2 dcast 1 17,31
data.table 1 12,94

Conclusiones

reshape2 a pesar de ser una librería especializada en este tipo de transformación, resultó ser más lenta que plyr.

plyr se perdió performance a medida que se agregan threads, esto debido a que para operaciones simples, el costo de paralelizar es mayor que no utilizar el resto de las CPU.

data.table a pesar de utilizar indices, resultó ser más lento que plyr, cuando se utilizan algoritmos complejos para situaciones simples, a veces se pierde performance.

Para el futuro….

En el futuro realizaré esas pruebas usando datasets más grandes y otro numero de usuarios a ver que resultados se obtienen.

Print Friendly, PDF & Email