Monday, March 3, 2014

How to model handles with pipes

I receive repeated questions for how to implement a Handle-like API similar to io-streams using pipes even after I outlined how to do this about a year ago. However, I don't blame these people because that tutorial uses an out-of-date (and uglier) pipes-3.2 API and also takes an approach that I felt was a bit inconsistent and missed the mark, so this is a rewrite of the original post, both to upgrade the code to the latest pipes-4.1 API and to make some key changes to the original idioms to improve conceptual consistency.

I usually find that the best way to introduce this Handle-like intuition for pipes is to make an analogy to io-streams, so this post assumes that you are familiar with both pipes and io-streams. If not, then you can read the pipes tutorial or the io-streams tutorial, both of which I wrote! I want to really emphasize that I mainly discuss io-streams as a stepping stone for understanding the equivalent pipes abstractions, but the purpose is not to reimplement io-streams in pipes. Rather I want to introduce a more general IO-free abstraction that just happens to overlap a bit with io-streams.


I'll begin with the correspondence between io-streams and pipes types:
import Pipes

type InputStream a = Effect IO (Maybe a)

type OutputStream a = Maybe a -> Effect IO ()

makeInputStream :: IO (Maybe a) -> InputStream a
makeInputStream = lift

makeOutputStream :: (Maybe a -> IO ()) -> OutputStream a
makeOutputStream f = lift . f
For example, the pipes analogs of stdin and stdout from io-streams would be:
import qualified Data.ByteString as B
import qualified System.IO as IO
import qualified Data.Foldable as F

stdin :: InputStream B.ByteString
stdin = makeInputStream $ do
    bs <- B.hGetSome IO.stdin 4096
    return (if B.null bs then Nothing else Just bs)

stdout :: OutputStream B.ByteString
stdout = makeOutputStream (F.mapM_ B.putStr)
Notice how this does not convert the read and write actions to repetitive streams. Instead, you just trivially wrap them using lift, promoting them to Effect. However, for the rest of this post I will use Effect instead of InputStream and OutputStream to avoid confusion between the two libraries.

Also, the pipes implementations of makeInputStream and makeOutputStream are pure, whereas their io-streams counterparts require IO. In fact, io-streams fudges a little bit and implements stdin and stdout in terms of unsafePerformIO, otherwise they would have these types:
stdin  :: IO (InputStream  ByteString)
stdout :: IO (OutputStream ByteString)
io-streams is tightly integrated with IO (thus the name), but I believe we can do better.


The pipes analogs of the io-streams read and write commands are await and yield:
import Prelude hiding (read)

read :: Monad m => Consumer (Maybe a) m (Maybe a)
read = await

write :: Monad m => Maybe a -> Producer (Maybe a) m ()
write = yield
Here are examples of how you might use them:
import Control.Applicative
import Data.Monoid

-- `Consumer (Maybe a) m (Maybe b)` is the analog of
-- `InputStream a -> IO (InputStream b)`
    :: Monad m
    => Consumer (Maybe B.ByteString) m (Maybe B.ByteString)
combine = do
    mstr1 <- read
    mstr2 <- read
    return $ liftA2 (<>) mstr1 mstr2

-- `Maybe a -> Producer (Maybe b) m ()` is the analog of
-- `OutputStream b -> IO (OutputStream a)`
duplicate :: Monad m => Maybe a -> Producer (Maybe a) m ()
duplicate ma = do
    write ma
    write ma
Compare to io-streams:
combine :: InputStream B.ByteString -> IO (Maybe B.ByteString)
combine is = do
    mbs1 <- read is
    mbs2 <- read is
    return $ liftA2 (<>) mbs1 mbs2

duplicate :: OutputStream a -> a -> IO ()
duplicate os a = do
    write a os
    write a os
