0:00
/
0:00
Transcript

In today's data-driven world, drone manufacturers need to process vast amounts of data from production lines, component inventories, flight tests, quality inspections, and real-time telemetry.

The Apache Iceberg Architecture

Apache Iceberg uses a sophisticated three-layer architecture that delivers ACID transactions, schema evolution, and high performance at scale:

The Data Layer

At the foundation sits the data layer containing the actual drone manufacturing data stored in open file formats:

Creating a drone manufacturing table with Parquet files

spark.sql("""

    CREATE TABLE drone_catalog.production.manufactured_drones (

        drone_id STRING,

        model STRING,

        serial_number STRING,

        manufacture_date TIMESTAMP,

        weight_grams DOUBLE,

        battery_capacity_mah INT,

        flight_time_minutes INT,

        quality_score INT

    ) USING iceberg

    PARTITIONED BY (months(manufacture_date))

""")

Datafiles store the actual drone manufacturing records in formats like Parquet (optimized columnar), ORC, or Avro. Parquet is preferred for its ability to prune irrelevant column and row data efficiently.

Delete Files track deleted records using two approaches:

- Positional Delete Files: Identify rows by file path and position ("delete row #234 in drone_components.parquet")

- Equality Delete Files: Identify rows by value ("delete where component_id = 'PROP-X22'")

Setting MOR mode for drone inventory with frequent updates

spark.sql("""

    ALTER TABLE drone_catalog.inventory.components 

    SET TBLPROPERTIES (

        'write.update.mode'='merge-on-read',

        'write.delete.mode'='merge-on-read' 

    )""")



# Delete a defective component batch

spark.sql("""

    DELETE FROM drone_catalog.inventory.components 

    WHERE supplier_id = 'SUP-A' AND quality_check = 'FAILED'

""")

The Metadata Layer

The metadata layer maintains a tree structure that enables Iceberg's advanced features:

Manifest Files track subsets of datafiles with statistics about column values, enabling efficient file pruning during queries. For example, a manifest file might track all drone test flight data for a specific month with min/max values for altitude and flight duration.

Manifest Lists represent snapshots of the table at specific points in time, containing references to manifest files and their statistics. This enables efficient time travel to previous states of your drone production data.

Metadata Files define the table structure including schema, partitioning specs, and snapshot history. Each table update creates a new metadata file, maintaining a complete lineage of changes.

Puffin Files store advanced statistics that accelerate specific query types like unique component counts across your drone manufacturing process.

The Catalog Layer

The catalog maps table names to metadata locations, supporting atomic updates critical for concurrent operations:

Configuring AWS Glue catalog for drone manufacturing

