Thursday, March 29, 2012

Haskell for Purists - Pipe Finalization

/r/haskell has been talking about conduits and pipes lately and in those discussions I vaguely mentioned I was working on termination/exception-handling. I wanted to follow up and discuss what I've done so far to move the discussion forward. This post will only discuss termination handling for now because exception handling is still in progress.

Call Stacks


I'll begin with the call-stack metaphor for pipes, which goes a very long way towards building an intuition for what the pipes category represents. First, imagine that every pipe in a pipeline represents a frame in a call-stack:
   p1   <+<    p2   <+<    p3   <+<    p4
Frame 0     Frame 1     Frame 2     Frame 3
The most downstream pipe behaves like the top-level frame. Every time a pipe calls await, it pushes a frame onto the call stack:
p1 = do
    ...
    x <- await -- push p2 onto stack
    ...
In this case, if p1 awaits on p2, it transfers control to p2 until p2 yields a value back. yield is like the adjoint of await, since yield pops a frame off the stack:
p2 = do
    ...
    yield x -- yield result and pop p2 off
    ...
Now, pipes differ from the traditional stack metaphor in an important way: each frame does not get to choose which functions to call. Instead they just simply call an anonymous upstream pipe and that upstream pipe gets to choose how to satisfy the request. So if p1 makes three await requests:
p1 = do
    x <- await
    y <- await
    z <- await
    yield (x, y, z)
... then it is completely up to upstream (i.e. p2), to decide what functions to substitute in place of those awaits. For example, we could use:
function1 = do
    yield 1

function2 = do
    lift $ putStrLn "Hello"
    yield 2

function3 = do
    x <- await
    yield 3

p2 = do
    function1
    function2
    function3
Note that these functions might also call their own functions from the next frame upstream, such as how function3 adds yet another frame to the stack by awaiting input from p3. p3 would then get to decide what to substitute in place of function3's await call.

Notice that function1, function2, and function3 have one thing in common: they all end in a yield. In fact, any pipe can be thought of as a succession of functions terminated by yield statements:
example = do
    x <- await     -- Function #1
    yield (x + 1)  -- Function #1

    yield 2        -- Function #2

    lift $ print 4 -- Function #3
    yield 3        -- Function #3

    yield 4        -- Function #4
Unlike a traditional call stack, every function at a specific frame must have the same return value. This is not strictly necessary for the category instance and it is conceivably possible to implement a variation on pipes where the frame boundaries communicate heterogeneous inputs instead of homogeneous streams. For an example of this idea, see the Coroutine package. For now, though, we'll just leave frame boundaries as homogeneous streams.

So we can immediately infer some invariants based on this call stack intuition. For example, every pipe downstream of our currently active pipe must be blocked on an await:
                                        Active pipe
(await >>= ...) <+< (await >>= ...) <+<      p*     <+< ...
This is analogous to every frame above us in the call stack waiting for the result of the frame below it. There is also a second adjoint invariant: Every function downstream of us can be thought of as having "just yielded" a value before it was popped off the stack:
        Active pipe     "yielded x"     "yielded y"
... <+<      p*     <+<      p'      <+<      p''
We can now use this intuition to interpret the pipe category as simply defining the category of call stacks. Composition just layers two call stacks together and the identity pipe is the "dummy" stack frame that just passes all function requests to the frame below and passes every function result to the frame above it:
idP = forever $ await >>= yield
However, the stack intuition also implies another very important invariant: pipes cannot communicate upstream. The flow of information is unidirectional and always goes towards the top-level function (i.e. the most downstream pipe). I've seen several attempts to implement pipes that allow bidirectional communication (including many of my own attempts), but they always inevitably fail the category laws. Even the tiniest, most limited, form of information transfer upstream seems to break the laws. I cannot rigorously prove my intuition, but it always seems to be true.

Unidirectional Termination Handling


