Michael's recent blog posts highlighted several deficiences of pipes-based parsing. Particularly, he emphasized that it was incompatible with idioms from the core pipes library, and I agree with that assessment. Programming with pipes-parse is a different beast from programming with vanilla pipes and pipes-parse idioms more closely resemble conduit idioms.
Several comments in response to Michael's post asked if one could internally implement conduit on top of pipes, in order to simplify conduit internals. This post answers half that question by showing how to implement conduit sans finalization on top pipes using the tools from pipes-parse.
This code is short, but very dense, so I will walk through the implementation step-by-step, explaining the underlying pipes-parse idioms that I'm using to reconstruct conduit functionality. If you just want to skip to the complete code then go straight to the Appendix at the end of this post.
The Conduit Type
The way you internally represent a conduit-like parser using pipes is the following data type:
import Pipes
import Control.Monad.Trans.State.Strict
newtype ConduitM i o m r = ConduitM
{ unConduitM ::
forall x . Producer o (StateT (Producer i m x) m) r }
To recap, a ConduitM i o m r has an input of type i and an output of type o, and the output is distinct from the return value, just like pipes.
I model this as a Producer of os that reads from and writes to a Producer of is. The Producer of is is our conduit's upstream input end. awaiting a value will pop an elements off of this Producer and adding back a leftover pushes an element back onto this Producer.
This representation differs from conduit's implementation in one important way: it makes no distinction between leftovers and future input. Both are stored together within the inner Producer. This is one neat trick of reifying future input as a Producer: you now have an obvious place to store leftovers.
Primitives
The next step is to implement await, which is just a thin wrapper around draw from pipes-parse:
type Consumer i m r = forall o . ConduitM i o m r
await :: (Monad m) => Consumer i m (Maybe i)
await = ConduitM $ lift $ liftM f draw
where
f (Left _) = Nothing
f (Right r) = Just r
However, this doesn't explain what draw is actually doing, so let's refer to its implementation:
draw = do
p <- get -- Retrieve our source of input
x <- lift (next p) -- Attempt to pop one element off
case x of
Right (a, p') -> do -- Success: bind the element
put p' -- Save the remainder
return (Right a)
Left r -> do -- Failure: No more input
put (return r)
return (Left r)
If you are more comfortable with StateT you might prefer the following less verbose form which inlines all the state passing:
draw = StateT $ \p -> do
x <- next p
return $ case x of
Right (a, p') -> (Right a, p' )
Left r -> (Left r, return r)
If you think of a Producer a m () as isomorphic to ListT m a, then next is the equivalent of uncons for ListT.
Similarly, we can add elements back to the producer, using leftover, which is just a thin wrapper around unDraw from pipes-parse:
leftover :: (Monad m) => i -> ConduitM i o m ()
leftover i = ConduitM $ lift $ unDraw i
unDraw has a simple implementation:
unDraw a = modify (Pipes.yield a >>)
It just prepends a yield statement onto the Producer. This is the equivalent of cons for ListT.
What about yield? Well, that's exactly the same:
yield :: (Monad m) => o -> ConduitM i o m ()
yield o = ConduitM (Pipes.yield o)
Composition
Now for the interesting part: conduit composition, which has the following type:
(=$=) :: (Monad m)
=> ConduitM a b m ()
-> ConduitM b c m r
-> ConduitM a c m r
If we were to replace these ConduitMs with the underlying pipe type, we would get:
(=$=) :: (Monad m)
=> forall x . Producer b (StateT (Producer a m x) m) ()
-> forall y . Producer c (StateT (Producer b m y) m) r
-> forall z . Producer c (StateT (Producer a m z) m) r
How do we even begin to approach that?
The key is the runStateP function from Pipes.Lift, which has the following (simplified) type:
runStateP
:: s -> Producer a (StateT s m) r -> Producer a m (r, s)
Compare this with the type for runStateT:
runStateT :: StateT s m r -> s -> m (r, s)
runStateP differs from runStateT in two ways:
runStatePunwraps aStateTburied inside of a piperunStatePtakes arguments in the opposite order fromrunStateT
runStateP takes care to thread state as it wraps the internal StateT so it behaves just like runStateT. Once you familiarize yourself with how runStateP works, the solution is a matter of type-chasing. In fact, what you will discover is that if you restrict yourself to runStateP, there is only one solution that type-checks.
We begin with two arguments two our operator:
ConduitM pL =$= ConduitM pR = ConduitM $ ...
pL has type:
pL :: forall x . Producer b (StateT (Producer a m x) m) ()
Let's look at what type we get when we unwrap pL using runStateP:
parseL
:: (Monad m)
=> Producer a m x -> Producer b m ((), Producer a m x)
parseL as = runStateP as pL
This now looks just like a parser combinator. It takes an input stream of values of type a and generates an output stream of values of type b, returning unused input alongside the () return value. We're not interested in this () return value, so we'll use execStateP instead:
parseL
:: (Monad m)
=> Producer a m x -> Producer b m (Producer a m x)
parseL as = execStateP as pL
Similarly, we'll convert pR to a parser:
parseR
:: (Monad m)
=> Producer b m y -> Producer c m (r, Producer b m y)
parseR bs = runStateP bs pR
Now what's our goal? We're trying to build a ConduitM a c m r, which is equivalent to the following parser:
parse
:: (Monad m)
=> Producer a m z -> Producer c m (r, Producer a m z)
This means that we need to introduce a stream of as:
parse as = do
-- as :: Producer a m x
We can now feed that stream to parseL
parse as = do
-- as :: Producer a m x
-- parseL as :: Producer b m (Producer a m x)
We can then feed that to parseR. This works because parseR is universally quantified in y, which type-checks as Producer a m x:
parse as = do
-- as :: Producer a m x
-- parseL as
-- :: Producer b m (Producer a m x)
-- parseR (parseL as)
-- :: Producer c m (r, Producer b m (Producer a m x))
This is almost what we want. We just need to discard the unused stream of bs:
parse as = do
(r, pp) <- parseR (parseL as)
p' <- lift $ drain pp
return (r, p')
where
drain p = runEffect (for p discard)
If we inline all of that logic, we get the following 5-line implementation of conduit composition:
ConduitM pL =$= ConduitM pR = ConduitM $
stateP $ \as -> do
(r, pp) <- runStateP (execStateP as pL) pR
p' <- lift $ drain pp
return (r, p')
This gives a birds-eye view of how conduit composition works. When we compose two conduits, we:
- Feed the input stream of
asto the upstream conduit - Feed that to the downstream conduit
- Discard all leftovers from their intermediate interface
Once we have this composition operator, the right and left fuse are just type-specializations (the same as in conduit):
type Conduit i m o = ConduitM i o m ()
type Source m o = ConduitM () o m ()
type Sink i m r = ConduitM i Void m r
($=) :: (Monad m) => Source m a -> Conduit a m b -> Source m b
($=) = (=$=)
(=$) :: (Monad m) => Conduit a m b -> Sink b m c -> Sink a m c
(=$) = (=$=)
What about ($$)? That is even simpler:
empty :: (Monad m) => Producer () m r
empty = return () >~ cat -- equivalent to "forever $ yield ()"
($$) :: (Monad m) => Source m a -> Sink a m b -> m b
ConduitM pL $$ ConduitM pR =
evalStateT (runEffect pR) (evalStateP empty pL)
This implementation says at a high-level:
- Feed an unused leftover stream to
pL(unused because it's aSource) - Feed that to
pR - There is no step 3
Identity
If that is composition, what is the identity? Why, it's just input from pipes-parse:
idP :: (Monad m) => ConduitM a a m ()
idP = ConduitM (void input)
Neat how that works out. This is equivalent in behavior to:
idP = do
ma <- await
case ma of
Nothing -> return ()
Just a -> do
yield a
idP
Connect and Resume
Last but not least we need connect and resume. Like I said before, this will ignore finalization concerns, so I will only implement a variation on ($$+) that returns a new Source, rather than a ResumableSource (which is just a Source tagged with a finalizer).
($$+)
:: (Monad m)
=> Source m a -> Sink a m b -> m (b, Source m a)
ConduitM pL $$+ ConduitM pR = do
(b, as) <- runEffect $ runStateP (execStateP empty pL) pR
let as' = ConduitM $ stateP $ \p -> ((), p) <$ as
return (b, as')
This says:
- Feed an unused input stream to
pL(it's aSource) - Feed that to
pR - Discard
pR's inexistent output (it's aSink) - Create a new
Sourcethat also ignores its input stream
Conclusion
The purpose of this post is not to suggest that Michael necessarily should implement conduit in terms of pipes, especially since this does not contain finalization code, yet. Rather, I wanted to exhibit that pipes is a powerful tool that you can use to build other abstractions concisely and with less room for error.
Appendix
The minimal test implementation is 50 lines of code, which I've included here:
{-# LANGUAGE RankNTypes #-}
import Control.Applicative ((<$))
import Control.Monad (liftM, void)
import Pipes hiding (await, yield, Consumer)
import qualified Pipes
import Pipes.Lift
import Pipes.Parse
newtype ConduitM i o m r = ConduitM
{ unConduitM :: forall x .
Producer o (StateT (Producer i m x) m) r }
instance (Monad m) => Monad (ConduitM i o m) where
return r = ConduitM (return r)
m >>= f = ConduitM $ unConduitM m >>= \r -> unConduitM (f r)
instance MonadTrans (ConduitM i o) where
lift m = ConduitM (lift (lift m))
type Consumer i m r = forall o . ConduitM i o m r
type Source m o = ConduitM () o m ()
type Sink i m r = ConduitM i Void m r
type Conduit i m o = ConduitM i o m ()
await :: (Monad m) => Consumer i m (Maybe i)
await = ConduitM $ lift $ liftM f draw
where
f (Left _) = Nothing
f (Right r) = Just r
yield :: (Monad m) => o -> ConduitM i o m ()
yield o = ConduitM (Pipes.yield o)
leftover :: (Monad m) => i -> ConduitM i o m ()
leftover i = ConduitM $ lift $ unDraw i
(=$=)
:: (Monad m)
=> Conduit a m b
-> ConduitM b c m r
-> ConduitM a c m r
ConduitM pL =$= ConduitM pR = ConduitM $
stateP $ \p -> do
(r, pp) <- runStateP (execStateP p pL) pR
p' <- lift $ drain pp
return (r, p')
drain :: (Monad m) => Producer a m r -> m r
drain p = runEffect (for p discard)
($=) :: (Monad m) => Source m a -> Conduit a m b -> Source m b
($=) = (=$=)
(=$) :: (Monad m) => Conduit a m b -> Sink b m c -> Sink a m c
(=$) = (=$=)
empty :: (Monad m) => Producer () m r
empty = return () >~ cat
($$) :: (Monad m) => Source m a -> Sink a m b -> m b
ConduitM pL $$ ConduitM pR =
evalStateT (runEffect pR) (evalStateP empty pL)
idP :: (Monad m) => ConduitM a a m ()
idP = ConduitM (void input)
($$+)
:: (Monad m)
=> Source m a -> Sink a m b -> m (b, Source m a)
ConduitM pL $$+ ConduitM pR = do
(b, pa) <- runEffect $ runStateP (execStateP empty pL) pR
let p' = ConduitM $ stateP $ \p -> ((), p) <$ pa
return (b, p')
Shouldn't
ReplyDeleterunStateP
:: s -> Producer a m (StateT s m) r -> Producer a m (r, s)
be
runStateP
:: s -> Producer a (StateT s m) r -> Producer a m (r, s)
You're right. I fixed it. Thanks!
Delete