Spark Structured Streaming verarbeitet Datenströme mit derselben API wie Batch. Ein Code für historische und Echtzeitdaten.
Structured Streaming¶
Stream als unendliche Tabelle — neue Daten sind neue Zeilen.
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, sum, count
spark = SparkSession.builder.appName("Streaming").getOrCreate()
orders = (
spark.readStream.format("kafka")
.option("subscribe", "orders").load()
.select(from_json(col("value").cast("string"), schema).alias("d"))
.select("d.*")
)
revenue = (
orders.withWatermark("order_time", "10 minutes")
.groupBy(window("order_time", "5 minutes"))
.agg(sum("amount").alias("revenue"))
)
revenue.writeStream.format("delta")
.option("checkpointLocation", "/cp/revenue")
.start("/data/revenue")
Trigger-Modi¶
- Default — Micro-Batch ASAP
- Fixed Interval — processingTime
- Once / Available-now — einmalige Verarbeitung
Zusammenfassung¶
Spark Structured Streaming ist ideal für Teams mit Spark, die Stream Processing hinzufügen möchten.
spark streamingapache sparkmicro-batchreal-time