Read, Enrich and Transform Data with AWS Glue Service

By:   |   Updated: 2019-03-14   |   Comments (3)   |   Related: 1 | 2 | > Amazon AWS


Problem

In the first part of this tip series we looked at how to map and view JSON files with the Glue Data Catalog. In this second part, we will look at how to read, enrich and transform the data using an AWS Glue job.

Solution

Please read the first tip about mapping and viewing JSON files in the Glue Data Catalog: Import JSON files to AWS RDS SQL Server database using Glue service.

In this part, we will look at how to read, enrich and transform the data using an AWS Glue job.

Read, Enrich and Transform Data with AWS Glue Service

In this part, we will create an AWS Glue job that uses an S3 bucket as a source and AWS SQL Server RDS database as a target. We will use a JSON lookup file to enrich our data during the AWS Glue transformation. The job will use the job bookmarking feature to move every new file that lands in the S3 source bucket.

Let's start the job wizard and configure the job properties:

confgure job properties

We will enter the job name, IAM role that has permissions to the s3 buckets and to our AWS RDS database. I chose Python as the ETL language.

If you want to track processed files and move only new ones, make sure to enable job bookmark. Note that job bookmarking will identify only new files and will not reload modified files.

Advanced Properties for AWS Glue Job

Bookmarks are maintained per job. If you drop the job, the bookmark will be deleted and new jobs will start processing all files in the bucket.

When you want to run a job manually, you will be prompted whether you want to keep Job Bookmark enabled or disable it and process all files.

In job parameters you can change concurrent DPUs per job execution to impact how fast the job will run, define how many concurrent threads of this job you want to execute, job timeout and many other settings. In our example I haven't changed any of those parameters.

Add a SQL Server destination connection (Read Serverless ETL using AWS Glue for RDS databases for a step by step tutorial on how to add a JDBC database connection) and S3 source connection we will create in our script

aws connections

After you press "save job and edit script” you will be taken to the Python script shell.

Here is a script that will support our requirements. Let's follow line by line:

Create dynamic frame from Glue catalog datalakedb, table aws_glue_maria - this table was built over the S3 bucket (remember part 1 of this tip).

flights_data = glueContext.create_dynamic_frame.from_catalog(database = "datalakedb", table_name = "aws_glue_maria", transformation_ctx = "datasource0")

The file looks as follows:

json file contents

Create another dynamic frame from another table, carriers_json, in the Glue Data Catalog - the lookup file is located on S3. Use the same steps as in part 1 to add more tables/lookups to the Glue Data Catalog. I will use this file to enrich our dataset. The file looks as follows:

json file contents
carriers_data = glueContext.create_dynamic_frame.from_catalog(database = "datalakedb", table_name = "carriers_json", transformation_ctx = "datasource1")

I will join two datasets using the Join.apply operator (dataframe1,dataframe2,joinColumn1DataFrame1,JoinColumn2dataframe2):

joined_data = Join.apply(flights_data,carriers_data,'carrier','carrier')

Build mapping of the columns. I want both carrier column and new column carrier_name from the lookup file:

applymapping1 = ApplyMapping.apply(frame = joined_data, mappings = [("year", "int", "year", "int"), ("month", "int", "month", "int"), ("day", "int", "day", "int"), ("dep_time", "int", "dep_time", "int"), ("dep_delay", "int", "dep_delay", "int"), ("arr_time", "int", "arr_time", "int"), ("arr_delay", "int", "arr_delay", "int"),("carrier", "string", "carrier", "string"), ("carrier_name", "string", "carrier_name", "string"), ("tailnum", "string", "tailnum", "string"), ("flight", "int", "flight", "int"), ("origin", "string", "origin", "string"), ("dest", "string", "dest", "string"), ("air_time", "int", "air_time", "int"), ("distance", "int", "distance", "int"), ("hour", "int", "hour", "int"), ("minute", "int", "minute", "int"), ("part_col", "string", "part_col", "string")], transformation_ctx = "applymapping1")

Create the output DataFrame and the target table name - it will be created if it does not exist.

datalake_dest = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "TestDataLakeDB", connection_options = {"dbtable": "flights_enriched", "database": "datalakedb"}, transformation_ctx = "datalake_dest")

Here is a full script the Glue job will execute.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
 
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
 
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
 
flights_data = glueContext.create_dynamic_frame.from_catalog(database = "datalakedb", table_name = "aws_glue_maria", transformation_ctx = "datasource0")
carriers_data = glueContext.create_dynamic_frame.from_catalog(database = "datalakedb", table_name = "carriers_json", transformation_ctx = "datasource1")
joined_data = Join.apply(flights_data,carriers_data,'carrier','carrier')
 
applymapping1 = ApplyMapping.apply(frame = joined_data, mappings = [("year", "int", "year", "int"), ("month", "int", "month", "int"), ("day", "int", "day", "int"), ("dep_time", "int", "dep_time", "int"), ("dep_delay", "int", "dep_delay", "int"), ("arr_time", "int", "arr_time", "int"), ("arr_delay", "int", "arr_delay", "int"),("carrier", "string", "carrier", "string"), ("carrier_name", "string", "carrier_name", "string"), ("tailnum", "string", "tailnum", "string"), ("flight", "int", "flight", "int"), ("origin", "string", "origin", "string"), ("dest", "string", "dest", "string"), ("air_time", "int", "air_time", "int"), ("distance", "int", "distance", "int"), ("hour", "int", "hour", "int"), ("minute", "int", "minute", "int"), ("part_col", "string", "part_col", "string")], transformation_ctx = "applymapping1")
 
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")

dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
 
datalake_dest = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "TestDataLakeDB", connection_options = {"dbtable": "flights_enriched", "database": "datalakedb"}, transformation_ctx = "datalake_dest")
job.commit()

Save the script and run the job.

Here is the result. Here is how my JSON flights data looks:

json file contents

And here is how the carrier's lookup file looks:

json file contents

And here is the result table data in the SQL Server table after the job has finished execution:

data in table
Next Steps


sql server categories

sql server webinars

subscribe to mssqltips

sql server tutorials

sql server white papers

next tip



About the author
MSSQLTips author Maria Zakourdaev Maria Zakourdaev has been working with SQL Server for more than 20 years. She is also managing other database technologies such as MySQL, PostgreSQL, Redis, RedShift, CouchBase and ElasticSearch.

This author pledges the content of this article is based on professional experience and not AI generated.

View all my tips


Article Last Updated: 2019-03-14

Comments For This Article




Thursday, August 15, 2024 - 9:25:24 AM - sharath Back To Top (92454)
Hi,
i am trying to find a resource tht uses from_options function and connects to sqlserver to extract data using a query.
is there such example? I have tried jdbc connection and extracted but the performance is not really great so wanted to know the dynmic frame using from_options usage.

Wednesday, May 29, 2019 - 2:27:44 PM - Jason H Back To Top (81250)

re: "Can I import data from S3 to SQL in EC2 instance (not RDS) "

Yes, any supported JDBC sources that are accessible from your AWS VPC are a supported destination.  You can setup a AWS Glue database connection from within the Glue console (or API) and entering in your DB details for use within your Glue ETL jobs.


Friday, April 12, 2019 - 10:48:35 AM - Umit Kavala Back To Top (79544)

Can I import data from S3 to SQL in EC2 instance (not RDS) 















get free sql tips
agree to terms