task task License Apache%202.0 blue

Welcome to the official Task user guide!

1. Overview

This document is a reference documentation for Task, a Clojure library for asynchronous computation.

1.1. What is Task?

Tasks provide simple and functional concurrency primitives for Clojure. Tasks represent asynchronous computation as pure values, and favours function composition over callbacks or promises.

Task promotes the use of combinators like then and compose. These let you combine asynchronous data functionally, so that asynchronous steps are just transformations on a value.

1.2. Key features

  • Value-oriented: tasks are just eventual values. No more callbacks, regular deref/@ is all you need.

  • Functional: tasks are composable. Tasks come with a set of operations that lets you compose and combine them in a functional manner.

  • Interoperable: Any Clojure future/promise can be converted into a task, and vice versa.

  • Asynchronous: tasks are always asynchronous. Tasks default to the ForkJoinPool executor.

  • Customizable: If you wish to control the execution model, you can define your own ExecutorService.

  • Performant: The library leverages the standard Java 8 Concurrency API, a powerful framework for concurrent computation.

1.3. An example

For example, a potential use case for asynchronous computation is doing HTTP requests. If we want to operate on the value of a GET request, and apply a data transformation, this is what you need to do.

(def request
  (task/then
   (fn [data] (-> data                                        (2)
                  :body                                       (3)
                  (cheshire/parse-string true)                (3)
                  :title                                      (3)
                  str/upper-case))
   (http/get "http://jsonplaceholder.typicode.com/posts/3"))) (1)

(println @request)                                            (4)
  1. We launch a HTTP GET request asynchronously using http-kit. The promise returned by it is converted into a task automatically.

  2. We pass the task and a function to then, a function described in Composing tasks. The function comes first and then comes the task.

  3. We parse the JSON data using Cheshire, extract an attribute and uppercase it.

  4. We deref the value of hte task which blocks the current thread until the task is complete.

This whole value executes in another thread (depending on the execution model) and eventually resolves to a value as it is deref d.

The snippet uses http-kit to launch a GET request asynchronously (<1>). The http-kit promise is automatically converted into a task. Once the promise completes, we parse the string into JSON (cheshire/parse-string) and then extract our attribute from it, and then uppercase the results.

All of this is executed asynchronously, so we wrap the call to then with a call to @, which calls deref, a Clojure standard library function, to await its result in the current thread. This blocks the execution of the current thread an and returns the result of the computation.

Head over to the User Guide to learn more!

1.4. Installation & Requirements

You need Java 8 to use Task, because Task uses features that were introduced in that version.

2. User Guide

2.1. Creating tasks

To create a task, use . It evaluates its result asynchronously. It behaves more or less like do except that it, depending on the execution model, may evaluate its result in another thread.

(task/run (println "hello")
          (Thread/sleep 1000)
          123)

This task evaluates to 123. We can get it value by deref ing it.

To get the value of the task, use deref or @ from the Clojure standard library. This blocks the current thread.

; these are both equal
@(task/run 123) ; => 123

(deref (task/run 123)) ; => 123

See the docs on deref.

Calling (run 123) results possibly in the creation of another thread. To create an "immeadiate" value that doesn’t cause any unwanted execution, use now:

@(task/now 123)

To create an empty task you can use (void) which is a task that never completes.

If you want to use another executor, you have two options. You can use run-in which accepts an explicit Executor parameter.

