1. Overview
This document is a reference documentation for Task, a Clojure library for asynchronous computation.
1.1. What is Task?
Tasks provide simple and functional concurrency primitives for Clojure. Tasks represent asynchronous computation as pure values, and favours function composition over callbacks or promises.
Task promotes the use of combinators like then
and compose
.
These let you combine asynchronous data functionally, so that
asynchronous steps are just transformations on a value.
1.2. Key features
-
Value-oriented: tasks are just eventual values. No more callbacks, regular
deref
/@
is all you need. -
Functional: tasks are composable. Tasks come with a set of operations that lets you compose and combine them in a functional manner.
-
Interoperable: Any Clojure
future
/promise
can be converted into a task, and vice versa. -
Asynchronous: tasks are always asynchronous. Tasks default to the ForkJoinPool executor.
-
Customizable: If you wish to control the execution model, you can define your own ExecutorService.
-
Performant: The library leverages the standard Java 8 Concurrency API, a powerful framework for concurrent computation.
1.3. An example
For example, a potential use case for asynchronous computation is doing HTTP requests. If we want to operate on the value of a GET request, and apply a data transformation, this is what you need to do.
(def request
(task/then
(fn [data] (-> data (2)
:body (3)
(cheshire/parse-string true) (3)
:title (3)
str/upper-case))
(http/get "http://jsonplaceholder.typicode.com/posts/3"))) (1)
(println @request) (4)
-
We launch a HTTP GET request asynchronously using http-kit. The promise returned by it is converted into a task automatically.
-
We pass the task and a function to
then
, a function described in Composing tasks. The function comes first and then comes the task. -
We parse the JSON data using Cheshire, extract an attribute and uppercase it.
-
We deref the value of hte task which blocks the current thread until the task is complete.
This whole value executes in another thread (depending on the
execution model) and eventually resolves to a
value as it is deref
d.
The snippet uses http-kit to launch a GET
request asynchronously (<1>). The http-kit promise is automatically converted
into a task. Once the promise completes, we parse the string into JSON
(cheshire/parse-string
) and then extract our attribute from it, and
then uppercase the results.
All of this is executed asynchronously, so we wrap the call to then
with a call to @
, which calls
deref, a Clojure standard
library function, to await its result in the current thread. This
blocks the execution of the current thread an and returns the result of
the computation.
Head over to the User Guide to learn more!
1.4. Installation & Requirements
You need Java 8 to use Task, because Task uses features that were introduced in that version.
2. User Guide
2.1. Creating tasks
To create a task, use . It evaluates its result asynchronously. It behaves more or less like do except that it, depending on the execution model, may evaluate its result in another thread.
(task/run (println "hello")
(Thread/sleep 1000)
123)
This task evaluates to 123. We can get it value by deref
ing it.
To get the value of the task, use deref
or @
from the Clojure
standard library. This blocks the current thread.
; these are both equal
@(task/run 123) ; => 123
(deref (task/run 123)) ; => 123
See the docs on deref.
Calling (run 123)
results possibly in the creation of
another thread. To create an "immeadiate" value that doesn’t cause any
unwanted execution, use now
:
@(task/now 123)
If you want to use another executor, you have two options. You can use run-in
which accepts
an explicit Executor parameter.
(let [pool (Executors/newFixedThreadPool 16)]
(task/run-in pool
(Thread/sleep 1000)
123)
A more Clojurelike approach is to use dynamic binding. The namespace variable task/*pool*
is the default
ExecutorService in scope. Rebinding *pool*
to the executor of your choosing is done as follows:
(binding [task/*pool* (Executors/newFixedThreadPool 1)]
(task/run (Thread/sleep 1000)
(println "hello!")))
This approach, dynamic binding, lets you do executor assignment in a much more idiomatic fashion. See also Using custom executors and the Execution model for more information about executors.
2.2. Composing tasks
2.2.1. Function application: then
and compose
If you want to apply a function on the value returned by a task, use
then
. then
accepts a function and a task, produces another task, to get its
result, you need to deref it:
@(task/then clojure.string/upper-case (task/run "asdf"))
; => ASDF
@(task/then inc (task/run 123))
; => 124
then
supports multiple arity. If you pass more than one task you get a task
that returns a vector of its results.
@(task/then inc (task/run 2) (task/now 5))
; => [3 6]
If you want to apply a function that produces another task, use
compose
:
@(task/compose (fn [x] (task/run
(Thread/sleep 1212)
(inc x)))
(task/run 778))
If you had used then
the result would have been a task inside another
task, requiring a double deref.
Like then
, compose also supports multiple arity. Pass more than one task,
you get a vector of the function applied to every task. The vector task
completes when all tasks complete.
@(task/compose (fn [y] (task/run (inc y))) (task/now 9) (task/run 10))
; => [10 11]
2.3. Interoperability
Standard Clojure future
and promise
are compatible with tasks:
@(task/then inc (future 9)) ; => 10
@(task/then inc (future 3) (task/now 4))) ; => [4 5]
(def foo (promise))
(task/then println foo)
(deliver foo "hello")
; prints "hello"
2.4. Using custom executors
By default, the *pool*
variable is bound to the default executor known as ForkJoinPool. If you want to
use your own executor, there are two approaches:
-
Use dynamic binding to set
*pool*
to the executor of your choice. -
Use the explicit variants of the combinators which accept an executor as the third parameter. These are:
2.4.1. Using dynamic binding
Rebind *pool*
like this:
(binding [task/*pool* (Executors/newCachedThreadPool)]
(task/run (println "Hello!")
1)))
In the scope, and subsequent calls to, the core functions, the default executing pool will be the binding.
2.4.2. Using explicit executors
All core combinators like then
and compose
accept a third parameter as the
ExecutorService in their
explicit variants:
(let [pool (Executors/newFixedThreadPool 4)]
(task/then-in inc (task/run-in 123 pool) pool))
This approach can be used to define multiple thread pools and use them in the chain of a task execution, spreading execution across multiple executors.
(let [pool1 (Executors/newFixedThreadPool 4)
pool2 (Executors/newCachedThreadPool)
init (task/run-in pool1
(Thread/sleep 1000)
(println "we start in" (.getName (Thread/currentThread)))
'blaa)]
(task/then-in
pool2
(fn [x]
(println "...but we end in" (.getName (Thread/currentThread)))) init))
; prints:
; we start in pool-8-thread-1
; ...but we end in pool-9-thread-1
2.4.3. Using for
to avoid boilerplate
The macro, like its standard library namesake, lets you skip some boilerplate when working with tasks that depend on each other:
(task/for [x (task/run 123)
y (future 123)
z (task/run (Thread/sleep 1000) 4)]
(+ x y z))
for
evaluates its body once all futures are complete, and the values
of each future are bound to their respective bindings.
2.5. Working with sequences
If you have a sequence of tasks, and you want to deal with each result as a whole, you can turn a sequence of tasks into a task that evaluates into the values of each task inside it using :
(task/sequence [(task/run 123) (future 9) (task/run (Thread/sleep 100) "hello")])
; => [123 9 "hello]
sequence
completes when all the tasks complete.
Similarly, if you have a funtion that produces a task, you can apply it to a sequence and gather its results into inside one task.
(task/traverse
[1 2 3 4 5 6]
(fn [x] (task/run (* 2 x))))
; => [2 4 6 8 10 12]
Using regular map
would have just produced a sequence of tasks, not what we want.
2.6. Completion
To check if a task is complete, use done?
:
(task/done? some-task)
(task/done? (task/run (Thread/sleep 123123) 'foo))
; => false
(task/done? (task/now 1))
To complete a task before it is done, use [[complete!]]:
(def baz (task/run (Thread/sleep 10000) 'foo))
(task/complete! baz 'bar)
@baz ; => 'bar
If the task is already complete, it does nothing.
(else (task/run (Thread/sleep 1000) 1) 2)
; => 2
2.6.1. Forcing a result
To force the result of a task, completed or not, use [[force!]]:
(def t (task/now 123))
(task/force! t 'hi)
@t ; => 'hi
2.6.2. Cancellation
(def my-task (task/run (Thread/sleep 10000) 'bla))
(task/cancel my-task)
To see if the task was cancelled, use [[cancelled?]]:
(cancelled? my-task) ; => true
Using deref
on a cancelled task blows up, predictably.
2.6.3. Failures
A task is said to have failed if its evaluation produced an exception or it produced an exception during its execution. Such a task is a cancelled task (see Cancellation), or any task that produces an exception when `deref’d:
(def oops (task/run (throw (RuntimeException. "hey!"))))
@oops
; RuntimeException hey! task.core/fn--17494 (form-init7142405608168193525.clj:182)
[[failed? ]]will tell you if that task has failed:
(task/failed? oops) ; => true
(def failed-task (task/failed (RuntimeException. "argf")))
(task/failure failed-task) ; => RuntimeException[:cause "argf"]
To force a task to fail, like [[force!]], use [[fail!]]:
(def foo (task/now "hi there"))
(task/fail! foo (IllegalStateException. "poop"))
(task/failed? foo) ; => true
(task/failure foo) ; => IllegalStateException[:cause "poop"]
Chaining a failed task to a normal task will cause the resulting task to fail.
2.7. Error handling
To recover from errors, use recover
:
(def boom (task/run (/ 1 0)))
(def incremented (task/then inc boom))
This will blow up, so we can ensure that the resulting operation succeeds:
@(recover incremented
(fn [ex]
(println "caught exception: " (.getMessage ex))
123))
; caught exception: java.lang.ArithmeticException: Divide by zero
; => 123
So you can recover from potential failures using a backup value from the function. Supplying a non-function will just recover with that value:
@(recover boom "hello") ; => "hello"
3. Advanced topics
3.1. Execution model
In the task model, the execution model is intentionally kept opaque, but not hidden. This means that by default, the user doesn’t need to worry about where — in which thread — code is executed.
What the user needs to know, to get started, is:
-
all code is asynchronous and non-blocking, and
-
deref
or@
will block the current thread.
The concurrency model is left to the executor.
A JVM executor is an abstraction for thread execution. This can be a thread pool with 64 threads per core. Or more. Or less. By default, task uses ForkJoinPool, a work-stealing scheduler, that attempts to find the best use for each of its threads.
The reason to default to the ForkJoinPool, in addition for it being perfomant, is that it kind of makes tasks appear lightweight (a bit like, but not similar to, fibers), as the user does not intentionally think that there is a one-to-one correspondence between each task and a thread. This means that you can have several hundred tasks executing concurrently for only a handful of threads! The ForkJoinPool handles the heavy lifting here: some tasks will complete faster, some slower, so the point is to have this work-stealing algorithm that makes use of idle threads.
It is often the case that such behavior is not desirable. Which is why the executor can be overridden via two methods:
-
implicitly by using dynamic binding, or
-
explicitly by using parameters to task-executing functions.
Dynamic binding is a standard Clojure method for doing dynamic scope. This is how it works.
(def ^:dynamic foo "hello")
(println foo) ; prints "hello"
(binding [foo "hi"]
(println foo)) ; prints "hi"
Using this technique lets us freely swap the executor. The var for this
is *pool*
. See the example in Using custom executors.
The JVM offers several you can freely chose from. This document is not a guide on their details, the user is left to explore that page or the Internet for more examples.
Usually, in 99% of cases, the default ForkJoinPool
executor is just
fine. But the option to customize is there.
3.2. Beware of blocking
To get the value out of a task, you deref it. This blocks the executing thread. It is generally safe to do so, but choosing the wrong kind of executor might yield strange behaviour.
For example, when using a fixed thread pool like this, it might not be obvious that once you run out of threads, the whole program will grind to a halt.
Blocking inside a task will block that task. So if you have a special kind of thread pool that may run out of threads, this is the part where we’d say, here be dragons.
Here is an example that will go awry.
(binding [task/*pool* (Executors/newFixedThreadPool 1)]
(let [nothing (task/void)]
(task/compose
(fn [prev]
(println "This will never complete!")
(+ 1 prev))
(task/run
(+ 3 @nothing))))))
The function that we pass to compose tries to deref nothing
, which is a task that will never
complete (see void
). The executor we’ve defined has only one thread in its thread pool,
so it cannot launch a new thread for the surrounding task resulting from the call to compose
.
How is this relevant? This is particularly nasty if you do it inside a HTTP server, e.g. you use the same thread pool for handling incoming web requests and database access. If your database hangs and takes time to answer the questions, the web server, which shares the thread pool, cannot spawn new threads to handle incoming requests and the web server will appear unresponsive!