Unlike io-streams, pipes does not need to specify the upstream source for read commands. This is because we can supply read with an input stream using (>~):
>>> runEffect $ stdin >~ read
Just "Test\n"
>>> runEffect $ stdin >~ combine
Just "Hello\nworld!\n"
In fact, the read command in the first example is totally optional, because read is just a synonym for await, and one of the pipes laws states that:
f >~ await = f
Therefore, we could instead just write:
>>> runEffect stdin
Just "Test\n"
Dually, we don't need to specify the downstream source for write. This is because you connect write with an output stream using (~>)
>>> :set -XOverloadedStrings
>>> runEffect $ (write ~> stdout) (Just "Test\n")
>>> runEffect $ (duplicate ~> stdout) (Just "Test\n")
Again, the write in the first example is optional, because write is just a synonym for yield and one of the pipes laws states that:
yield ~> f = f
Therefore, we can simplify it to:
>>> runEffect $ stdout (Just "Test")

Mixed reads and writes

Like io-streams, we do not need to specify the entire streaming computation up front. We can step these input and output streams one value at a time using runEffect instead of consuming them all in one go. However, you lose no power and gain a lot of elegance by writing everything entirely within pipes.

For example, just like io-streams, you can freely mix reads and writes from multiple diverse sources:
inputStream1 :: Effect IO (Maybe A)
inputStream2 :: Effect IO (Maybe B)

mixRead :: Effect IO (Maybe A, Maybe B)
mixRead = do
    ma <- inputStream1 >~ read
    mb <- inputStream2 >~ read
    return (ma, mb)

outputStream1 :: Maybe a -> Effect IO ()
outputStream2 :: Maybe a -> Effect IO ()

mixWrite :: Maybe a -> Effect IO ()
mixWrite ma = do
    (write ~> outputStream1) ma
    (write ~> outputStream2) ma
Oh, wait! Didn't we just get through saying that you can simplify these?
-- Reminder:
-- f >~  read = f
-- write ~> f = f

mixRead :: Effect IO (Maybe A, Maybe B)
mixRead = do
    ma <- inputStream1
    mb <- inputStream2
    return (ma, mb)

mixWrite :: Maybe a -> Effect IO ()
mixWrite ma = do
    outputStream1 ma
    outputStream2 ma
In other words, if we stay within pipes we can read and write from input and output streams just by directly calling them. Unlike io-streams, we don't need to use read and write when we select a specific stream. We only require read and write if we wish to inject the input or output stream later on.


What's neat about the pipes idioms is that you use the exact same API to implement and connect transformations on streams. For example, here's how you implement the map function from io-streams using pipes:
map :: Monad m => (a -> b) -> Consumer (Maybe a) m (Maybe b)
map f = do
    ma <- read
    return (fmap f ma)
Compare that to the io-streams implementation of map:
map :: (a -> b) -> InputStream a -> IO (InputStream b)
map f is = makeInputStream $ do
    ma <- read is
    return (fmap f ma)
Some differences stand out:
  • The pipes version does not need to explicitly thread the input stream
  • The pipes version does not require makeInputStream
  • The pipes transformation is pure (meaning no IO)
Now watch how we apply this transformation:
>>> runEffect $ stdin >~ map (<> "!")
Just "Test!\n"
The syntax for implementing and connecting map is completely indistinguishable from the syntax we used in the previous section to implement and connect combine. And why should there be a difference? Both of them read from a source and return a derived value. Yet, io-streams distinguishes between things that transform input streams and things that read from input streams, both in the types and implementation:
iReadFromAnInputStream :: InputStream a -> IO b
iReadFromAnInputStream is = do
    ma <- read is

iTransformAnInputStream :: InputStream a -> IO (InputStream b)
iTransformAnInputStream is = makeInputStream $ do
    ma <- read is
This means that io-streams code cannot reuse readers as input stream transformations or, vice versa, reuse input stream transformations as readers.

As you might have guessed, the dual property holds for writers and output stream transformations. In pipes, we model both of these using a Producer Kleisli arrow:
    :: Monad m => (a -> b) -> Maybe a -> Producer (Maybe b) m ()
contramap f ma = yield (fmap f ma)
... and we connect them the same way:
>>> runEffect $ (contramap (<> "!\n") ~> stdout) (Just "Test")
... whereas io-streams distinguishes these two concepts:
iWriteToAnOutputStream :: OutputStream b -> Maybe a -> IO ()
iWriteToAnOutputStream os ma = do
    write os ma
iTransformAnOutputStream :: OutputStream b -> IO (OutputStream a)
iTransformAnOutputStream os = makeOutputStream $ \ma -> do
    write os ma

