Michael's recent blog posts highlighted several deficiences of `pipes`

-based parsing. Particularly, he emphasized that it was incompatible with idioms from the core `pipes`

library, and I agree with that assessment. Programming with `pipes-parse`

is a different beast from programming with vanilla `pipes`

and `pipes-parse`

idioms more closely resemble `conduit`

idioms.

Several comments in response to Michael's post asked if one could internally implement `conduit`

on top of `pipes`

, in order to simplify `conduit`

internals. This post answers half that question by showing how to implement conduit sans finalization on top `pipes`

using the tools from `pipes-parse`

.

This code is short, but very dense, so I will walk through the implementation step-by-step, explaining the underlying `pipes-parse`

idioms that I'm using to reconstruct conduit functionality. If you just want to skip to the complete code then go straight to the Appendix at the end of this post.

#### The Conduit Type

The way you internally represent a conduit-like parser using `pipes`

is the following data type:

```
import Pipes
import Control.Monad.Trans.State.Strict
newtype ConduitM i o m r = ConduitM
{ unConduitM ::
forall x . Producer o (StateT (Producer i m x) m) r }
```

To recap, a `ConduitM i o m r`

has an input of type `i`

and an output of type `o`

, and the output is distinct from the return value, just like `pipes`

.

I model this as a `Producer`

of `o`

s that reads from and writes to a `Producer`

of `i`

s. The `Producer`

of `i`

s is our conduit's upstream input end. `await`

ing a value will pop an elements off of this `Producer`

and adding back a `leftover`

pushes an element back onto this `Producer`

.

This representation differs from conduit's implementation in one important way: it makes no distinction between leftovers and future input. Both are stored together within the inner `Producer`

. This is one neat trick of reifying future input as a `Producer`

: you now have an obvious place to store leftovers.

#### Primitives

The next step is to implement `await`

, which is just a thin wrapper around `draw`

from `pipes-parse`

:

```
type Consumer i m r = forall o . ConduitM i o m r
await :: (Monad m) => Consumer i m (Maybe i)
await = ConduitM $ lift $ liftM f draw
where
f (Left _) = Nothing
f (Right r) = Just r
```

However, this doesn't explain what `draw`

is actually doing, so let's refer to its implementation:

```
draw = do
p <- get -- Retrieve our source of input
x <- lift (next p) -- Attempt to pop one element off
case x of
Right (a, p') -> do -- Success: bind the element
put p' -- Save the remainder
return (Right a)
Left r -> do -- Failure: No more input
put (return r)
return (Left r)
```

If you are more comfortable with `StateT`

you might prefer the following less verbose form which inlines all the state passing:

```
draw = StateT $ \p -> do
x <- next p
return $ case x of
Right (a, p') -> (Right a, p' )
Left r -> (Left r, return r)
```

If you think of a `Producer a m ()`

as isomorphic to `ListT m a`

, then `next`

is the equivalent of `uncons`

for `ListT`

.

Similarly, we can add elements back to the producer, using `leftover`

, which is just a thin wrapper around `unDraw`

from `pipes-parse`

:

```
leftover :: (Monad m) => i -> ConduitM i o m ()
leftover i = ConduitM $ lift $ unDraw i
```

`unDraw`

has a simple implementation:

`unDraw a = modify (Pipes.yield a >>)`

It just prepends a `yield`

statement onto the `Producer`

. This is the equivalent of `cons`

for `ListT`

.

What about `yield`

? Well, that's exactly the same:

```
yield :: (Monad m) => o -> ConduitM i o m ()
yield o = ConduitM (Pipes.yield o)
```

#### Composition

Now for the interesting part: conduit composition, which has the following type:

```
(=$=) :: (Monad m)
=> ConduitM a b m ()
-> ConduitM b c m r
-> ConduitM a c m r
```

If we were to replace these `ConduitM`

s with the underlying pipe type, we would get:

```
(=$=) :: (Monad m)
=> forall x . Producer b (StateT (Producer a m x) m) ()
-> forall y . Producer c (StateT (Producer b m y) m) r
-> forall z . Producer c (StateT (Producer a m z) m) r
```

How do we even begin to approach that?

The key is the `runStateP`

function from `Pipes.Lift`

, which has the following (simplified) type:

```
runStateP
:: s -> Producer a (StateT s m) r -> Producer a m (r, s)
```

