ETL Engine best practice
When writing ETL Engine SQL, consider the following best practice:
Goal: Avoid accidental data shuffles.
In distributed systems like Spark, removing duplicates is expensive because it requires moving data across the network (Shuffle) to compare rows.
DISTINCT
The issue:
SELECT DISTINCTforces a huge shuffle of your entire dataset.Best practice: Only use
DISTINCTif you have verified that duplicates actually exist. Never use it "just in case."
UNION vs. UNION ALL
UNION: Appends data AND removes duplicates. (Slow: Requires Shuffle).
Best practice: Fast append. Handle duplicates later only if needed.
SELECT * FROM table_A UNION ALL SELECT * FROM table_B;
Avoid this: Triggers a heavy shuffle to check for duplicates
SELECT * FROM table_A UNION SELECT * FROM table_B;
Goal: Reduce the amount of data Spark reads from the disk.
Spark uses Columnar Storage (Parquet/Delta). This means every column is stored separately on the hard drive.
Best practice:
SELECT id, status (Reads only the files you need).Avoid this:
SELECT *(Reads every single column file, even unused ones).
Goal: Filter out rows that exist in another table (e.g., "Get customers who have never placed an order").
Best practice: Both of the following methods are safe and efficient. Spark's optimizer (Catalyst) converts
NOT EXISTSinto the same efficient execution plan (Anti Join) as the explicit join syntax. Spark reads the Left table (Customers) and attempts to find a match in the Right table (Orders). If a match is found, the row is discarded. If no match is found, the row is kept. This process is highly optimized and ignoresNULLvalues safely.Option A: LEFT ANTI JOIN (Explicit) - Think of this as efficient subtraction.
SELECT * FROM customers c LEFT ANTI JOIN orders o ON c.customer_id = o.customer_id;
Option B: NOT EXISTS (Standard SQL) - This is safer than NOT IN because it handles NULLs correctly and allows Spark to optimize.
SELECT * FROM customers c WHERE NOT EXISTS ( SELECT 1 FROM orders o WHERE c.customer_id = o.customer_id );
Avoid this: Using
NOT INwith a subquery is dangerous. If the subquery contains even a singleNULLvalue, Spark often falls back to a slow, single-threaded process (Cartesian Product) to handle the logic.SELECT * FROM customers WHERE customer_id NOT IN (SELECT customer_id FROM orders);
Goal: Get the "latest," "largest," or "most recent" record for every group.
Best practice: Use ROW_NUMBER() to rank items in a single pass.
WITH RankedUpdates AS ( SELECT user_id, status, updated_at, ROW_NUMBER() OVER ( PARTITION BY user_id ORDER BY updated_at DESC ) as rn FROM user_updates ) SELECT user_id, status, updated_at FROM RankedUpdates WHERE rn = 1;Window frames (Ascending vs. Descending)
If you are using running totals or sliding windows (e.g., SUM(...) OVER ...), the direction of your sort matters significantly for the Databricks Photon engine.
The trap: Using ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING (looking forward) often forces Spark to abandon the fast Vectorized/Photon engine and fall back to slow Java row-by-row processing.
The fix: Always try to look backwards. If you need to look forward, simply reverse your sort order and look backwards.
Example:
Fast:
ORDER BY date DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROWSlow:
ORDER BY date ASC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING
Avoid this: Finding the "Max Date" and then joining the table back to itself forces Spark to read the data twice and perform a heavy shuffle.
Goal: Speed up queries by eliminating data before processing it.
Best practice: Filter on columns as they exist in the source data. This allows Spark to skip reading entire files or partitions ("Partition Pruning").
Spark reads only relevant data first:
SELECT *, TO_DATE(timestamp_col) as evt_date FROM huge_event_log WHERE timestamp_col LIKE '2024-01-01%
Avoid this: Applying filters after heavy transformations forces Spark to process the entire dataset, only to throw most of it away later.
Parses all timestamps before filtering:
SELECT * FROM ( SELECT *, TO_DATE(timestamp_col) as evt_date FROM huge_event_log ) WHERE evt_date = '2024-01-01';
Goal: Remove rows efficiently.
Rule: Use DELETE for simple logic (e.g., date ranges). Use MERGE when removing a list or joining to another table.
Why: MERGE is optimized for "Join-based" updates. DELETE with subqueries (IN or EXISTS) can be slower and harder for the optimizer to execute efficiently.
Best practice: Use
MERGEto join and delete in one optimized step.Efficient join based delete:
MERGE INTO huge_table target USING banned_users source ON target.user_id = source.user_id WHEN MATCHED THEN DELETE;
Avoid this: Using subqueries inside
DELETEis often inefficient.DELETE FROM huge_table WHERE EXISTS ( SELECT 1 FROM banned_users b WHERE huge_table.user_id = b.user_id );
Goal: Avoid expensive "Fuzzy Matching" (LIKE, %, Regex) which forces slow table scans and loops.
Case A: Joining Tables (Avoid Non-Equi Joins) - Spark is optimized for Equi-Joins (where Key A = Key B). If you use
LIKEin a Join, Spark falls back to a Nested Loop Join, which is extremely slow.Best practice: Refactor the join key using
LPADorSUBSTRINGto force an exact match.Enables fast Hash Joins by forcing '=':
SELECT * FROM huge_invoices a JOIN huge_customers b
Example: Pad the shorter code to match the longer ID:
ON a.invoice_id = lpad(b.customer_code, 10, '0');
Avoid this: Forces a Nested Loop Join (Timeout Risk)
SELECT * FROM huge_invoices a JOIN huge_customers b ON a.invoice_id LIKE concat('%', b.customer_code);
Case B: Complex Filtering (Preprocess Logic). If you have complex logic (regex/case statements) in a
WHEREclause, Spark has to evaluate it for every single row at runtime.Best practice: Extract the logic into a clean column once using a Temporary View or Table, then filter on that simple column.
1. Preprocess: Extract logic into a clean column:
CREATE OR REPLACE TEMP VIEW CleanLogs AS SELECT *, CASE WHEN message RLIKE 'Error-[0-9]+' OR message LIKE '%CRITICAL%' THEN 'High' ELSE 'Low' END as urgency FROM huge_logs;2. Analyze: Fast filtering on the new simple column:
SELECT * FROM CleanLogs WHERE urgency = 'High';
Goal: Update a table's structure without breaking concurrent queries or losing history.
Best practice:
CREATE OR REPLACE TABLE... (Atomic swap; zero downtime).QLCREATE OR REPLACE TABLE my_table AS SELECT * FROM ...;
Avoid this:
DROP TABLE ...followed byCREATE TABLE ...(Causes downtime and "Table Not Found" errors).
Goal: Prevent "Dead End" columns that cause UNION failures and schema evolution crashes.
When you define a table (especially using CREATE TABLE AS SELECT or CTAS) and include a column defined simply as NULL without a cast, Spark infers the data type as NullType (or VoidType). This is a "silent killer" in pipelines.
The problems it causes:
Union mismatches: If you try to UNION this table with another table where the column is populated (e.g., it is a String), Spark will crash. The engine cannot automatically reconcile the difference between a "typed" column (String) and a "void" column (
NullType) in strict mode.Schema evolution crashes: If you try to append real data to this table later, the merge will fail because
NullTypecannot safely evolve intoStringTypeautomatically.
Best practice: Always cast explicit
NULLsto their intended future type.CREATE TABLE good_table AS SELECT id, CAST(NULL as STRING) as status_code, CAST(NULL as INT) as retry_count FROM source_data;Avoid this; Spark creates a "dead" column that breaks future Unions.
Table A has
status_codeasNullType:SELECT id, NULL as status_code FROM table_A UNION ALL
Table B has
status_codeasStringType->CRASHSELECT id, 'Active' as status_code FROM table_B;