End of input

None of this machinery is specific to the Maybe type. I only used Maybe for analogy with io-streams, which uses Maybe to signal end of input. For the rest of this post I will just drop the Maybes entirely to simplify the type signatures, meaning that I will model the relevant types as:
inputStream :: Effect m a

outputStream :: a -> Effect m ()

inputStreamTransformation :: Consumer a m b

outputStreamTransformation :: a -> Producer b m ()
This is also the approach that my upcoming pipes-streams library will take, because pipes does not use Maybe to signal end of input. Instead, if you have a finite input you represent that input as a Producer:
finiteInput :: Producer a m ()
What's great is that you can still connect this to an output stream, using for:
for finiteInput outputStream :: Effect m ()
Dually, you model a finite output as a Consumer:
finiteOutput :: Consumer a m ()
... and you can connect that to an input stream using (>~):
inputStream >~ finiteOutput :: Effect m ()
These provide a convenient bridge between classic pipes abstractions and handle-like streams.


Interestingly, we can use the same (>~) operator to:
  • connect input streams to input stream transformations, and:
  • connect input stream transformations to each other.
This is because (>~) has a highly polymorphic type which you can specialize down to two separate types:
-- Connect an input stream to an input stream transformation,
-- returning a new input stream
(>~) :: Monad m
     => Effect     m a
     -> Consumer a m b
     -> Effect     m b

-- Connect two input stream transformations together,
-- returning a new input stream transformation
(>~) :: Monad m
    => Consumer a m b
    -> Consumer b m c
    -> Consumer a m c
Dually, we can use the same (~>) operator to:
  • connect output stream transformations to output streams, and:
  • connect output stream transformations to each other.
Again, this is because (~>) has a highly polymorphic type which you can specialize down to two separate types:
-- Connect an output stream transformation to an output stream,
-- returning a new output stream
(~>) :: Monad m
     => (a -> Producer b m ())
     -> (b -> Effect     m ())
     -> (a -> Effect     m ())
-- Connect two output stream transformations together,
-- returning a new output stream transformation
(~>) :: Monad m
     => (a -> Producer b m ())
     -> (b -> Producer c m ())
     -> (a -> Producer c m ())
In fact, you will see these specialized type signatures in the documentation for (>~) for (~>). Hopefully, the analogy to input streams and output streams will help you see those types in a new light.


People familiar with pipes know that await/read and (>~) form a category, so we get three equations for free from the three category laws:
-- Reading something = calling it
read >~ f = f

-- `read` is the identity input stream transformation
f >~ read

-- It doesn't matter how you group sinks/transformations
(f >~ g) >~ h = f >~ (g >~ h)
Dually, yield/write and (~>) form a category, so we get another three equations for free:
-- Writing something = calling it
f >~ write = f

-- `write` is the identity output stream transformation
write >~ f

-- It doesn't matter how you group sources/transformations
(f ~> g) ~> h = f ~> (g ~> h)
Those are really nice properties to use when equationally reasoning about handles and as we've already seen they allow you to simplify your code in unexpected ways.


But wait, there's more! pipes actually has a fully bidirectional core in the Pipes.Core module. You can generalize everything above to use this bidirectional interface with surprising results. For example, in the bidirectional version of pipes-based streams, you can have streams that both accept input and produce output upon each step:
inputOutputStream :: a -> Effect m b
You also get the following generalized versions of all commands and operators:
  • request generalizes read, accepting an argument for upstream
  • respond generalizes write, returning a value from downstream
  • (\>\) generalizes (>~)
  • (/>/) generalizes (~>)
I will leave that as a teaser for now and I will discuss this in greater detail once I finish writing a new tutorial for the underlying bidirectional implementation.


pipes provides an elegant way to model Handle-like abstractions in a very compact way that requires no new extensions to the API. I will soon provide a pipes-streams library that implements utilities along these lines to further solidify these concepts.

One thing I hope people take away from this is that a Handle-based API need not be deeply intertwined with IO. All the commands and connectives from pipes-based handles are completely independent of the base monad and you can freely generalize the pipes notion of handles to pure monads.

No comments:

Post a Comment