This is my Blog.

It's full of opinions on a wide range of stuff.

Scalaz Task - the missing documentation

Before starting, let us consider the semantics of Future[A] for a moment: the monadic context being provided is that of asynchronous processing: the enclosed A may or may not exist yet, and if it doesn't exist, the computation to produce A is either not completed or it has failed. This is all very reasonable in isolation. Now then, the problem with this is that what one will often observe - especially with relative newcomers to functional programming - is that this "failed" state of Future gets abused to provide some semantic meaning within the given application code. Typically, if one was working with a computation that could fail at a higher level application layer, one might model it as Throwable \/ A in a pure function, such that said computation could then be lifted into some asynchronous context simply by wrapping it in a future: Future[Throwable \/ A]. Whilst much cleaner, this usually provides a secondary complication in that one is then stacking monads all over the application code, and you end up needing monad transformers to declutter things and make it reasonable to work with. The downside here of course is that monad transformers (such as scalaz.EitherT come with a layer of cognitive difficulty that many engineers find problematic to overcome).

In this frame, Scalaz brings scalaz.concurrent.Task to the table to try and make asynchronous operations that might have application-layer failures easier to work with, without the need for monad transformers, and still maintaining all the useful Scalaz typeclass utilities and combinators. In addition to just being a convenience data type, its important to understand the Task is a trampolined construct which means that its not using up stack in the same way a regular chain of functions would (this is an important point and becomes relevant later)

With the preamble out of the way, the next section describes the set of public combinators you will likely be using with Task, and gives examples of how to get started.

Task Combinators and Usage

There are a range of useful functions for task, some on the companion object and others on instances themselves.

Object functions

Typically speaking, when using Task one does not use the constructor to create new instances - whilst that is possible, at that point the user code needs to guarantee the exception safety of the supplied Future[Throwable \/ A], so its typically just easier to use one of the combinators below:

  • Task.apply: if you have a computation that you want to evaluate asynchronously, use apply. This is much like scala.concurrent.Future.apply, in that it takes an implicit ExecutorService reference for which the computation will later be run with. I say "later be run with", as, unlike the std library Future computations are not run at the moment they are called, they are lazy. Consider the following example:
scala> Task { println("foo") }
res0: scalaz.concurrent.Task[Unit] = scalaz.concurrent.Task@22147686

// dangerous - this is as bad as Option.get, included for completeness.

scala> res0.attemptRun
res2: scalaz.\/[Throwable,Unit] = \/-(())

In this example, note that Task.apply was used to suck the println operation into a task, which then just got allocated into res0. Nothing actually happens until calling one of run, attemptRun (which are both blocking) or runAsync (which is as the name suggests, is asynchronous). This is a really important detail, as your program won't even execute if you don't call one of these methods.

  • if you have a strict value that you simply want to lift into a Task[A], then is your friend. Example:
scala> import scalaz.concurrent.Task
import scalaz.concurrent.Task

scala> val i: Task[Int] =
i: scalaz.concurrent.Task[Int] = scalaz.concurrent.Task@6f721236
  • imagine you have a Throwable instance from some function that you want to lift into a Task, then you can use Failed tasks can also be chained together in a convenient fashion using the or combinator:
scala> import scalaz.concurrent.Task
import scalaz.concurrent.Task

scala> val i: Task[Int] = Exception("example")) or
i: scalaz.concurrent.Task[Int] = scalaz.concurrent.Task@4266d38d

res0: Int = 1

scala> Exception("boom")).run
java.lang.Exception: boom
    at .<init>(<console>:9)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  • Task.delay: this function might look similar to in terms of its type signature, but its semantic is very different. Consider the implementation: suspend(now(a)). You can see that it first lifts the value into a task using, and then Task.suspend's the resulting task (see below for description of suspend)

  • Task.suspend: suspend is a kind of interesting beast in that in that its purpose is to execute the supplied f: => Task[A] in a new trampolined loop. As mentioned in the introduction, Task is a trampolined structure, so this concept of suspending a computation is all to do where the computation is trampolined too. Use cases for this are when you would like to create a Task in a lazy fashion, which can often be useful when you have different types of recursive functions.

  • Task.fork: Given the supplied task, ensure that it runs on a separate logical thread when it comes to executing this task. How exactly this is executed at the end of the world depends on the supplied ExecutorService.

  • Task.async: Sometimes when working with callback-based APIs you really wish that you had a reasonable monadic API, and the Task.async function can help with this. Here's an example of wrapping Java's AsyncHttpClient using the async combinator below, which typically would give you code like this:

asyncHttp.prepareGet("", new AsyncCompletionHandler[Response] {
  def onComplete(r: Response) = ...
  def onError(e: Throwable) = ...

However, this can be transformed into a convenient monadic API using Task.async:

def get(s: String) = 
  Task.async(k => asyncHttp.prepareGet(s, toHandler(k)))

def toHandler[A](k: (Throwable \/ A)) = new AsyncCompletionHandler[A] {
  def onComplete(r: Response) = k(...)
  def onError(e: Throwable) = k(...)

// usage:
get("").flatMap(response => ...)
  • Task.gatherUnordered: More often than not, you may have a sequence of Task or have a situation where you would like tasks to be executed in parralell. In this case, gatherUnordered is exactly what you want. Here's an example:
scala> import scalaz._, Scalaz._
import scalaz._
import Scalaz._

scala> import scalaz.concurrent.Task
import scalaz.concurrent.Task

scala> val tasks = (1 |->  5).map(n => Task { Thread.sleep(100); n })
tasks: List[scalaz.concurrent.Task[Int]] = List(scalaz.concurrent.Task@22fef829, scalaz.concurrent.Task@4e707a3f, scalaz.concurrent.Task@3b3e8bb0, scalaz.concurrent.Task@4db4e786, scalaz.concurrent.Task@18de71a9)

scala> Task.gatherUnordered(tasks).run
res0: List[Int] = List(1, 3, 4, 2, 5)

Notice that when run the tasks are all executed in parallel, and the resulting list of Ints are not always ordered properly. This is wildly useful in all manner of circumstances, and for folks still thinking about the Scala std library Future, you can think of this as being similar to Future.traverse or Future.sequence.

  • Task.reduceUnordered: In Scalaz 7.0 gatherUnordered was the only available way to run multiple tasks and collect results, and its output collection was concretely List[A]. As there are plenty of other cases where you would want to return some other type constructor rather than List, in these cases you can use reduceUnordered. This is best illustrated by the updated implementation of gatherUnordered:
reduceUnordered[A, List[A]](tasks, exceptionCancels)

Pretty simply, with the output M being explicitly specified as List, this resolves a Reducer[List] from the companion object of Reducer, where Reducer is a convenience for composing operations on arbitrary monoids. See the implementation for more details as its a little out of scope for this blog post.

  • Task.unsafeStart: Given that some users struggle with the delta between the runtime semantics of Task[A] and the Future[A] in the standard library, those users can choose to use unsafeStart. When used, this function will take a thread from the relevant ExecutorService and run the enclosing function immediately. This function is only available in Scalaz 7.1+, and should rarely, if ever, be used with extreme care..

These are the functions available for creating Task instances, which are accompanied by a range convenient functions on the instances themselves.

Instance Functions

  • onFinish: upon completion of this task the supplied function Option[Throwable] => Task[Unit] will be executed. One could use this for cleaning up resources or executing some scheduled side-effect.

  • timed: if you want your task to take no longer than a specified amount of time, then the timed function is for you. Here's an example:

scala> Task { Thread.sleep(1000); "one" }.timed(100)
res3: scalaz.concurrent.Task[String] = scalaz.concurrent.Task@5c85a402

    at scalaz.concurrent.Future$$anonfun$timed$1$$anon$
    at java.util.concurrent.Executors$
  • handleWith: if you want to explicitly handle or maybe even ignore certain types of application error, you can use the handleWith function to alter the contents of your task based on the received error:
scala> val exe = Task(10 / 0)
exe: scalaz.concurrent.Task[Int] = scalaz.concurrent.Task@5fb90bbb

scala> exe.handleWith {
     | case e: ArithmeticException =>
     | }
res10: scalaz.concurrent.Task[Int] = scalaz.concurrent.Task@14b7c7d0

res11: Int = 0

A word on Nodeterminism[M[_]]

In Scalaz 7.1, some useful functions were also added to the Nodeterminism abstraction, specifically related to the parallel execution of tasks. Consider the following example:

scala> import scalaz.concurrent.Task, scalaz.Nondeterminism
import scalaz.concurrent.Task
import scalaz.Nondeterminism

scala> val sb = new StringBuffer
sb: StringBuffer =

scala> val t1 = Task.fork { Thread.sleep(1000); sb.append("a") ;"a") }
t1: scalaz.concurrent.Task[String] = scalaz.concurrent.Task@539d15a4

scala> val t2 = Task.fork { Thread.sleep(800); sb.append("b") ;"b") }
t2: scalaz.concurrent.Task[String] = scalaz.concurrent.Task@42e69075

scala> val t3 = Task.fork { Thread.sleep(200); sb.append("c") ;"c") }
t3: scalaz.concurrent.Task[String] = scalaz.concurrent.Task@1e0467b1

