Skip to main content

ETL Engine best practice

When writing ETL Engine SQL, consider the following best practice:

Goal: Avoid accidental data shuffles.

In distributed systems like Spark, removing duplicates is expensive because it requires moving data across the network (Shuffle) to compare rows.

DISTINCT

  • The issue: SELECT DISTINCT forces a huge shuffle of your entire dataset.

  • Best practice: Only use DISTINCT if you have verified that duplicates actually exist. Never use it "just in case."

UNION vs. UNION ALL

UNION: Appends data AND removes duplicates. (Slow: Requires Shuffle).

  • Best practice: Fast append. Handle duplicates later only if needed.

    SELECT * FROM table_A
    UNION ALL
    SELECT * FROM table_B;
  • Avoid this: Triggers a heavy shuffle to check for duplicates

    SELECT * FROM table_A
    UNION
    SELECT * FROM table_B;

Goal: Reduce the amount of data Spark reads from the disk.

Spark uses Columnar Storage (Parquet/Delta). This means every column is stored separately on the hard drive.

  • Best practice: SELECT id, status (Reads only the files you need).

  • Avoid this: SELECT * (Reads every single column file, even unused ones).

Goal: Filter out rows that exist in another table (e.g., "Get customers who have never placed an order").

  • Best practice: Both of the following methods are safe and efficient. Spark's optimizer (Catalyst) converts NOT EXISTS into the same efficient execution plan (Anti Join) as the explicit join syntax. Spark reads the Left table (Customers) and attempts to find a match in the Right table (Orders). If a match is found, the row is discarded. If no match is found, the row is kept. This process is highly optimized and ignores NULL values safely.

    • Option A: LEFT ANTI JOIN (Explicit) - Think of this as efficient subtraction.

      SELECT * FROM customers c
      LEFT ANTI JOIN orders o
      ON c.customer_id = o.customer_id;
    • Option B: NOT EXISTS (Standard SQL) - This is safer than NOT IN because it handles NULLs correctly and allows Spark to optimize.

      SELECT * FROM customers c
      WHERE NOT EXISTS (
          SELECT 1 FROM orders o
          WHERE c.customer_id = o.customer_id
      );
  • Avoid this: Using NOT IN with a subquery is dangerous. If the subquery contains even a single NULL value, Spark often falls back to a slow, single-threaded process (Cartesian Product) to handle the logic.

    SELECT * FROM customers
    WHERE customer_id NOT IN (SELECT customer_id FROM orders);

Goal: Get the "latest," "largest," or "most recent" record for every group.

  • Best practice: Use ROW_NUMBER() to rank items in a single pass.

    WITH RankedUpdates AS (
        SELECT
            user_id, status, updated_at,
            ROW_NUMBER() OVER (
                PARTITION BY user_id
                ORDER BY updated_at DESC
            ) as rn
        FROM user_updates
    )
    SELECT user_id, status, updated_at
    FROM RankedUpdates
    WHERE rn = 1;

    Window frames (Ascending vs. Descending)

    If you are using running totals or sliding windows (e.g., SUM(...) OVER ...), the direction of your sort matters significantly for the Databricks Photon engine.

    • The trap: Using ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING (looking forward) often forces Spark to abandon the fast Vectorized/Photon engine and fall back to slow Java row-by-row processing.

    • The fix: Always try to look backwards. If you need to look forward, simply reverse your sort order and look backwards.

      Example:

      • Fast: ORDER BY date DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW

      • Slow: ORDER BY date ASC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING

  • Avoid this: Finding the "Max Date" and then joining the table back to itself forces Spark to read the data twice and perform a heavy shuffle.

Goal: Speed up queries by eliminating data before processing it.

  • Best practice: Filter on columns as they exist in the source data. This allows Spark to skip reading entire files or partitions ("Partition Pruning").

    Spark reads only relevant data first:

    SELECT *, TO_DATE(timestamp_col) as evt_date
    FROM huge_event_log
    WHERE timestamp_col LIKE '2024-01-01%
  • Avoid this: Applying filters after heavy transformations forces Spark to process the entire dataset, only to throw most of it away later.

    Parses all timestamps before filtering:

    SELECT * FROM (
      SELECT *, TO_DATE(timestamp_col) as evt_date
      FROM huge_event_log
    )
    WHERE evt_date = '2024-01-01';

