I was originally planning to release an extension library for
pipes that would add an
io-streams-like interface to
pipes. However, this week I discovered a surprisingly elegant way to implement
io-streams behavior (minus pushback) using the existing
pipes API. In fact, this translation has been possible ever since
pipes-2.4!
This post assumes that you are familiar with both
pipes and
io-streams. If not, then you can read the
pipes tutorial or the
io-streams tutorial, both of which I wrote!
Edit: I also just discovered that when you re-implement
io-streams using
pipes, then
pipes ties
io-streams in performance. See
this reddit comment for code and benchmarks.
Conversions
I'll begin with the following generic correspondence between
io-streams and
pipes:
{-# LANGUAGE RankNTypes #-}
import Control.Proxy
type InputStream a
= forall p . (Proxy p) => () -> Session p IO (Maybe a)
type OutputStream a
= forall p . (Proxy p) => Maybe a -> Session p IO ()
makeInputStream :: IO (Maybe a) -> InputStream a
makeInputStream m () = runIdentityP $ lift m
makeOutputStream :: (Maybe a -> IO ()) -> OutputStream a
makeOutputStream f a = runIdentityP $ lift (f a)
For example:
stdin :: InputStream String
stdin = makeInputStream (fmap Just getLine)
stdout :: OutputStream String
stdout = makeOutputStream (maybe (return ()) putStrLn)
Notice how this does
not convert the read and write actions to repetitive streams. Instead, you
lift each action just once into the
Proxy monad.
To read or write to these sources you just run the proxy monad:
import Prelude hiding (read)
read :: InputStream a -> IO (Maybe a)
read is = runProxy is
{- This requires `pipes` HEAD, which generalizes the type
of `runProxyK.
Alternatively, you can hack around the current definition
using:
write ma os = runProxy (\() -> os ma)
-}
write :: Maybe a -> OutputStream a -> IO ()
write ma os = runProxyK os ma
For example:
>>> read stdin
Test<Enter>
Just "Test"
>>> write (Just "Test") stdout
"Test"
Now consider the following
io-streams transformation:
map :: (a -> b) -> InputStream a -> IO (InputStream b)
map f is = makeInputStream $ do
ma <- read is
return (fmap f ma)
Here's the naive translation to
pipes (later I will show the more elegant version):
import Prelude hiding (map)
map :: (a -> b) -> InputStream a -> InputStream b
map f is () = runIdentityP $ do
ma <- is ()
return (fmap f ma) )
For example:
>>> read (map (++ "!") stdin)
Test<Enter>
Just "Test!"
The only real difference is that instead of using
read is I just use
is () directly.
So far I've used absolutely no
pipes-specific features to implement any of this. I could have done this just as easily in any other library, including
conduit.
Input streams
Now I will add a twist: we can reify transformations to be
Consumers instead of functions of an
InputStream.
For example, let's transform
map to be a
Consumer instead of a function:
map :: (Proxy p)
=> (a -> b)
-> () -> Consumer p (Maybe a) IO (Maybe b)
map f () = runIdentityP $ do
ma <- request ()
return (fmap f ma)
I've only made two changes:
- map is no longer a function of an InputStream
- Instead of calling the input stream, I use request ()
Check out the final pipe type carefully (ignoring the
Maybes):
() -> Consumer p a m b
An input stream transformation is a fold in disguise! These transformations fold 0 or more values from the old input stream to return a new value.
Notice how this differs from the conventional flow of information
pipes. We're not converting a stream of
as into a new stream of
bs. Instead, we're folding a stream of
as into a single return value of type
b.
How do we even connect
map to
stdin, though? They don't have the right types for ordinary pipe composition:
stdin
:: (Proxy p)
=> () -> Session p IO (Maybe String)
map length
:: (Proxy p)
=> () -> Consumer p (Maybe String) IO (Maybe Int )
We need some way to feed the return value of
stdin into the upstream input of
map. This means we need to call
stdin once each time that
map requests a new value and feed that into the request. How can we do that using the existing
pipes API?
Fortunately, this is exactly the problem that the
(\>\) composition operator solves:
>>> read (stdin \>\ map length)
Test<Enter>
Just 4
Woah, slow down! What just happened?
The "request" category
The definition of
(\>\) is actually really simple:
(f \>\ g) replaces all of g's requests with f.
So when we wrote
(stdin \>\ map length), we replaced the single
request in
map with
stdin as if we had written it in by hand:
-- Before
map length () = runIdentityP $ do
ma <- request ()
return (fmap length ma)
-- After
(stdin \>\ map length) () = runIdentityP $ do
ma <- stdin ()
return (fmap length ma)
In fact, this behavior means that
(\>\) and
request form a category.
(\>\) is the composition operation,
request is the identity, and they satisfy the category laws:
-- Replacing each 'request' in 'f' with 'request' gives 'f'
request \>\ f = f
-- Replacing each 'request' in 'request' with 'f' gives 'f'
f \>\ request = f
-- Substitution of 'request's is associative
(f \>\ g) \>\ h = f \>\ (g \>\ h)
Let's take a closer look at the type of
(\>\) to understand what is going on:
(\>\)
:: (Monad m, ListT p)
=> (b' -> p a' a x' x m b)
-> (c' -> p b' b x' x m c)
-> (c' -> p a' a x' x m c)
We can illuminate this type by type-restricting the arguments. First, let's consider the case where the first argument is a
Session and the second argument is a
Consumer, and see what type the compiler infers for the result:
(\>\)
:: (Monad m, ListT p)
=> (() -> Session p m b)
-> (() -> Consumer p b m c)
-> (() -> Session p m c)
The compiler infers that we must get back a
readable
Session, just like we wanted! This is precisely what happened when we composed
stdin with
map:
stdin
:: () -> Session p IO (Maybe String)
map length
:: () -> Consumer p (Maybe String) IO (Maybe Int )
stdin \>\ map length
:: () -> Session p IO (Maybe Int )
read (stdin \>\ map) :: IO (Maybe String)
However, there's another way we can specialize the type of
(\>\). We can instead restrict both arguments to be
Consumers:
(\>\)
:: (Monad m, ListT p)
=> (() -> Consumer p a m b)
-> (() -> Consumer p b m c)
-> (() -> Consumer p a m c)
The compiler infers that if we compose two folds, we must get a new fold!
This means that we can compose input stream transformations using the same
(\>\) operator:
map length
:: () -> Consumer p (Maybe String) IO (Maybe Int )
map even
:: () -> Consumer p (Maybe Int ) IO (Maybe Bool)
map length \>\ map even
:: () -> Consumer p (Maybe String) IO (Maybe Bool)
Then we will use the exact same operator to connect our input stream:
stdin \>\ map length \>\ map even
:: () -> Session p IO (Maybe Bool)
read (stdin \>\ map length \>\ map even)
:: IO (Maybe Bool)
Let's try it!
>>> read (stdin \>\ map length \>\ map even)
Test<Enter>
Just True
Now let's make things more interesting to really drive the point home:
import Control.Monad
twoReqs
:: (Proxy p) => () -> Consumer p (Maybe a) IO (Maybe [a])
twoReqs () = runIdentityP $ do
mas <- replicateM 2 (request ())
return (sequence mas)
>>> read (stdin \>\ twoReqs \>\ twoReqs) :: IO [[String]]
1<Enter>
2<Enter>
3<Enter>
4<Enter>
Just [["1","2"],["3","4"]]
>>> -- We can still read from raw 'stdin'
>>> read stdin
5<Enter>
Just 5
In other words, the "request" category is actually the category of
InputStreams and
pipes already has
InputStream support today!
Output streams
Now let's do the exact same thing for
OutputStreams! We'll begin with the
filterOutput function from
io-streams:
filterOutput
:: (a -> Bool) -> OutputStream a -> IO (OutputStream a)
filterOutput pred os = makeOutputStream $ \ma -> case ma of
Nothing -> write ma os
Just a | pred a -> write ma os
| otherwise -> return ()
The
pipes equivalent is:
filterOutput
:: (Proxy p)
=> (a -> Bool)
-> Maybe a -> Producer p (Maybe a) IO ()
filterOutput pred ma = runIdentityP $ case ma of
Nothing -> respond ma
Just a | pred a -> respond ma
| otherwise -> return ()
This time we get a new shape for our type signature:
a -> Producer p b m ()
An output stream transformation is an unfold in disguise! These transformations unfold each output destined for the old output stream into 0 or more outputs destined for the new output stream.
Let's hook this transformation up to
stdout and start writing values to it ... oh wait. How do we hook it up? Well, let's look at the types of
stdout and
filterOutput:
filterOutput f
:: (Proxy p) => Maybe a -> Producer p (Maybe a) IO ()
stdout
:: (Proxy p) => Maybe a -> Session p IO ()
We need some way to feed the stream output of
filterOutput into the argument of
stdout. This means that we need to call
stdout on each output of
filterOutput. How can we do that using the existing
pipes API?
Why, we use
(/>/), the dual of
(\>\)!
>>> write (Just "") (filterOutput (not . null) />/ stdout)
>>> write (Just "Test") (filterOutput (not . null) />/ stdout)
Test
The "respond" category
(/>/) also has a simple behavior:
(f />/ g) replaces all of f's responds with g.
This behavior means that
(/>/) and
respond form a category, too!
(/>/) is the composition operation,
respond is the identity, and they satisfy the category laws:
-- Replacing each 'respond' in 'f' with 'respond' gives 'f'
f />/ respond = f
-- Replacing each 'respond' in 'respond' with 'f' gives 'f'
respond />/ f = f
-- Substitution of 'respond's is associative
(f />/ g) />/ h = f />/ (g />/ h)
Let's learn more by reviewing the type of
(/>/):
(/>/)
:: (Monad m, ListT p)
=> (a -> p x' x b' b m a')
-> (b -> p x' x c' c m b')
-> (a -> p x' x c' c m a')
We'll specialize this type signature in two ways. First, we'll consider the case where the first argument is a
Producer and the second argument is a
Session and then see what type the compiler infers for the result:
(/>/)
:: (Monad m, ListT p)
=> (a -> Producer p b m ())
-> (b -> Session p m ())
-> (a -> Session p m ())
The compiler infers that we must get back a
writeable session, just like we wanted! This is precisely what happened when we composed
filterOutput and
stdout:
filterOutput (not . null)
:: Maybe String -> Producer p (Maybe String) IO ()
stdout
:: Maybe String -> Session p IO ()
filterOutput (not . null) />/ stdout
:: Maybe String -> Session p IO ()
flip write (filterOutput (not . null) />/ stdout)
:: Maybe String -> IO ()
However, there's another way we can specialize the type of
(/>/). We can instead restrict both arguments to be
Producers:
(/>/)
:: (Monad m, ListT p)
=> (a -> Producer p b m ())
-> (b -> Producer p c m ())
-> (a -> Producer p c m ())
The compiler infers that if we compose two unfolds, we must get a new unfold!
This means that we can compose output stream transformations using
(/>/). To show this, let's define the equivalent of
io-stream's
contramap:
contramap
:: (Proxy p)
=> (a -> b)
-> Maybe a -> Producer p (Maybe b) IO ()
contramap f ma = runIdentityP $ respond (fmap f ma)
We can connect
contramap's using
(/>/):
contramap even
:: Maybe Int -> Producer p (Maybe Bool ) IO ()
contramap show
:: Maybe Bool -> Producer p (Maybe String) IO ()
contramap even />/ contramap show
:: Maybe Int -> Producer p (Maybe String) IO ()
... then using the exact same operator we connect them to
stdout:
contramap even />/ contramap show />/ stdout
:: Maybe Int -> Session p IO ()
flip write (contramap even />/ contramap show />/ stdout)
:: Maybe Int -> IO ()
Let's try it!
>>> write (Just 4) (contramap even />/ contramap show />/ stdout)
True
Now let's make things more interesting to really drive the point home:
hiBye
:: (Proxy p)
=> Maybe String -> Producer p (Maybe String) IO ()
hiBye mstr = runIdentityP $ do
respond $ fmap ("Hello, " ++) mstr
respond $ fmap ("Goodbye, " ++) mstr
>>> write (Just "world") (hiBye />/ hiBye />/ stdout)
Hello, Hello, world
Goodbye, Hello, world
Hello, Goodbye, world
Goodbye, Goodbye, world
>>> -- We can still write to raw 'stdout'
>>> write (Just "Haskell") stdout
Haskell
In other words, the "respond" category is actually the category of
OutputStreams and
pipes already has
OutputStream support today!
Unify input and output streams
We can do much more than just implement the
io-streams API, though! We can generalize it in many more powerful ways than
io-streams permits.
First off, I'm going to inline the
InputStream and
OutputStream types into the type signatures of
read and
write:
read
:: (forall p . (Proxy p) => () -> Session p IO (Maybe a))
-> IO (Maybe a)
write
:: Maybe a
-> (forall p . (Proxy p) => Maybe a -> Session p IO ())
-> IO ()
Then I will generalize the types to not be
Maybe-specific and flip the arguments for
write
read
:: (forall p . (Proxy p) => () -> Session p IO a)
-> IO a
write
:: (forall p . (Proxy p) => a -> Session p IO ())
-> a -> IO ()
Now, I will generalize both
read and
write by adding bidirectionality to them.
read will now accept an argument parametrizing its request and
write can now optionally receive a result.
read
:: (forall p . (Proxy p) => a -> Session p IO b)
-> a -> IO b
read is = runProxyK is
write
:: (forall p . (Proxy p) => a -> Session p IO b)
-> a -> IO b
write is = runProxyK is
Well, would you look at that! They both have the same type and implementation! In other words, we can unify
read and
write into a single function:
once
:: (forall p . (Proxy p) => a -> Session p IO b)
-> a -> IO b
once is = runProxyK is
Also,
once is really polymorphic over the base monad:
once
:: (Monad m)
=> (forall p . (ListT p) => a -> Session p m b)
-> a -> m b
once is = runProxyK is
Using this function, we can both
read from
InputStreams and
write to
OutputStreams:
>>> once (stdin \>\ map length) ()
Test<Enter>
Just 4
>>> once (contramap show />/ stdout) (Just 1)
1
Let's test the bidirectionality. I'll write a new
stdin that accepts a count parameter telling how many lines to get. This time, there will be no
Maybes:
stdinN :: (Proxy p) => Int -> Session p IO [String]
stdinN n = runIdentityP $ replicateM n $ lift getLine
First, briefly test it:
>>> once stdinN 3
Test
Apple
123
["Test","Apple","123"]
Similarly, let's define a transformation to automate supplying values to
stdinN:
automate :: (Proxy p) => () -> Client p Int [String] IO ()
automate () = runIdentityP $ do
lift $ putStrLn "First batch:"
xs <- request 2
lift $ print xs
lift $ putStrLn "Second batch:"
ys <- request 2
lift $ print ys
Now compose!
>>> once (stdinN \>\ automate) ()
First batch:
1<Enter>
2<Enter>
["1","2"]
Second batch:
3<Enter>
4<Enter>
["3","4"]
>>> -- Go back to using 'stdinN' unfiltered
>>> once stdinN 1
5<Enter>
["5"]
We can similarly unify
makeInputStream and
makeOutputStream:
make :: (Monad m, Proxy p) => (q -> m r) -> q -> p a' a b' b m r
make f a = runIdentityP (lift (f a))
The only difference between an input stream and an output stream is whether it consumes or produces a value and bidirectional streams blur the line even further.
Purity
None of this machinery is specific to the
IO monad. Everything I've introduced is polymorphic over the base monad! We only incur
IO if any of our stages use
IO. This departs greatly from
io-streams, where you are trapped in the
IO monad to do everything. However, I want to qualify that statement with a caveat: I expect that
pipes-parse will require either
IO or an
ST base monad (your choice) to properly manage pushback like
io-streams does. Even still, you only pay this price if you actually need it.
pipes improves on purity in two other ways. First, building input and output streams is pure when you use
pipes:
-- pipes
makeInputStream :: IO (Maybe a) -> InputStream a
makeOutputStream :: (Maybe a -> IO ()) -> OutputStream a
-- io-streams
makeInputStream :: IO (Maybe a) -> IO (InputStream a)
makeOutputStream :: (Maybe a -> IO ()) -> IO (OutputStream a)
Consequently,
io-streams must use
unsafePerformIO for its
stdin:
stdin :: InputStream ByteString
stdin = unsafePerformIO $
handleToInputStream IO.stdin >>= lockingInputStream
Second, composing and transforming input streams is pure when you use
pipes:
>>> -- pipes
>>> read (stdin \>\ map length \>\ map even)
...
>>> -- io-streams
>>> is <- (map even <=< map length) stdin
>>> read is
...
... and if none of the stages use
IO, then the entire operation is pure!
Extra interfaces
You can generalize the type signatures even further. Let's revisit the types of
(\>\) and
(/>/):
(\>\)
:: (Monad m, ListT p)
=> (b' -> p a' a x' x m b)
-> (c' -> p b' b x' x m c)
-> (c' -> p a' a x' x m c)
(/>/)
:: (Monad m, ListT p)
=> (a -> p x' x b' b m a')
-> (b -> p x' x c' c m b')
-> (a -> p x' x c' c m a')
So far we've been ignoring the
x'/
x interfaces entirely, treating them as closed interfaces. However, there's no reason we have to! If we open up this interface, then every processing stage that we connect can read and write to this streaming interface.
In a future post, I will show you how you can really elegantly use this trick to generalize Hutton-Meijer parsers to permit effects.
ListT
The people who read the
pipes-3.2 announcement post know that the
(\>\) and
(/>/) composition operators correspond to
(>=>) for the two
pipes ListT monads.
This means that you can take any output stream transformation and assemble more sophisticated behaviors using
do notation:
pairs
:: (ListT p)
=> () -> Producer p (Maybe (String, String)) IO ()
pairs () = runRespondT $ do
mstr1 <- RespondT $ hiBye (Just "world")
mstr2 <- RespondT $ hiBye (Just "Haskell")
return $ (,) <$> mstr1 <*> mstr2
These behave just like the list monad, non-deterministically selecting all possible paths:
>>> once (pairs />/ contramap show />/ stdout) ()
("Hello, world","Hello, Haskell")
("Hello, world","Goodbye, Haskell")
("Goodbye, world","Hello, Haskell")
("Goodbye, world","Goodbye, Haskell")
The exact same trick works for input stream transformations, too, but it's less useful because we don't send information upstream as frequently as downstream.
Conclusions
One of the common questions I often get was "How do I escape from the pipe monad?" and I wrote this post to demonstrate how to do so. You can use
pipes to program exactly in the same style as
io-streams. The only thing missing is a standard library of
io-streams-like utilities.
So far, I have not touched on the issue of push-back, a feature which
io-streams provides.
pipes-parse will build on the ideas I've introduced in this post to provide an
io-streams-style interface to parsing that includes push-back functionality. This will make it easy to dynamically add or remove processing stages throughout the parsing process, which is necessary for parsing HTTP. In fact, everything I've presented here fell out very naturally from my work on
pipes-parse and I only later discovered that I had accidentally reinvented
io-streams entirely within
pipes.
So I will cautiously state that I believe
pipes is "
io-streams done right". Everything that Greg has been doing with
io-streams matches quite nicely with the pre-existing "request" and "respond"
pipes categories that I independently discovered within
pipes. This indicates that Greg was really onto something and
pipes provides the elegant theoretical foundation for his work.