Concurrency with Cats

And other difficult questions

Jakub Kozłowski - Scala Developer, Ocado Technology December 6, 2018 | Wrocław, Szewska 8, 5. floor, Church Room

Slides: git.io/fp1Ag

Agenda

  1. Applicative, Monad, Parallel
  2. Parallel + IO
  3. Race conditions, cancelation
  4. Forking, fibers
  5. What thread am I running on?

Typeclass recap: Functor

Applicative

But wait, there's more!

Why?

Some types don't have `pure`

Apply doesn't have to be sequential

What's the order of operations here?

Could be anything

Some applicatives support parallel composition

Some applicatives are Monads!!!

Inherently sequential

Real monads

What's the order of operations here?

Sequential (left to right)

How to enforce parallel execution?

We'll get to that

Parallel applicatives can't have FlatMap

Applicatives that don't have flatMap

  • Validated
  • ZipList
  • cats.effect.IO.Par
  • ...

How to enforce parallel execution?

cats.Parallel

/**
 * Some types that form a FlatMap,
 * are also capable of forming an Apply
 * that supports parallel composition.
 */

What is Parallel[M, F]?

  • A pair of Monad[M], Applicative[F]
  • A bi-directional mapping between them

How it's used

How it works

How Parallel.parProduct works

  1. Convert both sequential M[_] to parallel F[_]
  2. Apply product using the Apply[F]
  3. Convert back to M[_]

Tagless style?

Two type parameters :(

cats-par

github.com/ChristopherDavenport/cats-par
"io.chrisdavenport" %% "cats-par" % "0.2.0"
import cats.temp.par._

Tagless style!

IO + Parallel

IO has an instance: Parallel[IO, IO.Par]

Requires implicit cats.effect.ContextShift[IO]

ContextShift[F]

Purely functional Executor

Getting an instance

Usage

Handling concurrency with cats-effect

  • Cancelation
  • Races
  • Forking, fibers
  • What thread am I on?
  • What ExecutionContext to use?

Cancelation

Happens only after asynchronous boundaries*, which can be achieved by:

  • Building an asynchronous IO with IO.cancelable, IO.async, IO.bracket (!)
  • Using IO.cancelBoundary or IO.shift

* - plus runtime-inserted cancel boundaries every n steps in the run loop (as of 1.1.0)

Races

Manual logical fork - start

Fiber - pure "green thread"

What thread am I on?

Unless you're creating async boundaries, the same one all the time

ContextShift/Async/Timer/start/parMapN insert async boundaries

What ExecutionContext to use?

Source

Links

Slides: https://git.io/fp1Ag

Code: https://git.io/fp01Ao

Thank you!

Get in touch

@kubukoz | kubukoz@gmail.com