spark = SparkSession.builder \

    .appName("DroneManufacturing") \

    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \

    .config("spark.sql.catalog.drone_catalog", "org.apache.iceberg.spark.SparkCatalog") \

    .config("spark.sql.catalog.drone_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \

    .config("spark.sql.catalog.drone_catalog.warehouse", "s3://drone-factory/warehouse") \

    .config("spark.sql.catalog.drone_catalog.aws.region", "us-east-1") \

    .getOrCreate()

Iceberg supports multiple catalog implementations including Hadoop, Hive Metastore, AWS Glue, Nessie, REST, and JDBC catalogs, each with different capabilities for versioning and multi-table transactions.

The Lifecycle of Queries

Write Operations

When writing drone production data, Iceberg ensures consistency through a multi-step process:

1. Consult the catalog for current table state

2. Write new datafiles for drone records

3. Create manifest files tracking these datafiles with statistics

4. Create manifest list pointing to the manifests

5. Create a new metadata file referencing the manifest list

6. Atomically update the catalog to point to the new metadata file

MERGE operation to update drone status based on inspections

spark.sql("""

    MERGE INTO drone_catalog.production.manufactured_drones t

    USING drone_catalog.quality.inspections s

    ON t.drone_id = s.drone_id

    WHEN MATCHED AND s.result = 'FAIL' AND s.severity_if_failed = 'CRITICAL' THEN

        UPDATE SET t.quality_score = 0, t.status = 'REJECTED'

    WHEN MATCHED AND s.result = 'PASS' THEN

        UPDATE SET t.status = 'APPROVED'

""")

Read Operations

Reading drone data takes advantage of Iceberg's metadata for optimal performance:

1. Consult the catalog for current metadata location

2. Read the metadata file for schema and manifest list

3. Use manifest list statistics to prune irrelevant manifest files

4. Read relevant manifest files and use their statistics to prune datafiles

5. Read only necessary datafiles

6. Process and return results

Analyze drone test flight performance with efficient pruning

spark.sql("""

    SELECT 

        d.model,

        COUNT(t.flight_id) as total_tests,

        AVG(t.flight_duration_minutes) as avg_flight_time,

        MAX(t.max_altitude_meters) as max_altitude,

        COUNT(CASE WHEN t.test_result = 'FAIL' THEN 1 END) as failed_tests

    FROM drone_catalog.production.test_flights t

    JOIN drone_catalog.production.manufactured_drones d ON t.drone_id = d.drone_id

    WHERE t.test_date >= '2025-04-01'

    GROUP BY d.model

    ORDER BY failed_tests DESC

""").show()

Performance Optimization

Compaction

As streaming drone telemetry data creates many small files, compaction is essential for maintaining performance:

 Compacting small drone telemetry files with Z-order

spark.sql("""

    CALL drone_catalog.system.rewrite_data_files(

        table => 'telemetry.flight_data', 

        strategy => 'sort', 

        sort_order => 'zorder(latitude, longitude, altitude)'

    )

""")

Hidden Partitioning

A key Iceberg innovation is hidden partitioning, which simplifies querying while maintaining performance:

 Creating flight logs with hidden partitioning

spark.sql("""

    CREATE TABLE drone_catalog.operations.flight_logs (

        flight_uuid STRING,

        drone_serial STRING,

        flight_start_ts TIMESTAMP,

        flight_duration_minutes FLOAT,

        max_altitude FLOAT,

        max_speed FLOAT

    ) USING iceberg

    PARTITIONED BY (months(flight_start_ts), bucket(16, drone_serial))

""")



# Users can query naturally without knowing the partitioning

spark.sql("""

    SELECT flight_uuid, flight_duration_minutes

    FROM drone_catalog.operations.flight_logs

    WHERE flight_start_ts >= '2025-04-01 00:00:00'

      AND flight_start_ts < '2025-05-01 00:00:00'

      AND drone_serial = 'SN10045678'

""").show()

Bucketing for Scalable Performance

As drone manufacturing data grows into billions of records, bucketing becomes crucial for maintaining query performance:

Bucketing is a partitioning transform that uses a hash function to distribute data evenly across a fixed number of buckets. Unlike range-based partitioning which can lead to skewed partitions (some very large, some very small), bucketing ensures balanced data distribution regardless of value skew.

# Creating a drone parts inventory table with bucketing

spark.sql("""

    CREATE TABLE drone_catalog.inventory.parts (

        part_id STRING,

        part_name STRING,

        supplier_id STRING,

        category STRING,

        stock_quantity INT,

        unit_cost DOUBLE,

        last_updated TIMESTAMP

    ) USING iceberg

    PARTITIONED BY (bucket(16, supplier_id))

""")



# Create a complementary suppliers table with the same bucketing

spark.sql("""

    CREATE TABLE drone_catalog.inventory.suppliers (

        supplier_id STRING,

        supplier_name STRING,

        country STRING,

        lead_time_days INT,

        quality_score DOUBLE

    ) USING iceberg

    PARTITIONED BY (bucket(16, supplier_id))

""")

Bucketing offers several performance benefits as data scales:

1. Optimized joins: When joining drone parts with suppliers on the same bucketed column, the query engine can perform efficient partition-wise joins, dramatically reducing shuffle operations

2. Reduced data skew: For high-cardinality columns with skewed value distribution (like supplier_id where some suppliers provide many more parts), bucketing ensures even distribution

3. Better parallelism: With evenly sized partitions, worker nodes process similar amounts of data, maximizing resource utilization

4. Improved write distribution: Prevents hotspots during bulk insertions by spreading writes evenly across storage

For massive drone manufacturing datasets, configuring bucketing on join keys that appear frequently in analytics (supplier_id, drone_model, component_type) can yield query performance improvements of 2-10x compared to unbucketed tables.

Copy-on-Write vs. Merge-on-Read

Iceberg offers two strategies for handling updates to drone manufacturing data:

- Copy-on-Write: Rewrites entire files containing updated rows (faster reads, slower writes)

- Merge-on-Read: Writes only changed records to separate files (faster writes, marginally slower reads)

# Configure component inventory for frequent updates using MOR

spark.sql("""

    ALTER TABLE drone_catalog.inventory.components 

    SET TBLPROPERTIES (

        'write.update.mode'='merge-on-read',

        'write.delete.mode'='merge-on-read' 

    )

""")

Advanced Features

Metadata Tables

Iceberg exposes internal metadata as queryable tables, providing unprecedented visibility into your drone manufacturing data:

 Monitoring file growth for drone telemetry

spark.sql("""

    SELECT

        partition.months_timestamp as month,

        COUNT(*) as file_count,

        SUM(file_size_in_bytes) as total_bytes,

        AVG(file_size_in_bytes) as avg_file_size

    FROM drone_catalog.telemetry.flight_data.files

    GROUP BY partition.months_timestamp

    ORDER BY month DESC

""").show()

Key metadata tables include `history`, `snapshots`, `files`, `manifests`, and `all_data_files`, enabling operations teams to monitor table health and performance.

Isolation with Branches and Tags

Iceberg's branching capabilities enable the Write-Audit-Publish pattern for ensuring data quality:

# Create branch for quality checking incoming drone inspection data

spark.sql("""

    ALTER TABLE drone_catalog.quality.inspection_results

    CREATE BRANCH quality_check_branch

""")



# Load test data into the branch

spark.sql("""

    INSERT INTO drone_catalog.quality.inspection_results VERSION AS OF 'quality_check_branch'

    VALUES

    ('INSP-1001', 'DRONE-X1', '2025-04-15 10:30:00', 'VISUAL', 'PASS', NULL),

    ('INSP-1002', 'DRONE-X2', '2025-04-15 11:15:00', 'COMPONENT', 'FAIL', 'Motor failure')

""")



# After validation, merge to main branch

spark.sql("""

    ALTER TABLE drone_catalog.quality.inspection_results

    REPLACE BRANCH main FROM quality_check_branch

""")

Time Travel and Rollback

Iceberg provides immutable snapshots enabling historical analysis and recovery from errors:

 Analyze drone quality before a manufacturing process change

spark.sql("""

    SELECT 

        model,

        COUNT(*) as drones_produced,

        AVG(quality_score) as avg_quality

    FROM drone_catalog.production.manufactured_drones

    TIMESTAMP AS OF '2025-03-14 23:59:59'

    WHERE manufacture_date >= '2025-03-01' AND manufacture_date < '2025-03-15'

    GROUP BY model

""").show()

Rolling back after detecting a data quality issue

spark.sql("""

    CALL drone_catalog.system.rollback_to_timestamp(

        'production.manufactured_drones',

        TIMESTAMP '2025-04-14 16:30:00'

    )

""")

Streaming with Apache Iceberg

Streaming data architectures with Apache Iceberg typically follow one of several design patterns:

1. Direct streaming ingestion: Real-time data from manufacturing equipment, IoT sensors, and quality control systems flows directly to Iceberg tables.

2. Lambda architecture: Combines batch and streaming paths where real-time drone telemetry flows through a "speed layer" for immediate analysis, while periodic batch processes consolidate historical data.

3. Kappa architecture: Uses only streaming processes for both real-time and historical analytics, simplifying the overall architecture.

# Reading the data from a message broker and streaming to Iceberg

spark = SparkSession.builder \
    .appName("DroneTelemStreamingToIceberg") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.drone_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.drone_catalog.type", "hadoop") \
    .config("spark.sql.catalog.drone_catalog.warehouse", "s3://drone-lakehouse/warehouse") \
    .getOrCreate()

# Define schema for drone telemetry
telemetry_schema = StructType([
    StructField("drone_id", StringType(), False),
    StructField("timestamp", TimestampType(), False),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("altitude", DoubleType(), True),
    StructField("speed", DoubleType(), True),
    StructField("battery_level", DoubleType(), True),
    StructField("motor_temperature", DoubleType(), True)
])

# Read from Kafka
telemetry_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "drone-telemetry") \
    .load()