Compare this with the type for `runStateT`

:

`runStateT :: StateT s m r -> s -> m (r, s)`

`runStateP`

differs from `runStateT`

in two ways:

`runStateP`

unwraps a`StateT`

buried inside of a pipe`runStateP`

takes arguments in the opposite order from`runStateT`

`runStateP`

takes care to thread state as it wraps the internal `StateT`

so it behaves just like `runStateT`

. Once you familiarize yourself with how `runStateP`

works, the solution is a matter of type-chasing. In fact, what you will discover is that if you restrict yourself to `runStateP`

, there is only one solution that type-checks.

We begin with two arguments two our operator:

`ConduitM pL =$= ConduitM pR = ConduitM $ ...`

`pL`

has type:

`pL :: forall x . Producer b (StateT (Producer a m x) m) ()`

Let's look at what type we get when we unwrap `pL`

using `runStateP`

:

```
parseL
:: (Monad m)
=> Producer a m x -> Producer b m ((), Producer a m x)
parseL as = runStateP as pL
```

This now looks just like a parser combinator. It takes an input stream of values of type `a`

and generates an output stream of values of type `b`

, returning unused input alongside the `()`

return value. We're not interested in this `()`

return value, so we'll use `execStateP`

instead:

```
parseL
:: (Monad m)
=> Producer a m x -> Producer b m (Producer a m x)
parseL as = execStateP as pL
```

Similarly, we'll convert `pR`

to a parser:

```
parseR
:: (Monad m)
=> Producer b m y -> Producer c m (r, Producer b m y)
parseR bs = runStateP bs pR
```

Now what's our goal? We're trying to build a `ConduitM a c m r`

, which is equivalent to the following parser:

```
parse
:: (Monad m)
=> Producer a m z -> Producer c m (r, Producer a m z)
```

This means that we need to introduce a stream of `as`

:

```
parse as = do
-- as :: Producer a m x
```

We can now feed that stream to `parseL`

```
parse as = do
-- as :: Producer a m x
-- parseL as :: Producer b m (Producer a m x)
```

We can then feed *that* to `parseR`

. This works because `parseR`

is universally quantified in `y`

, which type-checks as `Producer a m x`

:

```
parse as = do
-- as :: Producer a m x
-- parseL as
-- :: Producer b m (Producer a m x)
-- parseR (parseL as)
-- :: Producer c m (r, Producer b m (Producer a m x))
```

This is almost what we want. We just need to discard the unused stream of `b`

s:

```
parse as = do
(r, pp) <- parseR (parseL as)
p' <- lift $ drain pp
return (r, p')
where
drain p = runEffect (for p discard)
```

If we inline all of that logic, we get the following 5-line implementation of `conduit`

composition:

```
ConduitM pL =$= ConduitM pR = ConduitM $
stateP $ \as -> do
(r, pp) <- runStateP (execStateP as pL) pR
p' <- lift $ drain pp
return (r, p')
```

This gives a birds-eye view of how `conduit`

composition works. When we compose two conduits, we:

- Feed the input stream of
`as`

to the upstream conduit - Feed that to the downstream conduit
- Discard all leftovers from their intermediate interface

Once we have this composition operator, the right and left fuse are just type-specializations (the same as in `conduit`

):

```
type Conduit i m o = ConduitM i o m ()
type Source m o = ConduitM () o m ()
type Sink i m r = ConduitM i Void m r
($=) :: (Monad m) => Source m a -> Conduit a m b -> Source m b
($=) = (=$=)
(=$) :: (Monad m) => Conduit a m b -> Sink b m c -> Sink a m c
(=$) = (=$=)
```

What about `($$)`

? That is even simpler:

```
empty :: (Monad m) => Producer () m r
empty = return () >~ cat -- equivalent to "forever $ yield ()"
($$) :: (Monad m) => Source m a -> Sink a m b -> m b
ConduitM pL $$ ConduitM pR =
evalStateT (runEffect pR) (evalStateP empty pL)
```

This implementation says at a high-level:

- Feed an unused leftover stream to
`pL`

(unused because it's a`Source`

) - Feed that to
`pR`

- There is no step 3

#### Identity

If that is composition, what is the identity? Why, it's just `input`

from `pipes-parse`

:

