Friday, April 4, 2014

How to sail through the storm with phantom types

The storm framework is a handy tool when it comes to handling real-time infinite stream of data. Credits go to Nathan and his team! In a nutshell, storm models a flow based programming paradigm where the processes are modelled using Java classes spout and bolt, where spout denotes a data-generation process and bolt denotes a data processing/transforming possess. To illustrate the idea, let's consider the typical word count example in Storm. For conciseness, we adopt the Scala DSL syntax from ScalaStorm, (more credits go to Evan Chan]
  class RandomSentenceSpout extends StormSpout(outputFields = List("sentence")) {
    val sentences = List("the cow jumped over the moon",
                         "an apple a day keeps the doctor away",
                         "four score and seven years ago",
                         "snow white and the seven dwarfs",
                         "i am at two with nature")
    def nextTuple = {
      Thread sleep 100
      emit (sentences(Random.nextInt(sentences.length)))
    }
  }
Viewing a storm topology as flow of data processes, a spout is a data generating process. For example, the RandomSetenceSpout defined above operates by executing the method nextTuple, which randomly picks one sentence out of the predefined set and passes it to the next process in the pipeline.
Let's move onto the next item in the pipeline, the SplitSentence bolt.
  class SplitSentence extends StormBolt(outputFields = List("word")) {
    def execute(t: Tuple) = t matchSeq {
      case Seq(sentence: String) => sentence split " " foreach
        { word => using anchor t emit (word) }
      t ack
    }
  }
Being different from a spout, a bolt takes the output from its previous process in the pipeline as input. Given the input, a bolt invokes the execute() method to compute the output which will be in term passed to the next process in the pipeline. In this example, the SplitSentence bolt splits the incoming sentences (from the RandomSentence bolt) by spaces and passes the splitted words to the next process, the WordCount bolt.
  class WordCount extends StormBolt(List("word", "count")) {
    var counts: Map[String, Int] = _
    setup {
      counts = new HashMap[String, Int]().withDefaultValue(0)
    }
    def execute(t: Tuple) = t matchSeq {
      case Seq(word: String) =>
        counts(word) += 1
        using anchor t emit (word, counts(word))
        t ack
    }
  }
The WordCount bolt is the last process in this wordcount topology. It simply counts the occurrences of words it collects. Putting all the three pieces into a topology we have.
object WordCountTopology {

  class RandomSentenceSpout extends StormSpout(outputFields = List("sentence")) {
    val sentences = List("the cow jumped over the moon",
                         "an apple a day keeps the doctor away",
                         "four score and seven years ago",
                         "snow white and the seven dwarfs",
                         "i am at two with nature")
    def nextTuple = {
      Thread sleep 100
      emit (sentences(Random.nextInt(sentences.length)))
    }
  }

  class SplitSentence extends StormBolt(outputFields = List("word")) {
    def execute(t: Tuple) = t matchSeq {
      case Seq(sentence: String) => sentence split " " foreach
        { word => using anchor t emit (word) }
      t ack
    }
  }

  class WordCount extends StormBolt(List("word", "count")) {
    var counts: Map[String, Int] = _
    setup {
      counts = new HashMap[String, Int]().withDefaultValue(0)
    }
    def execute(t: Tuple) = t matchSeq {
      case Seq(word: String) =>
        counts(word) += 1
        using anchor t emit (word, counts(word))
        t ack
    }
  }
  
  def main(args: Array[String]) = {
    val builder = new TopologyBuilder

    builder.setSpout("randsentence", new RandomSentenceSpout, 5)
    builder.setBolt("split", new SplitSentence, 8)
        .shuffleGrouping("randsentence")
    builder.setBolt("count", new WordCount, 12)
        .fieldsGrouping("split", new Fields("word"))

    val conf = new Config
    conf.setDebug(true)
    conf.setMaxTaskParallelism(3)

    val cluster = new LocalCluster
    cluster.submitTopology("word-count", conf, builder.createTopology)
    Thread sleep 10000
    cluster.shutdown
  }
}
Note that a storm topology is developed in the main() method, where the TopologyBuilder connects the set of data processes by chaining them together.

Note that processes are exchanging data in the Tuple data type, which is a ordered sequence in which each item is named. For example, in the definition of RandomSentence, we see
outputFields = List("sentence")
And in the definition of SplitSentence, we use
outputFields = List("word")

When a bolt is "connected" to its upstream, an input declaration has to be made. In case of the SplitSentence bolt, the distribution of the inputs from RandomSentenceSpout does not matter, hence shuffleGrouping() is used.

