#### Call Stacks

I'll begin with the call-stack metaphor for pipes, which goes a very long way towards building an intuition for what the pipes category represents. First, imagine that every pipe in a pipeline represents a frame in a call-stack:

p1 <+< p2 <+< p3 <+< p4 Frame 0 Frame 1 Frame 2 Frame 3The most downstream pipe behaves like the top-level frame. Every time a pipe calls

`await`, it pushes a frame onto the call stack:

p1 = do ... x <- await -- push p2 onto stack ...In this case, if

`p1`awaits on

`p2`, it transfers control to

`p2`until

`p2`yields a value back.

`yield`is like the adjoint of

`await`, since

`yield`pops a frame off the stack:

p2 = do ... yield x -- yield result and pop p2 off ...Now, pipes differ from the traditional stack metaphor in an important way: each frame does not get to choose which functions to call. Instead they just simply call an anonymous upstream pipe and that upstream pipe gets to choose how to satisfy the request. So if

`p1`makes three

`await`requests:

p1 = do x <- await y <- await z <- await yield (x, y, z)... then it is completely up to upstream (i.e.

`p2`), to decide what functions to substitute in place of those

`awaits`. For example, we could use:

function1 = do yield 1 function2 = do lift $ putStrLn "Hello" yield 2 function3 = do x <- await yield 3 p2 = do function1 function2 function3Note that these functions might also call their own functions from the next frame upstream, such as how

`function3`adds yet another frame to the stack by awaiting input from

`p3`.

`p3`would then get to decide what to substitute in place of

`function3`'s

`await`call.

Notice that

`function1`,

`function2`, and

`function3`have one thing in common: they all end in a

`yield`. In fact, any pipe can be thought of as a succession of functions terminated by

`yield`statements:

example = do x <- await -- Function #1 yield (x + 1) -- Function #1 yield 2 -- Function #2 lift $ print 4 -- Function #3 yield 3 -- Function #3 yield 4 -- Function #4Unlike a traditional call stack, every function at a specific frame must have the same return value. This is not strictly necessary for the category instance and it is conceivably possible to implement a variation on pipes where the frame boundaries communicate heterogeneous inputs instead of homogeneous streams. For an example of this idea, see the Coroutine package. For now, though, we'll just leave frame boundaries as homogeneous streams.

So we can immediately infer some invariants based on this call stack intuition. For example, every pipe downstream of our currently active pipe must be blocked on an await:

Active pipe (await >>= ...) <+< (await >>= ...) <+< p* <+< ...This is analogous to every frame above us in the call stack waiting for the result of the frame below it. There is also a second adjoint invariant: Every function downstream of us can be thought of as having "just yielded" a value before it was popped off the stack:

Active pipe "yielded x" "yielded y" ... <+< p* <+< p' <+< p''We can now use this intuition to interpret the pipe category as simply defining the category of call stacks. Composition just layers two call stacks together and the identity pipe is the "dummy" stack frame that just passes all function requests to the frame below and passes every function result to the frame above it:

idP = forever $ await >>= yieldHowever, the stack intuition also implies another very important invariant: pipes cannot communicate upstream. The flow of information is unidirectional and always goes towards the top-level function (i.e. the most downstream pipe). I've seen several attempts to implement pipes that allow bidirectional communication (including many of my own attempts), but they always inevitably fail the category laws. Even the tiniest, most limited, form of information transfer upstream seems to break the laws. I cannot rigorously prove my intuition, but it always seems to be true.

#### Unidirectional Termination Handling

This unidirectionality seems problematic at first, because if a pipe terminates it needs to finalize every pipe upstream of it, but there is no way to communicate upstream. Even attempts to artificially restore control upstream by using

`await`gratuitously still fail the category laws. This includes the

`Strict`"category" released in v1.0, which, due to an oversight of mine, does not pass one of the identity laws:

p <-< idP /= pTherefore, it is not a true category (and it will be removed from v1.1).

