Unlocking Effectivity and Efficiency: Navigating the Spark 3 and EMR 6 Improve Journey at Slack

Unlocking Effectivity and Efficiency: Navigating the Spark 3 and EMR 6 Improve Journey at Slack
Unlocking Effectivity and Efficiency: Navigating the Spark 3 and EMR 6 Improve Journey at Slack

Slack Information Engineering not too long ago underwent knowledge workload migration from AWS EMR 5 (Spark 2/Hive 2 processing engine) to EMR 6 (Spark 3 processing engine). On this weblog, we are going to share our migration journey, challenges, and the efficiency features we noticed within the course of. This weblog goals to help Information Engineers, Information Infrastructure Engineers, and Product Managers who could also be contemplating migrating to EMR 6/Spark 3.

In Information Engineering, our main goal is to help inside groups—equivalent to Product Engineering, Machine Studying, and Information Science—by offering important datasets and a dependable knowledge infrastructure to facilitate the creation of their very own datasets. We make sure the reliability and timeliness of important billing and utilization knowledge for our shoppers. Sustaining Touchdown Time SLAs (Service Degree Agreements) serves as a measure to maintain up these guarantees.

Over time, the speedy enlargement of our knowledge quantity regularly led to the violation of our important knowledge pipeline’s SLAs. As we sought alternate options to Spark 2 and Hive 2, Spark 3 emerged as a compelling resolution for all our knowledge processing wants, notably as a result of its  Adaptive Query Execution (AQE) function that would enhance efficiency for a few of our skewed datasets. We launched into this EMR 6/Spark 3 migration as a result of enhanced efficiency, enhanced safety—with up to date log4j libraries—and the potential for vital cost financial savings.

This year-long undertaking consisted of two main phases:

  • Section 1: Improve EMR from 5.3x to six.x.
  • Section 2: Improve from Hive 2.x/Spark 2.x to Spark 3.x.

Migration journey

Present panorama 

We at Slack Information Engineering use a federated AWS EMR cluster mannequin to handle all knowledge analytics necessities. The info that lives within the knowledge warehouse is bodily saved in S3 and its metadata is saved in Hive Metastore schema on an RDS database. SQL handles most of our use instances. Moreover, we depend on Scala/PySpark for sure complicated workloads. We use Apache Airflow to orchestrate our workflows and have designed customized Spark Airflow operators for submitting SparkSQL, PySpark and Scala jobs to the EMR cluster by way of Livy Batches API utilizing authenticated HTTP requests.

Right here is an instance of our hierarchical customized Airflow Spark operators:

BaseAirflowOperator → SparkBaseAirflowOperator → CustomPySparkAirflowOperator or CustomSparkSqlAirflowOperator

Right here is an instance of how we use CustomSparkSqlAirflowOperator to schedule Airflow process:

Beneath is a pictorial illustration of all of the elements working collectively:

Our knowledge warehouse infrastructure contains over 60 EMR clusters, catering to the wants of over 40 groups and supporting 1000’s of Airflow Directed Acyclic Graphs (DAGs). Previous to this migration, all workloads had been executed on EMR 5.36, Spark 2.4.8, and Hive 2.3.9.

Migration challenges

As nearly all of our workloads had been managed by Hive 2, making the transition to Hive 3 in EMR 6 was the popular alternative for our inside prospects as a result of minimal modifications required within the codebase. Nevertheless, we opted to consolidate right into a single compute engine, Spark 3. This strategic resolution was made to leverage Spark 3 Adaptive Query Execution (AQE) function, develop experience in Spark 3 throughout our groups, and fine-tune Hadoop clusters completely for Spark operations for effectivity.

Given the dimensions of this migration, a phased strategy was important. Thus, we determined to help each AWS EMR 5 and EMR 6 variations till the migration was full, permitting us to transition workloads with out disrupting roadmaps for current groups.

Nevertheless, sustaining two completely different cluster settings (Hive 2.x/Spark 2.x in EMR 5.x and Spark 3.x in EMR 6) offered a number of challenges for us:

  • How can we help the identical Hive catalog throughout Spark 2/Spark 3 workloads?
  • How can we provision completely different variations of EMR clusters?
  • How can we management price?
  • How can we help completely different variations of our job libraries throughout these clusters?
  • How can we submit and route jobs throughout these completely different variations of clusters?