# Parse JSON telemetry data
parsed_telemetry = telemetry_stream \
    .select(from_json(col("value").cast("string"), telemetry_schema).alias("data")) \
    .select("data.*")

# Write to Iceberg table
query = parsed_telemetry.writeStream \
    .foreachBatch(lambda batch_df, batch_id: 
        batch_df.writeTo("drone_catalog.production.drone_telemetry").append()
    ) \
    .option("checkpointLocation", "s3://drone-lakehouse/checkpoints/telemetry") \
    .trigger(processingTime="30 seconds") \
    .start()

query.awaitTermination()

Reading Data from Iceberg and write to Message Broker:

# Create a streaming DataFrame from the Iceberg table

# Reads new data incrementally as it's added to the table
df_stream = spark.readStream \
    .format("iceberg") \
    .option("stream-from-timestamp", "2025-04-15 00:00:00") \
    .load("drone_catalog.production.drone_telemetry")

# Filter for anomalies in motor temperature
anomalies = df_stream.filter(col("motor_temperature") > 85)

# Write anomalies to a critical alerts to memory
query = anomalies.writeStream \
    .format("memory") \
    .queryName("critical_motor_temp_alerts") \
    .outputMode("append") \
    .start()

# Write anomalies to another system
queryKafka = anomalies.writeStream \
    .format("kafka") \
   .option("kafka.bootstrap.servers", "kafka:9092") \
     .option("topic", "drone-alerts") \
   .start()