scala> val t4 = Task.fork { Thread.sleep(400); sb.append("d") ;"d") }
t4: scalaz.concurrent.Task[String] = scalaz.concurrent.Task@3f7da7fe

scala> val t5 = Task.fork { sb.append("e") ;"e") }
t5: scalaz.concurrent.Task[String] = scalaz.concurrent.Task@1206a4db

scala> val t6 = Task.fork { Thread.sleep(600); sb.append("f") ;"f") }
t6: scalaz.concurrent.Task[String] = scalaz.concurrent.Task@55470723

scala> val r = Nondeterminism[Task].nmap6(t1, t2, t3, t4, t5, t6)(List(_,_,_,_,_,_))
r: scalaz.concurrent.Task[List[String]] = scalaz.concurrent.Task@5ae9139

res0: List[String] = List(a, b, c, d, e, f)

You might be wondering why this is even useful to use Nondeterminism over the plain Task combinators? Well, sometimes its useful to abstract over a given input (much like you would do generally), and only care about the certain operations that you might do with a task, without specifically being bound to Task explicitly.

Task Concurrency

Whilst the world of task is generally quite lovely, there are some important implementation details to be aware of when getting a handle on what runs on which thread pools when actually executing something. Let's look at some examples to try and explain the various cases (the following example use run just for simplicity and to obviate things if you want to try them in the REPL):


The above takes the task and runs the computation on thread pool p. This is probably the simplest case. If you do not supply p as an implicit or explicit argument then by default Task uses scalaz.concurrent.Strategy.DefaultExecutorService, which is a fixed thread pool which dynamically fixes its upper bound to the number of processors in the host machine.


Which then expands too:

)(DefaultExecutorService).flatMap(x => x).run

In this case the evaluation of the expression Task(e)(p) will occur on DefaultExecutorService and the evaluation of the expression e will occur on p.


Which then expands too:


This is the other way around, where Task(example(arg)) will end up being evaluated on p, but example(arg) will be evaluated on DefaultExecutorService.

You might be wondering how on earth one can then get a computation to run on precisely the pool you want, without having to constantly pass around the explicit reference to the desired pool. For this you can use scalaz.Kleisli, and "inject" the reference to the executor. Consider the following:

scala> import java.util.concurrent.{ExecutorService,Executors}
import java.util.concurrent.{ExecutorService, Executors}

scala> import scalaz.concurrent.Task
import scalaz.concurrent.Task

scala> import scalaz.Kleisli
import scalaz.Kleisli

scala> type Delegated[+A] = Kleisli[Task, ExecutorService, A]
defined type alias Delegated

scala> def delegate: Delegated[ExecutorService] = Kleisli(e =>
delegate: Delegated[java.util.concurrent.ExecutorService]

scala> implicit def delegateTaskToPool[A](a: Task[A]): Delegated[A] = Kleisli(x => a)
delegateTaskToPool: [A](a: scalaz.concurrent.Task[A])Delegated[A]

scala> val exe = for {
     |   p <- delegate
     |   b <- Task("x")(p)
     |   c <- Task("y")(p)
     | } yield c
exe: scalaz.Kleisli[scalaz.concurrent.Task,java.util.concurrent.ExecutorService,String] = scalaz.KleisliFunctions$$anon$17@1774bf5c

res0: scalaz.concurrent.Task[String] = scalaz.concurrent.Task@252cffe7

res1: String = y

This gives us a simple way to paramatise a set of functions to execute on the same thread pool if required.

blog comments powered by Disqus