Pre-migration planning

Hive catalog migration

How can we help the identical Hive catalog throughout Spark 2/Spark 3 workloads?

We wanted to make use of the identical Hive Metastore catalog for our workloads throughout EMR 5/Spark 2 and EMR 6/Spark 3 as migration of our pipelines from Spark 2 to Spark 3 would take a number of quarters. We solved this downside by migrating our current HMS 2.3.0 catalog to HMS 3.1.0 catalog, utilizing Hive Schema Tool. We executed the next instructions on the EMR 5 grasp host linked to the catalog database.

 

Earlier than migration we took backups of our Hive Metastore database, and in addition took some downtime on job processing throughout migration for schema improve.

Submit schema improve each our EMR 5 and EMR 6 clusters might discuss to the identical upgraded HMS 3 catalog DB because it was backward suitable with Hive 2 and Spark 2 functions.

EMR cluster provisioning

How can we provision completely different variations of EMR clusters? How can we management price? 

We use EMR’s golang SDK to launch EMR clusters by way of the RunJobFlow api. This API accepts a JSON-based launch configuration for an EMR cluster. We keep a base JSON config for all clusters and override customized parameters like InstanceFleetsCapability, and Launch Label on the cluster configuration stage. We created particular EMR 6 configurations for brand new EMR 6 clusters with auto-scaling enabled and low minimal capability to maintain prices below management. Throughout the strategy of migration, we created extra such EMR 6 cluster configurations for every new cluster. We regulated the capability and total cluster utilization prices by progressively lowering EMR 5 fleet measurement and growing EMR 6 fleets based mostly on utilization.

Job builds throughout completely different Spark variations

How can we help completely different variations of our job libraries throughout these clusters?

We use Bazel as the first instrument to construct our codebase. Utilizing Bazel, we applied parallel construct streams for Spark JARs throughout variations 2.x and three.x. We propagated all ongoing config modifications to each Spark 2 and Spark 3 JARs for consistency. Enabling the construct --config=spark3 flag within the .bazelrc file allowed constructing native JARs with the required model for testing. In our airflow pipelines, as we migrated jobs to EMR 6, the airflow operator would decide Spark 3 jars routinely based mostly on the flag strategy described under.

Airflow operators enhancement

How can we submit and route jobs throughout these completely different variations of clusters?

We enhanced our customized Airflow Spark operator to route jobs to completely different variations of clusters through the use of a boolean flag. This flag supplied the comfort of submitting jobs to both pre-migration and post-migration cluster by a easy toggle.

Moreover we launched 4 logical teams of Spark config sizing choices (SMALL, DEFAULT, LARGE and EXTRA_LARGE) embedded within the Airflow Spark operator. Every possibility has its personal executor reminiscence, driver reminiscence, and executor ranges. Sizing choices helped a few of our finish customers emigrate current Hive jobs with minimal understanding of Spark configurations.

That is an instance of our enhanced CustomSparkSqlAirflowOperator:

Code modifications 

For many instances, the present Hive and Spark 2 code ran advantageous in Spark 3. There have been few instances the place we needed to make modifications to the code to make it Spark 3 suitable.

One instance of a code change from Hive to Spark 3 can be the usage of a salting operate for skewed joins. Whereas some code used cumbersome subqueries to generate salt keys, others used RAND() within the becoming a member of key as a workaround for dealing with skew. Whereas RAND() within the becoming a member of key works in Hive, it throws an error in Spark 3: org.apache.spark.sql.AnalysisException: nondeterministic expressions are solely allowed in Challenge, Filter, Mixture, or Window. We eliminated all skew-handling code and let Spark 3’s Adaptive Query Execution (AQE) maintain the information skew. Extra about AQE within the ‘Migration achieve and influence’ part.

Moreover, Spark 3 threw errors for sure knowledge sort casting situations that labored nicely in Spark 2. We needed to change the default worth of some Spark 3 configurations. One instance is setting spark.sql.storeAssignmentPolicy to ‘Legacy’ as an alternative of default Spark 3 worth ‘ANSI’.

We confronted a couple of cases the place the Spark 3 job inferred the schema from the Hive Metastore however didn’t consolidate schemas, erroring with java.lang.StackOverflowError.  This occurred as a result of an absence of synchronization between the underlying Parquet knowledge and the Hive metastore schema. By setting spark.sql.hive.convertMetastoreParquet to False, we efficiently resolved the difficulty.

