Wednesday, October 9, 2013

How to reimplement the conduit parsing API in 50 lines of pipes code

Michael's recent blog posts highlighted several deficiences of pipes-based parsing. Particularly, he emphasized that it was incompatible with idioms from the core pipes library, and I agree with that assessment. Programming with pipes-parse is a different beast from programming with vanilla pipes and pipes-parse idioms more closely resemble conduit idioms.

Several comments in response to Michael's post asked if one could internally implement conduit on top of pipes, in order to simplify conduit internals. This post answers half that question by showing how to implement conduit sans finalization on top pipes using the tools from pipes-parse.

This code is short, but very dense, so I will walk through the implementation step-by-step, explaining the underlying pipes-parse idioms that I'm using to reconstruct conduit functionality. If you just want to skip to the complete code then go straight to the Appendix at the end of this post.

The Conduit Type

The way you internally represent a conduit-like parser using pipes is the following data type:

import Pipes
import Control.Monad.Trans.State.Strict

newtype ConduitM i o m r = ConduitM
    { unConduitM ::
         forall x . Producer o (StateT (Producer i m x) m) r }

To recap, a ConduitM i o m r has an input of type i and an output of type o, and the output is distinct from the return value, just like pipes.

I model this as a Producer of os that reads from and writes to a Producer of is. The Producer of is is our conduit's upstream input end. awaiting a value will pop an elements off of this Producer and adding back a leftover pushes an element back onto this Producer.

This representation differs from conduit's implementation in one important way: it makes no distinction between leftovers and future input. Both are stored together within the inner Producer. This is one neat trick of reifying future input as a Producer: you now have an obvious place to store leftovers.


The next step is to implement await, which is just a thin wrapper around draw from pipes-parse:

type Consumer i m r = forall o . ConduitM i o m r

await :: (Monad m) => Consumer i m (Maybe i)
await = ConduitM $ lift $ liftM f draw
    f (Left  _) = Nothing
    f (Right r) = Just r

However, this doesn't explain what draw is actually doing, so let's refer to its implementation:

draw = do
    p <- get                 -- Retrieve our source of input
    x <- lift (next p)       -- Attempt to pop one element off
    case x of
        Right (a, p') -> do  -- Success: bind the element
            put p'           -- Save the remainder
            return (Right a)
        Left   r      -> do  -- Failure: No more input
            put (return r)
            return (Left  r)

If you are more comfortable with StateT you might prefer the following less verbose form which inlines all the state passing:

draw = StateT $ \p -> do
    x <- next p
    return $ case x of
        Right (a, p') -> (Right a, p'      )
        Left      r   -> (Left  r, return r)

If you think of a Producer a m () as isomorphic to ListT m a, then next is the equivalent of uncons for ListT.

Similarly, we can add elements back to the producer, using leftover, which is just a thin wrapper around unDraw from pipes-parse:

leftover :: (Monad m) => i -> ConduitM i o m ()
leftover i = ConduitM $ lift $ unDraw i

unDraw has a simple implementation:

unDraw a = modify (Pipes.yield a >>)

It just prepends a yield statement onto the Producer. This is the equivalent of cons for ListT.

What about yield? Well, that's exactly the same:

yield :: (Monad m) => o -> ConduitM i o m ()
yield o = ConduitM (Pipes.yield o)


Now for the interesting part: conduit composition, which has the following type:

(=$=) :: (Monad m)
      => ConduitM a b m ()
      -> ConduitM b c m r
      -> ConduitM a c m r

If we were to replace these ConduitMs with the underlying pipe type, we would get:

(=$=) :: (Monad m)
      => forall x . Producer b (StateT (Producer a m x) m) ()
      -> forall y . Producer c (StateT (Producer b m y) m) r
      -> forall z . Producer c (StateT (Producer a m z) m) r

How do we even begin to approach that?

The key is the runStateP function from Pipes.Lift, which has the following (simplified) type:

    :: s -> Producer a (StateT s m) r -> Producer a m (r, s)

Compare this with the type for runStateT:

runStateT :: StateT s m r -> s -> m (r, s)

runStateP differs from runStateT in two ways:

  • runStateP unwraps a StateT buried inside of a pipe

  • runStateP takes arguments in the opposite order from runStateT

runStateP takes care to thread state as it wraps the internal StateT so it behaves just like runStateT. Once you familiarize yourself with how runStateP works, the solution is a matter of type-chasing. In fact, what you will discover is that if you restrict yourself to runStateP, there is only one solution that type-checks.

We begin with two arguments two our operator:

ConduitM pL =$= ConduitM pR = ConduitM $ ...

pL has type:

pL :: forall x . Producer b (StateT (Producer a m x) m) ()

Let's look at what type we get when we unwrap pL using runStateP:

    :: (Monad m)
    => Producer a m x -> Producer b m ((), Producer a m x)
parseL as = runStateP as pL

This now looks just like a parser combinator. It takes an input stream of values of type a and generates an output stream of values of type b, returning unused input alongside the () return value. We're not interested in this () return value, so we'll use execStateP instead:

    :: (Monad m)
    => Producer a m x -> Producer b m (Producer a m x)
parseL as = execStateP as pL

Similarly, we'll convert pR to a parser:

    :: (Monad m)
    => Producer b m y -> Producer c m (r, Producer b m y)
parseR bs = runStateP bs pR

Now what's our goal? We're trying to build a ConduitM a c m r, which is equivalent to the following parser:

    :: (Monad m)
    => Producer a m z -> Producer c m (r, Producer a m z)

This means that we need to introduce a stream of as:

parse as = do
    -- as :: Producer a m x

We can now feed that stream to parseL

parse as = do
    -- as        :: Producer a m x
    -- parseL as :: Producer b m (Producer a m x)

We can then feed that to parseR. This works because parseR is universally quantified in y, which type-checks as Producer a m x:

parse as = do
    -- as  :: Producer a m x
    -- parseL as
    --     :: Producer b m (Producer a m x)
    -- parseR (parseL as)
    --     :: Producer c m (r, Producer b m (Producer a m x))

This is almost what we want. We just need to discard the unused stream of bs:

parse as = do
    (r, pp) <- parseR (parseL as)
    p'      <- lift $ drain pp
    return (r, p')
    drain p = runEffect (for p discard)

If we inline all of that logic, we get the following 5-line implementation of conduit composition:

ConduitM pL =$= ConduitM pR = ConduitM $
    stateP $ \as -> do
          (r, pp) <- runStateP (execStateP as pL) pR
          p'      <- lift $ drain pp
          return (r, p')

This gives a birds-eye view of how conduit composition works. When we compose two conduits, we:

  • Feed the input stream of as to the upstream conduit
  • Feed that to the downstream conduit
  • Discard all leftovers from their intermediate interface

Once we have this composition operator, the right and left fuse are just type-specializations (the same as in conduit):

type Conduit  i m o = ConduitM i  o    m ()
type Source     m o = ConduitM () o    m ()
type Sink     i m r = ConduitM i  Void m r

($=) :: (Monad m) => Source m a -> Conduit a m b -> Source m b
($=) = (=$=)

(=$) :: (Monad m) => Conduit a m b -> Sink b m c -> Sink a m c
(=$) = (=$=)

What about ($$)? That is even simpler:

empty :: (Monad m) => Producer () m r
empty = return () >~ cat  -- equivalent to "forever $ yield ()"

($$) :: (Monad m) => Source m a -> Sink a m b -> m b
ConduitM pL $$ ConduitM pR =
    evalStateT (runEffect pR) (evalStateP empty pL)

This implementation says at a high-level:

  • Feed an unused leftover stream to pL (unused because it's a Source)
  • Feed that to pR
  • There is no step 3


If that is composition, what is the identity? Why, it's just input from pipes-parse:

idP :: (Monad m) => ConduitM a a m ()
idP = ConduitM (void input)

Neat how that works out. This is equivalent in behavior to:

idP = do
    ma <- await
    case ma of
        Nothing -> return ()
        Just a  -> do
            yield a

Connect and Resume

Last but not least we need connect and resume. Like I said before, this will ignore finalization concerns, so I will only implement a variation on ($$+) that returns a new Source, rather than a ResumableSource (which is just a Source tagged with a finalizer).

    :: (Monad m)
    => Source m a -> Sink a m b -> m (b, Source m a)
ConduitM pL $$+ ConduitM pR = do
    (b, as) <- runEffect $ runStateP (execStateP empty pL) pR
    let as' = ConduitM $ stateP $ \p -> ((), p) <$ as
    return (b, as')

This says:

  • Feed an unused input stream to pL (it's a Source)
  • Feed that to pR
  • Discard pR's inexistent output (it's a Sink)
  • Create a new Source that also ignores its input stream


The purpose of this post is not to suggest that Michael necessarily should implement conduit in terms of pipes, especially since this does not contain finalization code, yet. Rather, I wanted to exhibit that pipes is a powerful tool that you can use to build other abstractions concisely and with less room for error.


The minimal test implementation is 50 lines of code, which I've included here:

{-# LANGUAGE RankNTypes #-}

import Control.Applicative ((<$))
import Control.Monad (liftM, void)
import Pipes hiding (await, yield, Consumer)
import qualified Pipes
import Pipes.Lift
import Pipes.Parse

newtype ConduitM i o m r = ConduitM
    { unConduitM :: forall x .
        Producer o (StateT (Producer i m x) m) r }

instance (Monad m) => Monad (ConduitM i o m) where
    return r = ConduitM (return r)
    m >>= f  = ConduitM $ unConduitM m >>= \r -> unConduitM (f r)

instance MonadTrans (ConduitM i o) where
    lift m = ConduitM (lift (lift m))

type Consumer i m r = forall o . ConduitM i  o    m r
type Source     m o =            ConduitM () o    m ()
type Sink     i m r =            ConduitM i  Void m r
type Conduit  i m o =            ConduitM i  o    m ()

await :: (Monad m) => Consumer i m (Maybe i)
await = ConduitM $ lift $ liftM f draw
    f (Left  _) = Nothing
    f (Right r) = Just r

yield :: (Monad m) => o -> ConduitM i o m ()
yield o = ConduitM (Pipes.yield o)

leftover :: (Monad m) => i -> ConduitM i o m ()
leftover i = ConduitM $ lift $ unDraw i

    :: (Monad m)
    => Conduit a m b
    -> ConduitM b c m r
    -> ConduitM a c m r
ConduitM pL =$= ConduitM pR = ConduitM $
    stateP $ \p -> do
          (r, pp) <- runStateP (execStateP p pL) pR
          p'      <- lift $ drain pp
          return (r, p')

drain :: (Monad m) => Producer a m r -> m r
drain p = runEffect (for p discard)

($=) :: (Monad m) => Source m a -> Conduit a m b -> Source m b
($=) = (=$=)

(=$) :: (Monad m) => Conduit a m b -> Sink b m c -> Sink a m c
(=$) = (=$=)

empty :: (Monad m) => Producer () m r
empty = return () >~ cat

($$) :: (Monad m) => Source m a -> Sink a m b -> m b
ConduitM pL $$ ConduitM pR =
    evalStateT (runEffect pR) (evalStateP empty pL)

idP :: (Monad m) => ConduitM a a m ()
idP = ConduitM (void input)

    :: (Monad m)
    => Source m a -> Sink a m b -> m (b, Source m a)
ConduitM pL $$+ ConduitM pR = do
    (b, pa) <- runEffect $ runStateP (execStateP empty pL) pR
    let p' = ConduitM $ stateP $ \p -> ((), p) <$ pa
    return (b, p')


  1. Shouldn't
    :: s -> Producer a m (StateT s m) r -> Producer a m (r, s)
    :: s -> Producer a (StateT s m) r -> Producer a m (r, s)
