Thursday, June 6, 2013

pipes-concurrency-1.2.0: Behaviors and broadcasts

pipes-concurrency-1.2.0 adds two cool new features:
  • "continuous" events (what reactive-banana calls Behaviors)
  • broadcasts
Both of them are very interesting both practically and theoretically.


One of the major deficiencies of the pipes-concurrency-1.0 API was the requirement that every event was handled by downstream listeners. This is the sort of interaction that reactive-banana would call an Event where there is a one-to-one correspondence between production of events and consumption of events.

However, this model breaks down when you have two things updating at a different rate. For example, if you have a mouse and a monitor you don't want either one tied to the other's update cycle. You can't have the monitor respond to every mouse event because it cannot refresh fast enough. Similarly, you cannot have the mouse's event loop track the monitor's refresh rate because then you will develop an ever-increasing backlog of events.

pipes-concurrency solves this by providing a new option for the spawn command: the Latest constructor. This replaces the internal buffer with a single value that simply tracks the "latest" value received from upstream, discarding old values immediately. This completely decouples the input and output ends which no longer need to match each others' pace:
main = do
    (input, output) <- spawn (Latest 0)
    -- Update at any rate
    a1 <- async $ do runProxy $ events >-> sendD input
    -- Poll the latest value
    a2 <- async $ do runProxy $ recvS output >-> handler
    mapM_ wait [a1, a2]
The most fascinating part of this addition is how unintrusive it is. This only required adding 7 lines of code (1 for the new constructor, and 6 for the additional case branch for spawn), yet all of the advanced features of the library (like termination detection and deadlock safety) still work correctly with this new Latest constructor.

This will most likely interest people interested in functional reactive programming (FRP), because it flies in the face of conventional thinking where Events and Behaviors are typically treated very differently. pipes-concurrency suggests that these two concepts may actually be more similar than people suspect when viewed through the appropriate lens.

What differentiates pipes-concurrency from typical reactive programming abstractions is that it does not reify the streams as the central component for non-deterministic concurrency. In fact, the core machinery of pipes-concurrency is entirely pipes-independent, with the pipes utilities just forming a convenient interface. Rather, the central abstraction is the mailbox and how it coordinates inbound messages with outbound messages. When you view reactive programming through the lens of mailboxes instead of streaming processes then Behaviors and Events differ only very slightly in their implementations and they both support the same interface.


Broadcasts are a neat little feature that arose out of a discussion with Florian Hofmann. Now Inputs are Monoids and if you want to broadcast to multiple mailboxes, just mconcat their Inputs and send messages to the combined Input:
import Control.Monad
import Control.Concurrent.Async
import Control.Proxy
import Control.Proxy.Concurrent
import Data.Monoid

main = do
    -- Subscribers
    (inputs, outputs) <-
        fmap unzip $ replicateM 10 $ spawn Unbounded

    -- Broadcast to all subscribers
    a  <- async $ do runProxy $ stdinS >-> sendD (mconcat inputs)

    -- Receive broadcasted messages
    as <- forM outputs $ \o ->
          async $ do runProxy $ recvS o >-> stdoutD

    mapM_ wait (a:as)
This shows how handy and simple-to-use the Monoid class is.

There are also new type class instances for the Output type as well, such as Monad and Alternative! This is a great example of how the mailboxes make a more useful and theoretically interesting interface for concurrency than the streams.


This update adds some cool new features, but is very unlikely to break code. The main backwards-incompatible change was renaming the Size type to Buffer (since Size does not make sense for Latest), but other than that most existing code should work without any modifications.


  1. Very cool! I am porting some code I wrote with the Java Esper CEP library to process financial market data, and it looks like pipes-concurrency will be a great solution.