You can create isolated Python virtual environments to package multiple Python libraries for a PySpark job. Here is an example of how you can package Great Expectations and profile a set of sample data.
- Access to EMR Serverless
- Docker with the BuildKit backend
- An S3 bucket in
us-east-1
and an IAM Role to run your EMR Serverless jobs
Important
This example is intended to be run in the us-east-1
region as it reads data from New York City Taxi dataset from the Registry of Open Data. If your EMR Serverless application is in a different region, you must configure networking.
Important
The default Dockerfile is configured to use linux/amd64
If using Graviton, update to use linux/arm64
or pass --platform linux/arm64
to the docker build
command. See the EMR Serverless architecture options for more detail.
If using EMR 7.x, you must use Amazon Linux 2023 as the base image instead of Amazon Linux 2. A sample file is provided in Dockerfile.al2023.
Set the following variables according to your environment.
export S3_BUCKET=<YOUR_S3_BUCKET_NAME>
export APPLICATION_ID=<EMR_SERVERLESS_APPLICATION_ID>
export JOB_ROLE_ARN=<EMR_SERVERLESS_IAM_ROLE>
The example below builds a virtual environment with the necessary dependencies to use Great Expectations to profile a limited set of data from the New York City Taxi and Limo trip data.
All the commands below should be executed in this (examples/pyspark/dependencies
) directory.
- Build your virtualenv archive
This command builds the included Dockerfile
and exports the resulting pyspark_ge.tar.gz
file to your local filesystem.
# Enable BuildKit backend
DOCKER_BUILDKIT=1 docker build --output . .
aws s3 cp pyspark_ge.tar.gz s3://${S3_BUCKET}/artifacts/pyspark/
- Copy your code
There's a sample ge_profile.py
script included here.
aws s3 cp ge_profile.py s3://${S3_BUCKET}/code/pyspark/
- Run your job
entryPoint
should point to your script on S3entryPointArguments
defines the output location of the Great Expectations profiler- The virtualenv archive is added via the
--archives
parameter - The driver and executor Python paths are configured via the various
--conf spark.emr-serverless
parameters
aws emr-serverless start-job-run \
--application-id $APPLICATION_ID \
--execution-role-arn $JOB_ROLE_ARN \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://'${S3_BUCKET}'/code/pyspark/ge_profile.py",
"entryPointArguments": ["s3://'${S3_BUCKET}'/tmp/ge-profile"],
"sparkSubmitParameters": "--conf spark.archives=s3://'${S3_BUCKET}'/artifacts/pyspark/pyspark_ge.tar.gz#environment --conf spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python --conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON=./environment/bin/python --conf spark.emr-serverless.executorEnv.PYSPARK_PYTHON=./environment/bin/python"
}
}' \
--configuration-overrides '{
"monitoringConfiguration": {
"s3MonitoringConfiguration": {
"logUri": "s3://'${S3_BUCKET}'/logs/"
}
}
}'
When the job finishes, it will write a part-00000
file out to s3://${S3_BUCKET}/tmp/ge-profile
.
- Copy and view the output
aws s3 cp s3://${S3_BUCKET}/tmp/ge-profile/part-00000 ./ge.html
open ./ge.html
Sometimes you need to pull in Java dependencies like Kafka or PostgreSQL libraries. As of release label emr-6.7.0
, you can use either spark.jars.packages
or the --packages
flag in your sparkSubmitParameters
as shown below. Be sure to create your application within a VPC so that it can download the necessary dependencies.
# First create an application with release label emr-6.7.0 and a network configuration
aws emr-serverless create-application \
--release-label emr-6.7.0 \
--type SPARK \
--name spark-packages \
--network-configuration '{
"subnetIds": ["subnet-abcdef01234567890", "subnet-abcdef01234567891"],
"securityGroupIds": ["sg-abcdef01234567893"]
}'
# Then submit a job (replacing the application id, arn, and your code/packages)
aws emr-serverless start-job-run \
--name pg-query \
--application-id $APPLICATION_ID \
--execution-role-arn $JOB_ROLE_ARN \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://'${S3_BUCKET}'/code/pyspark/pg_query.py",
"sparkSubmitParameters": "--packages org.postgresql:postgresql:42.4.0"
}
}' \
--configuration-overrides '{
"monitoringConfiguration": {
"s3MonitoringConfiguration": {
"logUri": "s3://'${S3_BUCKET}'/logs/"
}
}
}'
While --packages
will let you easily specify additional dependencies for your job, these dependencies are not cached between job runs. In other words, each job run will need to re-fetch the dependencies potentially leading to increased startup time. To mitigate this, and to create reproducible builds, you can create a dependency uberjar and upload that to S3.
This approach can also be used with EMR release label emr-6.6.0
.
To do this, we'll create a pom.xml
that specifies our dependencies and use a maven Docker container to build the uberjar. In this example, we'll package org.postgresql:postgresql:42.4.0
and use the example script in ./pg_query.py to query a Postgres database.
Tip
The code in pg_query.py
is for demonstration purposes only - never store credentials directly in your code. 😁
- Build an uberjar with your dependencies
# Enable BuildKit backend
DOCKER_BUILDKIT=1 docker build -f Dockerfile.jars --output . .
This will create a uber-jars-1.0-SNAPSHOT.jar
file locally that you will copy to S3 in the next step.
- Copy your code and jar
aws s3 cp pg_query.py s3://${S3_BUCKET}/code/pyspark/
aws s3 cp uber-jars-1.0-SNAPSHOT.jar s3://${S3_BUCKET}/code/pyspark/jars/
- Set the following variables according to your environment.
export S3_BUCKET=<YOUR_S3_BUCKET_NAME>
export APPLICATION_ID=<EMR_SERVERLESS_APPLICATION_ID>
export JOB_ROLE_ARN=<EMR_SERVERLESS_IAM_ROLE>
- Start your job with
--jars
aws emr-serverless start-job-run \
--name pg-query \
--application-id $APPLICATION_ID \
--execution-role-arn $JOB_ROLE_ARN \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://'${S3_BUCKET}'/code/pyspark/pg_query.py",
"sparkSubmitParameters": "--jars s3://'${S3_BUCKET}'/code/pyspark/jars/uber-jars-1.0-SNAPSHOT.jar"
}
}' \
--configuration-overrides '{
"monitoringConfiguration": {
"s3MonitoringConfiguration": {
"logUri": "s3://'${S3_BUCKET}'/logs/"
}
}
}'
- See the output of your job!
Once your job finishes, you can copy the output locally to view the stdout.
export JOB_RUN_ID=<YOUR_JOB_RUN_ID>
aws s3 cp s3://${S3_BUCKET}/logs/applications/${APPLICATION_ID}/jobs/${JOB_RUN_ID}/SPARK_DRIVER/stdout.gz - | gunzip