Introducción
Hace unos días, caminando por ahí, vi que estaban vendiendo un HP Proliant DL360 G6 de segunda mano, para los que no saben es un servidor de alto rendimiento del año 2010.
Por mi interés por el el Data Science y BIG DATA, este juguete será util para un diplomado en el que tengo interés, el cual es dictado por la FCFM de la Universidad de Chile
Características del Jueguete:
- 2 x Xeon 5530 2.4 ghz de 4 procesadores con hyperthreading (des habilitado en esta prueba)
- 64 gb ram
- Nvidia Quadro M2000 4GB vram GDDR5
- Tarjeta Inteligente RAID con batería.
- 2 fuentes de poder independiente
- 4 discos duros SAS
Fotos del Juguete:
La prueba
Utilizaré la librería plyr, data.table, reshape2 para realizar agregaciones y doSNOW para la paralelización, el dataset será una tabla de unos 30 millones de registros similares a la base de datos de un operador movil (según recuerdo estos registros se llaman los CDR guardados en el CRCE de la plataforma de tarificación).
Básicamente la tabla tendrá un identificador de usuario, uno de tipo de trafico y la cantidad de trafico.
Generar los Datos
Generaremos 30 millones de registros CDR en una tabla que tendrá:
- identificador de usuario.
- tipo de trafico.
- cantidad de trafico.
options(stringsAsFactors = FALSE) library(reshape2) library(plyr) #Parametros set.seed(1986) #seteada la semilla a la fecha de nacimiento de mi querida polola. tipo_trafico = c("DATA","SMS","VOICE") prob_tipo_trafico = c(0.6,0.1,0.3) userids = 5e4 #simularemos n usuarios registros = 3e7 #numnero de registros CDR #valores aleatoreos del monto del trafico datos_tipo_trafico = data.frame( DATA = round(runif(registros,1,1024^2)),# sesion de datos en bytes con numero entre 1 byte y 1 mega SMS = rep(1,registros), #cada sms genera un registro VOICE = round(rexp(registros,1/120)) #largo de llamadas en segundos ) #generar los registros CDR cdr = data.frame( id_cdr = 1:registros, userid = floor(runif(registros,1,userids+1)), tipo_trafico = sample(tipo_trafico,registros,replace = T,prob = prob_tipo_trafico)) TI = proc.time() cdr$trafico = datos_tipo_trafico$DATA*(cdr$tipo_trafico=="DATA") + datos_tipo_trafico$SMS*(cdr$tipo_trafico=="SMS") + datos_tipo_trafico$VOICE*(cdr$tipo_trafico=="VOICE") print(proc.time()-TI) rm(datos_tipo_trafico) #borrar tabla de daltos aleatorios gc() #limpiar memoria no usada
El resultado es una tabla con este formato:
id_cdr | userid | tipo_trafico | trafico | |
---|---|---|---|---|
1 | 1 | 5579 | VOICE | 20 |
2 | 2 | 28374 | VOICE | 82 |
3 | 3 | 28526 | VOICE | 36 |
4 | 4 | 39179 | VOICE | 56 |
5 | 5 | 14244 | DATA | 629075 |
6 | 6 | 36779 | DATA | 690397 |
7 | 7 | 42774 | DATA | 175632 |
8 | 8 | 4276 | VOICE | 115 |
9 | 9 | 4445 | VOICE | 44 |
10 | 10 | 29458 | DATA | 946171 |
Benchmark
Crearemos una tabla de trafico agregado por usuario, esta tabla será generada con: reshape2, data.table y plyr (en un thread y multy-thread).
Algunos comentarios antes de mostrar los resultados:
- reshape2 permite realizar la agregación, pero permite muchas menos opciones que las otras librerías.
- data.table es una extensión de data.frame que permite utilizar indices.
- plyr perimite ejecutar en un thread como en paralelo
El código utilizado:
#usando reshape2 TI = proc.time() tmp = reshape2::dcast(cdr,userid ~ tipo_trafico, fun.aggregate = sum, value.var = "trafico") print(proc.time()-TI) #usando plyr 1 thread TI = proc.time() tmp = ddply(cdr,"userid",function(x) data.frame(DATA = sum(ifelse(x$tipo_trafico == "DATA",x$trafico,0)), SMS = sum(ifelse(x$tipo_trafico == "SMS",x$trafico,0)), VOICE = sum(ifelse(x$tipo_trafico == "VOICE",x$trafico,0)) ),.progress = T) print(proc.time()-TI) #usando plyr 8 thread (uno por CPU) library("doSNOW") nCPU = as.numeric(Sys.getenv("NUMBER_OF_PROCESSORS")[1]) cl = makeSOCKcluster(nCPU,outfile="cl.txt") registerDoSNOW(cl) TI = proc.time() tmp = ddply(cdr,"userid",function(x) data.frame(DATA = sum(ifelse(x$tipo_trafico == "DATA",x$trafico,0)), SMS = sum(ifelse(x$tipo_trafico == "SMS",x$trafico,0)), VOICE = sum(ifelse(x$tipo_trafico == "VOICE",x$trafico,0)) ),.parallel = T) print(proc.time()-TI) stopCluster(cl) library("doSNOW") nCPU = as.numeric(Sys.getenv("NUMBER_OF_PROCESSORS")[1]) cl = makeSOCKcluster(2,outfile="cl.txt") registerDoSNOW(cl) TI = proc.time() tmp = ddply(cdr,"userid",function(x) data.frame(DATA = sum(ifelse(x$tipo_trafico == "DATA",x$trafico,0)), SMS = sum(ifelse(x$tipo_trafico == "SMS",x$trafico,0)), VOICE = sum(ifelse(x$tipo_trafico == "VOICE",x$trafico,0)) ),.parallel = T) print(proc.time()-TI) stopCluster(cl) #usando data.table library(data.table) cdr_dt = data.table(cdr,key = "userid") TI = proc.time() tmp = cdr_dt[,list(DATA = sum(ifelse(tipo_trafico == "DATA",trafico,0)), SMS = sum(ifelse(tipo_trafico == "SMS",trafico,0)), VOICE = sum(ifelse(tipo_trafico == "VOICE",trafico,0)) ),by="userid"] print(proc.time()-TI)
Resultados
libreria | funcion | threads | segundos |
plyr | ddply | 1 | 9,18 |
plyr | ddply | 2 | 78,37 |
plyr | ddply | 8 | 146,04 |
reshape2 | dcast | 1 | 17,31 |
data.table | 1 | 12,94 |
Conclusiones
reshape2 a pesar de ser una librería especializada en este tipo de transformación, resultó ser más lenta que plyr.
plyr se perdió performance a medida que se agregan threads, esto debido a que para operaciones simples, el costo de paralelizar es mayor que no utilizar el resto de las CPU.
data.table a pesar de utilizar indices, resultó ser más lento que plyr, cuando se utilizan algoritmos complejos para situaciones simples, a veces se pierde performance.
Para el futuro….
En el futuro realizaré esas pruebas usando datasets más grandes y otro numero de usuarios a ver que resultados se obtienen.