I usually find that the best way to introduce this Handle-like intuition for pipes is to make an analogy to io-streams, so this post assumes that you are familiar with both pipes and io-streams. If not, then you can read the pipes tutorial or the io-streams tutorial, both of which I wrote! I want to really emphasize that I mainly discuss io-streams as a stepping stone for understanding the equivalent pipes abstractions, but the purpose is not to reimplement io-streams in pipes. Rather I want to introduce a more general IO-free abstraction that just happens to overlap a bit with io-streams.
Types
I'll begin with the correspondence between io-streams and pipes types:
import Pipes type InputStream a = Effect IO (Maybe a) type OutputStream a = Maybe a -> Effect IO () makeInputStream :: IO (Maybe a) -> InputStream a makeInputStream = lift makeOutputStream :: (Maybe a -> IO ()) -> OutputStream a makeOutputStream f = lift . fFor example, the pipes analogs of stdin and stdout from io-streams would be:
import qualified Data.ByteString as B import qualified System.IO as IO import qualified Data.Foldable as F stdin :: InputStream B.ByteString stdin = makeInputStream $ do bs <- B.hGetSome IO.stdin 4096 return (if B.null bs then Nothing else Just bs) stdout :: OutputStream B.ByteString stdout = makeOutputStream (F.mapM_ B.putStr)Notice how this does not convert the read and write actions to repetitive streams. Instead, you just trivially wrap them using lift, promoting them to Effect. However, for the rest of this post I will use Effect instead of InputStream and OutputStream to avoid confusion between the two libraries.
Also, the pipes implementations of makeInputStream and makeOutputStream are pure, whereas their io-streams counterparts require IO. In fact, io-streams fudges a little bit and implements stdin and stdout in terms of unsafePerformIO, otherwise they would have these types:
stdin :: IO (InputStream ByteString) stdout :: IO (OutputStream ByteString)io-streams is tightly integrated with IO (thus the name), but I believe we can do better.
Reading/Writing
The pipes analogs of the io-streams read and write commands are await and yield:
import Prelude hiding (read) read :: Monad m => Consumer (Maybe a) m (Maybe a) read = await write :: Monad m => Maybe a -> Producer (Maybe a) m () write = yieldHere are examples of how you might use them:
import Control.Applicative import Data.Monoid -- `Consumer (Maybe a) m (Maybe b)` is the analog of -- `InputStream a -> IO (InputStream b)` combine :: Monad m => Consumer (Maybe B.ByteString) m (Maybe B.ByteString) combine = do mstr1 <- read mstr2 <- read return $ liftA2 (<>) mstr1 mstr2 -- `Maybe a -> Producer (Maybe b) m ()` is the analog of -- `OutputStream b -> IO (OutputStream a)` duplicate :: Monad m => Maybe a -> Producer (Maybe a) m () duplicate ma = do write ma write maCompare to io-streams:
combine :: InputStream B.ByteString -> IO (Maybe B.ByteString) combine is = do mbs1 <- read is mbs2 <- read is return $ liftA2 (<>) mbs1 mbs2 duplicate :: OutputStream a -> a -> IO () duplicate os a = do write a os write a osUnlike io-streams, pipes does not need to specify the upstream source for read commands. This is because we can supply read with an input stream using (>~):
>>> runEffect $ stdin >~ read Test<Enter> Just "Test\n" >>> runEffect $ stdin >~ combine Hello<Enter> world!<Enter> Just "Hello\nworld!\n"In fact, the read command in the first example is totally optional, because read is just a synonym for await, and one of the pipes laws states that:
f >~ await = fTherefore, we could instead just write:
>>> runEffect stdin Test<Enter> Just "Test\n"Dually, we don't need to specify the downstream source for write. This is because you connect write with an output stream using (~>)
>>> :set -XOverloadedStrings >>> runEffect $ (write ~> stdout) (Just "Test\n") Test >>> runEffect $ (duplicate ~> stdout) (Just "Test\n") Test TestAgain, the write in the first example is optional, because write is just a synonym for yield and one of the pipes laws states that:
yield ~> f = fTherefore, we can simplify it to:
>>> runEffect $ stdout (Just "Test") Test
Mixed reads and writes
Like io-streams, we do not need to specify the entire streaming computation up front. We can step these input and output streams one value at a time using runEffect instead of consuming them all in one go. However, you lose no power and gain a lot of elegance by writing everything entirely within pipes.
For example, just like io-streams, you can freely mix reads and writes from multiple diverse sources:
inputStream1 :: Effect IO (Maybe A) inputStream2 :: Effect IO (Maybe B) mixRead :: Effect IO (Maybe A, Maybe B) mixRead = do ma <- inputStream1 >~ read mb <- inputStream2 >~ read return (ma, mb) outputStream1 :: Maybe a -> Effect IO () outputStream2 :: Maybe a -> Effect IO () mixWrite :: Maybe a -> Effect IO () mixWrite ma = do (write ~> outputStream1) ma (write ~> outputStream2) maOh, wait! Didn't we just get through saying that you can simplify these?
-- Reminder: -- f >~ read = f -- write ~> f = f mixRead :: Effect IO (Maybe A, Maybe B) mixRead = do ma <- inputStream1 mb <- inputStream2 return (ma, mb) mixWrite :: Maybe a -> Effect IO () mixWrite ma = do outputStream1 ma outputStream2 maIn other words, if we stay within pipes we can read and write from input and output streams just by directly calling them. Unlike io-streams, we don't need to use read and write when we select a specific stream. We only require read and write if we wish to inject the input or output stream later on.
Transformations
What's neat about the pipes idioms is that you use the exact same API to implement and connect transformations on streams. For example, here's how you implement the map function from io-streams using pipes:
map :: Monad m => (a -> b) -> Consumer (Maybe a) m (Maybe b) map f = do ma <- read return (fmap f ma)Compare that to the io-streams implementation of map:
map :: (a -> b) -> InputStream a -> IO (InputStream b) map f is = makeInputStream $ do ma <- read is return (fmap f ma)Some differences stand out:
- The pipes version does not need to explicitly thread the input stream
- The pipes version does not require makeInputStream
- The pipes transformation is pure (meaning no IO)
>>> runEffect $ stdin >~ map (<> "!") Test<Enter> Just "Test!\n"The syntax for implementing and connecting map is completely indistinguishable from the syntax we used in the previous section to implement and connect combine. And why should there be a difference? Both of them read from a source and return a derived value. Yet, io-streams distinguishes between things that transform input streams and things that read from input streams, both in the types and implementation:
iReadFromAnInputStream :: InputStream a -> IO b iReadFromAnInputStream is = do ma <- read is ... iTransformAnInputStream :: InputStream a -> IO (InputStream b) iTransformAnInputStream is = makeInputStream $ do ma <- read is ...This means that io-streams code cannot reuse readers as input stream transformations or, vice versa, reuse input stream transformations as readers.
As you might have guessed, the dual property holds for writers and output stream transformations. In pipes, we model both of these using a Producer Kleisli arrow:
contramap :: Monad m => (a -> b) -> Maybe a -> Producer (Maybe b) m () contramap f ma = yield (fmap f ma)... and we connect them the same way:
>>> runEffect $ (contramap (<> "!\n") ~> stdout) (Just "Test") Test!... whereas io-streams distinguishes these two concepts:
iWriteToAnOutputStream :: OutputStream b -> Maybe a -> IO () iWriteToAnOutputStream os ma = do write os ma ... iTransformAnOutputStream :: OutputStream b -> IO (OutputStream a) iTransformAnOutputStream os = makeOutputStream $ \ma -> do write os ma ...
End of input
None of this machinery is specific to the Maybe type. I only used Maybe for analogy with io-streams, which uses Maybe to signal end of input. For the rest of this post I will just drop the Maybes entirely to simplify the type signatures, meaning that I will model the relevant types as:
inputStream :: Effect m a outputStream :: a -> Effect m () inputStreamTransformation :: Consumer a m b outputStreamTransformation :: a -> Producer b m ()This is also the approach that my upcoming pipes-streams library will take, because pipes does not use Maybe to signal end of input. Instead, if you have a finite input you represent that input as a Producer:
finiteInput :: Producer a m ()What's great is that you can still connect this to an output stream, using for:
for finiteInput outputStream :: Effect m ()Dually, you model a finite output as a Consumer:
finiteOutput :: Consumer a m ()... and you can connect that to an input stream using (>~):
inputStream >~ finiteOutput :: Effect m ()These provide a convenient bridge between classic pipes abstractions and handle-like streams.
Polymorphism
Interestingly, we can use the same (>~) operator to:
- connect input streams to input stream transformations, and:
- connect input stream transformations to each other.
-- Connect an input stream to an input stream transformation, -- returning a new input stream (>~) :: Monad m => Effect m a -> Consumer a m b -> Effect m b -- Connect two input stream transformations together, -- returning a new input stream transformation (>~) :: Monad m => Consumer a m b -> Consumer b m c -> Consumer a m cDually, we can use the same (~>) operator to:
- connect output stream transformations to output streams, and:
- connect output stream transformations to each other.
-- Connect an output stream transformation to an output stream, -- returning a new output stream (~>) :: Monad m => (a -> Producer b m ()) -> (b -> Effect m ()) -> (a -> Effect m ()) -- Connect two output stream transformations together, -- returning a new output stream transformation (~>) :: Monad m => (a -> Producer b m ()) -> (b -> Producer c m ()) -> (a -> Producer c m ())In fact, you will see these specialized type signatures in the documentation for (>~) for (~>). Hopefully, the analogy to input streams and output streams will help you see those types in a new light.
Categories
People familiar with pipes know that await/read and (>~) form a category, so we get three equations for free from the three category laws:
-- Reading something = calling it read >~ f = f -- `read` is the identity input stream transformation f >~ read -- It doesn't matter how you group sinks/transformations (f >~ g) >~ h = f >~ (g >~ h)Dually, yield/write and (~>) form a category, so we get another three equations for free:
-- Writing something = calling it f >~ write = f -- `write` is the identity output stream transformation write >~ f -- It doesn't matter how you group sources/transformations (f ~> g) ~> h = f ~> (g ~> h)Those are really nice properties to use when equationally reasoning about handles and as we've already seen they allow you to simplify your code in unexpected ways.
Bidirectionality
But wait, there's more! pipes actually has a fully bidirectional core in the Pipes.Core module. You can generalize everything above to use this bidirectional interface with surprising results. For example, in the bidirectional version of pipes-based streams, you can have streams that both accept input and produce output upon each step:
inputOutputStream :: a -> Effect m bYou also get the following generalized versions of all commands and operators:
- request generalizes read, accepting an argument for upstream
- respond generalizes write, returning a value from downstream
- (\>\) generalizes (>~)
- (/>/) generalizes (~>)
Conclusion
pipes provides an elegant way to model Handle-like abstractions in a very compact way that requires no new extensions to the API. I will soon provide a pipes-streams library that implements utilities along these lines to further solidify these concepts.
One thing I hope people take away from this is that a Handle-based API need not be deeply intertwined with IO. All the commands and connectives from pipes-based handles are completely independent of the base monad and you can freely generalize the pipes notion of handles to pure monads.
No comments:
Post a Comment