Submit-migration knowledge validation 

We in contrast two tables:

  • prod_table_hive2_or_spark2 (EMR 5 desk)
  • test_table_spark3 (EMR 6 desk)

We aimed for an actual knowledge match between the tables somewhat than counting on sampling, significantly as a result of a few of our knowledge, equivalent to buyer billing knowledge, is mission-critical.

We used config information and macros to allow our SQL script to learn from the manufacturing schema and write to the check schema within the check setting. This helped us to populate the precise prod knowledge within the check schema utilizing Spark 3 for straightforward comparability. We then ran besides and depend SQL queries between prod_table_hive2_or_spark2 and test_table_spark3 in Trino to hurry up the validation course of.

In case of mismatch in besides or depend question output, we used our in-house Python framework with the Trino engine for detailed evaluation. We repeatedly monitored put up migration manufacturing runtime of our pipelines utilizing Airflow metadata DB tables and tuned pipelines as required.

There have been few sources of uncertainties within the validation course of. For instance:

  • When the code relied on the present timestamp, it prompted variations between manufacturing and growth runs. We excluded timestamp associated columns whereas validating these tables.
  • Random rows appeared when there’s no differentiable order by clause within the code to resolve ties. We fastened the code to have a differentiable order by clause for future.
  • Discrepancies appeared within the conduct of sure built-in features between Hive and Spark. As an example, features like Greatest, which is used to return the best worth of the checklist of arguments, exhibit completely different conduct when one of many arguments is NULL. We made code modifications to stick to the proper enterprise logic.

Migration achieve and influence

After migration, we noticed substantial runtime efficiency enhancements throughout nearly all of our pipeline duties. Most of our Airflow duties confirmed enhancements starting from 30% to 60%, with some jobs experiencing a formidable 90% enhance in runtime effectivity. We used Airflow metadata DB tables (length column in task_instance desk) to get runtime comparability numbers. Right here is an instance of how the runtime of one among our important duties improved considerably put up migration:

EMR 6 EMRFS S3-optimized committer fastened the issue of incomplete writes and deceptive SUCCESS statuses for a few of our Spark jobs that dealt with text-based enter and output format. It additionally improves software efficiency by avoiding checklist and rename operations achieved in S3 throughout job and process commit phases. Previous to EMR 6.4.0, this function was solely obtainable for Apache Parquet file format. From EMR 6.4.0 it was prolonged to all widespread codecs, together with parquet, ORC, and text-based codecs (together with CSV and JSON).

As anticipated, we observed a number of Adaptive Query Execution(AQE) enhancements within the question execution plan. One of many key enhancements was dynamically optimizing skew be a part of. This helped us to take away a number of strains of skew dealing with logic from our codebase and exchange them by easy be a part of situation between the keys. Beneath is an instance which exhibits AQE (skew=true) trace within the question plan.

One other enchancment was in dynamically coalescing shuffle partitions. This function simplified the tuning of the shuffle partition quantity by choosing the proper shuffle partition quantity at runtime. We solely had to supply a big sufficient preliminary shuffle partition quantity utilizing spark.sql.adaptive.coalescePartitions.initialPartitionNum configuration. Beneath is a question plan which exhibits partition depend going from 3000 to 348 utilizing AQE.

Conclusion

The migration to EMR 6 has resulted in vital enchancment within the runtime efficiency, effectivity, and reliability of our knowledge processing pipelines.

AQE enhancements, equivalent to dynamically optimizing skew joins and coalescing shuffle partitions, have simplified question optimization and diminished the necessity for handbook intervention in tuning parameters. S3-optimized committer has addressed points associated to incomplete writes and deceptive statuses in Spark jobs, resulting in improved stability. The complete strategy of migration described right here ran fairly easily and didn’t trigger any incidents in any of the steps! We improved our pipeline codebase alongside the way in which, making it simpler for brand new engineers to onboard on a clear basis and work completely off Spark 3 engine. The migration has additionally laid the muse for a extra scalable lakehouse with availability of contemporary desk codecs like Iceberg and Hudi in EMR 6. We advocate knowledge organizations to put money into such long-term modernization initiatives because it brings efficiencies throughout the board.

Concerned about becoming a member of our Information Engineering crew? Apply now