15. Databricks¶
15.1. Column function¶
from pyspark.sql.functions import col
15.2. Create a datatype schema¶
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, BooleanType
from pyspark.sql.functions import from_json, unix_timestamp
schema = StructType([
StructField("channel", StringType(), True),
StructField("comment", StringType(), True),
StructField("delta", IntegerType(), True),
StructField("flag", StringType(), True),
StructField("geocoding", StructType([ # (OBJECT): Added by the server, field contains IP address geocoding information for anonymous edit.
StructField("city", StringType(), True),
StructField("country", StringType(), True),
StructField("countryCode2", StringType(), True),
StructField("countryCode3", StringType(), True),
StructField("stateProvince", StringType(), True),
StructField("latitude", DoubleType(), True),
StructField("longitude", DoubleType(), True),
]), True),
StructField("isAnonymous", BooleanType(), True), # (BOOLEAN): Whether or not the change was made by an anonymous user
StructField("isNewPage", BooleanType(), True),
StructField("isRobot", BooleanType(), True),
StructField("isUnpatrolled", BooleanType(), True),
StructField("namespace", StringType(), True), # (STRING): Page's namespace. See https://en.wikipedia.org/wiki/Wikipedia:Namespace
StructField("page", StringType(), True), # (STRING): Printable name of the page that was edited
StructField("pageURL", StringType(), True), # (STRING): URL of the page that was edited
StructField("timestamp", StringType(), True), # (STRING): Time the edit occurred, in ISO-8601 format
StructField("url", StringType(), True),
StructField("user", StringType(), True), # (STRING): User who made the edit or the IP address associated with the anonymous editor
StructField("userURL", StringType(), True),
StructField("wikipediaURL", StringType(), True),
StructField("wikipedia", StringType(), True), # (STRING): Short name of the Wikipedia that was edited (e.g., "en" for the English)
])
15.3. Start a Stream¶
For parquet source file:
# TODO
dataPath = "/mnt/training/asa/flights/2007-01-stream.parquet/"
parquetSchema = "DepartureAt timestamp, FlightDate string, DepTime string, CRSDepTime string, ArrTime string, " + \
"CRSArrTime string, UniqueCarrier string, FlightNum integer, TailNum string, ActualElapsedTime string," + \
" CRSElapsedTime string, AirTime string, ArrDelay string, DepDelay string, Origin string, Dest string, " + \
"Distance string, TaxiIn string, TaxiOut string, Cancelled integer, CancellationCode string, Diverted integer," + \
"CarrierDelay string, WeatherDelay string, NASDelay string, SecurityDelay string, LateAircraftDelay string"
# Configure the shuffle partitions to match the number of cores
spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)
streamDF = (spark # Start with the SparkSesion
.readStream # Get the DataStreamReader
.format('parquet') # Configure the stream's source for the appropriate file type
.schema(parquetSchema) # Specify the parquet files' schema
.option("maxFilesPerTrigger", 1) # Restrict Spark to processing only 1 file per trigger
.load(dataPath) # Load the DataFrame specifying its location with dataPath
)
15.3.1. Kafka Stream¶
from pyspark.sql.functions import col
spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)
kafkaServer = "server1.databricks.training:9092" # US (Oregon)
# kafkaServer = "server2.databricks.training:9092" # Singapore
editsDF = (spark.readStream # Get the DataStreamReader
.format("kafka") # Specify the source format as "kafka"
.option("kafka.bootstrap.servers", kafkaServer) # Configure the Kafka server name and port
.option("subscribe", "en") # Subscribe to the "en" Kafka topic
.option("startingOffsets", "earliest") # Rewind stream to beginning when we restart notebook
.option("maxOffsetsPerTrigger", 1000) # Throttle Kafka's processing of the streams
.load() # Load the DataFrame
.select(col("value").cast("STRING")) # Cast the "value" column to STRING
)
15.4. Stop Streams¶
for s in spark.streams.active: # Iterate over all active streams
s.stop() # Stop the stream
15.5. Configure Suffle Partition¶
Configure the shuffle partitions to match the number of cores
spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)
15.6. Unqued delayed event in stream¶
Ignore any events delayed by X minutes or more. And slidding windows.
from pyspark.sql.functions import col, window
countsDF = (streamDF # Start with the DataFrame
.withWatermark("DepartureAt", "300 minutes")
.groupBy(col("UniqueCarrier"),
window(col("DepartureAt"), "30 minutes"))
.count()
.select(col("window.start").alias("start"),
col("count"),
col("UniqueCarrier"))
)
15.7. Train/Test Split in DataBricks¶
trainDF, testDF = preprocessedDF.randomSplit([0.7, 0.3], # 70-30 split
seed=42) # For reproducibility