However, instead of fighting the unidirectionality, we can "go with the flow", and instead pass finalizers downstream to be called by any pipe that terminates downstream. This requires no modification to the pipe type; instead we simply include the finalizer in the output type:

(Monad m) => Pipe a (m (), b) m rThis approach also naturally follows from the call stack analogy. Every time a pipe yields a value, it is effectively relinquishing control by being popped off of the call stack, and might never be called again. Therefore, it makes the most sense to register the finalizer at the point of the

`yield`where it loses control, potentially irreversibly. Then, all we have to do is that ensure that each yield contains the most up-to-date finalizer for the current pipe.

However, there's an obvious problem with this approach: Every pipe would need to keep track of the latest finalizer delivered from upstream and tack that onto to its own finalizer that it delivered downstream. This is both tedious and error-prone, though, and if you use another library's pipe that mismanaged finalizer passing you'd be in trouble. So let's create a convenient combinator to handle this forwarding for us:

heap :: (Monad m) => Pipe a (m (), b) m r -> Pipe (m (), a) (m (), b) m r heap p = go (return ()) p where go h p = case p of Pure r -> lift h >> Pure r M m -> M $ liftM (go h) m Await f -> Await $ \(h', a) -> go h' (f a) Yield ((h', b), p') -> Yield ((h' >> h, b), go h p') -- note: I actually use "forall r . r -> m r" instead of "m ()" -- because I'm not completely convinced "m ()" is a -- monoid, but I'll keep "m ()" right now for simplicity.In short, the

`heap`combinator wraps our pipe and takes care of forwarding upstream finalizers for us. It takes the latest finalizer received from upstream and

`mappend`s it to any outgoing finalizers that we yield, defaulting to

`mempty`if it hasn't received any yet. Also, it automatically calls every upstream finalizer for us when our pipe terminates. Convenient!

I call this the

`heap`combinator for two reasons. First, if everything downstream of the current pipe is the call stack, then everything upstream can be thought of as the heap (by analogy to memory layout). Second, it represents a tally of unfinished allocations just like a heap normally would.

Now here's where things start getting interesting:

`heap`defines a category, which I like to call the

`Heap`category:

p1 <|< p2 = heap p1 <+< p2 idH = forever $ await >>= \a -> yield (return (), a) (p1 <|< p2) <|< p3 = p1 <|< (p2 <|< p3) p <|< idH = p idH <|< p = pI've checked all the laws for this, so you can rest assured that you won't get a repeat of the

`Strict`non-category. Also, this formulation nicely permits convenient syntax for yielding no finalizer:

yieldH x = yield (return (), x)... and a convenience combinator for block-level finalizer registration that will add an additional finalizer to any yields within the block it wraps:

catchH h p = forever (await >>= \x -> yield (h, x)) <|< p enumFile = do h <- lift $ openFile ReadMode "twoLines.txt" catchH (hClose h) $ replicateM_ 2 do str <- lift $ hGetLine h yieldH str hClose hIn the above example,

`catchH`would take the empty finalizer included in

`yieldH str`and add

`hClose h`to it. If there were

`yieldH`commands outside the

`catchH`block, they would not get wrapped. Note that

`catchH`is merely a convenience and if you desired more fine-grained or overlapping finalizer blocks, you could yield the finalizers by hand instead of using

`yieldH/catchH`.

However, this is not the category I'm going to propose as the solution for error-handling. After all, the above category does absolutely nothing to finalize downstream correctly in the event of termination, so we'll need invoke the call stack metaphor again to guide us in the right direction.

#### Type-checked downgrades

In a traditional call stack, there is no such thing as a frame terminating. This is one instance where pipes depart radically from the call stack metaphor, since a pipe termination brings down the entire pipeline and effectively short-circuits the entire stack. In fact, return values of pipes resemble exceptions. While I have some ideas for treating pipe return values similar to exceptions, I will elide that discussion for now until my thoughts on this are clearer, so this post will only discuss the case of frames never terminating.

If we slavishly follow the stack metaphor, we'd have to prevent pipe termination. A naive approach might be to simply specialize the type of

`(<+<)`to only accept non-terminating pipes:

(<+<) :: (Monad m) => Pipe b c m Void -> Pipe a b m Void -> Pipe a c m VoidHowever, there's a huge problem with this: a non-terminating pipe will never finalize upstream promptly. For example, if we were to define the following pipe:

oops = do x <- await forever $ yield x... we'd have an immediate problem if we hooked it up to our file enumerator:

runPipe $ ... <+< oops <+< enumFileThe file would never get finalized, even though we know that

`oops`will never request a second line. This means we need some way to prove to the compiler that after some point

`oops`doesn't need upstream any longer so we can go ahead and finalize upstream promptly. I like call to call this "downgrading"

`oops`to a

`Producer`.

This leads to another problem, though. Let's say that we were to write the following pseudo-code:

oops2 = do x <- await safeToFinalizeUpstreamBecauseThisPipeWon'tAwaitAnyLonger yield xThat leads to an immediate problem if we were to then write:

oops2 >> oops2In short, we would have just violated our assertion that our pipe wouldn't await any longer. This means that the "pre-downgrade"

`Pipe`cannot be part of the same monad as the "post-downgrade"

`Producer`block, otherwise we'd have no way of enforcing our invariant at the type-level.

What we can do, however, is have the "pre-downgrade"

`Pipe`return the "post-downgrade"

`Producer`:

Pipe a b m (Producer b m r)Then, all we have to do is enhance lazy composition so that it automatically installs downgraded pipes in the place of the original pipe. Interestingly, in order to unify the types, this trick requires using the

`Fix`point of the

`Producer b m`functor:

newtype Fix f = Fix (f (Fix f)) -- Different from the definition found in the pipes package newtype Producer b m r = Producer (forall a . Pipe a b m r) type Frame' m a b = Pipe a b m (Fix (Producer b m))And with that type in place, we can define a new kind of composition that installs downgrades in place:

(<~<) :: (Monad m) => Frame' m b c -> Frame' m a b -> Frame' m a c Await f <~< Yield (x, p) = f x <~< p Yield y <~< p = Yield $ fmap (<~< p) y M m <~< p = M $ liftM (<~< p) m Pure (Fix (Producer r)) <~< p = Pure $ Fix $ Producer r p <~< M m = M $ liftM (p <~<) m p <~< Await a = Await $ fmap (p <~<) a p <~< Pure (Fix (Producer r)) = Pure $ Fix $ Producer $ p <~< rThis composition only differs from

`(<+<)`in the two

`Pure`case statements. For comparison, here is lazy composition:

Await f <+< Yield (x, p) = f x <+< p Yield y <+< p = Yield $ fmap (<+< p) y M m <+< p = M $ liftM (<+< p) m Pure r <+< p = Pure $ r p <+< M m = M $ liftM (p <+<) m p <+< Await a = Await $ fmap (p <+<) a p <+< Pure r = Pure $ rThis lets us rewrite

`oops`to signal its downgrade:

produce = return . Fix . Producer correct = do x <- await produce $ forever $ yield x... however nothing will happen yet when it downgrades because we haven't installed any machinery to call upstream finalizers.

Interestingly, we naturally get non-terminating pipes "for free" by simply unifying the types with

`Fix`, instead of having to artificially specialize composition to only accept never-ending pipes. However, you can write a terminating version of the above code by using

`Free`instead of

`Fix`, but that will have to wait until I discuss exceptions.

As you might suspect,

`(<~<)`forms a category, which I like to call the

`Stack`category:

(p1 <~< p2) <~< p3 = p1 <~< (p2 <~< p3) p <~< idP = p idP <~< p = pNow you might wonder why I used the return value of the pipe to store its

`Producer`continuation instead of defining an additional

`Pipe`constructor for that purpose. The answer again is not a stubborn refusal to extend the

`Pipe`type, but rather because the above formulation cleanly combines with the

`Heap`category to generate a new category which will replace the

`Strict`category:

type Frame m a b = Pipe a (m (), b) m (Fix (Producer (m (), b) m)) (<-<) :: Frame m b c -> Frame m a b -> Frame m a c p1 <-< p2 = heap p1 <~< p2 idT :: Frame m a a idT = idHIn other words, the two categories can be merged modularly into a new category without any special considerations! We can then combine

`correct`and

`enumFile`into part of a pipeline:

... <-< correct <-< enumFileAnd when

`correct`downgrades to a

`Producer`after its first await, it will finalize every pipe upstream of it, which is just

`enumFile`in this case.

There is still one last problem: our pipeline is now never-ending. How do we decide when to terminate the entire pipeline? We don't want to terminate it when the most downstream pipe downgrades, because that doesn't necessarily mean that it is done.

Once again the stack metaphor provides the answer: in a call stack, the program completes when the top-level function returns, so the analogy to pipes would be that the pipeline terminates when the most downstream pipe yields:

runPipe' :: Frame m () a -> m a runPipe' p = case p of Yield ((h, x), _) -> h >> return x Await f -> runPipe' $ f () M m -> m >>= runPipe' Pure (Fix (Producer' r)) -> runPipe' r > runPipe' $ correct <-< enumFile "returns first line of file"

#### Discussion

So the current frame-based solution to termination can be summed up in a few simple semantic rules:

- Frames don't terminate, but the call stack does.
- Frames prove at the type-level that it is safe to finalize upstream by downgrading to a producer.
- Frames yield finalizers which will be called if downstream downgrades to a producer.

- Handling termination does not require extending the
`Pipe`type. - We can decompose termination handling into two sub-problems: forwarding finalizers downstream (the
`Heap`category) and signaling downgrades (the`Stack`category). These two pieces can be tested independently (by proving they are categories in isolation) and then composed into a total category. - By defining a termination-handling category, we now have clearly encoded our call stack intuition and proven that our intuition leads to composable semantics.
- Pipes can handle prompt and automatic finalization, and they can do so elegantly and intuitively.

`pipes-core`or

`conduits`, you will see elements of the above design, but their implementations scatter this functionality across multiple functions, the

`Pipe/Conduit`type, and leave a lot of the invariants to the user to handle. They also tightly couple the finalization to the entire rest of the library and tightly integrate both halves of the termination problem, making it difficult to reason about or prove category instances. In contrast, the pipes approach requires simply defining

`heap`and

`(<~<)`and uses the original

`Pipe`data type. This completely decouples finalization concerns from the rest of the library.

Other implementations suffer from a form of

`IO`obsession that contaminates the entire rest of their library, whereas in pipes the finalization aspect is clearly delimited and separated from the rest of the library. So if you were working with a monad where you didn't need finalization of any form (or it didn't make sense), you could still fall back to using

`(<+<)`and ordinary pipes and not even concern yourself with even learning about frames,

`(<-<)`or finalizers, which are really

`IO`-specific considerations.

There are some final caveats before I finish. First off, pipes currently has a wrong

`MonadTrans`instance, which will be fixed in v1.1 by changing

`Pipe`to a free monad transformer (i.e. using

`FreeT`from

`control-monad-free`or something similar). This correct

`MonadTrans`instance is required for the

`Heap`category to pass all the category laws. This changes the composition definitions, but I've already verified that they still form categories and I didn't want to complicate the discussion.

Second, there may be a better way to formulate the

`Stack`category and the one I discussed only represents the first elegant solution I found. While I'm reasonably certain the

`Heap`category is spot-on, I'm still open to suggestions to changing the

`Stack`category as long as they are also elegant and compose just as seamlessly with the

`Heap`category.

Hi Gabriel,

ReplyDeleteThanks for this great stuff!

A couple of questions. The first one is practical.

Suppose I have a pipeline, say a producer that reads a file and yields bytestrings, an intermediate stage that awaits bytestrings and yields Chars, and a stage that awaits Chars, accumulating them until a token separator, and yields tokens (e.g. identifiers) downstream. There is a little problem though, if the upstream producer hits an end-of-file and terminates; there may be an identifier in progress (properly ended by the EOF, according to allowed syntax) in the token stage that doesn't get yielded downstream because the whole pipeline has terminated.

Is there a nice way to deal with this sort of case?

I can think of some ad-hoc methods, passing Maybe's or

Either's through the whole pipe instead of Bytestrings and Chars, with the exceptional case encoding EOF, and using an additional 'yield Nothing' after the producer to indicate the EOF, but it doesn't immediately strike me as very attractive.

A second question, not much related. I was trying to work through some proofs of the Monad / MonadTrans / Category laws in the pipes 1.0.2 library and ran into trouble with MonadTrans. I read in your article that you plan to change the MonadTrans instance. I'm fairly new to Haskell and don't know 'free monad transformers' yet. However, I thought I would try on my own; I came up with the following, but I'm worried that it seems to involve the base monad more than the original Pipes implementation and may not be as efficient. (Also, I haven't quite been able to prove transitivity yet, although some signs look promising to me; and I don't really know how to deal with bottoms and non-well-founded values like idP in inductive proofs, so I may well have proved even less than I think.)

http://hpaste.org/67732

So for your first question, the answer to your question is that the whole finalizer solution is actually much much more general than just finalizers. It can be applied to anything that is a monoid. A finalizer is just a specific kind of monoid (where mempty is return () and mappend just sequences two finalizers). If you think of each pipe as having an associated monoid element (registered by catchH or by manually yielding the monoid element), then the pipeline is like a list of monoids and all that the Heap category is doing is doing mconcat over all the upstream pipes to combine them into a single monoid element.

DeleteSo one thing I've been considering is extending heap to just include the monoid in the return value instead of running it (which only makes sense for finalizers). This means you can then truly generalize heap to work with any monoid and not just finalizers. Then what you would do is make your monoid the pair of monoids:

(Finalizer m, UnconsumedInput)

-- Note, a pair of monoids is a monoid itself

where mempty for UnconsumedInput is no dangling input and mappend combines multiple unconsumed inputs. Because this is a monoid fold over the entire pipeline, you can actually register unconsumed inputs from EVERY pipe in the pipeline, not just that one single pipe, and ensure they all get delivered in the final return value (assuming you come up with a sensible way to mappend them all together).

Then your composite pipe's return value includes both the finalizer monoid and the unresolved input monoid and you do whatever you want with them.

To answer your second question, there are some implementations of free monad transformers on Hackage, but they are not recognized as such. You can find my implementation here:

https://github.com/Gabriel439/Haskell-Pipes-Library/blob/master/Control/Monad/Trans/Free.hs

The latest patch of the library actually formulates Pipes as free monad transformers, which you can see here:

https://github.com/Gabriel439/Haskell-Pipes-Library/blob/master/Control/Pipe/Common.hs

If you want a good idea of what free monad transformers are good for, I highly recommend you read Issue 19 of the Monad Reader, which you can find here:

http://themonadreader.files.wordpress.com/2011/10/issue19.pdf

The article in that issue on concurrency by Mario Blazevic basically walks you through various uses of a free monad transformer, but in his article he calls it the "Coroutine" type.

Like you mentioned, a correct MonadTrans instance requires at least one pass through the base monad at a minimum and until we have a perfect compiler then binds are not free. However, I've actually done some quick and dirty benchmarks of pipes code and I find that it causes a negligible difference in performance, with only about a 1% decrease in speed for the simple cases I tried out.

Also, your refactorization of pipes to correct MonadTrans is correct and it's isomorphic to the implementation I have in my latest patch. I highly recommend you learn about free monads first, and then "free monad transformers" are just the monad transformer version of the "free monad". A free monad is a very useful concept to learn and it simplifies a lot of programs.

Thanks very much! I'll read issue 19 & think about the other stuff!

Delete