Your Web News in One Place

Help Webnuz

Referal links:

Sign up for GreenGeeks web hosting
April 1, 2022 10:16 pm GMT

Qu es y como crear ETL en AWS Glue Parte 3

Continuando con la tercera y final parte del tutorial, explicare el cdigo de nuestro ejemplo.

Escribiendo el cdigo

Empezaremos por incluir las siguienteslibrerias de Glue.

  • JsonOptions nos permitir especificar los paths dnde queremos que se cree nuestro archivo final.
  • DynamicFrame nos permitir crear un frame de datos de tipo spark
  • GlueContext nos permitir ejecutar nuestro job ETL en el entorno serverless
  • GlueArgParser nos permitir leer las variables de sysArgs que enviemos a nuestro job
import com.amazonaws.services.glue.util.JsonOptionsimport com.amazonaws.services.glue.{DynamicFrame, GlueContext}import com.amazonaws.services.glue.util.GlueArgParser

Definiremos nuestra GlueApp y nuestro main donde se realizar la primera ejecucin de nuestro cdigo

object GlueApp {  def main(sysArgs: Array[String]) {  ...  }}

Ahora declararemos nuestras variables

GlueContext es el punto de entrada para leer y escribir un DynamicFrame de un bucket de Amazon S3, un catlogo de datos de AWS, JDBC, etc.

Esta clase ofrece funciones de utilidades para crear objetos Caracterstica DataSource y DataSink que, a su vez, se pueden usar para leer y escribir objetos DynamicFrame.

val glueContext: GlueContext = new GlueContext(sc)

Punto de entrada principal para la funcionalidad Spark. Un SparkContext representa la conexin a un clster Spark y se puede usar para crear RDD, acumuladores y variables de difusin en ese clster.

val sc: SparkContext = new SparkContext()

Genera un identificador de la sesin Spark

val spark = glueContext.getSparkSession
object GlueApp { val glueContext: GlueContext = new GlueContext(sc) val sc: SparkContext = new SparkContext() val spark = glueContext.getSparkSession  def main(sysArgs: Array[String]) {  ...  }}

Si deseamos leer una variable de ambiente que hemos enviado en los job parameters lo podemos leer de la siguiente forma:

/* Lee el valor del job parameter enviado ejemplo: --env (key) ci (value) el valor lo leer como ci */val args = GlueArgParser.getResolvedOptions(sysArgs, Array("env"))// ejemplo// val table = s"${args("env")}_transactions" se traduce como ci_transactions

Dentro de nuestro main comenzaremos por declarar nuestra base de datos junto a nuestras tablas, esto nos permitir transformar o ejecutar consultas.

// Catlogo de datos: bases de datos y tablasval dbName = s"db-kushki-ejemplo"val tblCsv = s"transacciones" //El nombre de la tabla con la ubicacin del S3val tblDynamo = s"transactions" //El nombre de la tabla con la ubicacin de dynamo

Ahora declararemos nuestro output directory (la carpeta final donde se guardar nuestro archivo generado)

// Directorio final donde se guardar nuestro archivo dentro de un bucket S3val baseOutputDir = s"s3://${args("env")}-trx-ejemplo/"val transactionDir= s"$baseOutputDir/transaction/"

Una de las principales abstracciones de Apache Spark es el DataFrame de SparkSQL, que es similar a la construccin DataFrame que se encuentra en R y en Pandas. Un elemento DataFrame es similar a una tabla y admite operaciones de estilo funcional (map/reduce/filter/etc.) y operaciones SQL (select, project, aggregate). En este caso de nuestro ctalogo de datos crearemos un DynamicFrame de cada tabla

// Read data into a dynamic frameval trx_dyn: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblDYNAMO ).getDynamicFrame()val trx_csv: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblCSV ).getDynamicFrame()

ApplyMapping vs ResolveChoice

  • ApplyMapping: Aplica un mapeo declarativo a un DynamicFrame especificado.
  • ResolveChoice: Proporciona informacin para resolver tipos ambiguos dentro de un elemento DynamicFrame.
ApplyMappingResolveChoice
Tipos de datosEs incompatible si un tipo de dato es ambiguoSe define un solo tipo de dato
MappingEl Dataframe devuelve solo lo que se mapeaDevuelve todos los campos incluyendo al campo que se le realiz el casting
// ApplyMappingval trx_dyn_mapping=  trx_dyn.applyMapping(mappings = Seq(("id", "string", "id", "string"),("cliente", "string", "cliente", "string"),("estado", "string", "estado", "string"),("monto", "bigint", "monto", "double")), caseSensitive = false, transformationContext = "trx_dyn_mapping")// ResolveChoiceval trx_dyn_resolve= trx_dyn.resolveChoice(specs = Seq(("monto", "cast:double")))

En nuestro ejemplo es necesario resolver el problema de los tipos de datos ambiguos debido a que en nuestro archivo csv se presenta datos de tipo bigint, y en nuestra tabla de dynamo se presenta datos de tipo Number, ambos tipos de datos deben ser del mismo tipo por lo que se necesita aplicar resolveChoice , en este caso applyMapping nos devolver un problema debido a que la columna monto devolver un struct de los diferentes tipos de dato.

val trx_dyn_resolve= trx_dyn.resolveChoice(specs = Seq(("monto", "cast:double")))val trx_csv_resolve= trx_csv.resolveChoice(specs = Seq(("monto", "cast:double")))

En la siguiente seccin de cdigo procederemos a crear nuestra pseudo-tabla donde ejecutaremos sentencias SQL es importante darle un nombre simple pero distintivo

// Spark SQL on a Spark dataframeval dynDf = trx_dyn_resolve.toDF()dynDf.createOrReplaceTempView("dynamoTable")val csvDf = trx_csv_resolve.toDF()csvDf.createOrReplaceTempView("csvTable")

A continuacin realizaremos nuestra sentencia SQL con cualquier lgica de negocio que necesitemos, para nuestro ejemplo realizaremos una sentencia simple en la cual obtenga todos los registros que hagan un match

// SQL Queryval dynSqlDf = spark.sql("SELECT T1.id,T1.monto,T1.cliente,T1.estado FROM dynamoTable T1 LEFT JOIN csvTable T2 ON (T1.id=T2.id) WHERE T2.idIS NOT NULL AND (T1.monto=T2.monto AND T1.cliente=T2.cliente AND T1.estado = T2.estado)")

El runtime por detrs de AWS Glue ejecuta un proceso de ApacheSpark por lo que los DynamicFrame que retornemos se crearn en multi partes, por lo que utilizaremos coalesce(1) para juntarlos en uno solo, sin embargo esto puede ocasionar errores en grandes cantidades de datos retornados.

//Compact al run-part files into oneval dynFile = DynamicFrame(dynSqlDf, glueContext).withName("dyn_dyf").coalesce(1)

Finalmente procederemos a guardar nuestro resultado especificando un path en un bucket de S3

// Save file into S3glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> transactionDir)), format = "csv").writeDynamicFrame(dynFile)

Source Code
El script completo lo puedes encontrar aqu:
Github

Espero este tutorial te haya sido de ayuda !


Original Link: https://dev.to/davidshaek/que-es-y-como-crear-etl-en-aws-glue-parte-3-4n6n

Share this article:    Share on Facebook
View Full Article

Dev To

An online community for sharing and discovering great ideas, having debates, and making friends

More About this Source Visit Dev To