Friday, December 28, 2012

Iteratee fundamentals for the beginer

Iteratees are an alternative to commonly prevalent, imperative blocking IO. While reading from a stream, if is data is not available - the reading thread blocks - which is not desirable. The alernative would be to register a callback which gets invoked as data become available. Iteratees are callback handlers and much more. If you have heard about them but not sure what they are - then this post is for you.

Simply put, an iteratee is a consumer of data in chunks and it's counterpart, which is called `Enumerator`- is a producer of data. The imperative way of reading data conforms to pulling data out in chunks from the source. But in an iteratee based system, the producer pushes data into an iteratee in successive chunks, which finally computes a value. Iteratee's state changes as it receives chunks of data. Or to be more precise(since state change is a side-effect and side-effects don't compose) - when an iteratee receives a chunk of data - the iteratee is replicated with it's new state being computed from the old iteratee's state and chunk of input that it received. Also, with iteratees its a two way conversation between the producer and the consumer(i.e. the iteratee). The producer might hand out a chunk of data to the iteratee or it might say, I'm Empty now - but hang on - I will feed you as soon as I get a chunk in the near future or it might say - I have run out of data(EOF) - you make a call what to do next.

On the other hand, the iteratee might say - I am ready for next chunck(Cont), or I have had enough(Done) or it might throw up(Error) - because it is monday. I will not talk about Error anymore because ours' is an ideal world and it does not add any insight to the understanding of iteratees.

So, when an iteratee gets a chunk of input, it replicates itself transitioning from one state to other. It might transition to a `Done` state which would contain a computed value and possibly some unconsumed input. The iteratee might transition to a `Cont` state which would hold a function waiting to be invoked with next chunk of input once it arrives. Or the iteratee might enter into a Error state which might hold a error message and possibly the chunk of input that caused it to error out.

I have been talking about iteratees in the context of IO streams. For understanding's sake lets consider Lists as our source of data. So the examples I would develop would use Lists instead of streams. Once we get the idea of how iteratees behave - it should not be difficult to relate Lists to streams.

So, based on the ponderings so far, two types emerge. One is the input and another is the state of the iteratee. We parameterize on the element type of the input because each chunk of data could represent a byte, word, event or anything. So the types are:

scala> trait Input[+E]
defined trait Input

scala> object Empty extends Input[Nothing]
defined module Empty

scala> //Producer has finished producing

scala> object EOF extends Input[Nothing]
defined module EOF

scala> //The producer has produced a chunk

scala> case class Elem[+E](e: E) extends Input[E]
defined class Elem

Next up, we define the iteratee itself anlong with the various states it can be in after it receives a chunk of input. We paramterize the iteratee with `E` and `A` where former and later being the type of input it consumes and value it computes respectively. We also add a run method to our iteratee to extract the computed value. If our iteratee is already in the Done state then - we return the value inside it. If on the other hand, the iteratee is still in the Cont state, we send a EOF signal to it to indicate that we are interested in the value it has computed thus far.

scala> :paste
// Entering paste mode (ctrl-D to finish)

trait Iteratee[E,+A] {
    def run: A = this match {//run will get us the  result computed thus far - sending a EOF to itself if needed
      case Done(a, _) => a
      case Cont(k) => k(EOF).run

//Done holds computed result of type A and input it may not have consumed
  case class Done[E,+A](a: A, next: Input[E]) extends Iteratee[E,A]
  //Cont state holds a function, which given an input, would return a new iteratee instance(Done or Cont)
  case class Cont[E,+A](k: Input[E] => Iteratee[E,A]) extends Iteratee[E,A]

// Exiting paste mode, now interpreting.

defined trait Iteratee
defined class Done
defined class Cont

We have said before that it is the job of the producer(aka the enumerator) to feed the iteratee its produce in chunks. To keep things simple lets write an enumerate function instead of a full-blown enumerator. In the enumerate function below, the produce is held in a List.

 scala> :paste
// Entering paste mode (ctrl-D to finish)

 def enumerate[E,A](produce: List[E], itr:Iteratee[E,A]): Iteratee[E,A] = {
     produce match {
       //No produce - return the Iteratee as it is
       case Nil => itr
       case e :: elems => itr match {//produced an elem/chunk
         case i@Done(_,_) => i//if Done - return current Iteratee
         case Cont(k) => enumerate(elems, k(Elem(e)))//Not yet `Done` continue feeding chunks of produce

// Exiting paste mode, now interpreting.

enumerate: [E, A](produce: List[E], itr: Iteratee[E,A])Iteratee[E,A]

Iteratees can come in different categories - some would take finite chunks of input and then they would be in Done state. Iteratees that take the head of a List and returns it or drops few elements and then returns the rest of the List would fall in this category. On the other hand some would consume the entire List and then return a result - iteratees that sum up the List elements would fall in this category. Some other iteratees never enter the Done state even after receiving an `EOF` signal - these iteratees are termed as divergent iteratees. Below are shown few example iteratees.

An iteratee which returns the head from an enumerator's produce:

scala> :paste
// Entering paste mode (ctrl-D to finish)

def head[E]: Iteratee[E, Option[E]] = {
    def step[E](in: Input[E]): Iteratee[E, Option[E]] = in match {
  //Got an elem - return a Done iteratee right away
  case Elem(e) => Done(Some(e),Empty)
  //Cont iteratee waiting for an input
  case Empty => Cont(step)
  case EOF => Done(None, EOF)

// Exiting paste mode, now interpreting.

head: [E]=> Iteratee[E,Option[E]]

scala> val v =  enumerate(List(1,2,3), head[Int])
v: Iteratee[Int,Option[Int]] = Done(Some(1),Empty$@ade4cd)

scala> val result =
result: Option[Int] = Some(1)

Iteratee that computes the length of the produce of an enumerator:

scala> :paste
// Entering paste mode (ctrl-D to finish)

def length[E]: Iteratee[E,Int] = {
  def step[E](acc: Int): Input[E] => Iteratee[E,Int] = {
    case Elem(e) => Cont(step(acc+1))
    case Empty => Cont(step(acc))
    case EOF => Done(acc, EOF)

// Exiting paste mode, now interpreting.

length: [E]=> Iteratee[E,Int]

scala> val lenItr = enumerate(List(1,2,3,4,5,6), length[Int])
lenItr: Iteratee[Int,Int] = Cont(<function1>)

scala> val len =
len: Int = 6

Iteratee that will return a List containing every alternate element starting with the first:

scala> :paste
// Entering paste mode (ctrl-D to finish)

def alternate[E]: Iteratee[E, List[E]] = {
  def step(flag: Boolean, xs: List[E]): Input[E] => Iteratee[E, List[E]] = {
  case Elem(e) if(flag) => Cont(step(false,xs ::: List(e)))
  case Elem(e) if(!flag) => Cont(step(true, xs))
  case Empty => Cont(step(flag,xs))
  case EOF => Done(xs, EOF)

// Exiting paste mode, now interpreting.

alternate: [E]=> Iteratee[E,List[E]]

scala> val altItr = enumerate(List(1,2,3,4,5,6,7), alternate[Int])
altItr: Iteratee[Int,List[Int]] = Cont(<function1>)

scala> val altList =
altList: List[Int] = List(1, 3, 5, 7)

Conclusion: I have just shown the basics of iteratees. Frameworks like Play 2 - have taken the iteratees to a whole new level combining them with scala futures and executing them asynchronously. As web applications are becoming more and more real-time data centric, iteratees provide yet another tool in the arsenal of developer to scale up web application.  

Monday, December 24, 2012

Basics of Actors in akka/scala part 2

Hot swap of akka's actors behaviour

Akka actors' receive method accepts a partial function from Any to Unit(i.e. PartialFunction[Any, Unit]) and this function can be dynamically changed at runtime. Shown below is a simple example of that:

scala> class MyActor extends Actor {
     |   def receive = {
     |     case s: String => println("Current behaviour : "+ s)
     |     case pf: PartialFunction[Any, Unit] => context.become(pf)
     |   }
     | }

scala> val pf:PartialFunction[Any, Unit] = {
     |   case s => println("Behaving differently : "+ s)
     | }
pf: PartialFunction[Any,Unit] = <function1>

scala> val system =ActorSystem("system")
system: = akka://system

scala> val ma = system.actorOf(Props[MyActor])
ma: = Actor[akka://system/user/$a]

scala> ma ! "Testing"
Current behaviour : Testing

scala> ma ! pf

scala> ma ! "Testing again"
Behaving differently : Testing again

Complementing `become`, the context also provides an `unbecome` which reverts the actor's behavior to the previous one in the hotswap stack.

Router Actor

A router is just an actor that routes incoming messages to other actors. Akka has a default set of routers for various use cases. Shown below is a router which routes messages to routees in a round robin fashion.

scala> class Routee extends Actor {
     |   def receive = {
     |     case x => println(self.path + " : received : "+ x)
     |   }
     | } 
defined class Routee

scala> val routee1 = system.actorOf(Props[Routee],name="routee1")
routee1: = Actor[akka://system/user/routee1]

scala> val routee3 = system.actorOf(Props[Routee],name="route3")
routee3: = Actor[akka://system/user/route3]

scala> val routee2 = system.actorOf(Props[Routee],name="route2")
routee2: = Actor[akka://system/user/route2]

scala> import akka.routing.RoundRobinRouter
import akka.routing.RoundRobinRouter

scala> val routees = Vector[ActorRef](routee1, routee2, routee3)
routees: scala.collection.immutable.Vector[] = Vector(Actor[akka://system/user/routee1], Actor[akka://system/user/route2], Actor[akka://system/user/route3])

scala> val router = system.actorOf(Props().withRouter(RoundRobinRouter(routees = routees)))
router: = Actor[akka://system/user/$b]

scala> router ! "testing"

scala> akka://system/user/routee1 : received : testing

scala> router ! "testing"
akka://system/user/route2 : received : testing

scala> router ! "testing"
akka://system/user/route3 : received : testing

Remote actors:

As mentioned in the previous post, akka actors shipped with scala does not support remoting by default. We need to add addtional jars from the akka distribution. Following are the jars that need to be in the class path:

  • akka-remote_2.10.0-RC5-2.1.0-RC6.jar
  • netty-3.5.8.Final.jar
  • protobuf-java-2.4.1.jar

The first step is to create a conf file called `application.conf` with following content:

 akka {
   actor {
     provider = "akka.remote.RemoteActorRefProvider"
   remote {
     transport = "akka.remote.netty.NettyRemoteTransport"
     netty {
       hostname = ""
       port = 2552

This file should be available in the classpath. We launch  two scala consoles, from two terminals, making a copy of the `application.conf` and changing port number to 2553 in one of them.

class RemoteActor extends Actor {
 def receive = {
   case msg => println("Received : "+ msg)
               sender ! "Got : "+ msg

usr@ubuntu:~/akka/remoting$ scala -cp ./:$AKKA_LIB/akka-remote_2.10.0-RC5-2.1.0-RC6.jar:$AKKA_LIB/netty-3.5.8.Final.jar:$AKKA_LIB/protobuf-java-2.4.1.jar
Welcome to Scala version 2.10.0-RC5 (Java HotSpot(TM) Server VM, Java 1.7.0).
Type in expressions to have them evaluated.
Type :help for more information.

scala> import

scala> class RemoteActor extends Actor {
     |  def receive = {
     |    case msg => println("Received : "+ msg)
     |                sender ! "Got : "+ msg
     |  }
     | }
defined class RemoteActor

scala> val remoteSystem = ActorSystem("remotesystem")
remoteSystem: = akka://remotesystem

scala> val remoteActor = remoteSystem.actorOf(Props[RemoteActor],name="remoteactor")
remoteActor: = Actor[akka://remotesystem/user/remoteactor]

scala> remoteActor ! "local msg"
Received : local msg

We send a message the actor locally - it receives the message and prints it but reply does not get printed because the sender(i.e. REPL) is not an actor - so the reply goes to the dead letter actor.

Now we go to the second scala console create an actor and look up the actor created before and send a message to it.

scala> class MyActor extends Actor {
     |    def receive = {
     |      case x: String => println("Received : "+ x)
     |      case (a: ActorRef, x: String) => a ! x
     |    }
     | }
defined class MyActor

scala> val system = ActorSystem("system")
system: = akka://system

val ma = system.actorOf(Props[MyActor],name="myactor")
ma: = Actor[akka://system/user/myactor]

scala> val remoteActor = system.actorFor("akka://remotesystem@")
remoteActor: = Actor[akka://remotesystem@]

scala> ma ! (remoteActor, "testing")

scala> [INFO] [12/24/2012 14:31:21.155] [] [NettyRemoteTransport(akka://system@] RemoteClientStarted@akka://remotesystem@
Received : Got : testing

On the remote actor side we see:

scala> [INFO] [12/24/2012 14:29:43.906] [remotesystem-15] [NettyRemoteTransport(akka://remotesystem@] RemoteClientStarted@akka://system@
Received : testing

Conclusion: In these two parts posts we explored basics of scala/akka actors. Hopefully, anyone looking for where to begin about akka actors may have got some footing to explore the rest of akka.

Friday, December 21, 2012

Basics of actors in scala/Akka part 1

Starting with Scala 2.10.0 - scala actors will be replaced by akka actors.Since we are already using scala actors in production - the need arose to get familiarised with akka in general and specially actors . I had already, out of curiosity, tried to develop some knowledge about akka - but  was never quite able to do so - mainly because a) the documentation looks so voluminous - it scared me away each time. b) There are so many things - actors, typed actors, dataflow concurrency, software transaction memory, microkernel and what not. The lack of a step by step fast track guide - showing some basic stuff acted against me being able to give akka a shot in my previous attempts. So, if you are in the same situation, then this post is for you. I will show the basic stuff - that, I hope, will help you explore the rest. I am assuming prior familiarity with scala actors here.

I am using scala-2.10.0-RC5 - the lib already contains akka actors library.

Note: akka actors library shipped with scala does not support remote actors. How to do that I will in a subsequent post.

An scala actor needs to extend `` trait overiding the `receive` message(unlike scala there is not `react` method). Importing `` package is enough for creating and testing actors locally. Also, actors are grouped into named `ActorSystem` which provide thread pool, dead letter box, message dispatcher etc etc - the infrastrucre. Also, there can be more than one actor system in the same VM. Following shows how to create an actor system, defining an instantiating an actor and sending a message to the actor:

scala> import

scala> val actorSystem = ActorSystem("actorsystem")
system: = akka://actorsystem

scala> class MyActor extends Actor {
     |   def receive = {
     |     case x => println("Received : "+ x)
     |   }
     | }
defined class MyActor

scala> val myActor = actorSystem.actorOf(Props[MyActor])
myActor: = Actor[akka://actorsystem/user/$a]

scala> myActor ! "Hi"
Received : Hi

In the above `Props` is a factory for actor configuration.

Actors should be named. They are automatically started when created, have pre-start and pre-shutdown hooks. Above example shows how to create an actor from the system. If actors are to be created from within an actor, then we should use 'context.actorOf' instead. Following shows an actor with non-default constructor which creates actors in response to messages.

scala> val system = ActorSystem("system")
system: = akka://system

scala> :paste
// Entering paste mode (ctrl-D to finish)

case object Create
case object Kill
case class Msg(s: String)

class MyActor(arg: String) extends Actor {
  override def preStart = println("Some arbitrary arg : "+ arg)
  var myChild: Option[ActorRef] = None

  def receive = {
    case Msg(s) => (myChild getOrElse create) ! s; println("Passed on")
    case Create => myChild getOrElse create; println("create returned")
    case Kill => myChild match {
       case Some(a) => context.stop(a)
         myChild = None
       case _ => //Do nothing
  def create: ActorRef = {
   val child = context.actorOf(Props(new Actor {
      def receive = {
        case x => println("Got a msg from my parent : "+ x)
    }), name="childactor")
    myChild = Some(child)
  override def postStop = println("Some code to execute after stopping...")


// Exiting paste mode, now interpreting.

defined module Create
defined module Kill
defined class Msg
defined class MyActor

scala> val parentActor = system.actorOf(Props(new MyActor("Some constructor args")), name="parent")
parentActor: = Actor[akka://system/user/parent]

scala> Some arbitrary arg : Some constructor args

scala> parentActor ! Create

scala> create returned

scala> parentActor ! Msg("Testing")

scala> Passed on
Got a msg from my parent : Testing

scala> parentActor ! Kill

scala> killed

scala> parentActor ! Create

scala> create returned

scala> parentActor ! Msg("Testing again")

scala> Passed on
Got a msg from my parent : Testing again

scala> parentActor ! PoisonPill

scala> Some code to execute after stopping...

scala> system.shutdown

parentActor: = Actor[akka://system/user/parent]

Actors have parent-child relationship and parental supervision. An actor is a parent of each actor it creates. In the above line `user` is parent of actor named `parent`. `user` is a one of the system actors that gets created when an actor system is created. User created actors and their children are created underneath `user` system actor. Actors can looked up using `actorFor` on actor system or contenxt. The difference between actorOf and actorFor is that in the later case we get a reference to an ActorRef(If it exists otherwise reference to the dead letter system actor where all messages will passed on in case of no-existent actors) - but actor itself is not created.

scala> val system = ActorSystem("system")
system: = akka://system

scala> val p = system.actorFor("/user/parent")
p: = Actor[akka://system/user/parent]

scala> parentActor ! Msg("Testing")

scala> Passed on
Got a msg from my parent : Testing

scala> p ! Msg("Testing again")

scala> Passed on
Got a msg from my parent : Testing again

scala> val nonExistentActor = system.actorFor("/user/xyz")
nonExistentActor: = Actor[akka://system/user/xyz]

scala> nonExistentActor ! "This msg will be passed on to dead letter actor"

The preferred way of stopping an actor is by calling `context.stop(self)` in response to some shutdown message. Shown below are some other ways of stopping an actor. One things to notice is that in case of kill message the post shutdown hook is not getting called.

scala> val a1 = system.actorOf(Props(new MyActor("cons args")), name="a1") //actor names should be unique in an actor system
Some arbitrary arg : cons args
a1: = Actor[akka://system/user/a1]

scala> val a2 = system.actorOf(Props(new MyActor("cons args")), name="a2")
Some arbitrary arg : cons args
a2: = Actor[akka://system/user/a2]

scala> val a3 = system.actorOf(Props(new MyActor("cons args")), name="a3")
Some arbitrary arg : cons args
a3: = Actor[akka://system/user/a3]

scala> a1 ! Msg("Msg to a1")

scala> Passed on
Got a msg from my parent : Msg to a1

scala> a2 ! Msg("Msg to a2")

scala> Passed on
Got a msg from my parent : Msg to a2

scala> a3 ! Msg("Msg to a3")

scala> Passed on
Got a msg from my parent : Msg to a3

scala> system.stop(a1)

scala> Some code to execute after stopping...

scala> a2 ! Kill

scala> killed

scala> a3 ! PoisonPill

scala> Some code to execute after stopping...

We talked about parental supervision above. When an actor fails, it's parent can decide what action should be taken. The parent actor can escalate it or may handle failure. It may kill all the children and restart them or kill only the failed actor and restart it. Other options are - let the failed actor continue(resume) handling messages or the parent may decide to stop the failed actor permanently.

To supervise a child, an actor needs to override the `supervisorStrategy` member of the actor.

Following shows an actor that kills failed child and restarts it in response.

import scala.concurrent.duration._

scala> :paste
// Entering paste mode (ctrl-D to finish)

class MyActor extends Actor {
  override def supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 minute) {
        case e: Exception => Restart//Exception to restart directive
  var child = context.actorOf(Props[Child])

  def receive = {
    case s: String => child ! s
    case a: ActorRef => child = sender
    case e: Exception => child ! e

class Child extends Actor {
  def receive = {
    case s: String => println("Parent msg : "+ s)
    case e: Exception => throw new Exception("throwing up")
    case _ => println("Don't know what it is!")

  override def postRestart(reason: Throwable):Unit = {
    println("I am being restarted :" + reason.getMessage)
    context.parent ! self

// Exiting paste mode, now interpreting.

defined class MyActor
defined class Child

scala> val parent = system.actorOf(Props[MyActor])
parent: = Actor[akka://system/user/$a]

scala> parent ! "Testing"

scala> Parent msg : Testing

scala> parent ! new Exception

scala> [ERROR] [12/21/2012 16:58:27.328] [] [akka://system/user/$a/$a] throwing up
java.lang.Exception: throwing up

I am being restarted :throwing up

scala> parent ! "Testing after restart"

scala> Parent msg : Testing after restart

Conclusion: I have just touched upon basics stuff. In the next post I will briefly write about hot swapping of actor behaviour, routing and remote actors.

Wednesday, December 5, 2012

Asynch concurrency: In the promised land of scala futures

The basics:

The concept of future is not new - java added them in 1.5 -scala actors had futures from the start, lift webframework had its own futures. As more and more libraries and toolkits sprang up based on the scala language - many had futures with more or less similar functionalities - akka, playframework, twitter's finagle etc had their respective implementations.

With SIP-14, the case was made that all these different implementations should be unified and made available as part of the scala core library. And the result is scala.concurrent package. This write-up looks at futures and promises API, how they take asynch concurrency to a whole new level and provide a higher level of abstraction.

Lets fire up the REPL and see futures in action:

scala> import concurrent.future
import concurrent.future

scala> import

The first import is for the `future` method in the `concurrent` package object. The second import basically brings into scope the default execution context (i.e. thread pool) - which is used for computing chunks of code asynchronously. We can use our own custom execution context if one is available in the scope.

Computation of a future may yield a successful result or an exception and we can register callbacks to handle them appropriately.

scala> val f = future { 100 }
f: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise...

scala> f onComplete {
     |   case x => println("Future has returned :"+ x)
     | }
Future has returned : 100

We can attach multiple callbacks to a future, attach them even after the future has returned - all of them will be fired - the only caveat is that there is no guarantee in what order they will fire. Since - the future may or may not hold a value - we pattern match on instances of Either type - and as is the tradition - Left is used for signalling error condition.

scala> val f1 = future { 100 }
f1: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise...

scala> val f2 = future { throw new Exception("Boom") }
f2: scala.concurrent.Future[Nothing] = scala.concurrent.impl.Promise...

scala> f1 onComplete {
     |   case Success(r) => println(r)
     |   case Failure(e) => println(e)
     | }

scala> f2 onComplete {
     |   case Success(r) => println(r)
     |   case Failure(e) => println(e)
     | }
java.lang.Exception: Boom

`onComplete` is called called irrespective of success or failure. We could use `onSuccess` or `onFailure` - if we want.

scala> f1 onSuccess {
     |   case r => println(r)
     | }

scala> f2 onFailure {
     |   case e => println(e)
     | }
java.lang.Exception: Boom

future has its counterpart called `promise`. future and promise are two sides of the same coin.futures are read-many (times) whereas promises are write-once. We can make a promise, fullfill that promise later - when we do so - the corresponding future get its value(aka, returns). Once a promise is made and fullfilled(i.e. written to) - we can not go back on it - it's illegal.

scala> import concurrent.promise
import concurrent.promise

scala> val p = promise[Int]
p: scala.concurrent.Promise[Int] = scala.concurrent.impl.Promise...

scala> val f = p.future
f: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise...

scala> f onSuccess {
     |   case v => println(v)
     | }

scala> p.success(200)

scala>  p.success(500)
java.lang.IllegalStateException: Promise already completed.

Higher order functions and future specific combinators:

Apart from `map`, `flatMap` and `filter` - future has some other higher order functions making it possible to combine futures smart ways to achieve the task at hand.

scala> val f1 = future { 50 }
f1: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise...

scala> val f2 = future { 50 }
f2: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise...

scala> val result = for {
     |   x <- f1
     |   y <- f2
     | } yield x * y
result: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise...

scala> result map( _ * 2) filter(_ > 4000) onSuccess {
     |   case value => println(value)
     | }

Fall back to that future in case of failure with `fallbackTo`

scala> val blowUp = future { throw new Exception("Crashed") }
blowUp: scala.concurrent.Future[Nothing] = scala.concurrent.impl.Promise...

scala> val planB = future { "Recovered" }
planB: scala.concurrent.Future[String] = scala.concurrent.impl.Promise...

scala> blowUp fallbackTo planB onComplete {
     |   case Failure(e) => println(e)
     |   case Success(r) => println(r)
     | }

Sequential execution with `andThen`

scala> lazy val f = future { println("First here") }f: scala.concurrent.Future[Unit] = <lazy>

scala> f andThen {
     |   case r => println("Then here")
     | } andThen {
     |    case _ => println("At the end")
     | }

First here
Then here
At the end

Fold and reduce:

scala> val f1 = future { 100 }
f1: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise...

scala> val f2 = future { 200 }
f2: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise...

scala> val f3 = future { 300 }
f3: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise...

scala> val futures = List(f1,f2,f3)
futures: List[scala.concurrent.Future[Int]] = List...

scala> import concurrent.Future
import concurrent.Future

scala> Future.fold(futures)(0)(_ + _ ) onSuccess {
     |   case r => println(r)
     | }

scala> 600

scala> Future.reduce(futures)(_ * _ ) onSuccess {
     |   case r => println(r)
     | }

scala> 6000000


We have looked at just some of the APIs scala futures offer. And they by themselves are more than enough to solve many real world asynch concurrent problems. But there is more to them than what we have seen here. They will be an extremely useful tool in any developer's repertoire.

Monday, October 22, 2012

scala xml wrapper utility

XDoc is a utility wrapper over scala.xml.Elem. It's simplifies xml processing. Following needs to be considered while using this utility:

1) It does not support namespaces. That can easily incorporated - But our use case did not have any namespaces.
2) It does not consider text within an element to be a child element of the container as opposed to normal convention - But text can set and retrieved from an element.

Following REPL session shows how to use it.

Shows how to create an instance XDoc:

scala> var d = XDoc("animals") addChild "tiger" addChild "lion" addAttr("test", "attrValue")
d: XDoc = 
<animals test="attrValue">

Querying child count:

scala> d.childCount
res13: Int = 2

Setting text :

scala> d = d.setText("Sample text")
d: XDoc = <animals test="attrValue">
  Sample text

Shows that text is not considered as a child:

scala> d.childCount
res14: Int = 2

scala> d = d addChild(XDoc("zebra").addAttr("hasStripes", "true"))
d: XDoc = <animals test="attrValue">
  Sample text
  <zebra hasStripes="true"/>

scala> d.childCount
res15: Int = 3

scala> d = d.addAttr("test2", "Value")
d: XDoc = <animals test2="Value" test="attrValue">
  Sample text
  <zebra hasStripes="true"/>

Querying attributes. Attributes are an instance of the case class :

case class XAttr(name: String, value: String)

scala> val atrs = d.attrs
atrs: List[XAttr] = List(XAttr(test,attrValue), XAttr(test2,Value))

scala> d.hasAttrs
res16: Boolean = true

scala> d.attr("test2")
res17: Option[XAttr] = Some(XAttr(test2,Value))

scala> d.attr("testxx")
res18: Option[XAttr] = None

Querying childrens :

scala> d.childrenByName("tiger")
res20: List[XDoc] = List(<tiger/>)

scala> d = d addChild((XDoc("tiger").addAttr("male", "true")) addChild "females")
d: XDoc = <animals test2="Value" test="attrValue">
  Sample text
  <zebra hasStripes="true"/>
  <tiger male="true">

scala> d.childrenByName("tiger")
res21: List[XDoc] = 
List(<tiger/>, <tiger male="true">

Filtering children based on a predicate:

scala> d = d filter( !="zebra")
d: XDoc = <animals test2="Value" test="attrValue">
  <tiger male="true">

Checking existence of children based on predicate:

scala> d exists(_.isChildLess == false)
res22: Boolean = true

scala> val tigerWithFemales = d filter (_.isChildLess == false)
tigerWithFemales: XDoc = 
<animals test2="Value" test="attrValue">
  <tiger male="true">

Mapping over child elements and transforming them:

scala> d = d map (_.addAttr("wild","true"))
d: XDoc = <animals test2="Value" test="attrValue">
  <tiger wild="true"/>
  <lion wild="true"/>
  <tiger wild="true" male="true">

Finding based on a predicate condition on children:

scala> val withFemale = d find(_.isChildLess == false)
withFemale: Option[XDoc] = 
Some(<tiger wild="true" male="true">

Checking if some conditions holds for all children:

scala> d forall(_.attr("wild") match {
     |   case None => false
     |   case Some(x) => true
     |   })
res23: Boolean = true

Finally, we can partition the elements based on some criteria.

scala> val (d1, d2) = d partition (_.isChildLess)
d1: XDoc = 
<animals test2="Value" test="attrValue">
  <tiger wild="true"/>
  <lion wild="true"/>
d2: XDoc = 
<animals test2="Value" test="attrValue">
  <tiger wild="true" male="true">

As can be seen XDoc can be pretty handy for dealing with xmls without namespaces. It is an utility class of a bigger project - And can be improved upon. In the mean while the source can be found at:

Sunday, June 3, 2012

Cloud Foundry Now Supports Play!

Cloudfoundry now has now incorporated support for play 2.0. Following is the link:

Have fun!

Thursday, May 17, 2012

Play2.0 and cloudfoundry database connectivity

Right at the outset, an honest confession - I am no good at shell script. When I tried to deploy my play2.0 - scala application in cloudfoundry - and connect to the provisioned database service - I faced the difficult task of parsing a JSON string(cloudfoundry provides details of the services in JSON format) - extract the database credentials - and set them as environment variables so that when I start my play2.0 netty server - the server can read those environment variables and connect to the database. Following snippet shows a sample database related info that cloudfoundry provides as environment variable:


So, I had basically two options before me:

a) I write a shell script to parse the JSON script and extract  the relevant data
b) Or I change the netty server start-up code itself - so that when launched - it reads the VCAP_SERVICES JSON string - takes out the relevant database credentials and connects to the provisioned database instance.

So, I started on my venture to find a nice little shell script parses a JSON string and dishes out the database settings when asked for. Googling gave me some scripts - but they were not very helpful - either they were too lousy or too specific to some other tasks - and I dared not to modify them for the obvious reason as I have mentioned at the outset - I am not good at shell script.

So, I decided to follow my second option. I got the play2.0 source code from github repository - modified the netty server scala code - so that it calls some other piece of scala code(which does the JSON parsing - and sets the database specific details nice and cool) - and does what it normally does. Following snippet shows the piece of code that does JSON parsing and environment variable setting.

package play.core.server

import play.api.libs.json._
import play.api.libs.json.Json._
import play.api.libs.json.Json
import java.util.{Map=>JMap}
import java.util._
import java.lang.reflect.Field

object VCAPJsonParser {
  def init_env(): Unit  = {
    val vcap_services = System.getenv("VCAP_SERVICES")
    if(vcap_services == null || "" == vcap_services){
      println("vcap_services env value is not set!")
    }else {
    println("The VCAP_SERVICES json  : "+ vcap_services)
    val services: JsValue = Json.parse(vcap_services)
    val postgresql = services \ "postgresql-9.0"
    val credentials = postgresql(0) \ "credentials"
    println("The credentials json : "+ credentials)
    val dbname = (credentials \ "name").as[String]
    val hostname = (credentials \ "hostname").as[String]
    val user = (credentials \ "user").as[String]
    val password = (credentials \ "password").as[String]
    val port = (credentials \ "port").as[Int]
    val database = "jdbc:postgresql://" +hostname+":"+port+"/" +dbname
    println("database url : "+ database + ", user : "+ user+ ", password : "+ password)
    val envObj = System.getenv()
    val env = envObj.asInstanceOf[JMap[String,String]]
    var newEnv: JMap[String,String] = new java.util.HashMap()
    println("Env set: "+ System.getenv())
    println("Database url : "+ System.getenv("postgres_database")+", user : "+ System.getenv("postgres_dbuser")+", password : "+ System.getenv("postgres_password"))
  def main(args: Array[String]): Unit = {

  def setEnv(newenv: JMap[String,String]): Unit = {
    try {
    val processEnvironmentClass : Class[_] = Class.forName("java.lang.ProcessEnvironment")
        val theEnvironmentField : Field  = processEnvironmentClass.getDeclaredField("theEnvironment")
    val env: JMap[String,String] = (theEnvironmentField.get(null)).asInstanceOf[JMap[String,String]]
        val theCaseInsensitiveEnvironmentField: Field = processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment")
        val cienv: JMap[String,String] =(theCaseInsensitiveEnvironmentField.get(null)).asInstanceOf[JMap[String,String]]
    }catch {
     case e1: NoSuchFieldException => try {
       val classes: Array[Class[_]] = (classOf[Collections]).getDeclaredClasses()   
       val env: JMap[String,String] = System.getenv()
       for(cl <- classes){
     if("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
       val field: Field = cl.getDeclaredField("m")
           val obj = field.get(env)
           val map = obj.asInstanceOf[JMap[String,String]]
     }catch {
       case e11: Exception => e11.printStackTrace(System.err)
     case e2: Exception => e2.printStackTrace(System.err)


You can put this piece of code in the proper directory - call it from the main method of the "NettyServer.scala" and build the play framework from the source - and then push your application to cloudfoundry - and it will faithfully connect to your database instance.

I have done this - this is how my application( was running till I wrote this post.

I was not very happy to modify the netty server code just to read database specific details. Hence I rolled up my sleeves - And decided to give the shell script
approach another go. Following is shell script - that I ended up writing. Needless to say - there must be more efficient way of writing it - but it does the job currently. You are welcome to modify and make it more generic.

#!/usr/bin/env bash

function processVCAP_SERVICE_JSON_and_setDBenv(){
  echo "*******************"
  echo "*******************"
  for ((i=0; i<${length}; i++ ));
    curr_item="$(echo ${curr_item} | tr -d '\"')"
    echo "position : "+ "$i" + "and item is : "+ "$curr_item"
    #Take care of password:pbd0e3f367a6c4a36bcc48a3c763a929e}}]} <--
    if [[ "$curr_item" == password* ]]; then

    if [[ "$found" = "y" ]] || [[ "$curr_item" == credentials* ]]; then
     if [ "$found" = "n" ]; then
       #get rid of 'credentials:{' part
     echo counter is : "{$counter}"
   echo "ratul your db settings are : "  
   echo "${#db_settings}"
   echo "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"
   #prepare the database_url,database_user & database_password and set them in env
   #change above in case of MySQL
   echo "============================================="
   for setting in "${db_settings[@]}"
      echo ##############"${#setting}"   
      echo "$setting and setting length=$set_len"

      case "$setting" in
           name:*) db_name="${setting:5:$set_len}";
                  echo "DB name : $db_name" ;;
       hostname:*) host_name="${setting:9:$set_len}";
          echo "DB host : $host_name" ;;

       port:*) port_num="${setting:5:$set_len}";
          echo "DB port : $port_num" ;;
       username:*) database_user="${setting:9:$set_len}";
          echo "DB user : $database_user" ;;
       password:*) database_password="${setting:9:$set_len}";
          echo "DB password : $database_password" ;;

    export database_url=$database_url
    export database_password=$database_password
    export database_user=$database_user

echo "database url : $database_url"

echo "password : $database_password"

echo "user : $database_user"

exec java $JAVA_OPTS -Dhttp.port=$VCAP_APP_PORT -DapplyEvolutions.default=true -cp "`dirname $0`/lib/*" play.core.server.NettyServer `dirname $0`

Be sure to set environment variables accordingly in application.conf like so:


Saturday, May 12, 2012

Deploy play 2.0 app on cloudfoundry

For last few days I was trying to find a to deploy my play 2.0 application. I have already deployed my app at heroku( But heroku offers very limited resources for free account (max slug size can not exceed 100MB and only 5MB of shared database space. So you have to work within these constraints and it becomes difficult to play around. In fact, to reduce my slug size I had to use a custom build pack from github. Also, if your application is not accessed for a long - heroku idles out your app - next time you access your app - it takes a long time for the response to come back. So you need to setup some kind ping program to keep hitting your app at regular interval say, every 10 minutes.

With cloundfoundry these problems are not there - cloudfoundry free resources are quite good enough(2GB RAM, 2GB disk space) so that you can concentrate on what you are doing instead of thinking about resource constraint.


Before you can deploy your app in - you need to open an account with After registering it takes a day or two to get your account activated. You also have to have either the vmc command line tool or sts(eclipse plugin) installed on your system - visit the link and follow the instructions there.
r new application is ready.App
Once you have setup the pre-requisites and got your user name and password from - you are all set to deploy your app on Paas.

I am going to use the vmc command line tool to deploy a newly created play 2.0 application on cloudfoundry.

Follow the steps below to deploy your app.

1. Launch command prompt.

2. Type in -> play new myPlayApp

ratul@ubuntu:~$ play new myPlayApp
       _            _
 _ __ | | __ _ _  _| |
| '_ \| |/ _' | || |_|
|  __/|_|\____|\__ (_)
|_|            |__/
play! 2.0.1,

The new application will be created in /home/ratul/myPlayApp

What is the application name?
> myPlayApp

Which template do you want to use for this new application?

  1 - Create a simple Scala application
  2 - Create a simple Java application
  3 - Create an empty project

> 1

OK, application myPlayApp is created.

Have fun!

3. Opne the newly created myPlayApp/app/controllers/Application.scala file in a text editor.

Change defintion of the index method like so:

 def index = Action {
    val port = System.getenv("VCAP_APP_PORT")
    val host = System.getenv("VCAP_APP_HOST")
    println("App port : "+ port+" and host: "+ host)
    Ok(views.html.index("Your new application is ready."+ "App port : "+ port+" and host: "+ host))

  Note: VCAP_APP_PORT & VCAP_APP_HOST are port name host names cloudfoundry assigns when it runs your app.

4. Save the Application.scala.

5. Open the myPlayApp/app/views/index.scala.html in a text editor and change the line that reads @play20.welcome(message) to @message and save it.

6. On the command, cd myPlayApp -> ratul@ubuntu:~$ cd myPlayApp/

7. launch the play promt by tying 'play'.

ratul@ubuntu:~/myPlayApp$ play
[info] Loading project definition from /home/ratul/myPlayApp/project
[info] Set current project to myPlayApp (in build file:/home/ratul/myPlayApp/)
       _            _
 _ __ | | __ _ _  _| |
| '_ \| |/ _' | || |_|
|  __/|_|\____|\__ (_)
|_|            |__/
play! 2.0.1,

> Type "help play" or "license" for more information.
> Type "exit" or use Ctrl+D to leave this console.

8. Next we will bundle the application. So type 'dist' on the play prompt.

[myPlayApp] $ dist
[info] Updating {file:/home/ratul/myPlayApp/}myPlayApp...
[info] Done updating.                                                                 
[info] Compiling 5 Scala sources and 1 Java source to /home/ratul/myPlayApp/target/scala-2.9.1/classes...
[info] Packaging /home/ratul/myPlayApp/target/scala-2.9.1/myPlayApp_2.9.1-1.0-SNAPSHOT.jar ...
[info] Done packaging.

Your application is ready in /home/ratul/myPlayApp/dist/

[success] Total time: 12 s, completed 12 May, 2012 2:56:34 PM

9. We want to push the application contents as folder structure and not as zip so that we want to be able to push deltas later on. Hence open the folder myPlayApp/dist in your explorer and extract the contents inside '' in the same folder.

10. Delete the Go inside the myPlayApp-1.0-SNAPSHOT folder and delete the README and start files as well - we do not need them.

11. Now we are ready to push our content to cloudfoundry. Let's logr new application is ready.Appin first.

12. On the command prompt come out from the play promt by typing 'exit'.

13. cd dist/myPlayApp-1.0-SNAPSHOT/

14. Type 'vmc login' on the command prompt.

ratul@ubuntu:~/myPlayApp/dist/myPlayApp-1.0-SNAPSHOT$ vmc login
Attempting login to []

Password: ******************
Successfully logged into []

15. On the prompt type : vmc target

ratul@ubuntu:~/myPlayApp/dist/myPlayApp-1.0-SNAPSHOT$ vmc target
Successfully targeted to []

16. When prompted by vmc, answer with y or n as shown below.

ratul@ubuntu:~/myPlayApp/dist/myPlayApp-1.0-SNAPSHOT$ vmc push
Would you like to deploy from the current directory? [Yn]: y
Application Name: myPlayApp
Detected a Standalone Application, is this correct? [Yn]: y
1: java
2: node
3: node06
4: ruby18
5: ruby19
Select Runtime [java]: 1
Selected java
Start Command: java $JAVA_OPTS -Dhttp.port=$VCAP_APP_PORT -cp "`dirname $0`/lib/*" play.core.server.NettyServer `dirname $0`
Application Deployed URL [None]: myPlayApp.${target-base}   
Memory reservation (128M, 256M, 512M, 1G, 2G) [512M]: 256M
How many instances? [1]: 1
Create services to bind to 'myPlayApp'? [yN]: n
Would you like to save this configuration? [yN]: y
Manifest written to manifest.yml.
Creating Application: OK
Uploading Application:
  Checking for available resources: OK
  Processing resources: OK
  Packing application: OK
  Uploading (80K): OK  
Push Status: OK
Staging Application 'myPlayApp': OK                                                 
Starting Application 'myPlayApp': OK     


17. In your browser you can now open your application - type in - and we are done!

18. The broswer will show something like :

Your new application is ready.App port : 61897 and host:

Monday, January 16, 2012

Delimited continuations

There have been a lot of blog posts, questions & answers about scala's delimited continuation. I would not again try to confuse the reader with another lame-duck explaination. I would, instead, present it as I have come to understand it and then go on to develop a basic framework where we capture a part of a computation and send that frozen computation to a remote machine where it gets executed and result of that computation is sent back to the criginal machine.  
If we think of a scala program as the whole computation - then we can say the whole continuation is from the begining to the end. Let's say whole computation takes value of type A as input and produces a value of type C as output. Also, let's assume the program cosists of 2 functions F1 & F2 which are called in a sequence. So if F is the whole program, then we can write F as follows:


So, F1 receives the initial input of type A, computes an intermediate result of type B which is fed to F2 which computes the final result of type C. Now what if we could pause the computation halfway after F1 has calculated it's value and freeze rest of the computation? So the the program now translates into some other function from B => C. Let's call that some other function F~. Now couple of things we need to take notice of here.

Though F~ is a function from B => C, its frozen now - we can pass any value of type B - but it has no effect. We feed some value of type B at the moment we freeze it (via shift). The frozen state extends from end of F1's computation till end F2 computation(this boundary is specified by reset). There might be additional computations after F2 but they are not part of the frozen state. Astute reader by now must have figured out where the delimited qualifier is coming from in scala's continuation. Also, the moment we capture(i.e. freeze) the rest of the computation - control returns to the caller of F but shifting/capturing/freezing must respect the contractual obligation that F returns a value of type C. So, at the point of shifting we must return some value of type C or call F~ with some arbitray value of type B.

Now let's try out a few example before embarking on the business of tossing around delimited continuations. Launch REPL with the continuation pluging enabled as shown below:

scala -P:continuations:enable

scala> class A
defined class A

scala> class B
defined class B

scala> class C
defined class C

scala> import util.continuations._
import util.continuations._

var cont: (B => C) = null

def fun1(a: A): C = {
  println("I am not part of frozen state!")
    reset {
      println("reset marks the boundary where shift/freeze/capture can be done")
      shift { continuation: (B => C) =>
        cont= continuation
        println("I was realized with: "+a.hashCode)
        println("I am returning an arbitrary C")
        new C
      println("I am the part of the frozen computation")
      new C
  println("fun is supposed to return a C")
  new C

fun1: (a: A)C

Now let's invoke fun and capture the continuation.

scala> fun1(new A)
I am not part of frozen state!
reset marks the boundary where shift/freeze/capture can be done
I was realized with: 13264767
I am returning an arbitrary C
fun is supposed to return a C
res1: C = C@50d7c5

Now we call cont:

scala> cont(new B)

I am the part of the frozen computation
res2: C = C@3f348a

Let's look at another example:

scala> var cont: (Int => String) = null
cont: (Int) => String = null


scala> def fun2(i: Int): String = reset {
shift { continuation: (Int => String) =>
"Ratul Buragohain" //since shift will return control to caller of fun - but fun is supposed to return a String
(i to 10).toList.mkString //Changing i's value has no effect after capturing the rest of the computation

fun2: (i: Int)String

Call fun to capture the continuation.

scala> fun2(0)

res7: String = Ratul Buragohain

Now call cont.

scala> cont(0)
res8: String = 012345678910

scala> cont(5)
res9: String = 012345678910

scala> cont(null.asInstanceOf[Int])
res12: String = 012345678910

scala> cont(100000000)
res15: String = 012345678910

That's the crux of scala's delimited continuation.

In the next post I will show how we can toss around delimited continuation across machines.