Aller au contenu

Exercices PySpark

  1. Log filtering

    • Input data: a simplified log of a web server (i.e., a textual file). Each line of the file is associated with a URL request
    • Output: the lines containing the word "google". Store the output in a local folder, for simplicity

      # Input data
      66.249.69.97 --[24/Sep/2014:22:25:44  +0000]  "GET http://www.google.com/bot.html”
      66.249.69.97 --[24/Sep/2014:22:26:44  +0000]  "GET  http://www.google.com/how.html”
      66.249.69.97 --[24/Sep/2014:22:28:44 +0000] "GET http://dbdmg.polito.it/course.html”
      71.19.157.179 --[24/Sep/2014:22:30:12  +0000]  "GET http://www.google.com/faq.html”
      66.249.69.97 --[24/Sep/2014:31:28:44 +0000] "GET http://dbdmg.polito.it/thesis.html”
      
      # Output data
      66.249.69.97 --[24/Sep/2014:22:25:44  +0000]  "GET http://www.google.com/bot.html”
      66.249.69.97 --[24/Sep/2014:22:26:44  +0000]  "GET  http://www.google.com/how.html”
      71.19.157.179 --[24/Sep/2014:22:30:12  +0000]  "GET http://www.google.com/faq.html”
      
  2. Log analysis

    • Input data: log of a web server (i.e., a textual file). Each line of the file is associated with a URL request
    • Output: the list of distinct IP addresses associated with the connections to a googlepage (i.e., connections to URLs containing the term www.google.com). Store the output in a local folder, for simplicity

      #Input data
      
      66.249.69.97 --[24/Sep/2014:22:25:44  +0000]  "GET http://www.google.com/bot.html”
      66.249.69.97 --[24/Sep/2014:22:26:44  +0000]  "GET  http://www.google.com/how.html”
      66.249.69.97 --[24/Sep/2014:22:28:44 +0000] "GET http://dbdmg.polito.it/course.html”
      71.19.157.179 --[24/Sep/2014:22:30:12  +0000]  "GET http://www.google.com/faq.html”
      66.249.69.95 --[24/Sep/2014:31:28:44 +0000] "GET http://dbdmg.polito.it/thesis.html”
      66.249.69.97 --[24/Sep/2014:56:26:44  +0000]  "GET http://www.google.com/how.html”
      56.249.69.97 --[24/Sep/2014:56:26:44  +0000]  "GET http://www.google.com/how.html”
      
      # Output data
      66.249.69.97
      71.19.157.179
      56.249.69.97
      
  3. Maximum value

    • Input data: a collection of (structured) textual csv files containing the daily value of PM10 for a set of sensors. Each line of the files has the following format: sensorId,date,PM10 value (μg/m3)\n
    • Output: report the maximum value of PM10. Print the result on the standard output

      #Input data
      
      sensorId,date,PM10  value (μg/m3)
      s1,2016-01-01,20.5
      s2,2016-01-01,30.1
      s1,2016-01-02,60.2
      s2,2016-01-02,20.4
      s1,2016-01-03,55.5
      s2,2016-01-03,52.5
      
      # Output data
      60.2
      
  4. Top-k maximum values

    • Input data: a collection of (structured) textual csv files containing the daily value of PM10 for a set of sensors. Each line of the files has the following format:
      sensorId,date,PM10 value (μg/m3)\n
    • Output: report the top-3 maximum values of PM10. Print the result on the standard output.

      #Input data
      
      sensorId,date,PM10  value (μg/m3)
      s1,2016-01-01,20.5
      s2,2016-01-01,30.1
      s1,2016-01-02,60.2
      s2,2016-01-02,20.4
      s1,2016-01-03,55.5
      s2,2016-01-03,52.5
      
      # Output data
      60.2
      
Solutions
log-filtering.py
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame

logFile = "log-filtering-input.txt"
spark: SparkSession = SparkSession.builder.appName("log-filtering").getOrCreate()
logData: DataFrame = spark.read.text(logFile).cache()

df = logData.filter(logData.value.contains('google'))

df.show(df.count())

df.write.save("log-filtering-output", format="text", mode="overwrite")

spark.stop()
log-analysis.py
from pyspark.sql import SparkSession, DataFrame, Row, functions as sf

exercise_name = "log-analysis"
logFile = f"{exercise_name}-input.txt"
spark: SparkSession = SparkSession.builder.appName(exercise_name).getOrCreate()
logData: DataFrame = spark.read.text(logFile).cache()

df = logData.filter(logData.value.contains('google'))
df = df.select(sf.split(df.value, "\\s+").getItem(0).alias("IP")).distinct()

df.show(df.count())
df.write.save(f"{exercise_name}-output", format="text", mode="overwrite")

spark.stop()

Exos supplémentaires