- "continuous" events (what reactive-banana calls Behaviors)
- broadcasts
Behaviors
One of the major deficiencies of the pipes-concurrency-1.0 API was the requirement that every event was handled by downstream listeners. This is the sort of interaction that reactive-banana would call an Event where there is a one-to-one correspondence between production of events and consumption of events.
However, this model breaks down when you have two things updating at a different rate. For example, if you have a mouse and a monitor you don't want either one tied to the other's update cycle. You can't have the monitor respond to every mouse event because it cannot refresh fast enough. Similarly, you cannot have the mouse's event loop track the monitor's refresh rate because then you will develop an ever-increasing backlog of events.
pipes-concurrency solves this by providing a new option for the spawn command: the Latest constructor. This replaces the internal buffer with a single value that simply tracks the "latest" value received from upstream, discarding old values immediately. This completely decouples the input and output ends which no longer need to match each others' pace:
main = do (input, output) <- spawn (Latest 0) -- Update at any rate a1 <- async $ do runProxy $ events >-> sendD input performGC -- Poll the latest value a2 <- async $ do runProxy $ recvS output >-> handler performGC mapM_ wait [a1, a2]The most fascinating part of this addition is how unintrusive it is. This only required adding 7 lines of code (1 for the new constructor, and 6 for the additional case branch for spawn), yet all of the advanced features of the library (like termination detection and deadlock safety) still work correctly with this new Latest constructor.
This will most likely interest people interested in functional reactive programming (FRP), because it flies in the face of conventional thinking where Events and Behaviors are typically treated very differently. pipes-concurrency suggests that these two concepts may actually be more similar than people suspect when viewed through the appropriate lens.
What differentiates pipes-concurrency from typical reactive programming abstractions is that it does not reify the streams as the central component for non-deterministic concurrency. In fact, the core machinery of pipes-concurrency is entirely pipes-independent, with the pipes utilities just forming a convenient interface. Rather, the central abstraction is the mailbox and how it coordinates inbound messages with outbound messages. When you view reactive programming through the lens of mailboxes instead of streaming processes then Behaviors and Events differ only very slightly in their implementations and they both support the same interface.
Broadcasts
Broadcasts are a neat little feature that arose out of a discussion with Florian Hofmann. Now Inputs are Monoids and if you want to broadcast to multiple mailboxes, just mconcat their Inputs and send messages to the combined Input:
import Control.Monad import Control.Concurrent.Async import Control.Proxy import Control.Proxy.Concurrent import Data.Monoid main = do -- Subscribers (inputs, outputs) <- fmap unzip $ replicateM 10 $ spawn Unbounded -- Broadcast to all subscribers a <- async $ do runProxy $ stdinS >-> sendD (mconcat inputs) performGC -- Receive broadcasted messages as <- forM outputs $ \o -> async $ do runProxy $ recvS o >-> stdoutD performGC mapM_ wait (a:as)This shows how handy and simple-to-use the Monoid class is.
There are also new type class instances for the Output type as well, such as Monad and Alternative! This is a great example of how the mailboxes make a more useful and theoretically interesting interface for concurrency than the streams.
Conclusions
This update adds some cool new features, but is very unlikely to break code. The main backwards-incompatible change was renaming the Size type to Buffer (since Size does not make sense for Latest), but other than that most existing code should work without any modifications.
Very cool! I am porting some code I wrote with the Java Esper CEP library to process financial market data, and it looks like pipes-concurrency will be a great solution.
ReplyDeleteAwesome! I'm glad it helps.
Delete