Goal: Remove rows efficiently.

Rule: Use DELETE for simple logic (e.g., date ranges). Use MERGE when removing a list or joining to another table.

Why: MERGE is optimized for "Join-based" updates. DELETE with subqueries (IN or EXISTS) can be slower and harder for the optimizer to execute efficiently.

  • Best practice: Use MERGE to join and delete in one optimized step.

    Efficient join based delete:

    MERGE INTO huge_table target
    USING banned_users source
    ON target.user_id = source.user_id
    WHEN MATCHED THEN DELETE;
  • Avoid this: Using subqueries inside DELETE is often inefficient.

    DELETE FROM huge_table
    WHERE EXISTS (
       SELECT 1 FROM banned_users b
       WHERE huge_table.user_id = b.user_id
    );

Goal: Avoid expensive "Fuzzy Matching" (LIKE, %, Regex) which forces slow table scans and loops.

  • Case A: Joining Tables (Avoid Non-Equi Joins) - Spark is optimized for Equi-Joins (where Key A = Key B). If you use LIKE in a Join, Spark falls back to a Nested Loop Join, which is extremely slow.

    • Best practice: Refactor the join key using LPAD or SUBSTRING to force an exact match.

      Enables fast Hash Joins by forcing '=':

      SELECT * FROM huge_invoices a
      JOIN huge_customers b

      Example: Pad the shorter code to match the longer ID:

      ON a.invoice_id = lpad(b.customer_code, 10, '0');
    • Avoid this: Forces a Nested Loop Join (Timeout Risk)

      SELECT * FROM huge_invoices a
      JOIN huge_customers b
      ON a.invoice_id LIKE concat('%', b.customer_code);
  • Case B: Complex Filtering (Preprocess Logic). If you have complex logic (regex/case statements) in a WHERE clause, Spark has to evaluate it for every single row at runtime.

    • Best practice: Extract the logic into a clean column once using a Temporary View or Table, then filter on that simple column.

      • 1. Preprocess: Extract logic into a clean column:

        CREATE OR REPLACE TEMP VIEW CleanLogs AS
        SELECT *,
          CASE
            WHEN message RLIKE 'Error-[0-9]+' OR message LIKE '%CRITICAL%' THEN 'High'
            ELSE 'Low'
          END as urgency
        FROM huge_logs;
      • 2. Analyze: Fast filtering on the new simple column:

        SELECT * FROM CleanLogs WHERE urgency = 'High'; 

Goal: Update a table's structure without breaking concurrent queries or losing history.

  • Best practice: CREATE OR REPLACE TABLE ... (Atomic swap; zero downtime).QL

    CREATE OR REPLACE TABLE my_table
    AS SELECT * FROM ...;
  • Avoid this: DROP TABLE ... followed by CREATE TABLE ... (Causes downtime and "Table Not Found" errors).

Goal: Prevent "Dead End" columns that cause UNION failures and schema evolution crashes.

When you define a table (especially using CREATE TABLE AS SELECT or CTAS) and include a column defined simply as NULL without a cast, Spark infers the data type as NullType (or VoidType). This is a "silent killer" in pipelines.

The problems it causes:

  • Union mismatches: If you try to UNION this table with another table where the column is populated (e.g., it is a String), Spark will crash. The engine cannot automatically reconcile the difference between a "typed" column (String) and a "void" column (NullType) in strict mode.

  • Schema evolution crashes: If you try to append real data to this table later, the merge will fail because NullType cannot safely evolve into StringType automatically.

  • Best practice: Always cast explicit NULLs to their intended future type.

    CREATE TABLE good_table AS
    SELECT
        id,
        CAST(NULL as STRING) as status_code,
        CAST(NULL as INT) as retry_count
    FROM source_data;
  • Avoid this; Spark creates a "dead" column that breaks future Unions.

    Table A has status_code as NullType:

    SELECT id, NULL as status_code FROM table_A
    UNION ALL

    Table B has status_code as StringType -> CRASH

    SELECT id, 'Active' as status_code FROM table_B;