Sunday, April 21, 2013

pipes and io-streams

I was originally planning to release an extension library for pipes that would add an io-streams-like interface to pipes. However, this week I discovered a surprisingly elegant way to implement io-streams behavior (minus pushback) using the existing pipes API. In fact, this translation has been possible ever since pipes-2.4!

This post assumes that you are familiar with both pipes and io-streams. If not, then you can read the pipes tutorial or the io-streams tutorial, both of which I wrote!

Edit: I also just discovered that when you re-implement io-streams using pipes, then pipes ties io-streams in performance. See this reddit comment for code and benchmarks.


Conversions


I'll begin with the following generic correspondence between io-streams and pipes:
{-# LANGUAGE RankNTypes #-}

import Control.Proxy

type InputStream a
    = forall p . (Proxy p) => () -> Session p IO (Maybe a)

type OutputStream a
    = forall p . (Proxy p) => Maybe a -> Session p IO ()

makeInputStream :: IO (Maybe a) -> InputStream a
makeInputStream m () = runIdentityP $ lift m

makeOutputStream :: (Maybe a -> IO ()) -> OutputStream a
makeOutputStream f a = runIdentityP $ lift (f a)
For example:
stdin :: InputStream String
stdin = makeInputStream (fmap Just getLine)

stdout :: OutputStream String
stdout = makeOutputStream (maybe (return ()) putStrLn)
Notice how this does not convert the read and write actions to repetitive streams. Instead, you lift each action just once into the Proxy monad.

To read or write to these sources you just run the proxy monad:
import Prelude hiding (read)

read :: InputStream a -> IO (Maybe a)
read is = runProxy is

{- This requires `pipes` HEAD, which generalizes the type
   of `runProxyK.

   Alternatively, you can hack around the current definition
   using:

   write ma os = runProxy (\() -> os ma)
-}
write :: Maybe a -> OutputStream a -> IO ()
write ma os = runProxyK os ma
For example:
>>> read stdin
Test<Enter>
Just "Test"
>>> write (Just "Test") stdout
"Test"
Now consider the following io-streams transformation:
map :: (a -> b) -> InputStream a -> IO (InputStream b)
map f is = makeInputStream $ do
    ma <- read is
    return (fmap f ma)
Here's the naive translation to pipes (later I will show the more elegant version):
import Prelude hiding (map)

map :: (a -> b) -> InputStream a -> InputStream b
map f is () = runIdentityP $ do
    ma <- is ()
    return (fmap f ma) )
For example:
>>> read (map (++ "!") stdin)
Test<Enter>
Just "Test!"
The only real difference is that instead of using read is I just use is () directly.

So far I've used absolutely no pipes-specific features to implement any of this. I could have done this just as easily in any other library, including conduit.


Input streams


Now I will add a twist: we can reify transformations to be Consumers instead of functions of an InputStream.

For example, let's transform map to be a Consumer instead of a function:
map :: (Proxy p)
    => (a -> b)
    -> () -> Consumer p (Maybe a) IO (Maybe b)
map f () = runIdentityP $ do
    ma <- request ()
    return (fmap f ma)
I've only made two changes:
  • map is no longer a function of an InputStream
  • Instead of calling the input stream, I use request ()
Check out the final pipe type carefully (ignoring the Maybes):
() -> Consumer p a m b
An input stream transformation is a fold in disguise! These transformations fold 0 or more values from the old input stream to return a new value.

Notice how this differs from the conventional flow of information pipes. We're not converting a stream of as into a new stream of bs. Instead, we're folding a stream of as into a single return value of type b.

How do we even connect map to stdin, though? They don't have the right types for ordinary pipe composition:
stdin 
    :: (Proxy  p)
    => () -> Session  p                IO (Maybe String)

map length
    :: (Proxy p)
    => () -> Consumer p (Maybe String) IO (Maybe Int   )
We need some way to feed the return value of stdin into the upstream input of map. This means we need to call stdin once each time that map requests a new value and feed that into the request. How can we do that using the existing pipes API?

Fortunately, this is exactly the problem that the (\>\) composition operator solves:
>>> read (stdin \>\ map length)
Test<Enter>
Just 4
Woah, slow down! What just happened?


The "request" category


The definition of (\>\) is actually really simple:
(f \>\ g) replaces all of g's requests with f.
So when we wrote (stdin \>\ map length), we replaced the single request in map with stdin as if we had written it in by hand:
-- Before
map length () = runIdentityP $ do
    ma <- request ()
    return (fmap length ma)

-- After
(stdin \>\ map length) () = runIdentityP $ do
    ma <- stdin ()
    return (fmap length ma)
In fact, this behavior means that (\>\) and request form a category. (\>\) is the composition operation, request is the identity, and they satisfy the category laws:
-- Replacing each 'request' in 'f' with 'request' gives 'f'
request \>\ f = f

-- Replacing each 'request' in 'request' with 'f' gives 'f'
f \>\ request = f

-- Substitution of 'request's is associative
(f \>\ g) \>\ h = f \>\ (g \>\ h)
Let's take a closer look at the type of (\>\) to understand what is going on:
(\>\)
    :: (Monad m, ListT p)
    => (b' -> p a' a x' x m b)
    -> (c' -> p b' b x' x m c)
    -> (c' -> p a' a x' x m c)
We can illuminate this type by type-restricting the arguments. First, let's consider the case where the first argument is a Session and the second argument is a Consumer, and see what type the compiler infers for the result:
(\>\)
    :: (Monad m, ListT p)
    => (() -> Session  p   m b)
    -> (() -> Consumer p b m c)
    -> (() -> Session  p   m c)
The compiler infers that we must get back a readable Session, just like we wanted! This is precisely what happened when we composed stdin with map:
stdin
    :: () -> Session  p                IO (Maybe String)

map length
    :: () -> Consumer p (Maybe String) IO (Maybe Int   )

stdin \>\ map length
    :: () -> Session  p                IO (Maybe Int   )

read (stdin \>\ map) :: IO (Maybe String)
However, there's another way we can specialize the type of (\>\). We can instead restrict both arguments to be Consumers:
(\>\)
    :: (Monad m, ListT p)
    => (() -> Consumer p a m b)
    -> (() -> Consumer p b m c)
    -> (() -> Consumer p a m c)
The compiler infers that if we compose two folds, we must get a new fold!

This means that we can compose input stream transformations using the same (\>\) operator:
map length
    :: () -> Consumer p (Maybe String) IO (Maybe Int )

map even
    :: () -> Consumer p (Maybe Int   ) IO (Maybe Bool)

map length \>\ map even
    :: () -> Consumer p (Maybe String) IO (Maybe Bool)
Then we will use the exact same operator to connect our input stream:
stdin \>\ map length \>\ map even
    :: () -> Session p IO (Maybe Bool)

read (stdin \>\ map length \>\ map even)
    :: IO (Maybe Bool)
Let's try it!
>>> read (stdin \>\ map length \>\ map even)
Test<Enter>
Just True
Now let's make things more interesting to really drive the point home:
import Control.Monad

twoReqs
    :: (Proxy p) => () -> Consumer p (Maybe a) IO (Maybe [a])
twoReqs () = runIdentityP $ do
    mas <- replicateM 2 (request ())
    return (sequence mas)
>>> read (stdin \>\ twoReqs \>\ twoReqs) :: IO [[String]]
1<Enter>
2<Enter>
3<Enter>
4<Enter>
Just [["1","2"],["3","4"]]
>>> -- We can still read from raw 'stdin'
>>> read stdin
5<Enter>
Just 5
In other words, the "request" category is actually the category of InputStreams and pipes already has InputStream support today!


Output streams


Now let's do the exact same thing for OutputStreams! We'll begin with the filterOutput function from io-streams:
filterOutput
    :: (a -> Bool) -> OutputStream a -> IO (OutputStream a)
filterOutput pred os = makeOutputStream $ \ma -> case ma of
    Nothing            -> write ma os
    Just a | pred a    -> write ma os
           | otherwise -> return ()
The pipes equivalent is:
filterOutput
    :: (Proxy p)
    => (a -> Bool)
    -> Maybe a -> Producer p (Maybe a) IO ()
filterOutput pred ma = runIdentityP $ case ma of
    Nothing            -> respond ma
    Just a | pred a    -> respond ma
           | otherwise -> return ()
This time we get a new shape for our type signature:
a -> Producer p b m ()
An output stream transformation is an unfold in disguise! These transformations unfold each output destined for the old output stream into 0 or more outputs destined for the new output stream.

Let's hook this transformation up to stdout and start writing values to it ... oh wait. How do we hook it up? Well, let's look at the types of stdout and filterOutput:
filterOutput f
    :: (Proxy p) => Maybe a -> Producer p (Maybe a) IO ()

stdout
    :: (Proxy p) => Maybe a -> Session  p           IO ()
We need some way to feed the stream output of filterOutput into the argument of stdout. This means that we need to call stdout on each output of filterOutput. How can we do that using the existing pipes API?

Why, we use (/>/), the dual of (\>\)!
>>> write (Just "") (filterOutput (not . null) />/ stdout)
>>> write (Just "Test") (filterOutput (not . null) />/ stdout)
Test

The "respond" category


(/>/) also has a simple behavior:
(f />/ g) replaces all of f's responds with g.
This behavior means that (/>/) and respond form a category, too! (/>/) is the composition operation, respond is the identity, and they satisfy the category laws:
-- Replacing each 'respond' in 'f' with 'respond' gives 'f'
f />/ respond = f

-- Replacing each 'respond' in 'respond' with 'f' gives 'f'
respond />/ f = f

-- Substitution of 'respond's is associative
(f />/ g) />/ h = f />/ (g />/ h)
Let's learn more by reviewing the type of (/>/):
(/>/)
    :: (Monad m, ListT p)
    => (a -> p x' x b' b m a')
    -> (b -> p x' x c' c m b')
    -> (a -> p x' x c' c m a')
We'll specialize this type signature in two ways. First, we'll consider the case where the first argument is a Producer and the second argument is a Session and then see what type the compiler infers for the result:
(/>/)
    :: (Monad m, ListT p)
    => (a -> Producer p b m ())
    -> (b -> Session  p   m ())
    -> (a -> Session  p   m ())
The compiler infers that we must get back a writeable session, just like we wanted! This is precisely what happened when we composed filterOutput and stdout:
filterOutput (not . null)
    :: Maybe String -> Producer p (Maybe String) IO ()

stdout
    :: Maybe String -> Session  p                IO ()

filterOutput (not . null) />/ stdout
    :: Maybe String -> Session  p                IO ()

flip write (filterOutput (not . null) />/ stdout)
    :: Maybe String -> IO ()
However, there's another way we can specialize the type of (/>/). We can instead restrict both arguments to be Producers:
(/>/)
    :: (Monad m, ListT p)
    => (a -> Producer p b m ())
    -> (b -> Producer p c m ())
    -> (a -> Producer p c m ())
The compiler infers that if we compose two unfolds, we must get a new unfold!

This means that we can compose output stream transformations using (/>/). To show this, let's define the equivalent of io-stream's contramap:
contramap
    :: (Proxy p)
    => (a -> b)
    -> Maybe a -> Producer p (Maybe b) IO ()
contramap f ma = runIdentityP $ respond (fmap f ma)
We can connect contramap's using (/>/):
contramap even
    :: Maybe Int  -> Producer p (Maybe Bool  ) IO ()

contramap show
    :: Maybe Bool -> Producer p (Maybe String) IO ()

contramap even />/ contramap show
    :: Maybe Int  -> Producer p (Maybe String) IO ()
... then using the exact same operator we connect them to stdout:
contramap even />/ contramap show />/ stdout
    :: Maybe Int -> Session p IO ()

flip write (contramap even />/ contramap show />/ stdout)
    :: Maybe Int -> IO ()
Let's try it!
>>> write (Just 4) (contramap even />/ contramap show />/ stdout)
True
Now let's make things more interesting to really drive the point home:
hiBye
    :: (Proxy p)
    => Maybe String -> Producer p (Maybe String) IO ()
hiBye mstr = runIdentityP $ do
    respond $ fmap ("Hello, " ++) mstr
    respond $ fmap ("Goodbye, " ++) mstr
>>> write (Just "world") (hiBye />/ hiBye />/ stdout)
Hello, Hello, world
Goodbye, Hello, world
Hello, Goodbye, world
Goodbye, Goodbye, world
>>> -- We can still write to raw 'stdout'
>>> write (Just "Haskell") stdout
Haskell
In other words, the "respond" category is actually the category of OutputStreams and pipes already has OutputStream support today!


Unify input and output streams


We can do much more than just implement the io-streams API, though! We can generalize it in many more powerful ways than io-streams permits.

First off, I'm going to inline the InputStream and OutputStream types into the type signatures of read and write:
read
    :: (forall p . (Proxy p) => () -> Session p IO (Maybe a))
    -> IO (Maybe a)

write
    :: Maybe a
    -> (forall p . (Proxy p) => Maybe a -> Session p IO ())
    -> IO ()
Then I will generalize the types to not be Maybe-specific and flip the arguments for write
read
    :: (forall p . (Proxy p) => () -> Session p IO a)
    -> IO a

write
    :: (forall p . (Proxy p) => a -> Session p IO ())
    -> a -> IO ()
Now, I will generalize both read and write by adding bidirectionality to them. read will now accept an argument parametrizing its request and write can now optionally receive a result.
read
    :: (forall p . (Proxy p) => a -> Session p IO b)
    -> a -> IO b
read is = runProxyK is

write
    :: (forall p . (Proxy p) => a -> Session p IO b)
    -> a -> IO b
write is = runProxyK is
Well, would you look at that! They both have the same type and implementation! In other words, we can unify read and write into a single function:
once
    :: (forall p . (Proxy p) => a -> Session p IO b)
    -> a -> IO b
once is = runProxyK is
Also, once is really polymorphic over the base monad:
once
    :: (Monad m)
    => (forall p . (ListT p) => a -> Session p m b)
    -> a -> m b
once is = runProxyK is
Using this function, we can both read from InputStreams and write to OutputStreams:
>>> once (stdin \>\ map length) ()
Test<Enter>
Just 4
>>> once (contramap show />/ stdout) (Just 1)
1
Let's test the bidirectionality. I'll write a new stdin that accepts a count parameter telling how many lines to get. This time, there will be no Maybes:
stdinN :: (Proxy p) => Int -> Session p IO [String]
stdinN n = runIdentityP $ replicateM n $ lift getLine
First, briefly test it:
>>> once stdinN 3
Test
Apple
123
["Test","Apple","123"]
Similarly, let's define a transformation to automate supplying values to stdinN:
automate :: (Proxy p) => () -> Client p Int [String] IO ()
automate () = runIdentityP $ do
    lift $ putStrLn "First batch:"
    xs <- request 2
    lift $ print xs
    lift $ putStrLn "Second batch:"
    ys <- request 2
    lift $ print ys
Now compose!
>>> once (stdinN \>\ automate) ()
First batch:
1<Enter>
2<Enter>
["1","2"]
Second batch:
3<Enter>
4<Enter>
["3","4"]
>>> -- Go back to using 'stdinN' unfiltered
>>> once stdinN 1
5<Enter>
["5"]
We can similarly unify makeInputStream and makeOutputStream:
make :: (Monad m, Proxy p) => (q -> m r) -> q -> p a' a b' b m r
make f a = runIdentityP (lift (f a))
The only difference between an input stream and an output stream is whether it consumes or produces a value and bidirectional streams blur the line even further.


Purity


None of this machinery is specific to the IO monad. Everything I've introduced is polymorphic over the base monad! We only incur IO if any of our stages use IO. This departs greatly from io-streams, where you are trapped in the IO monad to do everything. However, I want to qualify that statement with a caveat: I expect that pipes-parse will require either IO or an ST base monad (your choice) to properly manage pushback like io-streams does. Even still, you only pay this price if you actually need it.

pipes improves on purity in two other ways. First, building input and output streams is pure when you use pipes:
-- pipes
makeInputStream  :: IO (Maybe a)       -> InputStream a
makeOutputStream :: (Maybe a -> IO ()) -> OutputStream a

-- io-streams
makeInputStream  :: IO (Maybe a)       -> IO (InputStream  a)
makeOutputStream :: (Maybe a -> IO ()) -> IO (OutputStream a)
Consequently, io-streams must use unsafePerformIO for its stdin:
stdin :: InputStream ByteString
stdin = unsafePerformIO $
    handleToInputStream IO.stdin >>= lockingInputStream
Second, composing and transforming input streams is pure when you use pipes:
>>> -- pipes
>>> read (stdin \>\ map length \>\ map even)
...
>>> -- io-streams
>>> is <- (map even <=< map length) stdin
>>> read is
...
... and if none of the stages use IO, then the entire operation is pure!


Extra interfaces


You can generalize the type signatures even further. Let's revisit the types of (\>\) and (/>/):
(\>\)
    :: (Monad m, ListT p)
    => (b' -> p a' a x' x m b)
    -> (c' -> p b' b x' x m c)
    -> (c' -> p a' a x' x m c)

(/>/)
    :: (Monad m, ListT p)
    => (a -> p x' x b' b m a')
    -> (b -> p x' x c' c m b')
    -> (a -> p x' x c' c m a')
So far we've been ignoring the x'/x interfaces entirely, treating them as closed interfaces. However, there's no reason we have to! If we open up this interface, then every processing stage that we connect can read and write to this streaming interface.

In a future post, I will show you how you can really elegantly use this trick to generalize Hutton-Meijer parsers to permit effects.


ListT


The people who read the pipes-3.2 announcement post know that the (\>\) and (/>/) composition operators correspond to (>=>) for the two pipes ListT monads.
This means that you can take any output stream transformation and assemble more sophisticated behaviors using do notation:
pairs
    :: (ListT p)
    => () -> Producer p (Maybe (String, String)) IO ()
pairs () = runRespondT $ do
    mstr1 <- RespondT $ hiBye (Just "world")
    mstr2 <- RespondT $ hiBye (Just "Haskell")
    return $ (,) <$> mstr1 <*> mstr2
These behave just like the list monad, non-deterministically selecting all possible paths:
>>> once (pairs />/ contramap show />/ stdout) ()
("Hello, world","Hello, Haskell")
("Hello, world","Goodbye, Haskell")
("Goodbye, world","Hello, Haskell")
("Goodbye, world","Goodbye, Haskell")
The exact same trick works for input stream transformations, too, but it's less useful because we don't send information upstream as frequently as downstream.


Conclusions


One of the common questions I often get was "How do I escape from the pipe monad?" and I wrote this post to demonstrate how to do so. You can use pipes to program exactly in the same style as io-streams. The only thing missing is a standard library of io-streams-like utilities.

So far, I have not touched on the issue of push-back, a feature which io-streams provides. pipes-parse will build on the ideas I've introduced in this post to provide an io-streams-style interface to parsing that includes push-back functionality. This will make it easy to dynamically add or remove processing stages throughout the parsing process, which is necessary for parsing HTTP. In fact, everything I've presented here fell out very naturally from my work on pipes-parse and I only later discovered that I had accidentally reinvented io-streams entirely within pipes.

So I will cautiously state that I believe pipes is "io-streams done right". Everything that Greg has been doing with io-streams matches quite nicely with the pre-existing "request" and "respond" pipes categories that I independently discovered within pipes. This indicates that Greg was really onto something and pipes provides the elegant theoretical foundation for his work.

Sunday, April 14, 2013

pipes-concurrency-1.0.0: Reactive programming

This post introduces the pipes-concurrency library (which you can find here), which is the renamed pipes-stm library that I previously promised. I ended up completing this much sooner than I anticipated, which is why it precedes the upcoming pipes-parse package.

Begin with the tutorial if you want to learn how to use the library. This post mainly highlights some features and compares the pipes-concurrency approach to other libraries. Also, I'll provide some bonus examples that are not in the tutorial.

Before I continue, I want to credit Eric Jones, who first began the library as pipes-stm on Github. Unfortunately, I lost all contact with him and he didn't include a LICENSE in his repository, so I had to rebuild the library from scratch because I wasn't sure if a fork would constitute copyright infringement. If he reads this and gets in touch with me and approves of the BSD license then I will add him to the LICENSE and also add him as a library author.

Reactive programming


Several people are beginning to realize that streaming libraries overlap substantially with reactive programming frameworks. pipes-concurrency provides the basic building blocks necessary to build reactive programming abstractions.

For example, let's say that I want to simulate reactive-banana's Events using pipes-concurrency:
{-# LANGUAGE RankNTypes #-}

import Control.Proxy

-- `pipes` does not need the `t` parameter from reactive-banana
type Event a = forall p . (Proxy p) => () -> Producer p a IO ()
If you want to take the union of two asynchronous streams, you spawn a mailbox, merge two streams into it using sendD, and then read out the results using recvS:
import Control.Monad
import Control.Concurrent.Async
import Control.Proxy.Concurrent

union :: Event a -> Event a -> Event a
union e1 e2 () = runIdentityP $ do
    (input, output) <- lift $ spawn Unbounded
    as <- lift $ forM [e1, e2] $ \e ->
        async $ do runProxy $ e >-> sendD input
                   performGC
    recvS output ()
    lift $ mapM_ wait as
Now we can define two event sources:
clock :: Event String
clock = fromListS (cycle ["Tick", "Tock"])
    >-> execD (threadDelay 1000000)

user :: Event String
user = stdinS
... and cleanly merge them:
main = runProxy $ union clock user
              >-> takeWhileD (/= "quit")
              >-> stdoutD
$ ./union
Tick
Tock
test<Enter>
test
Tick
Tock
quit<Enter>
$
People often tout spreadsheets as the classic example of functional-reactive programming, so why not simulate that, too? Well, a spreadsheet cell is just a non-empty stream of values:
{-# LANGUAGE PolymorphicComponents #-}

import Control.Proxy

data Cell a = Cell
    { initial :: a
    , stream  :: forall p . (Proxy p) => () -> Producer p a IO ()
    }

runCell :: (Proxy p) => Cell a -> () -> Producer p a IO ()
runCell (Cell a ga) () = runIdentityP $ do
    respond a
    ga ()
Each value in the stream represents an update to the cell's contents, either by the user:
input :: Cell String
input = Cell "" stdinS
... or some data source:
time :: Cell Int
time = Cell 0 $ \() -> evalStateP 1 $ forever $ do
    n <- get
    respond n
    lift $ threadDelay 1000000
    put (n + 1)
Spreadsheet cells are both Functors and Applicatives:
instance Functor Cell where
    fmap f (Cell x gx) = Cell (f x) (gx >-> mapD f)

instance Applicative Cell where
    pure a = Cell a (runIdentityK return)

    (Cell f0 gf) <*> (Cell x0 gx)
        = Cell (f0 x0) $ \() -> runIdentityP $ do
            (input, output) <- lift $ spawn Unbounded
            lift $ do
                a1 <- async $ runProxy $
                    gf >-> mapD Left  >-> sendD input
                a2 <- async $ runProxy $
                    gx >-> mapD Right >-> sendD input
                link2 a1 a2
                link a1
            (recvS output >-> handler) ()
      where
        handler () = evalStateP (f0, x0) $ forever $ do
            e <- request ()
            (f, x) <- get
            case e of
                Left  f' -> do
                    put (f', x)
                    respond (f' x)
                Right x' -> do
                    put (f, x')
                    respond (f x')
... so we can use Applicative style to combine spreadsheet cells, which will only update when their dependencies update:
both :: Cell (Int, String)
both = (,) <$> time <*> input

main = runProxy $ runCell both >-> printD
$ ./cell
(0,"")
(1,"")
(2,"")
test<Enter>
(2,"test")
(3,"test")
apple<Enter>
(3,"apple")
(4,"apple")
(5,"apple")
...

Simple API


pipes-concurrency has a really, really, really simple API, and the three key functions are:
spawn :: Size -> IO (Input a, Output a)

send :: Input a -> a -> STM Bool

recv :: Output a -> STM (Maybe a)
The spawn function creates a FIFO channel, send adds messages to the channel, and recv takes messages off the channel. That's it! The rest of the library are the following two higher-level pipes that build on those two functions to stream messages into and out of the channel:
sendD :: Proxy p => Input a -> x -> p x a x a IO ()

recvS :: Proxy p => Output a -> () -> Producer p a IO ()
The library only has five functions total, making it very easy to learn.


Deadlock safety


What distinguishes this abstraction from traditional STM channels is that send and recv hook into the garbage collection system to automatically detect and avoid deadlocks. If they detect a deadlock they just terminate cleanly instead.

Surprisingly, this works so well that it even correctly handles crazy scenarios like cyclic graphs. For example, the run-time system magically ties the knot in the following example and both pipelines successfully terminate and get garbage collected:
import Control.Concurrent.Async
import Control.Proxy
import Control.Proxy.Concurrent

main = do
    (in1, out1) <- spawn Unbounded
    (in2, out2) <- spawn Unbounded
    a1 <- async $ do runProxy $ recvS out1 >-> sendD in2
                     performGC
    a2 <- async $ do runProxy $ recvS out2 >-> sendD in1
                     performGC
    mapM_ wait [a1, a2]
I don't even know why the above example works, to be completely honest. I really only designed pipes-concurrency to avoid deadlocks for acyclic graphs and the above was just a really nice emergent behavior that fell out of the implementation. I think this is an excellent tribute to how amazing ghc is, and I want to give a big shout-out to all the people who contribute to it.

I call this "semi-automatic" reference management because you must still call the garbage collector manually after you stop using each reference, otherwise you cannot guarantee promptly releasing the reference. However, even if you forget to do this, all that happens is that it just delays stream termination until the next garbage collection cycle.

Severability


I designed the API so that if any other streaming library wants to use it I can cleanly separate out the pipes-agnostic part, consisting of spawn, send, and recv. If you want to build on this neat deadlock-free abstraction with, say, conduit or io-streams, then just let me know and I will factor those functions out into their own library.


Comparisons


People might wonder how pipes-concurrency compares to the stm-conduit and io-streams approaches for managing concurrency. Before I continue I just want to point out that I contributed some of the concurrency code to io-streams, so I am at fault for some of its current weaknesses. One of the reasons I made the pipes-concurrency functionality severable was so that io-streams could optionally merge in this same feature to fix some of the concurrency issues that I couldn't resolve the first time around.

pipes-concurrency does several things that are very unique in the streaming concurrency space, including:
  • Dynamic communication graphs with semi-automatic reference management
  • Correctly handling multiple readers and writers on the same resource
  • Deadlock safety (as mentioned above)
  • Exception safety (by virtue of deadlock safety)

Conclusion


There are still more features that I haven't even mentioned, so I highly recommend you read the tutorial to learn other cool tricks you can do with the library.

For people following the pipes ecosystem, the next library coming up is pipes-parse which is getting very close to completion, although the version currently on Github is stale and doesn't reflect the current state of the project. Expect to see some very cool and unique features when it comes out, which should be within the next two weeks.

Wednesday, April 10, 2013

Defaults

Many programs require default values of some sort and normally we would consider this aspect of programming "business logic" and not give it a second thought. However, mathematics provides some some surprising insight into the banal task of choosing appropriate defaults, so let's temporarily put on our theory hats and try to over-think this problem.


Patterns


Let's begin by trying to identify a common pattern unifying all default values. If I were to name a few types, most people would agree on the following default values:
  • Int: 0
  • Bool: False
  • [a]: [], the empty list
  • Text: "", the empty Text string (if you use OverloadedStrings)
Why not choose 5 as the default Int or "ridiculous" as the default Text string? What makes us gravitate towards choosing those particular values?

Well, we can discern a common pattern: all these default values seem to correspond to something "empty". But what does it really mean to be "empty"?

Well, for numbers it is obvious why we consider 0 empty. If you add 0 to any number n, you get back n, signifying that 0 must be empty:
0 + n = n
n + 0 = n
We can extend this same reasoning to the other values to justify why we consider them empty. For example, if you append "" to any Text string str, you get back str, signifying that "" added nothing:
"" `append` str = str
str `append` "" = str
Similarly, the empty list adds nothing when you concatenate it:
[] ++ xs = xs
xs ++ [] = xs
... and False does nothing when you (||) it:
False || b = b
b || False = b
The pattern is obvious: in every case we have some empty default value, which we will call mempty, and some combining function, which we will call mappend, that satisfy the following two equations:
mempty `mappend` x = x
x `mappend` mempty = x
This is a Monoid, and Haskell's Data.Monoid module defines mempty and mappend in the Monoid type class:
class Monoid m where
    mempty  :: m
    mappend :: m -> m -> m
... and also provides a convenient infix operator for mappend:
(<>) :: (Monoid a) => a -> a -> a
m1 <> m2 = m1 `mappend` m2

Emptiness


Not all types have a unique Monoid instance. For example, Bool has two separate Monoid instances and we must use the Any or All newtypes from Data.Monoid to distinguish which one we prefer.

The Any monoid corresponds to the one we chose above, where False is the empty value and (||) is the combining operation:
newtype Any = Any { getAny :: Bool }

instance Monoid Any where
    mempty = Any False
    (Any b1) `mappend` (Any b2) = Any (b1 || b2)
However, there is a dual monoid where True is the "empty" value and (&&) is the combining operation:
newtype All = All { getAll :: Bool }

instance Monoid All where
    mempty = All True
    (All b1) `mappend` (All b2) = All (b1 && b2)
Similarly, numbers have two separate Monoid instances, and we use the Sum or Product monoids to distinguish which one we prefer.

newtype Sum a = Sum { getSum :: a }

instance (Num a) => Monoid (Sum a) where
    mempty = Sum 0
    (Sum n1) `mappend` (Sum n2) = Sum (n1 + n2)


newtype Product a = Product { getProduct :: a }

instance (Num a) => Monoid (Sum a) where
    mempty = Product 1
    (Product n1) `mappend` (Product n2) = Product (n1 * n2)
Monoids teach a valuable lesson: there is no such thing as an intrinsically empty value. Values are only empty with respect to a specific combining operation. We can choose more exotic default values to be mempty, but if we must select an unusual mappend to justify their emptiness then that suggests that we chose the wrong defaults.

So the next time you find yourself needing a Default type class, chances are that you actually should use the Monoid type class instead. The Monoid class forces us to demonstrate why our default is truly empty by also providing the associated combining operator. This basic sanity check discourages us from defining "ridiculous" defaults.