1

I am running a glue ETL transformation job. This job is suppose to read data from s3 and converts to parquet.

Below is the glue source.... sourcePath is the location of the s3 file.

In this location we have around 100 million json files.. all of them are nested into sub-folders.

So that is the reason I am applying exclusionPattern to exclude and files starting with a (which are around 2.7 million files) and I believe that only the files starting with a will be processed.

val file_paths = Array(sourcePath)

val exclusionPattern = "\"" + sourcePath + "{[!a]}**" + "\""

glueContext
  .getSourceWithFormat(connectionType = "s3",
    options = JsonOptions(Map(
      "paths" -> file_paths, "recurse" -> true, "groupFiles" -> "inPartition", "exclusions" -> s"[$exclusionPattern]"
    )),
    format = "json",
    transformationContext = "sourceDF"
  )
  .getDynamicFrame()
  .map(transformRow, "error in row")
  .toDF()

After running this job with Standard worker type and with G2 worker type as well. I keep getting error

#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing /bin/sh -c "kill -9 27788"...

And in the cloudwatch I can see that the driver memory is getting utilised 100% but executor memory usage is almost nil.

When running the job I am setting spark.driver.memory=10g and spark.driver.memoryOverhead=4096 and --conf job parameter.

This is the details in the logs

--conf spark.hadoop.yarn.resourcemanager.connect.max-wait.ms=60000 
--conf spark.hadoop.fs.defaultFS=hdfs://ip-myip.compute.internal:1111 
--conf spark.hadoop.yarn.resourcemanager.address=ip-myip.compute.internal:1111 
--conf spark.dynamicAllocation.enabled=true 
--conf spark.shuffle.service.enabled=true 
--conf spark.dynamicAllocation.minExecutors=1 
--conf spark.dynamicAllocation.maxExecutors=4 
--conf spark.executor.memory=20g 
--conf spark.executor.cores=16 
--conf spark.driver.memory=20g 
--conf spark.default.parallelism=80 
--conf spark.sql.shuffle.partitions=80 
--conf spark.network.timeout=600 
--job-bookmark-option job-bookmark-disable 
--TempDir s3://my-location/admin 
--class com.example.ETLJob 
--enable-spark-ui true 
--enable-metrics 
--JOB_ID j_111... 
--spark-event-logs-path s3://spark-ui 
--conf spark.driver.memory=20g 
--JOB_RUN_ID jr_111... 
--conf spark.driver.memoryOverhead=4096 
--scriptLocation s3://my-location/admin/Job/ETL 
--SOURCE_DATA_LOCATION s3://xyz/ 
--job-language scala 
--DESTINATION_DATA_LOCATION s3://xyz123/ 
--JOB_NAME ETL

Any ideas what could be the issue.

Thanks

2 Answers 2

2

If you have too many files, you are probably overwhelming the driver. Try using the useS3ListImplementation. This is an implementation of the Amazon S3 ListKeys operation, which splits large results sets into multiple responses.

try to add:

"useS3ListImplementation" -> true

[1] https://aws.amazon.com/premiumsupport/knowledge-center/glue-oom-java-heap-space-error/

4
  • thanks.. @eman.. I am not seeing OOM errors.. but when the job start... it keeps running but there is no progress at all... Cloudwatch shows memory usage of ~0.28 for both driver and executor.. but that is a constant for a very long time... and there is no ETL data movement or any thing...
    – abhishek
    Apr 30, 2020 at 12:18
  • also the executor log says WARN ApacheUtils: NoSuchMethodException was thrown when disabling normalizeUri. This indicates you are using an old version (< 4.5.8) of Apache http client. It is recommended to use http client version >= 4.5.9 to avoid the breaking change introduced in apache client 4.5.7 and the latency in exception handling. See github.com/aws/aws-sdk-java/issues/1919 for more information
    – abhishek
    Apr 30, 2020 at 12:48
  • I think you can incorporate the spark-ui as well to see what you job is actually doing. Although this is a post run assessment you could try this with a smaller dataset i.e using push down predicates. Benchmark the runs and see the allocations of DPUs that are more suited for your workload. You could also spin up a development endpoint but be careful on the DPU count as your cost can easily sky rocket.
    – Eman
    Apr 30, 2020 at 18:43
  • I am running the spark ui.... but there are not tasks created, no DAG... it is all empty... also m not sure if I can use push down predicates as the s3 file path is s3://bucket-name/uuid/sometext/2020/01/02.data.json m not sure if can use push down predicates... thanks for your help... the only thing i don't understand is why does driver and executors not doing any thing at all... the job keeps on running but no processing happening....
    – abhishek
    Apr 30, 2020 at 19:13
0

As suggested by @eman...

I applied all 3 groupFiles, groupSize and useS3ListImplementation.. like below

options = JsonOptions(Map(
            "path" -> sourcePath,
            "recurse" -> true,
            "groupFiles" -> "inPartition",
            "groupSize" -> 104857600,//100 mb
            "useS3ListImplementation" -> true
    ))

And that is working for me... there is also an option of "acrossPartitions" if data is not arranged properly.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Not the answer you're looking for? Browse other questions tagged or ask your own question.