I’m new to Spark and I’m running spark job on EMR cluster.
It takes about one hour for job to finish.
In job, there is a code for reading the file from S3 (file size is 10MB) and writing it’s content into dynamo db.
I don’t understand why it takes so long to finish this task? I was expecting that it would be finished much more faster.
Here is the code for Spark job:
public class PopulateCovid19Citations {
private static final String APP_NAME = "PopulateCovid19Citations";
private static Logger LOGGER = LogManager.getLogger(PopulateCovid19Citations.class);
static Logger log = Logger.getLogger(PopulateCovid19Citations.class.getName());
public static void main(String[] args) throws Exception {
Logger.getLogger("org").setLevel(Level.ERROR);
// Building the Spark session
JavaSparkContext javaSparkContext = SparkConfiguration.buildSparkContext(APP_NAME);
SparkSession sparkSession = SparkSession.builder()
.sparkContext(javaSparkContext.sc()).getOrCreate();
String fileLocationTest = "s3a://mybucket/WHOCovid19CitationsDatabase.csv";
Dataset<Row> fullCitations =
sparkSession.read().format("com.databricks.spark.csv").option("inferSchema", "true")
.option("header", "true").load(fileLocationTest);
fullCitations.show(10);
// Selecting only the relevant columns for our exercise
Dataset<Row> citations = fullCitations.select(
col("Title"),
col("Authors"),
col("Abstract"),
col("Published Year"),
col("Journal"),
col("Study"),
col("Tags"));
citations.show(10);
// Removing citations with null title
Dataset<Row> filteredCitations = citations.filter(col("Title").isNotNull());
// Building a RDD composed of DynamoDB writable items that matches the Covid19Citation table
JavaPairRDD<Text, DynamoDBItemWritable> dynamoCitations = filteredCitations.javaRDD().mapToPair(citation -> {
Map<String, AttributeValue> attributes = new HashMap<>();
putStringAttribute(attributes, "citationCode", UUID.randomUUID().toString());
putStringAttribute(attributes, "title", citation.getAs("Title"));
putStringAttribute(attributes, "authors", citation.getAs("Authors"));
putStringAttribute(attributes, "abstract", citation.getAs("Abstract"));
putNumberAttribute(attributes, "publishedYear", citation.getAs("Published Year"));
putStringAttribute(attributes, "journal", citation.getAs("Journal"));
putStringAttribute(attributes, "study", citation.getAs("Study"));
putStringAttribute(attributes, "tags", citation.getAs("Tags"));
DynamoDBItemWritable dynamoDBItemWritable = new DynamoDBItemWritable();
dynamoDBItemWritable.setItem(attributes);
return new Tuple2<>(new Text(""), dynamoDBItemWritable);
});
// Writing data to the DynamoDB table
JobConf jobConf = SparkConfiguration.buildJobConf(javaSparkContext, Boolean.FALSE);
dynamoCitations.saveAsHadoopDataset(jobConf);
sparkSession.stop();
}
private static void putStringAttribute(Map<String, AttributeValue> attributes, String key, Object fieldValue) {
if (fieldValue != null) {
attributes.put(key, new AttributeValue(fieldValue.toString()));
}
}
private static void putNumberAttribute(Map<String, AttributeValue> attributes, String key, Object fieldValue) {
if (fieldValue != null) {
try {
Integer integerFieldValue = Integer.parseInt(fieldValue.toString());
AttributeValue attributeValue = new AttributeValue();
attributeValue.setN(integerFieldValue.toString());
attributes.put(key, attributeValue);
} catch (Exception e) {
LOGGER.info("cannot convert " + fieldValue + " to integer");
}
}
}
}
And here is the configuration for building Spark context:
public static JavaSparkContext buildSparkContext(String application) throws ClassNotFoundException {
SparkConf conf = new SparkConf()
.setAppName(application)
.set("spark.executor.cores", "5")
.set("spark.executor.memory", "42G")
.set("spark.executor.memoryOverhead", "4G")
.set("spark.driver.memory", "42G")
.set("spark.driver.cores", "5")
.set("spark.executor.instances", "25")
.set("spark.default.parallelism", "250")
.set("spark.dynamicAllocation.enabled", "false")
.registerKryoClasses(new Class<?>[]{
Class.forName("org.apache.hadoop.io.Text"),
Class.forName("org.apache.hadoop.dynamodb.DynamoDBItemWritable")
});
return new JavaSparkContext(conf);
}
Im using EMR cluster with m6a.16xlarge (64vcore, 259GB memory) for EC2 instance types.
Can somebody please help me with this?
Thanks in advance.
2
Answers
CSV schema inference is implemented as "read through the entire file and work out the type of each column". There’s a bit of sampling, but its the same thing really: 1 or more HTTP GET requests to stream through the entire file before any computation begins
Ideally, move off CSV to a modern file format, such as Avro, with a well defined schema. If you can’t do that, explicitly declare the schema for the csv file in your app.
This may not be the sole cause of your problems, but it will be the first bottleneck in the processing
If you do iterative operations with the same data, you should use
persist()
method to avoid redundant recalculation.I would recommend persisting at least
fullCitations
dataset after its creation.See more about persistence in the documentation: https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
Also, I would check the Spark configuration. You have a cluster with 64 vcores and 259GB RAM, but you declared 25 executors with 5 cores and 42GB RAM for each executor. I am not sure you need such a large amount of resources for every executor, but it may affect performance if your cluster does not have enough resources to start all executors.