In this context of the WordCount bolt, the input assignment is crucial, as we want the same words must always go to the same bolt instance otherwise the counting is meaningless. Thus, fieldsGrouping() is used.




Everything is neat and tidy except we spot two immediate issues.
Issue number 1:
The connection between spouts and bolts are dynamically typed. i.e. if there is a type mismatch, it will only be discovered during run-time.
For instance, consider there is a slight adjustment of the definition of SplitSentence
  class SplitSentence extends StormBolt(outputFields = List("word")) {
    def execute(t: Tuple) = t matchSeq {
      case Seq(sentence: String) => sentence split " " foreach
        { word => using anchor t emit (word.toList) }
      t ack
    }
  }

As a result, the bolt emits List[Char] instead of String. The compiler happily accepts it and type checks it. The type mismatch between the output from SplitSentence and input to WordCount will only be raised as an error during run-time. This is unacceptable as we are dealing with large set of data and there are cases where this type of mismatch run-time error become hard to trace.
Issue number 2:
The construction of the topology requires that there should be at least one spout followed by zero or more bolts within a topology. It does not make sense to add a bolt into an empty topology. However the TopologyBuilder does not enforce this constraint statically.
    val builder = new TopologyBuilder
 
    builder.setSpout("randsentence", new RandomSentenceSpout, 5)
    builder.setBolt("split", new SplitSentence, 8)
        .shuffleGrouping("randsentence")
    builder.setBolt("count", new WordCount, 12)
        .fieldsGrouping("split", new Fields("word"))

To solve the above issues, we adopt a well-known technique in the FP world called phantom types. In a nutshell, phantom types are data type whose type variables are not fully mentioned in its data constructors.

We follow style of phantom type encoding described here .
First of all, we introduce two phantom types to capture the input and output types of the spout and the bolt.
  trait StormSpoutT[Out]{def spout:StormSpout}
  trait StormBoltT[In,Out]{def bolt:StormBolt}

Now we are able to declare the input output type of the spout and the bolts
  case class RandomSentenceSpoutT (spout: RandomSentenceSpout) extends StormSpoutT[String]
  class RandomSentenceSpout extends StormSpout(outputFields = List("sentence")) {
    val sentences = List("the cow jumped over the moon",
                         "an apple a day keeps the doctor away",
                         "four score and seven years ago",
                         "snow white and the seven dwarfs",
                         "i am at two with nature")
    def nextTuple = {
      Thread sleep 100
      emit (sentences(Random.nextInt(sentences.length)))
    }
  }

  case class SplitSentenceT (bolt:SplitSentence) extends StormBoltT[String,String]
  class SplitSentence extends StormBolt(outputFields = List("word")) {
    def execute(t: Tuple) = t matchSeq {
      case Seq(sentence: String) => sentence split " " foreach
        { word => using anchor t emit (word) }
      t ack
    }
  }


  case class WordCountT (bolt:WordCount) extends StormBoltT[String,(String,Int)]
  class WordCount extends StormBolt(List("word", "count")) {
    var counts: Map[String, Int] = _
    setup {
      counts = new HashMap[String, Int]().withDefaultValue(0)
    }
    def execute(t: Tuple) = t matchSeq {
      case Seq(word: String) =>
        counts(word) += 1
        using anchor t emit (word, counts(word))
        t ack
    }
  }
Next, we define three possible states of constructed topology, an ordering among the three states and a phantom type representing a state of topology
  abstract class TopEmpty;
  abstract class TopWithSpout extends TopEmpty;
  abstract class TopWithBolt extends TopWithSpout;

  case class TopologyBuilderT[+State,+Out](builder:TopologyBuilder,output_name:String) {
    def createTopology = builder.createTopology
  }
In addition, we define two combinators for the topology construction which enforces the matching constraint between the output type of one possesses and the input type of the its following processes. Further more, as the topology being constructed, we also keep track of the state of the topology. For example, when a topology is initiated, it should be TopEmpty, until a spout is added, it becomes TopWithSpout. Adding a bolt to an empty topology is not allowed by the type system.
  def addSpout[Out](top:TopologyBuilderT[TopEmpty,_])
    ( spout_name:String
    , ts:StormSpoutT[Out]
    , threadMax:Int) : TopologyBuilderT[TopWithSpout,Out] = 
    top match {
      case TopologyBuilderT(builder,_) => {
        builder.setSpout(spout_name, ts.spout, threadMax)
        TopologyBuilderT[TopWithSpout,Out](builder,spout_name)   
      }
    }
  
  def addBolt[In,Out,State <: bolt_name:string="" boltdeclarer="" indecl:="" n="" tate="" tb:stormboltt="" threadmax:int="" top:topologybuildert="" topwithspout="" ut=""> BoltDeclarer ) : TopologyBuilderT[TopWithBolt,Out] = 
    top match {
      case TopologyBuilderT(builder,output_name) => {
        val i = builder.setBolt(bolt_name,tb.bolt, threadMax)
        inDecl(i)
        TopologyBuilderT[TopWithBolt,Out](builder,bolt_name)   
      }
    } 
