Spark Streaming in Action

An introduction to Spark Streaming through demonstration


Presented by Sean Glover / @randonom

SimpleApp.scala



import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}
            
https://spark.apache.org/docs/latest/quick-start.html

Spark Demo StackExchange

"All community-contributed content on Stack Exchange is licensed under the Creative Commons BY-SA 3.0 license. As part of our commitment to that, we release a quarterly dump of all user-contributed data (after carefully sanitizing it to protect user private data, of course)." https://archive.org/details/stackexchange
StackOverflow.com data:

$ du stackoverflow.com*/*.xml --total --block-size=G
1G      stackoverflow.com-Badges/Badges.xml
8G      stackoverflow.com-Comments/Comments.xml
46G     stackoverflow.com-PostHistory/PostHistory.xml
1G      stackoverflow.com-PostLinks/PostLinks.xml
29G     stackoverflow.com-Posts/Posts.xml
1G      stackoverflow.com-Tags/Tags.xml
1G      stackoverflow.com-Users/Users.xml
7G      stackoverflow.com-Votes/Votes.xml
90G     total
            

Confluent's Avro Support

Kafka Avro Serialization library

libraryDependencies += "io.confluent" % "kafka-avro-serializer" % "1.0.1"
          
Initialization

val props = new Properties()

props.put("bootstrap.servers", "localhost:9092")
props.put("schema.registry.url", "http://localhost:8081")
props.put("value.serializer", classOf[KafkaAvroSerializer].getName)
props.put("key.serializer", classOf[KafkaAvroSerializer].getName)

val producer = KafkaProducer[Object, Object](props)