This unidirectionality seems problematic at first, because if a pipe terminates it needs to finalize every pipe upstream of it, but there is no way to communicate upstream. Even attempts to artificially restore control upstream by using await gratuitously still fail the category laws. This includes the Strict "category" released in v1.0, which, due to an oversight of mine, does not pass one of the identity laws:
p <-< idP /= p
Therefore, it is not a true category (and it will be removed from v1.1).

However, instead of fighting the unidirectionality, we can "go with the flow", and instead pass finalizers downstream to be called by any pipe that terminates downstream. This requires no modification to the pipe type; instead we simply include the finalizer in the output type:
(Monad m) => Pipe a (m (), b) m r
This approach also naturally follows from the call stack analogy. Every time a pipe yields a value, it is effectively relinquishing control by being popped off of the call stack, and might never be called again. Therefore, it makes the most sense to register the finalizer at the point of the yield where it loses control, potentially irreversibly. Then, all we have to do is that ensure that each yield contains the most up-to-date finalizer for the current pipe.

However, there's an obvious problem with this approach: Every pipe would need to keep track of the latest finalizer delivered from upstream and tack that onto to its own finalizer that it delivered downstream. This is both tedious and error-prone, though, and if you use another library's pipe that mismanaged finalizer passing you'd be in trouble. So let's create a convenient combinator to handle this forwarding for us:
heap :: (Monad m) =>
    Pipe a (m (), b) m r -> Pipe (m (), a) (m (), b) m r