```
idP :: (Monad m) => ConduitM a a m ()
idP = ConduitM (void input)
```

Neat how that works out. This is equivalent in behavior to:

```
idP = do
ma <- await
case ma of
Nothing -> return ()
Just a -> do
yield a
idP
```

#### Connect and Resume

Last but not least we need connect and resume. Like I said before, this will ignore finalization concerns, so I will only implement a variation on `($$+)`

that returns a new `Source`

, rather than a `ResumableSource`

(which is just a `Source`

tagged with a finalizer).

```
($$+)
:: (Monad m)
=> Source m a -> Sink a m b -> m (b, Source m a)
ConduitM pL $$+ ConduitM pR = do
(b, as) <- runEffect $ runStateP (execStateP empty pL) pR
let as' = ConduitM $ stateP $ \p -> ((), p) <$ as
return (b, as')
```

This says:

- Feed an unused input stream to
`pL`

(it's a`Source`

) - Feed that to
`pR`

- Discard
`pR`

's inexistent output (it's a`Sink`

) - Create a new
`Source`

that also ignores its input stream

#### Conclusion

The purpose of this post is not to suggest that Michael necessarily should implement `conduit`

in terms of `pipes`

, especially since this does not contain finalization code, yet. Rather, I wanted to exhibit that `pipes`

is a powerful tool that you can use to build other abstractions concisely and with less room for error.

#### Appendix

The minimal test implementation is 50 lines of code, which I've included here:

```
{-# LANGUAGE RankNTypes #-}
import Control.Applicative ((<$))
import Control.Monad (liftM, void)
import Pipes hiding (await, yield, Consumer)
import qualified Pipes
import Pipes.Lift
import Pipes.Parse
newtype ConduitM i o m r = ConduitM
{ unConduitM :: forall x .
Producer o (StateT (Producer i m x) m) r }
instance (Monad m) => Monad (ConduitM i o m) where
return r = ConduitM (return r)
m >>= f = ConduitM $ unConduitM m >>= \r -> unConduitM (f r)
instance MonadTrans (ConduitM i o) where
lift m = ConduitM (lift (lift m))
type Consumer i m r = forall o . ConduitM i o m r
type Source m o = ConduitM () o m ()
type Sink i m r = ConduitM i Void m r
type Conduit i m o = ConduitM i o m ()
await :: (Monad m) => Consumer i m (Maybe i)
await = ConduitM $ lift $ liftM f draw
where
f (Left _) = Nothing
f (Right r) = Just r
yield :: (Monad m) => o -> ConduitM i o m ()
yield o = ConduitM (Pipes.yield o)
leftover :: (Monad m) => i -> ConduitM i o m ()
leftover i = ConduitM $ lift $ unDraw i
(=$=)
:: (Monad m)
=> Conduit a m b
-> ConduitM b c m r
-> ConduitM a c m r
ConduitM pL =$= ConduitM pR = ConduitM $
stateP $ \p -> do
(r, pp) <- runStateP (execStateP p pL) pR
p' <- lift $ drain pp
return (r, p')
drain :: (Monad m) => Producer a m r -> m r
drain p = runEffect (for p discard)
($=) :: (Monad m) => Source m a -> Conduit a m b -> Source m b
($=) = (=$=)
(=$) :: (Monad m) => Conduit a m b -> Sink b m c -> Sink a m c
(=$) = (=$=)
empty :: (Monad m) => Producer () m r
empty = return () >~ cat
($$) :: (Monad m) => Source m a -> Sink a m b -> m b
ConduitM pL $$ ConduitM pR =
evalStateT (runEffect pR) (evalStateP empty pL)
idP :: (Monad m) => ConduitM a a m ()
idP = ConduitM (void input)
($$+)
:: (Monad m)
=> Source m a -> Sink a m b -> m (b, Source m a)
ConduitM pL $$+ ConduitM pR = do
(b, pa) <- runEffect $ runStateP (execStateP empty pL) pR
let p' = ConduitM $ stateP $ \p -> ((), p) <$ pa
return (b, p')
```

Shouldn't

ReplyDeleterunStateP

:: s -> Producer a m (StateT s m) r -> Producer a m (r, s)

be

runStateP

:: s -> Producer a (StateT s m) r -> Producer a m (r, s)

You're right. I fixed it. Thanks!

Delete