Exercices PySpark
-
Log filtering
- Input data: a simplified log of a web server (i.e., a textual file). Each line of the file is associated with a URL request
-
Output: the lines containing the word "google". Store the output in a local folder, for simplicity
# Input data 66.249.69.97 --[24/Sep/2014:22:25:44 +0000] "GET http://www.google.com/bot.html” 66.249.69.97 --[24/Sep/2014:22:26:44 +0000] "GET http://www.google.com/how.html” 66.249.69.97 --[24/Sep/2014:22:28:44 +0000] "GET http://dbdmg.polito.it/course.html” 71.19.157.179 --[24/Sep/2014:22:30:12 +0000] "GET http://www.google.com/faq.html” 66.249.69.97 --[24/Sep/2014:31:28:44 +0000] "GET http://dbdmg.polito.it/thesis.html” # Output data 66.249.69.97 --[24/Sep/2014:22:25:44 +0000] "GET http://www.google.com/bot.html” 66.249.69.97 --[24/Sep/2014:22:26:44 +0000] "GET http://www.google.com/how.html” 71.19.157.179 --[24/Sep/2014:22:30:12 +0000] "GET http://www.google.com/faq.html”
-
Log analysis
- Input data: log of a web server (i.e., a textual file). Each line of the file is associated with a URL request
-
Output: the list of distinct IP addresses associated with the connections to a googlepage (i.e., connections to URLs containing the term
www.google.com
). Store the output in a local folder, for simplicity#Input data 66.249.69.97 --[24/Sep/2014:22:25:44 +0000] "GET http://www.google.com/bot.html” 66.249.69.97 --[24/Sep/2014:22:26:44 +0000] "GET http://www.google.com/how.html” 66.249.69.97 --[24/Sep/2014:22:28:44 +0000] "GET http://dbdmg.polito.it/course.html” 71.19.157.179 --[24/Sep/2014:22:30:12 +0000] "GET http://www.google.com/faq.html” 66.249.69.95 --[24/Sep/2014:31:28:44 +0000] "GET http://dbdmg.polito.it/thesis.html” 66.249.69.97 --[24/Sep/2014:56:26:44 +0000] "GET http://www.google.com/how.html” 56.249.69.97 --[24/Sep/2014:56:26:44 +0000] "GET http://www.google.com/how.html” # Output data 66.249.69.97 71.19.157.179 56.249.69.97
-
Maximum value
- Input data: a collection of (structured) textual csv files containing the daily value of PM10 for a set of sensors. Each line of the files has the following format:
sensorId,date,PM10 value (μg/m3)\n
-
Output: report the maximum value of PM10. Print the result on the standard output
- Input data: a collection of (structured) textual csv files containing the daily value of PM10 for a set of sensors. Each line of the files has the following format:
-
Top-k maximum values
- Input data: a collection of (structured) textual csv files containing the daily value of PM10 for a set of sensors. Each line of the files has the following format:
sensorId,date,PM10 value (μg/m3)\n
-
Output: report the top-3 maximum values of PM10. Print the result on the standard output.
- Input data: a collection of (structured) textual csv files containing the daily value of PM10 for a set of sensors. Each line of the files has the following format:
Solutions
log-filtering.py
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
logFile = "log-filtering-input.txt"
spark: SparkSession = SparkSession.builder.appName("log-filtering").getOrCreate()
logData: DataFrame = spark.read.text(logFile).cache()
df = logData.filter(logData.value.contains('google'))
df.show(df.count())
df.write.save("log-filtering-output", format="text", mode="overwrite")
spark.stop()
log-analysis.py
from pyspark.sql import SparkSession, DataFrame, Row, functions as sf
exercise_name = "log-analysis"
logFile = f"{exercise_name}-input.txt"
spark: SparkSession = SparkSession.builder.appName(exercise_name).getOrCreate()
logData: DataFrame = spark.read.text(logFile).cache()
df = logData.filter(logData.value.contains('google'))
df = df.select(sf.split(df.value, "\\s+").getItem(0).alias("IP")).distinct()
df.show(df.count())
df.write.save(f"{exercise_name}-output", format="text", mode="overwrite")
spark.stop()
Exos supplémentaires
- Python
- Travaux pratiques - Introduction à Spark
- Marlowess/spark-exercises (vérifier les branches pour les version récentes de pyspark)
- Databricks Spark DF, SQL, ML Exercise - Solutions
- areibman/pyspark_exercises (pas de solutions avec pyspark)
- Scala