query.awaitTermination()

Governance and Security

Iceberg supports a comprehensive security model across multiple layers:

Storage-Level Security

Controls access to the underlying drone manufacturing data files:

S3 bucket policy for drone data

{

  "Statement": [

    {

      "Sid": "ProdDataAccess",

      "Effect": "Allow",

      "Principal": {

        "AWS": "arn:aws:iam::123456789012:role/DroneProductionAnalyst"

      },

      "Action": [

        "s3:GetObject",

        "s3:ListBucket"

      ],

      "Resource": [

        "arn:aws:s3:::drone-manufacturing-data",

        "arn:aws:s3:::drone-manufacturing-data/production/*"

      ]

    }

  ]

}

Semantic Layer Governance

Creates business-friendly views with appropriate access controls:

# Creating view of drone quality data

spark.sql("""

CREATE OR REPLACE VIEW drone_catalog.quality.failure_analysis_secure AS

SELECT 

    date_trunc('month', inspection_date) AS month,

    drone_model,

    component_id,

    COUNT(*) AS total_inspections,

    SUM(CASE WHEN pass_inspection = false THEN 1 ELSE 0 END) AS failures,

    ROUND(SUM(CASE WHEN pass_inspection = false THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) AS failure_rate

FROM drone_catalog.quality.inspection_results

WHERE inspection_date >= date_add(current_date(), -365)

GROUP BY date_trunc('month', inspection_date), drone_model, component_id

""")

Catalog-Level Security

Controls access to table metadata through AWS Lake Formation, Nessie permissions, or other catalog-specific security features.

Migrating to Apache Iceberg

Two primary strategies exist for migrating drone manufacturing data to Iceberg:

In-Place Migration

Converts existing tables without copying data:

 In-place migration of drone assembly metrics

spark.sql("""

    CALL spark_catalog.system.snapshot(

        source_table => 'drone_factory.production.assembly_metrics',

        table => 'drone_catalog.production.assembly_metrics',

        properties => map('format-version', '2')

    )

""")

Shadow Migration

Creates new Iceberg tables and copies data from source tables, allowing for transformation during migration:

# Shadow migration with transformation

spark.sql("""

    CREATE TABLE drone_catalog.manufacturing.assembly_metrics

    USING iceberg

    PARTITIONED BY (days(assembly_date))

    AS

    SELECT

        assembly_id,

        drone_id,

        drone_model,

        assembly_date,

        -- Extract technician information from combined field

        split(technician_info, ':')[0] as technician_id,

        split(technician_info, ':')[1] as technician_name,

        -- Convert seconds to minutes for better usability

        assembly_time_seconds / 60 as assembly_time_minutes,

        quality_score,

        current_timestamp() as migrated_at

    FROM drone_factory.manufacturing.assembly_metrics

""")

Usecase

Predictive Maintenance with ML

Apache Iceberg's time travel capabilities enable reproducible machine learning for drone component failure prediction.

Iceberg provides critical capabilities for ML workflows in the data lake, including reproducible feature engineering, model training, and prediction serving.

Key Components:

- Time travel for reproducible ML pipelines

- Feature tables stored as Iceberg tables

- Incremental feature computation using metadata tables



# Create an Iceberg table to store drone telemetry data

spark.sql("""
    CREATE TABLE IF NOT EXISTS drone_catalog.operations.drone_telemetry (
        event_id STRING,
        drone_id STRING,
        model STRING,
        timestamp TIMESTAMP,
        flight_hours DOUBLE,
        battery_cycles INT,
        avg_motor_temperature DOUBLE,
        max_motor_temperature DOUBLE,
        vibration_level DOUBLE,
        failure_occurred BOOLEAN,
        component_replaced STRING
    ) USING iceberg
    PARTITIONED BY (months(timestamp))
""")


# Use time travel to create reproducible training dataset, ensures the same training data can be regenerated even as the table evolves

