Scalaz Task - the missing documentation
Before starting, let us consider the semantics of Future[A]
for a moment: the monadic context being provided is that of asynchronous processing: the enclosed A
may or may not exist yet, and if it doesn’t exist, the computation to produce A
is either not completed or it has failed. This is all very reasonable in isolation. Now then, the problem with this is that what one will often observe - especially with relative newcomers to functional programming - is that this “failed” state of Future
gets abused to provide some semantic meaning within the given application code. Typically, if one was working with a computation that could fail at a higher level application layer, one might model it as Throwable \/ A
in a pure function, such that said computation could then be lifted into some asynchronous context simply by wrapping it in a future: Future[Throwable \/ A]
. Whilst much cleaner, this usually provides a secondary complication in that one is then stacking monads all over the application code, and you end up needing monad transformers to declutter things and make it reasonable to work with. The downside here of course is that monad transformers (such as scalaz.EitherT
come with a layer of cognitive difficulty that many engineers find problematic to overcome).
In this frame, Scalaz brings scalaz.concurrent.Task
to the table to try and make asynchronous operations that might have application-layer failures easier to work with, without the need for monad transformers, and still maintaining all the useful Scalaz typeclass utilities and combinators. In addition to just being a convenience data type, its important to understand the Task
is a trampolined construct which means that its not using up stack in the same way a regular chain of functions would (this is an important point and becomes relevant later)
With the preamble out of the way, the next section describes the set of public combinators you will likely be using with Task
, and gives examples of how to get started.
Task Combinators and Usage
There are a range of useful functions for task, some on the companion object and others on instances themselves.
Object functions
Typically speaking, when using Task
one does not use the constructor to create new instances - whilst that is possible, at that point the user code needs to guarantee the exception safety of the supplied Future[Throwable \/ A]
, so its typically just easier to use one of the combinators below:
Task.apply
: if you have a computation that you want to evaluate asynchronously, useapply
. This is much likescala.concurrent.Future.apply
, in that it takes an implicitExecutorService
reference for which the computation will later be run with. I say “later be run with”, as, unlike the std libraryFuture
computations are not run at the moment they are called, they are lazy. Consider the following example:
scala> Task { println("foo") }
res0: scalaz.concurrent.Task[Unit] = scalaz.concurrent.Task@22147686
// dangerous - this is as bad as Option.get, included for completeness.
scala> res0.run
foo
scala> res0.attemptRun
foo
res2: scalaz.\/[Throwable,Unit] = \/-(())
In this example, note that Task.apply
was used to suck the println
operation into a task, which then just got allocated into res0
. Nothing actually happens until calling one of run
, attemptRun
(which are both blocking) or runAsync
(which is as the name suggests, is asynchronous). This is a really important detail, as your program won’t even execute if you don’t call one of these methods.
Task.now
: if you have a strict value that you simply want to lift into aTask[A]
, thenTask.now
is your friend. Example:
scala> import scalaz.concurrent.Task
import scalaz.concurrent.Task
scala> val i: Task[Int] = Task.now(1)
i: scalaz.concurrent.Task[Int] = scalaz.concurrent.Task@6f721236
Task.fail
: imagine you have aThrowable
instance from some function that you want to lift into aTask
, then you can useTask.fail
. Failed tasks can also be chained together in a convenient fashion using theor
combinator:
scala> import scalaz.concurrent.Task
import scalaz.concurrent.Task
scala> val i: Task[Int] = Task.fail(new Exception("example")) or Task.now(1)
i: scalaz.concurrent.Task[Int] = scalaz.concurrent.Task@4266d38d
scala> i.run
res0: Int = 1
scala> Task.fail(new Exception("boom")).run
java.lang.Exception: boom
at .<init>(<console>:9)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
-
Task.delay
: this function might look similar toTask.now
in terms of its type signature, but its semantic is very different. Consider the implementation:suspend(now(a))
. You can see that it first lifts the value into a task usingTask.now
, and thenTask.suspend
’s the resulting task (see below for description ofsuspend
) -
Task.suspend
: suspend is a kind of interesting beast in that in that its purpose is to execute the suppliedf: => Task[A]
in a new trampolined loop. As mentioned in the introduction,Task
is a trampolined structure, so this concept of suspending a computation is all to do where the computation is trampolined too. Use cases for this are when you would like to create aTask
in a lazy fashion, which can often be useful when you have different types of recursive functions. -
Task.fork
: Given the supplied task, ensure that it runs on a separate logical thread when it comes to executing this task. How exactly this is executed at the end of the world depends on the suppliedExecutorService
. -
Task.async
: Sometimes when working with callback-based APIs you really wish that you had a reasonable monadic API, and theTask.async
function can help with this. Here’s an example of wrapping Java’sAsyncHttpClient
using theasync
combinator below, which typically would give you code like this:
asyncHttp.prepareGet("http://google.com", new AsyncCompletionHandler[Response] {
def onComplete(r: Response) = ...
def onError(e: Throwable) = ...
})
However, this can be transformed into a convenient monadic API using Task.async
:
def get(s: String) =
Task.async(k => asyncHttp.prepareGet(s, toHandler(k)))
def toHandler[A](k: (Throwable \/ A)) = new AsyncCompletionHandler[A] {
def onComplete(r: Response) = k(...)
def onError(e: Throwable) = k(...)
}
// usage:
get("http://google.com").flatMap(response => ...)
Task.gatherUnordered
: More often than not, you may have a sequence ofTask
or have a situation where you would like tasks to be executed in parralell. In this case,gatherUnordered
is exactly what you want. Here’s an example:
scala> import scalaz._, Scalaz._
import scalaz._
import Scalaz._
scala> import scalaz.concurrent.Task
import scalaz.concurrent.Task
scala> val tasks = (1 |-> 5).map(n => Task { Thread.sleep(100); n })
tasks: List[scalaz.concurrent.Task[Int]] = List(scalaz.concurrent.Task@22fef829, scalaz.concurrent.Task@4e707a3f, scalaz.concurrent.Task@3b3e8bb0, scalaz.concurrent.Task@4db4e786, scalaz.concurrent.Task@18de71a9)
scala> Task.gatherUnordered(tasks).run
res0: List[Int] = List(1, 3, 4, 2, 5)
Notice that when run the tasks are all executed in parallel, and the resulting list of Int
s are not always ordered properly. This is wildly useful in all manner of circumstances, and for folks still thinking about the Scala std library Future
, you can think of this as being similar to Future.traverse
or Future.sequence
.
Task.reduceUnordered
: In Scalaz 7.0gatherUnordered
was the only available way to run multiple tasks and collect results, and its output collection was concretelyList[A]
. As there are plenty of other cases where you would want to return some other type constructor rather thanList
, in these cases you can usereduceUnordered
. This is best illustrated by the updated implementation ofgatherUnordered
:
reduceUnordered[A, List[A]](tasks, exceptionCancels)
Pretty simply, with the output M
being explicitly specified as List
, this resolves a Reducer[List]
from the companion object of Reducer
, where Reducer
is a convenience for composing operations on arbitrary monoids. See the implementation for more details as its a little out of scope for this blog post.
Task.unsafeStart
: Given that some users struggle with the delta between the runtime semantics ofTask[A]
and theFuture[A]
in the standard library, those users can choose to useunsafeStart
. When used, this function will take a thread from the relevantExecutorService
and run the enclosing function immediately. This function is only available in Scalaz 7.1+, and should rarely, if ever, be used with extreme care..
These are the functions available for creating Task
instances, which are accompanied by a range convenient functions on the instances themselves.
Instance Functions
-
onFinish
: upon completion of this task the supplied functionOption[Throwable] => Task[Unit]
will be executed. One could use this for cleaning up resources or executing some scheduled side-effect. -
timed
: if you want your task to take no longer than a specified amount of time, then thetimed
function is for you. Here’s an example:
scala> Task { Thread.sleep(1000); "one" }.timed(100)
res3: scalaz.concurrent.Task[String] = scalaz.concurrent.Task@5c85a402
scala> res3.run
java.util.concurrent.TimeoutException
at scalaz.concurrent.Future$$anonfun$timed$1$$anon$2.run(Future.scala:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
handleWith
: if you want to explicitly handle or maybe even ignore certain types of application error, you can use thehandleWith
function to alter the contents of your task based on the received error:
scala> val exe = Task(10 / 0)
exe: scalaz.concurrent.Task[Int] = scalaz.concurrent.Task@5fb90bbb
scala> exe.handleWith {
| case e: ArithmeticException => Task.now(0)
| }
res10: scalaz.concurrent.Task[Int] = scalaz.concurrent.Task@14b7c7d0
scala> res10.run
res11: Int = 0
A word on Nodeterminism[M[_]]
In Scalaz 7.1, some useful functions were also added to the Nodeterminism
abstraction, specifically related to the parallel execution of tasks. Consider the following example:
scala> import scalaz.concurrent.Task, scalaz.Nondeterminism
import scalaz.concurrent.Task
import scalaz.Nondeterminism
scala> val sb = new StringBuffer
sb: StringBuffer =
scala> val t1 = Task.fork { Thread.sleep(1000); sb.append("a") ; Task.now("a") }
t1: scalaz.concurrent.Task[String] = scalaz.concurrent.Task@539d15a4
scala> val t2 = Task.fork { Thread.sleep(800); sb.append("b") ; Task.now("b") }
t2: scalaz.concurrent.Task[String] = scalaz.concurrent.Task@42e69075
scala> val t3 = Task.fork { Thread.sleep(200); sb.append("c") ; Task.now("c") }
t3: scalaz.concurrent.Task[String] = scalaz.concurrent.Task@1e0467b1
scala> val t4 = Task.fork { Thread.sleep(400); sb.append("d") ; Task.now("d") }
t4: scalaz.concurrent.Task[String] = scalaz.concurrent.Task@3f7da7fe
scala> val t5 = Task.fork { sb.append("e") ; Task.now("e") }
t5: scalaz.concurrent.Task[String] = scalaz.concurrent.Task@1206a4db
scala> val t6 = Task.fork { Thread.sleep(600); sb.append("f") ; Task.now("f") }
t6: scalaz.concurrent.Task[String] = scalaz.concurrent.Task@55470723
scala> val r = Nondeterminism[Task].nmap6(t1, t2, t3, t4, t5, t6)(List(_,_,_,_,_,_))
r: scalaz.concurrent.Task[List[String]] = scalaz.concurrent.Task@5ae9139
scala> r.run
res0: List[String] = List(a, b, c, d, e, f)
You might be wondering why this is even useful to use Nondeterminism
over the plain Task
combinators? Well, sometimes its useful to abstract over a given input (much like you would do generally), and only care about the certain operations that you might do with a task, without specifically being bound to Task
explicitly.
Task Concurrency
Whilst the world of task is generally quite lovely, there are some important implementation details to be aware of when getting a handle on what runs on which thread pools when actually executing something. Let’s look at some examples to try and explain the various cases (the following example use run
just for simplicity and to obviate things if you want to try them in the REPL):
Task(e)(p).run
The above takes the task and runs the computation on thread pool p
. This is probably the simplest case. If you do not supply p
as an implicit or explicit argument then by default Task
uses scalaz.concurrent.Strategy.DefaultExecutorService
, which is a fixed thread pool which dynamically fixes its upper bound to the number of processors in the host machine.
Task.fork(Task(e)(p)).run
Which then expands too:
Task(
Task(e)(p)
)(DefaultExecutorService).flatMap(x => x).run
In this case the evaluation of the expression Task(e)(p)
will occur on DefaultExecutorService
and the evaluation of the expression e
will occur on p
.
Task.fork(
Task(example(arg))
)(p)
Which then expands too:
Task(
Task(
example(arg)
)(DefaultExecutorService)
)(p)
This is the other way around, where Task(example(arg))
will end up being evaluated on p
, but example(arg)
will be evaluated on DefaultExecutorService
.
You might be wondering how on earth one can then get a computation to run on precisely the pool you want, without having to constantly pass around the explicit reference to the desired pool. For this you can use scalaz.Kleisli
, and “inject” the reference to the executor. Consider the following:
scala> import java.util.concurrent.{ExecutorService,Executors}
import java.util.concurrent.{ExecutorService, Executors}
scala> import scalaz.concurrent.Task
import scalaz.concurrent.Task
scala> import scalaz.Kleisli
import scalaz.Kleisli
scala> type Delegated[+A] = Kleisli[Task, ExecutorService, A]
defined type alias Delegated
scala> def delegate: Delegated[ExecutorService] = Kleisli(e => Task.now(e))
delegate: Delegated[java.util.concurrent.ExecutorService]
scala> implicit def delegateTaskToPool[A](a: Task[A]): Delegated[A] = Kleisli(x => a)
delegateTaskToPool: [A](a: scalaz.concurrent.Task[A])Delegated[A]
scala> val exe = for {
| p <- delegate
| b <- Task("x")(p)
| c <- Task("y")(p)
| } yield c
exe: scalaz.Kleisli[scalaz.concurrent.Task,java.util.concurrent.ExecutorService,String] = scalaz.KleisliFunctions$$anon$17@1774bf5c
scala> exe.run(Executors.newFixedThreadPool(1))
res0: scalaz.concurrent.Task[String] = scalaz.concurrent.Task@252cffe7
scala> res0.run
res1: String = y
This gives us a simple way to paramatise a set of functions to execute on the same thread pool if required.