(let [pool (Executors/newFixedThreadPool 16)]
  (task/run-in pool
    (Thread/sleep 1000)
    123)

A more Clojurelike approach is to use dynamic binding. The namespace variable task/*pool* is the default ExecutorService in scope. Rebinding *pool* to the executor of your choosing is done as follows:

(binding [task/*pool* (Executors/newFixedThreadPool 1)]
  (task/run (Thread/sleep 1000)
            (println "hello!")))

This approach, dynamic binding, lets you do executor assignment in a much more idiomatic fashion. See also Using custom executors and the Execution model for more information about executors.

2.2. Composing tasks

2.2.1. Function application: then and compose

If you want to apply a function on the value returned by a task, use then. then accepts a function and a task, produces another task, to get its result, you need to deref it:

@(task/then clojure.string/upper-case (task/run "asdf"))
; => ASDF

@(task/then inc (task/run 123))
; => 124

then supports multiple arity. If you pass more than one task you get a task that returns a vector of its results.

@(task/then inc (task/run 2) (task/now 5))
; => [3 6]

If you want to apply a function that produces another task, use compose:

@(task/compose (fn [x] (task/run
                         (Thread/sleep 1212)
                         (inc x)))
               (task/run 778))

If you had used then the result would have been a task inside another task, requiring a double deref.

Like then, compose also supports multiple arity. Pass more than one task, you get a vector of the function applied to every task. The vector task completes when all tasks complete.

@(task/compose (fn [y] (task/run (inc y))) (task/now 9) (task/run 10))
; => [10 11]

2.3. Interoperability

Standard Clojure future and promise are compatible with tasks:

@(task/then inc (future 9)) ; => 10

@(task/then inc (future 3) (task/now 4))) ; => [4 5]

(def foo (promise))

(task/then println foo)

(deliver foo "hello")

; prints "hello"

2.4. Using custom executors

By default, the *pool* variable is bound to the default executor known as ForkJoinPool. If you want to use your own executor, there are two approaches:

  1. Use dynamic binding to set *pool* to the executor of your choice.

  2. Use the explicit variants of the combinators which accept an executor as the third parameter. These are:

2.4.1. Using dynamic binding

Rebind *pool* like this:

(binding [task/*pool* (Executors/newCachedThreadPool)]
  (task/run (println "Hello!")
            1)))

In the scope, and subsequent calls to, the core functions, the default executing pool will be the binding.

2.4.2. Using explicit executors

All core combinators like then and compose accept a third parameter as the ExecutorService in their explicit variants:

(let [pool (Executors/newFixedThreadPool 4)]
  (task/then-in inc (task/run-in 123 pool) pool))

This approach can be used to define multiple thread pools and use them in the chain of a task execution, spreading execution across multiple executors.

(let [pool1 (Executors/newFixedThreadPool 4)
      pool2 (Executors/newCachedThreadPool)
      init (task/run-in pool1
                        (Thread/sleep 1000)
                        (println "we start in" (.getName (Thread/currentThread)))
                        'blaa)]
  (task/then-in
    pool2
    (fn [x]
      (println "...but we end in" (.getName (Thread/currentThread)))) init))
; prints:
; we start in pool-8-thread-1
; ...but we end in pool-9-thread-1

2.4.3. Using for to avoid boilerplate

The macro, like its standard library namesake, lets you skip some boilerplate when working with tasks that depend on each other:

(task/for [x (task/run 123)
           y (future 123)
           z (task/run (Thread/sleep 1000) 4)]
          (+ x y z))

for evaluates its body once all futures are complete, and the values of each future are bound to their respective bindings.

2.5. Working with sequences

If you have a sequence of tasks, and you want to deal with each result as a whole, you can turn a sequence of tasks into a task that evaluates into the values of each task inside it using :

(task/sequence [(task/run 123) (future 9) (task/run (Thread/sleep 100) "hello")])
; => [123 9 "hello]

sequence completes when all the tasks complete.

Similarly, if you have a funtion that produces a task, you can apply it to a sequence and gather its results into inside one task.

(task/traverse
  [1 2 3 4 5 6]
  (fn [x] (task/run (* 2 x))))
; => [2 4 6 8 10 12]

Using regular map would have just produced a sequence of tasks, not what we want.

2.6. Completion

To check if a task is complete, use done?:

(task/done? some-task)

(task/done? (task/run (Thread/sleep 123123) 'foo))
; => false

(task/done? (task/now 1))

To complete a task before it is done, use [[complete!]]:

(def baz (task/run (Thread/sleep 10000) 'foo))

(task/complete! baz 'bar)

@baz ; => 'bar

If the task is already complete, it does nothing.

To get a value anyway if the task isn’t complete, use :

(else (task/run (Thread/sleep 1000) 1) 2)

; => 2

2.6.1. Forcing a result

To force the result of a task, completed or not, use [[force!]]:

(def t (task/now 123))

(task/force! t 'hi)

@t ; => 'hi

2.6.2. Cancellation

To cancel a task, use :

(def my-task (task/run (Thread/sleep 10000) 'bla))

(task/cancel my-task)

To see if the task was cancelled, use [[cancelled?]]:

(cancelled? my-task) ; => true

Using deref on a cancelled task blows up, predictably.

2.6.3. Failures

A task is said to have failed if its evaluation produced an exception or it produced an exception during its execution. Such a task is a cancelled task (see Cancellation), or any task that produces an exception when `deref’d:

(def oops (task/run (throw (RuntimeException. "hey!"))))

@oops

; RuntimeException hey!  task.core/fn--17494 (form-init7142405608168193525.clj:182)

[[failed? ]]will tell you if that task has failed:

(task/failed? oops) ; => true

To create a failed task with some exception, use :

(def failed-task (task/failed (RuntimeException. "argf")))

To get the exception that caused the failure, use :

(task/failure failed-task) ; => RuntimeException[:cause "argf"]

To force a task to fail, like [[force!]], use [[fail!]]:

(def foo (task/now "hi there"))

(task/fail! foo (IllegalStateException. "poop"))

(task/failed? foo) ; => true

(task/failure foo) ; => IllegalStateException[:cause "poop"]

Chaining a failed task to a normal task will cause the resulting task to fail.

2.7. Error handling

To recover from errors, use recover:

(def boom (task/run (/ 1 0)))

(def incremented (task/then inc boom))

This will blow up, so we can ensure that the resulting operation succeeds:

@(recover incremented
          (fn [ex]
            (println "caught exception: " (.getMessage ex))
            123))

; caught exception: java.lang.ArithmeticException: Divide by zero
; => 123

So you can recover from potential failures using a backup value from the function. Supplying a non-function will just recover with that value:

@(recover boom "hello") ; => "hello"

3. Advanced topics

3.1. Execution model

In the task model, the execution model is intentionally kept opaque, but not hidden. This means that by default, the user doesn’t need to worry about where — in which thread — code is executed.

What the user needs to know, to get started, is:

  • all code is asynchronous and non-blocking, and

  • deref or @ will block the current thread.

The concurrency model is left to the executor.

A JVM executor is an abstraction for thread execution. This can be a thread pool with 64 threads per core. Or more. Or less. By default, task uses ForkJoinPool, a work-stealing scheduler, that attempts to find the best use for each of its threads.

The reason to default to the ForkJoinPool, in addition for it being perfomant, is that it kind of makes tasks appear lightweight (a bit like, but not similar to, fibers), as the user does not intentionally think that there is a one-to-one correspondence between each task and a thread. This means that you can have several hundred tasks executing concurrently for only a handful of threads! The ForkJoinPool handles the heavy lifting here: some tasks will complete faster, some slower, so the point is to have this work-stealing algorithm that makes use of idle threads.

It is often the case that such behavior is not desirable. Which is why the executor can be overridden via two methods:

  • implicitly by using dynamic binding, or

  • explicitly by using parameters to task-executing functions.

Dynamic binding is a standard Clojure method for doing dynamic scope. This is how it works.

(def ^:dynamic foo "hello")

(println foo) ; prints "hello"

(binding [foo "hi"]
  (println foo)) ; prints "hi"

Using this technique lets us freely swap the executor. The var for this is *pool*. See the example in Using custom executors.

The JVM offers several you can freely chose from. This document is not a guide on their details, the user is left to explore that page or the Internet for more examples.

Usually, in 99% of cases, the default ForkJoinPool executor is just fine. But the option to customize is there.

3.2. Beware of blocking

To get the value out of a task, you deref it. This blocks the executing thread. It is generally safe to do so, but choosing the wrong kind of executor might yield strange behaviour.

For example, when using a fixed thread pool like this, it might not be obvious that once you run out of threads, the whole program will grind to a halt.

Blocking inside a task will block that task. So if you have a special kind of thread pool that may run out of threads, this is the part where we’d say, here be dragons.

Here is an example that will go awry.

(binding [task/*pool* (Executors/newFixedThreadPool 1)]
  (let [nothing (task/void)]
    (task/compose
      (fn [prev]
        (println "This will never complete!")
        (+ 1 prev))
      (task/run
        (+ 3 @nothing))))))

The function that we pass to compose tries to deref nothing, which is a task that will never complete (see void). The executor we’ve defined has only one thread in its thread pool, so it cannot launch a new thread for the surrounding task resulting from the call to compose.

How is this relevant? This is particularly nasty if you do it inside a HTTP server, e.g. you use the same thread pool for handling incoming web requests and database access. If your database hangs and takes time to answer the questions, the web server, which shares the thread pool, cannot spawn new threads to handle incoming requests and the web server will appear unresponsive!