Putting everything together we will have a typeful version of the word count topology.
object WordCountTopologyP {

  trait StormSpoutT[Out]{def spout:StormSpout}

  trait StormBoltT[In,Out]{def bolt:StormBolt}


  abstract class TopEmpty;
  abstract class TopWithSpout extends TopEmpty;
  abstract class TopWithBolt extends TopWithSpout;


  case class TopologyBuilderT[+State,+Out](builder:TopologyBuilder,output_name:String) {
    def createTopology = builder.createTopology
  }


  case class RandomSentenceSpoutT (spout: RandomSentenceSpout) extends StormSpoutT[String]
  class RandomSentenceSpout extends StormSpout(outputFields = List("sentence")) {
    val sentences = List("the cow jumped over the moon",
                         "an apple a day keeps the doctor away",
                         "four score and seven years ago",
                         "snow white and the seven dwarfs",
                         "i am at two with nature")
    def nextTuple = {
      Thread sleep 100
      emit (sentences(Random.nextInt(sentences.length)))
    }
  }

  case class SplitSentenceT (bolt:SplitSentence) extends StormBoltT[String,String]
  class SplitSentence extends StormBolt(outputFields = List("word")) {
    def execute(t: Tuple) = t matchSeq {
      case Seq(sentence: String) => sentence split " " foreach
        { word => using anchor t emit (word) }
      t ack
    }
  }


  case class WordCountT (bolt:WordCount) extends StormBoltT[String,(String,Int)]
  class WordCount extends StormBolt(List("word", "count")) {
    var counts: Map[String, Int] = _
    setup {
      counts = new HashMap[String, Int]().withDefaultValue(0)
    }
    def execute(t: Tuple) = t matchSeq {
      case Seq(word: String) =>
        counts(word) += 1
        using anchor t emit (word, counts(word))
        t ack
    }
  }


  def addSpout[Out](top:TopologyBuilderT[TopEmpty,_])
    ( spout_name:String
    , ts:StormSpoutT[Out]
    , threadMax:Int) : TopologyBuilderT[TopWithSpout,Out] = 
    top match {
      case TopologyBuilderT(builder,_) => {
        builder.setSpout(spout_name, ts.spout, threadMax)
        TopologyBuilderT[TopWithSpout,Out](builder,spout_name)   
      }
    }

  
  def addBolt[In,Out,State <: bolt_name:string="" boltdeclarer="" indecl:="" n="" tate="" tb:stormboltt="" threadmax:int="" top:topologybuildert="" topwithspout="" ut=""> BoltDeclarer ) : TopologyBuilderT[TopWithBolt,Out] = 
    top match {
      case TopologyBuilderT(builder,output_name) => {
        val i = builder.setBolt(bolt_name,tb.bolt, threadMax)
        inDecl(i)
        TopologyBuilderT[TopWithBolt,Out](builder,bolt_name)   
      }
    } 
    

  def main(args: Array[String]) = {
    val builderT = addBolt(
                    addBolt(addSpout(new TopologyBuilderT(new TopologyBuilder,""))("randsentence", new RandomSentenceSpoutT(new RandomSentenceSpout), 8)) 
                      ("split", new SplitSentenceT(new SplitSentence), 8)( _.shuffleGrouping("randsentence"))
                  ) ("count", new WordCountT(new WordCount), 12)( _.fieldsGrouping("split", new Fields("word")))


    val conf = new Config
    conf.setDebug(true)
    conf.setMaxTaskParallelism(3)

    val cluster = new LocalCluster
    cluster.submitTopology("word-count", conf, builderT.createTopology)
    Thread sleep 10000
    cluster.shutdown
  }
}
There is still another problem. The topology construction is unreadable.
    val builderT = addBolt(
                    addBolt(addSpout(new TopologyBuilderT(new TopologyBuilder,""))("randsentence", new RandomSentenceSpoutT(new RandomSentenceSpout), 8)) 
                      ("split", new SplitSentenceT(new SplitSentence), 8)( _.shuffleGrouping("randsentence"))
                  ) ("count", new WordCountT(new WordCount), 12)( _.fieldsGrouping("split", new Fields("word")))