heap p = go (return ()) p where
    go h p = case p of
        Pure r              -> lift h >> Pure r
        M m                 -> M $ liftM (go h) m
        Await f             -> Await $ \(h', a) -> go h' (f a)
        Yield ((h', b), p') -> Yield ((h' >> h, b), go h p')

-- note: I actually use  "forall r . r -> m r" instead of "m ()"
--       because I'm not completely convinced "m ()" is a
--       monoid, but I'll keep "m ()" right now for simplicity.
In short, the heap combinator wraps our pipe and takes care of forwarding upstream finalizers for us. It takes the latest finalizer received from upstream and mappends it to any outgoing finalizers that we yield, defaulting to mempty if it hasn't received any yet. Also, it automatically calls every upstream finalizer for us when our pipe terminates. Convenient!

I call this the heap combinator for two reasons. First, if everything downstream of the current pipe is the call stack, then everything upstream can be thought of as the heap (by analogy to memory layout). Second, it represents a tally of unfinished allocations just like a heap normally would.

Now here's where things start getting interesting: heap defines a category, which I like to call the Heap category:
p1 <|< p2 = heap p1 <+< p2

idH = forever $ await >>= \a -> yield (return (), a)

(p1 <|< p2) <|< p3 = p1 <|< (p2 <|< p3)
p <|< idH = p
idH <|< p = p
I've checked all the laws for this, so you can rest assured that you won't get a repeat of the Strict non-category. Also, this formulation nicely permits convenient syntax for yielding no finalizer:
yieldH x = yield (return (), x)
... and a convenience combinator for block-level finalizer registration that will add an additional finalizer to any yields within the block it wraps:
catchH h p = forever (await >>= \x -> yield (h, x)) <|< p

enumFile = do
    h <- lift $ openFile ReadMode "twoLines.txt"
    catchH (hClose h) $ replicateM_ 2 do
        str <- lift $ hGetLine h
        yieldH str
    hClose h
In the above example, catchH would take the empty finalizer included in yieldH str and add hClose h to it. If there were yieldH commands outside the catchH block, they would not get wrapped. Note that catchH is merely a convenience and if you desired more fine-grained or overlapping finalizer blocks, you could yield the finalizers by hand instead of using yieldH/catchH.

However, this is not the category I'm going to propose as the solution for error-handling. After all, the above category does absolutely nothing to finalize downstream correctly in the event of termination, so we'll need invoke the call stack metaphor again to guide us in the right direction.

Type-checked downgrades


In a traditional call stack, there is no such thing as a frame terminating. This is one instance where pipes depart radically from the call stack metaphor, since a pipe termination brings down the entire pipeline and effectively short-circuits the entire stack. In fact, return values of pipes resemble exceptions. While I have some ideas for treating pipe return values similar to exceptions, I will elide that discussion for now until my thoughts on this are clearer, so this post will only discuss the case of frames never terminating.

If we slavishly follow the stack metaphor, we'd have to prevent pipe termination. A naive approach might be to simply specialize the type of (<+<) to only accept non-terminating pipes:
(<+<) :: (Monad m) =>
    Pipe b c m Void
 -> Pipe a b m Void
 -> Pipe a c m Void
However, there's a huge problem with this: a non-terminating pipe will never finalize upstream promptly. For example, if we were to define the following pipe:
oops = do
    x <- await
    forever $ yield x
... we'd have an immediate problem if we hooked it up to our file enumerator:
runPipe $ ... <+< oops <+< enumFile
The file would never get finalized, even though we know that oops will never request a second line. This means we need some way to prove to the compiler that after some point oops doesn't need upstream any longer so we can go ahead and finalize upstream promptly. I like call to call this "downgrading" oops to a Producer.

This leads to another problem, though. Let's say that we were to write the following pseudo-code:
oops2 = do
    x <- await
    safeToFinalizeUpstreamBecauseThisPipeWon'tAwaitAnyLonger
    yield x
That leads to an immediate problem if we were to then write:
oops2 >> oops2
In short, we would have just violated our assertion that our pipe wouldn't await any longer. This means that the "pre-downgrade" Pipe cannot be part of the same monad as the "post-downgrade" Producer block, otherwise we'd have no way of enforcing our invariant at the type-level.

What we can do, however, is have the "pre-downgrade" Pipe return the "post-downgrade" Producer:
Pipe a b m (Producer b m r)
Then, all we have to do is enhance lazy composition so that it automatically installs downgraded pipes in the place of the original pipe. Interestingly, in order to unify the types, this trick requires using the Fix point of the Producer b m functor:
newtype Fix f = Fix (f (Fix f))

-- Different from the definition found in the pipes package
newtype Producer b m r = Producer (forall a . Pipe a b m r)

type Frame' m a b = Pipe a b m (Fix (Producer b m))
And with that type in place, we can define a new kind of composition that installs downgrades in place:
(<~<) :: (Monad m) =>
    Frame' m b c -> Frame' m a b -> Frame' m a c
Await f <~< Yield (x, p) = f x <~< p
Yield y <~< p = Yield $ fmap  (<~< p) y
M     m <~< p = M     $ liftM (<~< p) m
Pure (Fix (Producer r)) <~< p = Pure $ Fix $ Producer r
p <~< M     m = M     $ liftM (p <~<) m
p <~< Await a = Await $ fmap  (p <~<) a
p <~< Pure (Fix (Producer r)) = Pure $ Fix $ Producer $ p <~< r
This composition only differs from (<+<) in the two Pure case statements. For comparison, here is lazy composition:
Await f <+< Yield (x, p) = f x <+< p
Yield y <+< p = Yield $ fmap  (<+< p) y
M     m <+< p = M     $ liftM (<+< p) m
Pure  r <+< p = Pure  $ r
p <+< M     m = M     $ liftM (p <+<) m
p <+< Await a = Await $ fmap  (p <+<) a
p <+< Pure  r = Pure  $ r
This lets us rewrite oops to signal its downgrade:
produce = return . Fix . Producer

correct = do
    x <- await
    produce $ forever $ yield x
... however nothing will happen yet when it downgrades because we haven't installed any machinery to call upstream finalizers.

Interestingly, we naturally get non-terminating pipes "for free" by simply unifying the types with Fix, instead of having to artificially specialize composition to only accept never-ending pipes. However, you can write a terminating version of the above code by using Free instead of Fix, but that will have to wait until I discuss exceptions.

As you might suspect, (<~<) forms a category, which I like to call the Stack category:
(p1 <~< p2) <~< p3 = p1 <~< (p2 <~< p3)
p <~< idP = p
idP <~< p = p
Now you might wonder why I used the return value of the pipe to store its Producer continuation instead of defining an additional Pipe constructor for that purpose. The answer again is not a stubborn refusal to extend the Pipe type, but rather because the above formulation cleanly combines with the Heap category to generate a new category which will replace the Strict category:
type Frame m a b =
    Pipe a (m (), b) m (Fix (Producer (m (), b) m))

(<-<) :: Frame m b c -> Frame m a b -> Frame m a c
p1 <-< p2 = heap p1 <~< p2

idT :: Frame m a a
idT = idH
In other words, the two categories can be merged modularly into a new category without any special considerations! We can then combine correct and enumFile into part of a pipeline:
... <-< correct <-< enumFile 
And when correct downgrades to a Producer after its first await, it will finalize every pipe upstream of it, which is just enumFile in this case.

There is still one last problem: our pipeline is now never-ending. How do we decide when to terminate the entire pipeline? We don't want to terminate it when the most downstream pipe downgrades, because that doesn't necessarily mean that it is done.

Once again the stack metaphor provides the answer: in a call stack, the program completes when the top-level function returns, so the analogy to pipes would be that the pipeline terminates when the most downstream pipe yields:
runPipe' :: Frame m () a -> m a
runPipe' p = case p of
    Yield ((h, x), _) -> h >> return x
    Await f -> runPipe' $ f ()
    M m -> m >>= runPipe'
    Pure (Fix (Producer' r)) -> runPipe' r

> runPipe' $ correct <-< enumFile
"returns first line of file"

Discussion


So the current frame-based solution to termination can be summed up in a few simple semantic rules:
  • Frames don't terminate, but the call stack does.
  • Frames prove at the type-level that it is safe to finalize upstream by downgrading to a producer.
  • Frames yield finalizers which will be called if downstream downgrades to a producer.
I also wanted to use this post to emphasize several points:
  • Handling termination does not require extending the Pipe type.
  • We can decompose termination handling into two sub-problems: forwarding finalizers downstream (the Heap category) and signaling downgrades (the Stack category). These two pieces can be tested independently (by proving they are categories in isolation) and then composed into a total category.
  • By defining a termination-handling category, we now have clearly encoded our call stack intuition and proven that our intuition leads to composable semantics.
  • Pipes can handle prompt and automatic finalization, and they can do so elegantly and intuitively.
If you look at other implementations, like pipes-core or conduits, you will see elements of the above design, but their implementations scatter this functionality across multiple functions, the Pipe/Conduit type, and leave a lot of the invariants to the user to handle. They also tightly couple the finalization to the entire rest of the library and tightly integrate both halves of the termination problem, making it difficult to reason about or prove category instances. In contrast, the pipes approach requires simply defining heap and (<~<) and uses the original Pipe data type. This completely decouples finalization concerns from the rest of the library.

Other implementations suffer from a form of IO obsession that contaminates the entire rest of their library, whereas in pipes the finalization aspect is clearly delimited and separated from the rest of the library. So if you were working with a monad where you didn't need finalization of any form (or it didn't make sense), you could still fall back to using (<+<) and ordinary pipes and not even concern yourself with even learning about frames, (<-<) or finalizers, which are really IO-specific considerations.

There are some final caveats before I finish. First off, pipes currently has a wrong MonadTrans instance, which will be fixed in v1.1 by changing Pipe to a free monad transformer (i.e. using FreeT from control-monad-free or something similar). This correct MonadTrans instance is required for the Heap category to pass all the category laws. This changes the composition definitions, but I've already verified that they still form categories and I didn't want to complicate the discussion.

Second, there may be a better way to formulate the Stack category and the one I discussed only represents the first elegant solution I found. While I'm reasonably certain the Heap category is spot-on, I'm still open to suggestions to changing the Stack category as long as they are also elegant and compose just as seamlessly with the Heap category.

3 comments:

  1. Hi Gabriel,

    Thanks for this great stuff!

    A couple of questions. The first one is practical.
    Suppose I have a pipeline, say a producer that reads a file and yields bytestrings, an intermediate stage that awaits bytestrings and yields Chars, and a stage that awaits Chars, accumulating them until a token separator, and yields tokens (e.g. identifiers) downstream. There is a little problem though, if the upstream producer hits an end-of-file and terminates; there may be an identifier in progress (properly ended by the EOF, according to allowed syntax) in the token stage that doesn't get yielded downstream because the whole pipeline has terminated.

    Is there a nice way to deal with this sort of case?
    I can think of some ad-hoc methods, passing Maybe's or
    Either's through the whole pipe instead of Bytestrings and Chars, with the exceptional case encoding EOF, and using an additional 'yield Nothing' after the producer to indicate the EOF, but it doesn't immediately strike me as very attractive.

    A second question, not much related. I was trying to work through some proofs of the Monad / MonadTrans / Category laws in the pipes 1.0.2 library and ran into trouble with MonadTrans. I read in your article that you plan to change the MonadTrans instance. I'm fairly new to Haskell and don't know 'free monad transformers' yet. However, I thought I would try on my own; I came up with the following, but I'm worried that it seems to involve the base monad more than the original Pipes implementation and may not be as efficient. (Also, I haven't quite been able to prove transitivity yet, although some signs look promising to me; and I don't really know how to deal with bottoms and non-well-founded values like idP in inductive proofs, so I may well have proved even less than I think.)

    http://hpaste.org/67732

    ReplyDelete
    Replies
    1. So for your first question, the answer to your question is that the whole finalizer solution is actually much much more general than just finalizers. It can be applied to anything that is a monoid. A finalizer is just a specific kind of monoid (where mempty is return () and mappend just sequences two finalizers). If you think of each pipe as having an associated monoid element (registered by catchH or by manually yielding the monoid element), then the pipeline is like a list of monoids and all that the Heap category is doing is doing mconcat over all the upstream pipes to combine them into a single monoid element.

      So one thing I've been considering is extending heap to just include the monoid in the return value instead of running it (which only makes sense for finalizers). This means you can then truly generalize heap to work with any monoid and not just finalizers. Then what you would do is make your monoid the pair of monoids:

      (Finalizer m, UnconsumedInput)
      -- Note, a pair of monoids is a monoid itself

      where mempty for UnconsumedInput is no dangling input and mappend combines multiple unconsumed inputs. Because this is a monoid fold over the entire pipeline, you can actually register unconsumed inputs from EVERY pipe in the pipeline, not just that one single pipe, and ensure they all get delivered in the final return value (assuming you come up with a sensible way to mappend them all together).

      Then your composite pipe's return value includes both the finalizer monoid and the unresolved input monoid and you do whatever you want with them.

      To answer your second question, there are some implementations of free monad transformers on Hackage, but they are not recognized as such. You can find my implementation here:

      https://github.com/Gabriel439/Haskell-Pipes-Library/blob/master/Control/Monad/Trans/Free.hs

      The latest patch of the library actually formulates Pipes as free monad transformers, which you can see here:

      https://github.com/Gabriel439/Haskell-Pipes-Library/blob/master/Control/Pipe/Common.hs

      If you want a good idea of what free monad transformers are good for, I highly recommend you read Issue 19 of the Monad Reader, which you can find here:

      http://themonadreader.files.wordpress.com/2011/10/issue19.pdf

      The article in that issue on concurrency by Mario Blazevic basically walks you through various uses of a free monad transformer, but in his article he calls it the "Coroutine" type.

      Like you mentioned, a correct MonadTrans instance requires at least one pass through the base monad at a minimum and until we have a perfect compiler then binds are not free. However, I've actually done some quick and dirty benchmarks of pipes code and I find that it causes a negligible difference in performance, with only about a 1% decrease in speed for the simple cases I tried out.

      Also, your refactorization of pipes to correct MonadTrans is correct and it's isomorphic to the implementation I have in my latest patch. I highly recommend you learn about free monads first, and then "free monad transformers" are just the monad transformer version of the "free monad". A free monad is a very useful concept to learn and it simplifies a lot of programs.

      Delete
    2. Thanks very much! I'll read issue 19 & think about the other stuff!

      Delete