An Interest In:
Web News this Week
- April 19, 2024
- April 18, 2024
- April 17, 2024
- April 16, 2024
- April 15, 2024
- April 14, 2024
- April 13, 2024
Using Athena Views As A Source In Glue
Whilst working with AWS Glue recently I noticed that I was unable to use a view created in Athena as a source for an ETL job in the same way that I could use a table that had been cataloged.
The error I received was this.
An error occurred while calling o73.getCatalogSource. No classification or connection in mydatabase.v_my_view
Rather than try and recreate the view using a new PySpark job I used the Athena JDBC drivers as a custom JAR in a glue job to be able to query the view I wanted to use.
This blog are my notes on how this works.
Drivers
Create or reuse an existing S3 bucket to store the Athena JDBC drivers JAR file. The JAR files are available to download from AWS. I used the latest version which at the time of writing was JDBC Driver with AWS SDK AthenaJDBC42_2.0.27.1000.jar (compatible with JDBC 4.2 and requires JDK 8.0 or later).
IAM
The Glue job will need not only Glue Service privileges but also IAM privileges to access the S3 Buckets and also the AWS Athena Service.
For Athena this would provide Glue will full permissions.
{ "Sid": "VisualEditor1", "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:AssociateKmsKey", "athena:*", "logs:CreateLogGroup", "logs:PutLogEvents" ], "Resource": [ "arn:aws:athena:*:youraccount:workgroup/*", "arn:aws:athena:*:youracccont:datacatalog/*", "arn:aws:logs:*:*:/aws-glue/*" ] }
Create Glue ETL Job
My use case for the Glue job was to query the view I had and save the results into Parquet format to speed up future queries against the same data.
The following code allows you to query an Athena view as a source for a data frame.
import sysfrom awsglue.transforms import *from awsglue.utils import getResolvedOptionsfrom pyspark.context import SparkContextfrom awsglue.context import GlueContextfrom awsglue.job import Jobfrom awsglue.dynamicframe import DynamicFrameargs = getResolvedOptions(sys.argv, ["JOB_NAME"])sc = SparkContext()glueContext = GlueContext(sc)spark = glueContext.spark_sessionjob = Job(glueContext)job.init(args["JOB_NAME"], args)athena_view_dataframe = ( glueContext.read.format("jdbc") .option("driver", "com.simba.athena.jdbc.Driver") .option("AwsCredentialsProviderClass","com.simba.athena.amazonaws.auth.InstanceProfileCredentialsProvider") .option("url", "jdbc:awsathena://athena.eu-west-1.amazonaws.com:443") .option("dbtable", "AwsDataCatalog.yourathenadatabase.yourathenaview") .option("S3OutputLocation","s3://yours3bucket/temp") .load() )athena_view_dataframe.printSchema()
The key things in this code snippet to be aware of are.
.option("driver", "com.simba.athena.jdbc.Driver")
We are telling Glue which class within the JDBC driver to use.
.option("AwsCredentialsProviderClass","com.simba.athena.amazonaws.auth.InstanceProfileCredentialsProvider")
This uses the IAM role assigned to the Glue job to authenticate to Athena. You can use other authentication method like AWS_ACCESS_KEY or federated authentication but using IAM I think makes most sense for an ETL job that will most likely run on a schedule or event.
.option("url", "jdbc:awsathena://athena.eu-west-1.amazonaws.com:443")
I am using Athena in Ireland (EU-WEST-1) if you are using a different region update this accordingly.
.option("dbtable", "AwsDataCatalog.yourathenadatabase.yourathenaview")
The fully qualified name of view in your Athena catalog. It's in the format of 'AwsDataCatalog.Database.View'. For example this query run in Athena.
SELECT * FROM "AwsDataCatalog"."vehicles"."v_electric_cars";
You would set the dbtable option to this
.option("dbtable", "AwsDataCatalog.vehicles.v_electric_cars")
The last option tells Glue which S3 location to use as temporary storage to store the data returned from Athena.
.option("S3OutputLocation","s3://yours3bucket/temp")
At this point you can test it works. When running the job you need to tell Glue about the location for the Athena JDBC drivers JAR file that was uploaded to S3.
If you are working in the AWS Glue Console the parameter to set can be found under Job Details --> Advanced --> Dependent JARs path.
The parameter needs to be set to the full path and filename of the JAR file. For example s3://yours3bucket/jdbc-drivers/AthenaJDBC42_2.0.27.1000.jar
By setting this in the console it ensures that the correct argument is passed into the Glue job.
--extra-jars s3://yours3bucket/jdbc-drivers/AthenaJDBC42_2.0.27.1000.jar
The final code including the conversion to Parquet format looked like this.
import sysfrom awsglue.transforms import *from awsglue.utils import getResolvedOptionsfrom pyspark.context import SparkContextfrom awsglue.context import GlueContextfrom awsglue.job import Jobfrom awsglue.dynamicframe import DynamicFrameargs = getResolvedOptions(sys.argv, ["JOB_NAME"])sc = SparkContext()glueContext = GlueContext(sc)spark = glueContext.spark_sessionjob = Job(glueContext)job.init(args["JOB_NAME"], args)athena_view_dataframe = ( glueContext.read.format("jdbc") .option("driver", "com.simba.athena.jdbc.Driver") .option("AwsCredentialsProviderClass","com.simba.athena.amazonaws.auth.InstanceProfileCredentialsProvider") .option("url", "jdbc:awsathena://athena.eu-west-1.amazonaws.com:443") .option("dbtable", "AwsDataCatalog.vehicles.v_electric_cars") .option("S3OutputLocation","s3://yours3bucket/temp") .load() )athena_view_dataframe.printSchema()athena_view_datasource = DynamicFrame.fromDF(athena_view_dataframe, glueContext, "athena_view_source")pq_output = glueContext.write_dynamic_frame.from_options( frame=athena_view_datasource, connection_type="s3", format="glueparquet", connection_options={ "path": "s3://yourotherS3Bucket/", "partitionKeys": [], }, format_options={"compression": "snappy"}, transformation_ctx="ParquetConversion",)job.commit()
Original Link: https://dev.to/aws-builders/using-athena-views-as-a-source-in-glue-k09
Dev To
An online community for sharing and discovering great ideas, having debates, and making friendsMore About this Source Visit Dev To