Michael's recent blog posts highlighted several deficiences of pipes
-based parsing. Particularly, he emphasized that it was incompatible with idioms from the core pipes
library, and I agree with that assessment. Programming with pipes-parse
is a different beast from programming with vanilla pipes
and pipes-parse
idioms more closely resemble conduit
idioms.
Several comments in response to Michael's post asked if one could internally implement conduit
on top of pipes
, in order to simplify conduit
internals. This post answers half that question by showing how to implement conduit sans finalization on top pipes
using the tools from pipes-parse
.
This code is short, but very dense, so I will walk through the implementation step-by-step, explaining the underlying pipes-parse
idioms that I'm using to reconstruct conduit functionality. If you just want to skip to the complete code then go straight to the Appendix at the end of this post.
The Conduit Type
The way you internally represent a conduit-like parser using pipes
is the following data type:
import Pipes
import Control.Monad.Trans.State.Strict
newtype ConduitM i o m r = ConduitM
{ unConduitM ::
forall x . Producer o (StateT (Producer i m x) m) r }
To recap, a ConduitM i o m r
has an input of type i
and an output of type o
, and the output is distinct from the return value, just like pipes
.
I model this as a Producer
of o
s that reads from and writes to a Producer
of i
s. The Producer
of i
s is our conduit's upstream input end. await
ing a value will pop an elements off of this Producer
and adding back a leftover
pushes an element back onto this Producer
.
This representation differs from conduit's implementation in one important way: it makes no distinction between leftovers and future input. Both are stored together within the inner Producer
. This is one neat trick of reifying future input as a Producer
: you now have an obvious place to store leftovers.
Primitives
The next step is to implement await
, which is just a thin wrapper around draw
from pipes-parse
:
type Consumer i m r = forall o . ConduitM i o m r
await :: (Monad m) => Consumer i m (Maybe i)
await = ConduitM $ lift $ liftM f draw
where
f (Left _) = Nothing
f (Right r) = Just r
However, this doesn't explain what draw
is actually doing, so let's refer to its implementation:
draw = do
p <- get -- Retrieve our source of input
x <- lift (next p) -- Attempt to pop one element off
case x of
Right (a, p') -> do -- Success: bind the element
put p' -- Save the remainder
return (Right a)
Left r -> do -- Failure: No more input
put (return r)
return (Left r)
If you are more comfortable with StateT
you might prefer the following less verbose form which inlines all the state passing:
draw = StateT $ \p -> do
x <- next p
return $ case x of
Right (a, p') -> (Right a, p' )
Left r -> (Left r, return r)
If you think of a Producer a m ()
as isomorphic to ListT m a
, then next
is the equivalent of uncons
for ListT
.
Similarly, we can add elements back to the producer, using leftover
, which is just a thin wrapper around unDraw
from pipes-parse
:
leftover :: (Monad m) => i -> ConduitM i o m ()
leftover i = ConduitM $ lift $ unDraw i
unDraw
has a simple implementation:
unDraw a = modify (Pipes.yield a >>)
It just prepends a yield
statement onto the Producer
. This is the equivalent of cons
for ListT
.
What about yield
? Well, that's exactly the same:
yield :: (Monad m) => o -> ConduitM i o m ()
yield o = ConduitM (Pipes.yield o)
Composition
Now for the interesting part: conduit composition, which has the following type:
(=$=) :: (Monad m)
=> ConduitM a b m ()
-> ConduitM b c m r
-> ConduitM a c m r
If we were to replace these ConduitM
s with the underlying pipe type, we would get:
(=$=) :: (Monad m)
=> forall x . Producer b (StateT (Producer a m x) m) ()
-> forall y . Producer c (StateT (Producer b m y) m) r
-> forall z . Producer c (StateT (Producer a m z) m) r
How do we even begin to approach that?
The key is the runStateP
function from Pipes.Lift
, which has the following (simplified) type:
runStateP
:: s -> Producer a (StateT s m) r -> Producer a m (r, s)
Compare this with the type for runStateT
:
runStateT :: StateT s m r -> s -> m (r, s)
runStateP
differs from runStateT
in two ways:
runStateP
unwraps aStateT
buried inside of a piperunStateP
takes arguments in the opposite order fromrunStateT
runStateP
takes care to thread state as it wraps the internal StateT
so it behaves just like runStateT
. Once you familiarize yourself with how runStateP
works, the solution is a matter of type-chasing. In fact, what you will discover is that if you restrict yourself to runStateP
, there is only one solution that type-checks.
We begin with two arguments two our operator:
ConduitM pL =$= ConduitM pR = ConduitM $ ...
pL
has type:
pL :: forall x . Producer b (StateT (Producer a m x) m) ()
Let's look at what type we get when we unwrap pL
using runStateP
:
parseL
:: (Monad m)
=> Producer a m x -> Producer b m ((), Producer a m x)
parseL as = runStateP as pL
This now looks just like a parser combinator. It takes an input stream of values of type a
and generates an output stream of values of type b
, returning unused input alongside the ()
return value. We're not interested in this ()
return value, so we'll use execStateP
instead:
parseL
:: (Monad m)
=> Producer a m x -> Producer b m (Producer a m x)
parseL as = execStateP as pL
Similarly, we'll convert pR
to a parser:
parseR
:: (Monad m)
=> Producer b m y -> Producer c m (r, Producer b m y)
parseR bs = runStateP bs pR
Now what's our goal? We're trying to build a ConduitM a c m r
, which is equivalent to the following parser:
parse
:: (Monad m)
=> Producer a m z -> Producer c m (r, Producer a m z)
This means that we need to introduce a stream of as
:
parse as = do
-- as :: Producer a m x
We can now feed that stream to parseL
parse as = do
-- as :: Producer a m x
-- parseL as :: Producer b m (Producer a m x)
We can then feed that to parseR
. This works because parseR
is universally quantified in y
, which type-checks as Producer a m x
:
parse as = do
-- as :: Producer a m x
-- parseL as
-- :: Producer b m (Producer a m x)
-- parseR (parseL as)
-- :: Producer c m (r, Producer b m (Producer a m x))
This is almost what we want. We just need to discard the unused stream of b
s:
parse as = do
(r, pp) <- parseR (parseL as)
p' <- lift $ drain pp
return (r, p')
where
drain p = runEffect (for p discard)
If we inline all of that logic, we get the following 5-line implementation of conduit
composition:
ConduitM pL =$= ConduitM pR = ConduitM $
stateP $ \as -> do
(r, pp) <- runStateP (execStateP as pL) pR
p' <- lift $ drain pp
return (r, p')
This gives a birds-eye view of how conduit
composition works. When we compose two conduits, we:
- Feed the input stream of
as
to the upstream conduit - Feed that to the downstream conduit
- Discard all leftovers from their intermediate interface
Once we have this composition operator, the right and left fuse are just type-specializations (the same as in conduit
):
type Conduit i m o = ConduitM i o m ()
type Source m o = ConduitM () o m ()
type Sink i m r = ConduitM i Void m r
($=) :: (Monad m) => Source m a -> Conduit a m b -> Source m b
($=) = (=$=)
(=$) :: (Monad m) => Conduit a m b -> Sink b m c -> Sink a m c
(=$) = (=$=)
What about ($$)
? That is even simpler:
empty :: (Monad m) => Producer () m r
empty = return () >~ cat -- equivalent to "forever $ yield ()"
($$) :: (Monad m) => Source m a -> Sink a m b -> m b
ConduitM pL $$ ConduitM pR =
evalStateT (runEffect pR) (evalStateP empty pL)
This implementation says at a high-level:
- Feed an unused leftover stream to
pL
(unused because it's aSource
) - Feed that to
pR
- There is no step 3
Identity
If that is composition, what is the identity? Why, it's just input
from pipes-parse
:
idP :: (Monad m) => ConduitM a a m ()
idP = ConduitM (void input)
Neat how that works out. This is equivalent in behavior to:
idP = do
ma <- await
case ma of
Nothing -> return ()
Just a -> do
yield a
idP
Connect and Resume
Last but not least we need connect and resume. Like I said before, this will ignore finalization concerns, so I will only implement a variation on ($$+)
that returns a new Source
, rather than a ResumableSource
(which is just a Source
tagged with a finalizer).
($$+)
:: (Monad m)
=> Source m a -> Sink a m b -> m (b, Source m a)
ConduitM pL $$+ ConduitM pR = do
(b, as) <- runEffect $ runStateP (execStateP empty pL) pR
let as' = ConduitM $ stateP $ \p -> ((), p) <$ as
return (b, as')
This says:
- Feed an unused input stream to
pL
(it's aSource
) - Feed that to
pR
- Discard
pR
's inexistent output (it's aSink
) - Create a new
Source
that also ignores its input stream
Conclusion
The purpose of this post is not to suggest that Michael necessarily should implement conduit
in terms of pipes
, especially since this does not contain finalization code, yet. Rather, I wanted to exhibit that pipes
is a powerful tool that you can use to build other abstractions concisely and with less room for error.
Appendix
The minimal test implementation is 50 lines of code, which I've included here:
{-# LANGUAGE RankNTypes #-}
import Control.Applicative ((<$))
import Control.Monad (liftM, void)
import Pipes hiding (await, yield, Consumer)
import qualified Pipes
import Pipes.Lift
import Pipes.Parse
newtype ConduitM i o m r = ConduitM
{ unConduitM :: forall x .
Producer o (StateT (Producer i m x) m) r }
instance (Monad m) => Monad (ConduitM i o m) where
return r = ConduitM (return r)
m >>= f = ConduitM $ unConduitM m >>= \r -> unConduitM (f r)
instance MonadTrans (ConduitM i o) where
lift m = ConduitM (lift (lift m))
type Consumer i m r = forall o . ConduitM i o m r
type Source m o = ConduitM () o m ()
type Sink i m r = ConduitM i Void m r
type Conduit i m o = ConduitM i o m ()
await :: (Monad m) => Consumer i m (Maybe i)
await = ConduitM $ lift $ liftM f draw
where
f (Left _) = Nothing
f (Right r) = Just r
yield :: (Monad m) => o -> ConduitM i o m ()
yield o = ConduitM (Pipes.yield o)
leftover :: (Monad m) => i -> ConduitM i o m ()
leftover i = ConduitM $ lift $ unDraw i
(=$=)
:: (Monad m)
=> Conduit a m b
-> ConduitM b c m r
-> ConduitM a c m r
ConduitM pL =$= ConduitM pR = ConduitM $
stateP $ \p -> do
(r, pp) <- runStateP (execStateP p pL) pR
p' <- lift $ drain pp
return (r, p')
drain :: (Monad m) => Producer a m r -> m r
drain p = runEffect (for p discard)
($=) :: (Monad m) => Source m a -> Conduit a m b -> Source m b
($=) = (=$=)
(=$) :: (Monad m) => Conduit a m b -> Sink b m c -> Sink a m c
(=$) = (=$=)
empty :: (Monad m) => Producer () m r
empty = return () >~ cat
($$) :: (Monad m) => Source m a -> Sink a m b -> m b
ConduitM pL $$ ConduitM pR =
evalStateT (runEffect pR) (evalStateP empty pL)
idP :: (Monad m) => ConduitM a a m ()
idP = ConduitM (void input)
($$+)
:: (Monad m)
=> Source m a -> Sink a m b -> m (b, Source m a)
ConduitM pL $$+ ConduitM pR = do
(b, pa) <- runEffect $ runStateP (execStateP empty pL) pR
let p' = ConduitM $ stateP $ \p -> ((), p) <$ pa
return (b, p')
Shouldn't
ReplyDeleterunStateP
:: s -> Producer a m (StateT s m) r -> Producer a m (r, s)
be
runStateP
:: s -> Producer a (StateT s m) r -> Producer a m (r, s)
You're right. I fixed it. Thanks!
Delete