Wednesday, September 5, 2012

pipes-2.3 - Bidirectional pipes

One thing I love about Blogger is the detailed traffic information it provides out of the box. I enjoy seeing what keywords direct people to my blog, and one particular search result came up a lot recently, namely bidirectional pipes. Every time I saw somebody searching for bidirectional pipes I would think to myself "You and me both!" since I've been wanting bidirectional pipes for quite some time now to implement features that users have been requesting.

Well, anonymous googlers, today is your day! I'm releasing pipes-2.3 which introduces a new bidirectional pipe type, which I call a Proxy and I've proven the category laws for Proxy composition.

This blog post is not a proper tutorial but rather a meta-discussion of this release. This post discusses context surrounding this release for people who follow iteratee development, so if you just want to see cool examples, then read the Proxy tutorial over at Control.Proxy.Tutorial.

Also, this post is not technically part of my category theory series that I'm writing, but it does fortuitously tie in to it. The Proxy type provides an elegant framework for composing reusable client/proxy/server primitives into powerful applications, so if you started following my blog because of my discussion about compositionality, then I recommend you read the Proxy tutorial.

Generalizing Pipes

The Proxy terminology is built on the client-server metaphor, and if you already understand Pipes the following translations will help you map your Pipe intuition onto Proxy terms:
-- Types
Pipe     -> Proxy

Producer -> Server
Consumer -> Client
Pipeline -> Session

-- commands
await    -> request
yield    -> respond
Clients resemble Consumers, except you replace await with request, which provides an argument to upstream:
myClient () = do
    answer <- request argument
Servers resemble Producers, except you replace yield with respond. Composition requires a parameter to pass in the first request:
--       +-- 1st request
--       |
--       v
myServer argument = do
... and every subsequent request is bound to the return value of respond:
myServer argument = do
    x <- computeSomething argument
    -- "respond" binds the next argument
    nextArgument <- respond x
    myServer nextArgument

-- or: myServer = computeSomething >=> respond >=> myServer
I provide the foreverK function which abstracts away this common recursion pattern:
-- i.e. forever 'K'leisli arrow
foreverK f = f >=> foreverK f

myServer = foreverK $ \argument -> do
    result <- computeSomething argument
    respond result

-- or: myServer = foreverK (computeSomething >=> respond)
That looks just like the way you'd write a server's loop: get some argument, compute some result, respond with the result. However, you can do significantly more sophisticated things than just loop.

A Proxy sits between servers and clients. It can query servers on its upstream interface, and respond to clients on its downstream interface:
      | Upstream  | Downstream |
      | interface | interface  |
Proxy   arg1 ret1    arg2 ret2   m r
As with Pipes, the intermediate Proxy type is the unifying compositional type which generalizes the endpoint types. Server and Client are just type synonyms around the Proxy type with one of its two ends closed.

You can then compose as many components as you please into a single Session using composition and then use runSession to convert the results back to the base monad:
runSession $ client <-< proxy_1 <-<  ... <-< proxy_n <-< server
In the following sections, I will motivate this upgrade to bidirectional pipes by providing some examples of trivial problems that have embarrassed the entire iteratee community (myself included) up until now.

Dumb sources

The simplest example is a file reader. Using any iteratee implementation out there, it is very awkward to specify how many bytes you wish to pull from the upstream source on a request-to-request basis. Most implementations either:
  • Hard-code the number of bytes delivered on each request (i.e. conduit/iterIO)
  • Initialize the source with a given buffer size and then fix it from that point onward (i.e. enumerator/iteratee)
Now, there's nothing wrong with hard-coding the size for the read from the file since typically there is an optimum buffer size for disk I/O, but you'd still like to be able to layer another component downstream that can then parcel that out into chunk sizes that the user actually wants.

Unfortunately, the gold standard solution (pushback) is unsatisfactory because it:
  • only solves this narrow use case and does not generalize,
  • cannot push back portions of input without imposing some sort of Monoid restriction on the iteratee type itself, and
  • requires that the user maintain certain invariants to prevent breaking the Category laws.
Wouldn't it be nice if we could just directly tell upstream what we wanted instead of playing all these games? Proxys let you do that through the argument you supply to request.

Remote-procedure call