training_data = spark.sql("""
    SELECT
        t.drone_id,
        t.model,
        t.component_replaced AS component_type,
        m.manufacture_date,
        t.timestamp AS failure_date,
        datediff(t.timestamp, m.manufacture_date) AS lifespan_days,
        SUM(t.flight_hours) OVER (PARTITION BY t.drone_id) AS total_flight_hours,
        AVG(t.avg_motor_temperature) OVER (PARTITION BY t.drone_id) AS avg_temp,
        MAX(t.max_motor_temperature) OVER (PARTITION BY t.drone_id) AS max_temp,
        AVG(t.vibration_level) OVER (PARTITION BY t.drone_id) AS avg_vibration
    FROM drone_catalog.operations.drone_telemetry t
    JOIN drone_catalog.manufacturing.drone_components m
        ON t.drone_id = m.drone_id
    WHERE t.component_replaced IS NOT NULL
    AND t.failure_occurred = TRUE
    VERSION AS OF '2025-04-01 00:00:00'
""").cache()  # Cache it for performance

# Features

feature_cols = ["total_flight_hours", "avg_temp", "max_temp", "avg_vibration"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# model build
rf = RandomForestRegressor(
    featuresCol="features", 
    labelCol="lifespan_days",
    numTrees=100, 
    maxDepth=10
)

# Build the pipeline
pipeline = Pipeline(stages=[assembler, rf])

# Train the model
model = pipeline.fit(training_data)

# Save the model for reproducibility
model.write().overwrite().save("s3://drone-lakehouse/models/component_lifespan_predictor")

# Get current component data

current_components = spark.sql("""
    SELECT
        c.drone_id,
        c.component_id,
        c.component_type,
        c.model,
        c.installation_date,
        datediff(current_date(), c.installation_date) AS current_age_days,
        SUM(t.flight_hours) OVER (PARTITION BY c.drone_id) AS total_flight_hours,
        AVG(t.avg_motor_temperature) OVER (PARTITION BY c.drone_id) AS avg_temp,
        MAX(t.max_motor_temperature) OVER (PARTITION BY c.drone_id) AS max_temp,
        AVG(t.vibration_level) OVER (PARTITION BY c.drone_id) AS avg_vibration
    FROM drone_catalog.manufacturing.drone_components c
    JOIN drone_catalog.operations.drone_telemetry t
        ON c.drone_id = t.drone_id
    WHERE c.status = 'ACTIVE'
""")

# Apply model to make predictions
predictions = model.transform(current_components)

# Write predictions to Iceberg

predictions.select(
    "drone_id", 
    "component_id", 
    "component_type", 
    "model",
    "current_age_days",
    col("prediction").alias("predicted_lifespan_days"),
    when(col("current_age_days") >= col("prediction"), "HIGH")
    .when(col("current_age_days") >= 0.8 * col("prediction"), "MEDIUM")
    .otherwise("LOW").alias("failure_risk")
).writeTo("drone_catalog.maintenance.component_lifespan_predictions").append()

# Create maintenance schedule based on predictions
spark.sql("""
    CREATE OR REPLACE TABLE drone_catalog.maintenance.scheduled_maintenance AS
    SELECT 
        drone_id,
        component_id,
        component_type,
        failure_risk,
        CASE 
            WHEN failure_risk = 'HIGH' THEN current_date() + INTERVAL 7 DAYS
            WHEN failure_risk = 'MEDIUM' THEN current_date() + INTERVAL 30 DAYS
            ELSE NULL
        END AS scheduled_maintenance_date
    FROM drone_catalog.maintenance.component_lifespan_predictions
    WHERE failure_risk IN ('HIGH', 'MEDIUM')
""")

Change Data Capture

Iceberg enables efficient change tracking for downstream systems:

# Create change log view to capture component inventory changes

spark.sql(f"""

    CREATE OR REPLACE TEMPORARY VIEW component_changes AS

    SELECT * FROM drone_catalog.inventory.components.changes

    WHERE snapshot_id = {current_snapshot_id}

""")



# Process only changes to update supply chain systems

spark.sql("""

    MERGE INTO drone_catalog.analytics.supplier_inventory_summary target

    USING (

        SELECT

            supplier_id,

            category,

            COUNT(*) AS total_parts,

            SUM(unit_cost * quantity_in_stock) AS total_value,

            AVG(unit_cost) AS average_unit_cost,

            MAX(last_updated) AS last_updated

        FROM drone_catalog.inventory.components

        GROUP BY supplier_id, category

    ) source

    ON target.supplier_id = source.supplier_id AND target.category = source.category

    WHEN MATCHED THEN UPDATE SET *

    WHEN NOT MATCHED THEN INSERT *

"""

)

Apache Iceberg provides a unified platform that delivers warehouse-like performance with lake-like flexibility. From streaming telemetry data to predictive maintenance models, Iceberg's architecture supports diverse analytical needs while ensuring data consistency, evolution, and governance.

Citations:

• Apache Iceberg - The Definitive Guide

• Jack Vanlightly Blogs - https://jack-vanlightly.com/analyses/2024/7/30/understanding-apache-icebergs-consistency-model-part1