This is because the addBolt() and addSpout() methods are not infix. To fix this problem, we use the type class/Scala implicit style of phantom type encoding described here .
object WordCountTopologyT {

  trait StormSpoutT[Out]{def spout:StormSpout}

  trait StormBoltT[In,Out]{def bolt:StormBolt}


  abstract class TopEmpty;
  abstract class TopWithSpout extends TopEmpty;
  abstract class TopWithBolt extends TopWithSpout;


  case class TopologyBuilderT[+State,Out](builder:TopologyBuilder,output_name:String) {
    def createTopology = builder.createTopology

    def init : TopologyBuilderT[TopEmpty,Out] = 
      new TopologyBuilderT(builder,output_name)

    def >> [NextOut]
      ( spout_name:String
      , ts:StormSpoutT[NextOut]
      , threadMax:Int)
      (implicit evS:State <: :="" builder.setspout="" builder="" def="" extout="" new="" opwithspout="" spout_name="" threadmax="" topempty="" topologybuildert="" ts.spout="">>> [NextOut,State <: bolt_name:string="" boltdeclarer="" extout="" indecl:="" tb:stormboltt="" threadmax:int="" topwithspout="" ut=""> BoltDeclarer )
      (implicit evS: State <: 100="" :="" a="" ago="" am="" an="" and="" andom.nextint="" apple="" at="" away="" bolt:splitsentence="" bolt_name="" builder="" case="" class="" cow="" day="" def="" doctor="" dwarfs="" emit="" execute="" extends="" extout="" four="" i="" indecl="" jumped="" keeps="" matchseq="" moon="" nature="" new="" nexttuple="{" opwithbolt="" outputfields="List(" over="" randomsentencespout="" randomsentencespoutt="" score="" sentence:="" sentence="" sentences.length="" sentences="" seq="" seven="" sleep="" snow="" splitsentence="" splitsentencet="" spout:="" stormbolt="" stormboltt="" stormspout="" stormspoutt="" string="" t:="" t="" the="" thread="" threadmax="" topologybuildert="" topwithspout="" tring="" tuple="" two="" val="" white="" with="" word="" years=""> sentence split " " foreach
        { word => using anchor t emit (word) }
      t ack
    }
  }


  case class WordCountT (bolt:WordCount) extends StormBoltT[String,(String,Int)]
  class WordCount extends StormBolt(List("word", "count")) {
    var counts: Map[String, Int] = _
    setup {
      counts = new HashMap[String, Int]().withDefaultValue(0)
    }
    def execute(t: Tuple) = t matchSeq {
      case Seq(word: String) =>
        counts(word) += 1
        using anchor t emit (word, counts(word))
        t ack
    }
  }


  def main(args: Array[String]) = {
    val builderT = (new TopologyBuilderT(new TopologyBuilder,"")).init
                   .>> ("randsentence", new RandomSentenceSpoutT(new RandomSentenceSpout), 8) 
                   .>>> ("split", new SplitSentenceT(new SplitSentence), 8)( _.shuffleGrouping("randsentence")) 
                   .>>> ("count", new WordCountT(new WordCount), 12)( _.fieldsGrouping("split", new Fields("word")))

    val conf = new Config
    conf.setDebug(true)
    conf.setMaxTaskParallelism(3)

    val cluster = new LocalCluster
    cluster.submitTopology("word-count", conf, builderT.createTopology)
    Thread sleep 10000
    cluster.shutdown
  }
}
The difference is that we migrate the addBolt and addSpout combinator into the TopologyBuilderT class to make them infixable, they are renamed to >> and >>> respectively.
To enforce the constraints among the phantom type variables, we use the Scala implicit argument definition (similar to Haskell's type class evidence construction) to enforce the constraints, e.g. see line 23 and 35 in the above.
One observation is that this formulation is still a bit too verbose, though arguably speaking, we say the phantom types and the definition of TopologyBuilderT should be packaged as a common library, the definition of RandomSentenceSpoutT, SplitSentenceT and WordCountT are treated as type annotation.
We believe there shall be way of automatically inferring these annotation from the bodies of the RandomSentenceSpout, SplitSentence and WordCount. Maybe the new Scala Macro will be one of the option.
The TODOs include:
  1. Support one spout to multiple bolts and multiple spout to one bolt.
  2. Use Scala Macro to infer the "spout and bolt type annotation/declaration"

The full source can be found here .
Related works http://www.infosun.fim.uni-passau.de/publications/docs/MAPREDUCE2011.pdf

No comments: