Sunday, April 14, 2013

pipes-concurrency-1.0.0: Reactive programming

This post introduces the pipes-concurrency library (which you can find here), which is the renamed pipes-stm library that I previously promised. I ended up completing this much sooner than I anticipated, which is why it precedes the upcoming pipes-parse package.

Begin with the tutorial if you want to learn how to use the library. This post mainly highlights some features and compares the pipes-concurrency approach to other libraries. Also, I'll provide some bonus examples that are not in the tutorial.

Before I continue, I want to credit Eric Jones, who first began the library as pipes-stm on Github. Unfortunately, I lost all contact with him and he didn't include a LICENSE in his repository, so I had to rebuild the library from scratch because I wasn't sure if a fork would constitute copyright infringement. If he reads this and gets in touch with me and approves of the BSD license then I will add him to the LICENSE and also add him as a library author.

Reactive programming

Several people are beginning to realize that streaming libraries overlap substantially with reactive programming frameworks. pipes-concurrency provides the basic building blocks necessary to build reactive programming abstractions.

For example, let's say that I want to simulate reactive-banana's Events using pipes-concurrency:
{-# LANGUAGE RankNTypes #-}

import Control.Proxy

-- `pipes` does not need the `t` parameter from reactive-banana
type Event a = forall p . (Proxy p) => () -> Producer p a IO ()
If you want to take the union of two asynchronous streams, you spawn a mailbox, merge two streams into it using sendD, and then read out the results using recvS:
import Control.Monad
import Control.Concurrent.Async
import Control.Proxy.Concurrent

union :: Event a -> Event a -> Event a
union e1 e2 () = runIdentityP $ do
    (input, output) <- lift $ spawn Unbounded
    as <- lift $ forM [e1, e2] $ \e ->
        async $ do runProxy $ e >-> sendD input
    recvS output ()
    lift $ mapM_ wait as
Now we can define two event sources:
clock :: Event String
clock = fromListS (cycle ["Tick", "Tock"])
    >-> execD (threadDelay 1000000)

user :: Event String
user = stdinS
... and cleanly merge them:
main = runProxy $ union clock user
              >-> takeWhileD (/= "quit")
              >-> stdoutD
$ ./union
People often tout spreadsheets as the classic example of functional-reactive programming, so why not simulate that, too? Well, a spreadsheet cell is just a non-empty stream of values:
{-# LANGUAGE PolymorphicComponents #-}

import Control.Proxy

data Cell a = Cell
    { initial :: a
    , stream  :: forall p . (Proxy p) => () -> Producer p a IO ()

runCell :: (Proxy p) => Cell a -> () -> Producer p a IO ()
runCell (Cell a ga) () = runIdentityP $ do
    respond a
    ga ()
Each value in the stream represents an update to the cell's contents, either by the user:
input :: Cell String
input = Cell "" stdinS
... or some data source:
time :: Cell Int
time = Cell 0 $ \() -> evalStateP 1 $ forever $ do
    n <- get
    respond n
    lift $ threadDelay 1000000
    put (n + 1)
Spreadsheet cells are both Functors and Applicatives:
instance Functor Cell where
    fmap f (Cell x gx) = Cell (f x) (gx >-> mapD f)

instance Applicative Cell where
    pure a = Cell a (runIdentityK return)

    (Cell f0 gf) <*> (Cell x0 gx)
        = Cell (f0 x0) $ \() -> runIdentityP $ do
            (input, output) <- lift $ spawn Unbounded
            lift $ do
                a1 <- async $ runProxy $
                    gf >-> mapD Left  >-> sendD input
                a2 <- async $ runProxy $
                    gx >-> mapD Right >-> sendD input
                link2 a1 a2
                link a1
            (recvS output >-> handler) ()
        handler () = evalStateP (f0, x0) $ forever $ do
            e <- request ()
            (f, x) <- get
            case e of
                Left  f' -> do
                    put (f', x)
                    respond (f' x)
                Right x' -> do
                    put (f, x')
                    respond (f x')
... so we can use Applicative style to combine spreadsheet cells, which will only update when their dependencies update:
both :: Cell (Int, String)
both = (,) <$> time <*> input

main = runProxy $ runCell both >-> printD
$ ./cell

Simple API

pipes-concurrency has a really, really, really simple API, and the three key functions are:
spawn :: Size -> IO (Input a, Output a)

send :: Input a -> a -> STM Bool

recv :: Output a -> STM (Maybe a)
The spawn function creates a FIFO channel, send adds messages to the channel, and recv takes messages off the channel. That's it! The rest of the library are the following two higher-level pipes that build on those two functions to stream messages into and out of the channel:
sendD :: Proxy p => Input a -> x -> p x a x a IO ()

recvS :: Proxy p => Output a -> () -> Producer p a IO ()
The library only has five functions total, making it very easy to learn.

Deadlock safety

What distinguishes this abstraction from traditional STM channels is that send and recv hook into the garbage collection system to automatically detect and avoid deadlocks. If they detect a deadlock they just terminate cleanly instead.

Surprisingly, this works so well that it even correctly handles crazy scenarios like cyclic graphs. For example, the run-time system magically ties the knot in the following example and both pipelines successfully terminate and get garbage collected:
import Control.Concurrent.Async
import Control.Proxy
import Control.Proxy.Concurrent

main = do
    (in1, out1) <- spawn Unbounded
    (in2, out2) <- spawn Unbounded
    a1 <- async $ do runProxy $ recvS out1 >-> sendD in2
    a2 <- async $ do runProxy $ recvS out2 >-> sendD in1
    mapM_ wait [a1, a2]
I don't even know why the above example works, to be completely honest. I really only designed pipes-concurrency to avoid deadlocks for acyclic graphs and the above was just a really nice emergent behavior that fell out of the implementation. I think this is an excellent tribute to how amazing ghc is, and I want to give a big shout-out to all the people who contribute to it.

I call this "semi-automatic" reference management because you must still call the garbage collector manually after you stop using each reference, otherwise you cannot guarantee promptly releasing the reference. However, even if you forget to do this, all that happens is that it just delays stream termination until the next garbage collection cycle.


I designed the API so that if any other streaming library wants to use it I can cleanly separate out the pipes-agnostic part, consisting of spawn, send, and recv. If you want to build on this neat deadlock-free abstraction with, say, conduit or io-streams, then just let me know and I will factor those functions out into their own library.


People might wonder how pipes-concurrency compares to the stm-conduit and io-streams approaches for managing concurrency. Before I continue I just want to point out that I contributed some of the concurrency code to io-streams, so I am at fault for some of its current weaknesses. One of the reasons I made the pipes-concurrency functionality severable was so that io-streams could optionally merge in this same feature to fix some of the concurrency issues that I couldn't resolve the first time around.

pipes-concurrency does several things that are very unique in the streaming concurrency space, including:
  • Dynamic communication graphs with semi-automatic reference management
  • Correctly handling multiple readers and writers on the same resource
  • Deadlock safety (as mentioned above)
  • Exception safety (by virtue of deadlock safety)


There are still more features that I haven't even mentioned, so I highly recommend you read the tutorial to learn other cool tricks you can do with the library.

For people following the pipes ecosystem, the next library coming up is pipes-parse which is getting very close to completion, although the version currently on Github is stale and doesn't reflect the current state of the project. Expect to see some very cool and unique features when it comes out, which should be within the next two weeks.


  1. I see this pattern is used regularly, why it isn't wrapped in a some handy shortcut?

    > run pipe = async (runProxy pipe >> performGC)

    1. There are two main reasons:

      * To avoid an `async` dependency

      * There may be more run functions than just `runProxy` (i.e. `runEitherK`, `evalStateK`), especially if using `pipes-safe`.