Sunday, April 21, 2013

pipes and io-streams

I was originally planning to release an extension library for pipes that would add an io-streams-like interface to pipes. However, this week I discovered a surprisingly elegant way to implement io-streams behavior (minus pushback) using the existing pipes API. In fact, this translation has been possible ever since pipes-2.4!

This post assumes that you are familiar with both pipes and io-streams. If not, then you can read the pipes tutorial or the io-streams tutorial, both of which I wrote!

Edit: I also just discovered that when you re-implement io-streams using pipes, then pipes ties io-streams in performance. See this reddit comment for code and benchmarks.


Conversions


I'll begin with the following generic correspondence between io-streams and pipes:
{-# LANGUAGE RankNTypes #-}

import Control.Proxy

type InputStream a
    = forall p . (Proxy p) => () -> Session p IO (Maybe a)

type OutputStream a
    = forall p . (Proxy p) => Maybe a -> Session p IO ()

makeInputStream :: IO (Maybe a) -> InputStream a
makeInputStream m () = runIdentityP $ lift m

makeOutputStream :: (Maybe a -> IO ()) -> OutputStream a
makeOutputStream f a = runIdentityP $ lift (f a)
For example:
stdin :: InputStream String
stdin = makeInputStream (fmap Just getLine)

stdout :: OutputStream String
stdout = makeOutputStream (maybe (return ()) putStrLn)
Notice how this does not convert the read and write actions to repetitive streams. Instead, you lift each action just once into the Proxy monad.

To read or write to these sources you just run the proxy monad:
import Prelude hiding (read)

read :: InputStream a -> IO (Maybe a)
read is = runProxy is

{- This requires `pipes` HEAD, which generalizes the type
   of `runProxyK.

   Alternatively, you can hack around the current definition
   using:

   write ma os = runProxy (\() -> os ma)
-}
write :: Maybe a -> OutputStream a -> IO ()
write ma os = runProxyK os ma
For example:
>>> read stdin
Test<Enter>
Just "Test"
>>> write (Just "Test") stdout
"Test"
Now consider the following io-streams transformation:
map :: (a -> b) -> InputStream a -> IO (InputStream b)
map f is = makeInputStream $ do
    ma <- read is
    return (fmap f ma)
Here's the naive translation to pipes (later I will show the more elegant version):
import Prelude hiding (map)

map :: (a -> b) -> InputStream a -> InputStream b
map f is () = runIdentityP $ do
    ma <- is ()
    return (fmap f ma) )
For example:
>>> read (map (++ "!") stdin)
Test<Enter>
Just "Test!"
The only real difference is that instead of using read is I just use is () directly.

So far I've used absolutely no pipes-specific features to implement any of this. I could have done this just as easily in any other library, including conduit.


Input streams


Now I will add a twist: we can reify transformations to be Consumers instead of functions of an InputStream.

For example, let's transform map to be a Consumer instead of a function:
map :: (Proxy p)
    => (a -> b)
    -> () -> Consumer p (Maybe a) IO (Maybe b)
map f () = runIdentityP $ do
    ma <- request ()
    return (fmap f ma)
I've only made two changes:
  • map is no longer a function of an InputStream
  • Instead of calling the input stream, I use request ()
Check out the final pipe type carefully (ignoring the Maybes):
() -> Consumer p a m b
An input stream transformation is a fold in disguise! These transformations fold 0 or more values from the old input stream to return a new value.

Notice how this differs from the conventional flow of information pipes. We're not converting a stream of as into a new stream of bs. Instead, we're folding a stream of as into a single return value of type b.

How do we even connect map to stdin, though? They don't have the right types for ordinary pipe composition:
stdin 
    :: (Proxy  p)
    => () -> Session  p                IO (Maybe String)

map length
    :: (Proxy p)
    => () -> Consumer p (Maybe String) IO (Maybe Int   )
We need some way to feed the return value of stdin into the upstream input of map. This means we need to call stdin once each time that map requests a new value and feed that into the request. How can we do that using the existing pipes API?

Fortunately, this is exactly the problem that the (\>\) composition operator solves:
>>> read (stdin \>\ map length)
Test<Enter>
Just 4
Woah, slow down! What just happened?


The "request" category


The definition of (\>\) is actually really simple:
(f \>\ g) replaces all of g's requests with f.
So when we wrote (stdin \>\ map length), we replaced the single request in map with stdin as if we had written it in by hand:
-- Before
map length () = runIdentityP $ do
    ma <- request ()
    return (fmap length ma)

-- After
(stdin \>\ map length) () = runIdentityP $ do
    ma <- stdin ()
    return (fmap length ma)
In fact, this behavior means that (\>\) and request form a category. (\>\) is the composition operation, request is the identity, and they satisfy the category laws:
-- Replacing each 'request' in 'f' with 'request' gives 'f'
request \>\ f = f

-- Replacing each 'request' in 'request' with 'f' gives 'f'
f \>\ request = f

-- Substitution of 'request's is associative
(f \>\ g) \>\ h = f \>\ (g \>\ h)
Let's take a closer look at the type of (\>\) to understand what is going on:
(\>\)
    :: (Monad m, ListT p)
    => (b' -> p a' a x' x m b)
    -> (c' -> p b' b x' x m c)
    -> (c' -> p a' a x' x m c)
We can illuminate this type by type-restricting the arguments. First, let's consider the case where the first argument is a Session and the second argument is a Consumer, and see what type the compiler infers for the result:
(\>\)
    :: (Monad m, ListT p)
    => (() -> Session  p   m b)
    -> (() -> Consumer p b m c)
    -> (() -> Session  p   m c)
The compiler infers that we must get back a readable Session, just like we wanted! This is precisely what happened when we composed stdin with map:
stdin
    :: () -> Session  p                IO (Maybe String)

map length
    :: () -> Consumer p (Maybe String) IO (Maybe Int   )

stdin \>\ map length
    :: () -> Session  p                IO (Maybe Int   )

read (stdin \>\ map) :: IO (Maybe String)
However, there's another way we can specialize the type of (\>\). We can instead restrict both arguments to be Consumers:
(\>\)
    :: (Monad m, ListT p)
    => (() -> Consumer p a m b)
    -> (() -> Consumer p b m c)
    -> (() -> Consumer p a m c)
The compiler infers that if we compose two folds, we must get a new fold!

This means that we can compose input stream transformations using the same (\>\) operator:
map length
    :: () -> Consumer p (Maybe String) IO (Maybe Int )

map even
    :: () -> Consumer p (Maybe Int   ) IO (Maybe Bool)

map length \>\ map even
    :: () -> Consumer p (Maybe String) IO (Maybe Bool)
Then we will use the exact same operator to connect our input stream:
stdin \>\ map length \>\ map even
    :: () -> Session p IO (Maybe Bool)

read (stdin \>\ map length \>\ map even)
    :: IO (Maybe Bool)
Let's try it!
>>> read (stdin \>\ map length \>\ map even)
Test<Enter>
Just True
Now let's make things more interesting to really drive the point home:
import Control.Monad

twoReqs
    :: (Proxy p) => () -> Consumer p (Maybe a) IO (Maybe [a])
twoReqs () = runIdentityP $ do
    mas <- replicateM 2 (request ())
    return (sequence mas)
>>> read (stdin \>\ twoReqs \>\ twoReqs) :: IO [[String]]
1<Enter>
2<Enter>
3<Enter>
4<Enter>
Just [["1","2"],["3","4"]]
>>> -- We can still read from raw 'stdin'
>>> read stdin
5<Enter>
Just 5
In other words, the "request" category is actually the category of InputStreams and pipes already has InputStream support today!


Output streams


Now let's do the exact same thing for OutputStreams! We'll begin with the filterOutput function from io-streams:
filterOutput
    :: (a -> Bool) -> OutputStream a -> IO (OutputStream a)
filterOutput pred os = makeOutputStream $ \ma -> case ma of
    Nothing            -> write ma os
    Just a | pred a    -> write ma os
           | otherwise -> return ()
The pipes equivalent is:
filterOutput
    :: (Proxy p)
    => (a -> Bool)
    -> Maybe a -> Producer p (Maybe a) IO ()
filterOutput pred ma = runIdentityP $ case ma of
    Nothing            -> respond ma
    Just a | pred a    -> respond ma
           | otherwise -> return ()
This time we get a new shape for our type signature:
a -> Producer p b m ()
An output stream transformation is an unfold in disguise! These transformations unfold each output destined for the old output stream into 0 or more outputs destined for the new output stream.

Let's hook this transformation up to stdout and start writing values to it ... oh wait. How do we hook it up? Well, let's look at the types of stdout and filterOutput:
filterOutput f
    :: (Proxy p) => Maybe a -> Producer p (Maybe a) IO ()

stdout
    :: (Proxy p) => Maybe a -> Session  p           IO ()
We need some way to feed the stream output of filterOutput into the argument of stdout. This means that we need to call stdout on each output of filterOutput. How can we do that using the existing pipes API?

Why, we use (/>/), the dual of (\>\)!
>>> write (Just "") (filterOutput (not . null) />/ stdout)
>>> write (Just "Test") (filterOutput (not . null) />/ stdout)
Test

The "respond" category


(/>/) also has a simple behavior:
(f />/ g) replaces all of f's responds with g.
This behavior means that (/>/) and respond form a category, too! (/>/) is the composition operation, respond is the identity, and they satisfy the category laws:
-- Replacing each 'respond' in 'f' with 'respond' gives 'f'
f />/ respond = f

-- Replacing each 'respond' in 'respond' with 'f' gives 'f'
respond />/ f = f

-- Substitution of 'respond's is associative
(f />/ g) />/ h = f />/ (g />/ h)
Let's learn more by reviewing the type of (/>/):
(/>/)
    :: (Monad m, ListT p)
    => (a -> p x' x b' b m a')
    -> (b -> p x' x c' c m b')
    -> (a -> p x' x c' c m a')
We'll specialize this type signature in two ways. First, we'll consider the case where the first argument is a Producer and the second argument is a Session and then see what type the compiler infers for the result:
(/>/)
    :: (Monad m, ListT p)
    => (a -> Producer p b m ())
    -> (b -> Session  p   m ())
    -> (a -> Session  p   m ())
The compiler infers that we must get back a writeable session, just like we wanted! This is precisely what happened when we composed filterOutput and stdout:
filterOutput (not . null)
    :: Maybe String -> Producer p (Maybe String) IO ()

stdout
    :: Maybe String -> Session  p                IO ()

filterOutput (not . null) />/ stdout
    :: Maybe String -> Session  p                IO ()

flip write (filterOutput (not . null) />/ stdout)
    :: Maybe String -> IO ()
However, there's another way we can specialize the type of (/>/). We can instead restrict both arguments to be Producers:
(/>/)
    :: (Monad m, ListT p)
    => (a -> Producer p b m ())
    -> (b -> Producer p c m ())
    -> (a -> Producer p c m ())
The compiler infers that if we compose two unfolds, we must get a new unfold!

This means that we can compose output stream transformations using (/>/). To show this, let's define the equivalent of io-stream's contramap:
contramap
    :: (Proxy p)
    => (a -> b)
    -> Maybe a -> Producer p (Maybe b) IO ()
contramap f ma = runIdentityP $ respond (fmap f ma)
We can connect contramap's using (/>/):
contramap even
    :: Maybe Int  -> Producer p (Maybe Bool  ) IO ()

contramap show
    :: Maybe Bool -> Producer p (Maybe String) IO ()

contramap even />/ contramap show
    :: Maybe Int  -> Producer p (Maybe String) IO ()
... then using the exact same operator we connect them to stdout:
contramap even />/ contramap show />/ stdout
    :: Maybe Int -> Session p IO ()

flip write (contramap even />/ contramap show />/ stdout)
    :: Maybe Int -> IO ()
Let's try it!
>>> write (Just 4) (contramap even />/ contramap show />/ stdout)
True
Now let's make things more interesting to really drive the point home:
hiBye
    :: (Proxy p)
    => Maybe String -> Producer p (Maybe String) IO ()
hiBye mstr = runIdentityP $ do
    respond $ fmap ("Hello, " ++) mstr
    respond $ fmap ("Goodbye, " ++) mstr
>>> write (Just "world") (hiBye />/ hiBye />/ stdout)
Hello, Hello, world
Goodbye, Hello, world
Hello, Goodbye, world
Goodbye, Goodbye, world
>>> -- We can still write to raw 'stdout'
>>> write (Just "Haskell") stdout
Haskell
In other words, the "respond" category is actually the category of OutputStreams and pipes already has OutputStream support today!


Unify input and output streams


We can do much more than just implement the io-streams API, though! We can generalize it in many more powerful ways than io-streams permits.

First off, I'm going to inline the InputStream and OutputStream types into the type signatures of read and write:
read
    :: (forall p . (Proxy p) => () -> Session p IO (Maybe a))
    -> IO (Maybe a)

write
    :: Maybe a
    -> (forall p . (Proxy p) => Maybe a -> Session p IO ())
    -> IO ()
Then I will generalize the types to not be Maybe-specific and flip the arguments for write
read
    :: (forall p . (Proxy p) => () -> Session p IO a)
    -> IO a

write
    :: (forall p . (Proxy p) => a -> Session p IO ())
    -> a -> IO ()
Now, I will generalize both read and write by adding bidirectionality to them. read will now accept an argument parametrizing its request and write can now optionally receive a result.
read
    :: (forall p . (Proxy p) => a -> Session p IO b)
    -> a -> IO b
read is = runProxyK is

write
    :: (forall p . (Proxy p) => a -> Session p IO b)
    -> a -> IO b
write is = runProxyK is
Well, would you look at that! They both have the same type and implementation! In other words, we can unify read and write into a single function:
once
    :: (forall p . (Proxy p) => a -> Session p IO b)
    -> a -> IO b
once is = runProxyK is
Also, once is really polymorphic over the base monad:
once
    :: (Monad m)
    => (forall p . (ListT p) => a -> Session p m b)
    -> a -> m b
once is = runProxyK is
Using this function, we can both read from InputStreams and write to OutputStreams:
>>> once (stdin \>\ map length) ()
Test<Enter>
Just 4
>>> once (contramap show />/ stdout) (Just 1)
1
Let's test the bidirectionality. I'll write a new stdin that accepts a count parameter telling how many lines to get. This time, there will be no Maybes:
stdinN :: (Proxy p) => Int -> Session p IO [String]
stdinN n = runIdentityP $ replicateM n $ lift getLine
First, briefly test it:
>>> once stdinN 3
Test
Apple
123
["Test","Apple","123"]
Similarly, let's define a transformation to automate supplying values to stdinN:
automate :: (Proxy p) => () -> Client p Int [String] IO ()
automate () = runIdentityP $ do
    lift $ putStrLn "First batch:"
    xs <- request 2
    lift $ print xs
    lift $ putStrLn "Second batch:"
    ys <- request 2
    lift $ print ys
Now compose!
>>> once (stdinN \>\ automate) ()
First batch:
1<Enter>
2<Enter>
["1","2"]
Second batch:
3<Enter>
4<Enter>
["3","4"]
>>> -- Go back to using 'stdinN' unfiltered
>>> once stdinN 1
5<Enter>
["5"]
We can similarly unify makeInputStream and makeOutputStream:
make :: (Monad m, Proxy p) => (q -> m r) -> q -> p a' a b' b m r
make f a = runIdentityP (lift (f a))
The only difference between an input stream and an output stream is whether it consumes or produces a value and bidirectional streams blur the line even further.


Purity


None of this machinery is specific to the IO monad. Everything I've introduced is polymorphic over the base monad! We only incur IO if any of our stages use IO. This departs greatly from io-streams, where you are trapped in the IO monad to do everything. However, I want to qualify that statement with a caveat: I expect that pipes-parse will require either IO or an ST base monad (your choice) to properly manage pushback like io-streams does. Even still, you only pay this price if you actually need it.

pipes improves on purity in two other ways. First, building input and output streams is pure when you use pipes:
-- pipes
makeInputStream  :: IO (Maybe a)       -> InputStream a
makeOutputStream :: (Maybe a -> IO ()) -> OutputStream a

-- io-streams
makeInputStream  :: IO (Maybe a)       -> IO (InputStream  a)
makeOutputStream :: (Maybe a -> IO ()) -> IO (OutputStream a)
Consequently, io-streams must use unsafePerformIO for its stdin:
stdin :: InputStream ByteString
stdin = unsafePerformIO $
    handleToInputStream IO.stdin >>= lockingInputStream
Second, composing and transforming input streams is pure when you use pipes:
>>> -- pipes
>>> read (stdin \>\ map length \>\ map even)
...
>>> -- io-streams
>>> is <- (map even <=< map length) stdin
>>> read is
...
... and if none of the stages use IO, then the entire operation is pure!


Extra interfaces


You can generalize the type signatures even further. Let's revisit the types of (\>\) and (/>/):
(\>\)
    :: (Monad m, ListT p)
    => (b' -> p a' a x' x m b)
    -> (c' -> p b' b x' x m c)
    -> (c' -> p a' a x' x m c)

(/>/)
    :: (Monad m, ListT p)
    => (a -> p x' x b' b m a')
    -> (b -> p x' x c' c m b')
    -> (a -> p x' x c' c m a')
So far we've been ignoring the x'/x interfaces entirely, treating them as closed interfaces. However, there's no reason we have to! If we open up this interface, then every processing stage that we connect can read and write to this streaming interface.

In a future post, I will show you how you can really elegantly use this trick to generalize Hutton-Meijer parsers to permit effects.


ListT


The people who read the pipes-3.2 announcement post know that the (\>\) and (/>/) composition operators correspond to (>=>) for the two pipes ListT monads.
This means that you can take any output stream transformation and assemble more sophisticated behaviors using do notation:
pairs
    :: (ListT p)
    => () -> Producer p (Maybe (String, String)) IO ()
pairs () = runRespondT $ do
    mstr1 <- RespondT $ hiBye (Just "world")
    mstr2 <- RespondT $ hiBye (Just "Haskell")
    return $ (,) <$> mstr1 <*> mstr2
These behave just like the list monad, non-deterministically selecting all possible paths:
>>> once (pairs />/ contramap show />/ stdout) ()
("Hello, world","Hello, Haskell")
("Hello, world","Goodbye, Haskell")
("Goodbye, world","Hello, Haskell")
("Goodbye, world","Goodbye, Haskell")
The exact same trick works for input stream transformations, too, but it's less useful because we don't send information upstream as frequently as downstream.


Conclusions


One of the common questions I often get was "How do I escape from the pipe monad?" and I wrote this post to demonstrate how to do so. You can use pipes to program exactly in the same style as io-streams. The only thing missing is a standard library of io-streams-like utilities.

So far, I have not touched on the issue of push-back, a feature which io-streams provides. pipes-parse will build on the ideas I've introduced in this post to provide an io-streams-style interface to parsing that includes push-back functionality. This will make it easy to dynamically add or remove processing stages throughout the parsing process, which is necessary for parsing HTTP. In fact, everything I've presented here fell out very naturally from my work on pipes-parse and I only later discovered that I had accidentally reinvented io-streams entirely within pipes.

So I will cautiously state that I believe pipes is "io-streams done right". Everything that Greg has been doing with io-streams matches quite nicely with the pre-existing "request" and "respond" pipes categories that I independently discovered within pipes. This indicates that Greg was really onto something and pipes provides the elegant theoretical foundation for his work.

3 comments:

  1. Gabriel,

    Any chance of an article for the Monad Reader, or a submission to the Haskell Symposium perhaps?

    ReplyDelete
    Replies
    1. Yeah, I'm aiming for a Haskell symposium submission.

      Delete
    2. Excellent! I shall hope to review it!

      Delete