The next example is interfacing with some server. This is a real-world example from my own work. I've written a protein structural search engine and I've set it up as an RPC service: protein structure goes in, a bunch of search results come out. I'd like to write a Pipes interface to this so I can stream the results coming out of the server, but unfortunately I can't. If I tried, I might do something like this:
searchEngine? :: Pipe Structure [Structure] IO r
I can't really accomplish this because Pipes only permit a unidirectional flow of information. I can't both provide the query and receive the results within the same component without resorting to brittle non-compositional tricks like IORefs that defeat the entire point of the iteratee abstraction. However, with Proxys, the solution is incredibly easy:
The input ---------+-------------------+          +- The results
                   |                   |          |
                   v                   v          v
searchEngine :: Structure -> Server Structure [Structure] IO r
searchEngine = foreverK $ \structure -> do
    -- "search" might send a network query to the actual server
    results <- lift $ search structure
    respond results

-- searchEngine = foreverK ((lift . search) >=> respond)
Note that this time the query and response occupy the same interface, rather than two opposing interfaces, so I can now hook up a Client to it that send in requests and receive responses within the same block of code.

No other iteratee implementation out there can accomplish this. Instead, they restrict us to using blind sources that don't know what downstream actually wants.


You can also implement imperative-style closures using Proxys. Simply define:
type Closure = Server
... and you are good to go! Consider the Python example from the Wikipedia article on closures:
def counter():
    x = 0
    def increment(y):
        nonlocal x
        x += y
    return increment
We can translate this directly into Proxys:
counter :: Int -> Closure Int () IO r
counter = counter' 0

counter' x y = do
    let x' = x + y
    lift $ print x'
    y' <- respond ()
    counter' x' y'
We can then consume the closure in a structured way using composition:
type Opening = Client -- The opposite of a closure?

useClosure :: () -> Opening Int () IO ()
useClosure () = mapM_ request [1, 7, 1, 1]

main = runSession $ useClosure <-< counter
... or we can manually peel off individual elements from the closure using runFreeT:
pop :: (Monad m)
 => a
 -> Closure a b m r
 -> m (Maybe (b, Closure a b m r))
pop y = do
    f <- runFreeT (counter y)
    case f of
        Pure          _  -> return   Nothing
        Free (Yield x c) -> return $ Just (x, c)
Proxy internals are all exposed without compromising any safety, so if you choose not to buy in to the whole composition framework you can always manually deconstruct Proxys by hand and go along your way.

Compositional message passing

As far as I can tell, this is the only bidirectional message passing framework that satisfies the category laws. This guarantees several nice properties:
  • The identity laws enforce that composition of components must be completely transparent.
  • The associativity law guarantees that each component can be written completely context-free.
Unlike most message passing frameworks, Proxys promote component decoupling by structuring message passing through typed interfaces and composing those interfaces to mix and match components. This promotes code reuse and makes it easy to encapsulate complete functionality into single black-box objects instead of exposing a bunch of initialization/push/pull/finalization routines that your user must worry about threading together correctly with every other component.

When you have compositional components, combining them together is as easy as snapping a bunch of legos together.


Another motivation for this upgrade is finalization. With the ability to send information back upstream, I can now implement bidirectional finalization using ordinary monads and not indexed monads. This will replace Frames, which I will deprecate and either remove or migrate to a separate library.

Pipe compatibility

Pipes are a strict subset of Proxys so if you have existing Pipe code you can replace Control.Pipe with Control.Proxy which provides backwards-compatible definitions for all Pipe primitives and your previous code will still work.

You can understand the relationship between Pipes and Proxys by checking out the type synonym for Pipes provided by Control.Proxy:
type Pipe a b = Proxy () a () b
In other words, a Pipe is a Proxy that never sends any information upstream when it requests input.

There is another advantage of Proxys over Pipes, which is that now it is possible to forbid awaits. The Proxy implementation is highly symmetric and fills a lot of elegance holes that Pipes had.

However, if you love Pipes, never fear, because Control.Pipe will never be deprecated, ever. It provides the simplest iteratee API on Hackage, and I plan to continue to upgrade it with all features compatible with the Pipe type.

Kleisli arrow

One of the surprising results of the bidirectional implementation was that it unifies Kleisli composition and Proxy composition, whose arguments overlap. One thing you will discover the more you program with Proxys is that most useful Proxy components end up being Kleisli arrows and you'll often find that a lot of your code simplifies to the following point-free style:
-- Not that I necessarily recommend writing it this way
((p1 <=< p2 <=< p3) <-< (p4 <=< p5)) <=< (p6 <-< p7)
This isn't a coincidence. A very abstract way to understand Proxy composition is that it is just merging lists of Kleisli arrows in a structured way.


I know in the past I've stated that bidirectional information flow does not form a category, so now I'm publicly eating my own words.

There will be two more release in the next two months. The first release will provide the first general mechanism for extending Pipes with your own custom extensions and will include error handling and parsing extensions implemented using this approach.

The second release will provide a second way to customize pipes and will include finalization/reinitialization and stack traces implemented using that approach.


  1. Excellent work. Bidirectional will make the package usable in many more domains, and the client-server metaphor will help people understand the explanation (so long as they aren't thinking of multi-client concurrency).

    Regarding your comment about bidirectional with category laws: a paradigm I am developing, Reactive Demand Programming, achieves that (and generalized arrow laws). RDP is not message passing; it involves bidirectional data synchronization ('reactive' paradigm) and I elide events (such as messages) because they encourage a great deal of non-essential state. However, it is easy to model events with short-lived data.

    1. Actually, as I explained in a comment to Paul below, you can interact with multiple clients by nesting the Proxy monad transformer within itself.

      I really like the reactive demand programming idea. I toyed around with using pipes/proxies for reactive programming, but I've never really developed the idea significantly. I'd really like to see what you do with it.

  2. One limitation of this API is the server must always respond with a value of the same type, which loses out on some really obvious use cases that come up when a client conceptually needs to pull from multiple servers. Ed Kmett's (very recently released) machines package has a solution to this problem. What do you think of it, and could you see pipes going in a similar direction in the future?

    1. Oops, I meant to reply directly to this but my response ended up as a separate top-level comment:

  3. Actually, you can interact with multiple servers (or multiple clients, or multiple proxies). You just nest a Proxy within a Proxy. The outer proxy interacts with one session while the inner proxy interacts with an entirely different session. This isn't just hypothetical as I do this in my own code all the time and it works really well for me. This is also the exact same approach that Oleg and John Millikin advocate for enumerator and I'll explain why I agree with them.

    First off, this approach automatically works for interacting with multiple servers, clients, or proxies, whereas Edward's machines currently only support multiple servers. I assume that he'll write machines to handle the other two cases, but right off the bat you can see that even if he does then his approach leads to API complexity and overhead. Now you have three concepts instead of one, similar to the situation before pipes where people had three APIs for sources/sinks/transducers and three sets of semantics that the users had to keep track of. It would seem strange to liberate ourselves from that distinction only to reinvent it again.

    Ok, but say that I ignore that and let's concern ourselves with just servers for now. I will draw an analogy between the two alternative approaches and simple functions. You can imagine that the proxy approach of layering two proxies to interact with multiple inputs is analogous to curried functions:

    a -> b -> c

    Why curried functions? Because I can partially apply a layered proxy by composing and running just the outermost session, leaving behind a proxy that interacts with just a single session. Edward's machines are analogous to uncurrying functions:

    (a, b) -> c

    ... except that he only really sweetens the special case of two inputs. For more inputs you end up with something analogous to:

    (((a, b), c), d) -> e

    ... in which case you aren't sweetening that much and just trading one type of layering for another one.

    However, while I wouldn't use Edward's approach in my code, I still think it has a lot of potential for other purposes (such as possibly efficiency) and I can see how other people might like it for reasons of personal preference.

    In principle, anybody can write machines that compile to proxies (just like Edward's machines currently compile to pipes). However, I wouldn't write it myself, only because I can't actively maintain and advocate for something I don't strongly believe in. Somebody who believes in machines strongly should be the one to implement that.

    1. Very interesting, thanks for the response. I'm a little hazy on how nesting proxies lets you combine values from multiple servers. Could you give a concrete example -- let's say I have two external sources of integers, and I'd like to merge these two into a single stream by always emitting the smaller of the two values from each source.

      One thing I like about Ed's library is that it lets you write pure (non-monadic) stream transducers that operate on multiple streams. Ed has a cap combinator that lets you partially apply a two-input machine, but that results in a transducer that is no longer pure. It sounds like nesting of proxies would have to do something similar. Are there any difficulties with ensuring resource-safety with this approach?

    2. I've hpasted the code here:

      The code has inline commentary in the comments.

      Resource safety is not a problem. Although I haven't released it yet, the general gist of it is that I write an extended composition that lets pipe register monoids associated with code segments and when the pipeline terminates it collects all the currently registered monoids and includes them in the return value.

      For the special case of the monoid being a finalizer (i.e. the "(Monad m) => m ()" monoid), then you register finalizers and the result contains all the remaining finalizers to run. Then you just extend runPipe to auto-run this last collected finalizer.

      This plays nice with nesting since the finalizer that is returned only belongs to the outermost pipe layer, so you don't accidentally get multiple frees.

    3. Thanks for your reply. The code you posted makes total sense, and I can see how it could be extended to send requests (containing actual values) to multiple servers and merging their results somehow.

      Have you considered making Pipe/Proxy into a MonadPlus, so the Await constructor takes a Pipe/Proxy to run if the awaiting fails? You then add a Stop constructor to PipeF, and the usual behavior is to Stop if an Await fails, but operations like merge can do something more intelligent. You can even make the fallback case a Producer, rather than a Pipe, to prevent it from awaiting on a nontrivial value (you'd probably have to use regular recursion rather than going through FreeT, though I'm not sure FreeT is buying you much anyway).

    4. Yes, this is possible to write as an extension to the Pipe type. I found the only way to correctly intercept upstream termination and preserve the Category is the following semantics:

      *) Wrap ordinary yielded values in a Just. When a pipe terminates, yield a final Nothing (which functions like your "Stop")
      *) If a pipe receives a Nothing, it can still await again, but the next time it awaits it yields a Nothing first (so that downstream still has a chance to handle termination).

      The default "await" unwraps all Justs and just ignores Nothings and reawaits again. The more sophisticated await returns the raw Maybe value so you can handle upstream termination appropriately.

      This solution works with ordinary monads and the FreeT type and doesn't require indexed monads. Also, you can define a functor from the ordinary Pipe type to the extended Pipe type so all classic Pipe code is automatically compatible with this extended version.

      The problem is that it does not mix well with finalization, at least not for the Pipe type, and so the only way I was able to mix the two in the Frame type was to use indexed monads. I still haven't revisited it for the Proxy type to see if I can get it to mesh correctly now, so it's an open question I still have to address. However, finalization is the higher of the two priorities at the moment, after which I'm going to try my hand again at guarding against upstream termination.

    5. I see, you are just using the fact that (a -> b, b) is isomorphic to Maybe a -> b. Though then you need a new type to handle the special rules for composing these pipes. Is there a good reason not to just add a Stop constructor and the extra fallback argument to the Await constructor?

    6. A Stop constructor version is fine so long as it is isomorphic to the Maybe version I just described. However, a Stop constructor with no continuation doesn't work (I think... unless I misunderstand what you are saying).

      The corner case you have to consider is how to write a proper downstream identity pipe. If you use a Stop with no continuation to handle awaiting a terminated pipe, then there is no correct identity for:

      idP <+< return r = return r

      Practically speaking this means that the only pipe that can return is the most downstream one.

      However, the notion of most downstream is context-dependent, since it depends on where a pipe is in the final assembled pipeline, namely whether or not it is in the most downstream position. The category laws guarantee that you can reason about each pipe's behavior independent of its context so usually law violations like these indicate to me that there is something context-dependent about an implementation. A downstream identity law violation indicates that the most downstream position is being given special treatment, as in the above example.

    7. I was actually thinking of Stop as being nullary, but I think I see now why you settled on the solution that you did. This might be a longer discussion, but my feeling is that it isn't necessary for Pipes to have a Return constructor and it somewhat confuses the model. You can have a separate type to give you monad syntax sugar for building up Pipes, but the Return isn't actually needed and any Pipe defined using monad sugar could in principle have been written directly - the monad for Pipe adds no expressive power. Ed calls this monad used for building up transducers a 'Plan', and it exists purely for convenience.

      With that removed, your transducers become _only_ a category rather than a category and a monad at the same time, which I think is cleaner and avoids tricky questions.

      By the way, this has been a great discussion, I'm really enjoying it! :)

    8. Check Control.Proxy.Tutorial for a concrete example. (the mixedClient function) of how you would want the result of composition to still be embeddable within a larger monad.

      The other reason I keep the monad is that I consider it inseparable from the core of why composition works in the first place. As I describe in the post immediately following this one, composition just weaves lists of Kleisli arrows together into a new list of Kleisli arrows and demonstrates a lot of elegant and non-trivial interactions between Kleisli composition and Proxy composition. This leads me to believe that the Kleisli category is not playing a supporting role to composition but rather is significantly intertwined with the very meaning of composition.

      I'm going to release several standard library functions in this next release and you will see a LOT of really elegant interactions between Kleisli composition ane Proxy composition when you see the source code. Proxies lend themselves to very elegant code, much more so than even Pipes.

    9. I'm not sure I share your viewpoint, but I will keep an open mind and I look forward to seeing the future release.

  4. This comment has been removed by the author.