Friday, June 28, 2013

The Resource Monad

Edit: tomejaguar points out on /r/haskell that there is a Monad instance for this type. The original version of this post said that Resource was only an Applicative. See the discussion here.

I'm writing this post to briefly share a neat trick to manage acquisition and release of multiple resources using Monads. I haven't seen this trick in the wild, so I thought it was worth mentioning.


Resources


A Resource is like a handle with built-in allocation and deallocation logic. The type of a Resource is simple:
newtype Resource a = Resource { acquire :: IO (a, IO ()) }
A Resource is an IO action which acquires some resource of type a and also returns a finalizer of type IO () that releases the resource. You can think of the a as a Handle, but it can really be anything which can be acquired or released, like a Socket or AMQP Connection.

We can also provide an exception-safe way to access a Resource using bracket:
runResource :: Resource a -> (a -> IO ()) -> IO ()
runResource resource k = bracket (acquire resource)
                                 (\(_, release) -> release)
                                 (\(a, _) -> k a)
This ensures every acquisition is paired with a release.


Theory


Resource is both a Functor and Applicative, using the following two instances:
instance Functor Resource where
    fmap f resource = Resource $ do
        (a, release) <- acquire resource
        return (f a, release)

instance Applicative Resource where
    pure a = Resource (pure (a, pure ()))
    resource1 <*> resource2 = Resource $ do
        (f, release1) <- acquire resource1
        (x, release2) <- acquire resource2 `onException` release1
        return (f x, release2 >> release1)

instance Monad Resource where
    return a = Resource (return (a, return ()))
    m >>= f = Resource $ do
        (m', release1) <- acquire m
        (x , release2) <- acquire (f m') `onException` release1
        return (x, release2 >> release1)
These two instances satisfy the Functor, Applicative, and Monad laws, assuming only that IO satisfies the Monad laws.

Examples


The classic example of a managed resource is a file:
import Resource  -- The above code
import System.IO

file :: IOMode -> FilePath -> Resource Handle
file mode path = Resource $ do
    handle <- openFile path mode
    return (handle, hClose handle)
Using the Applicative instance we can easily combine an input and output file into a single resource:
import Control.Applicative

inAndOut :: Resource (Handle, Handle)
inAndOut = (,) <$> file ReadMode "file1.txt"
               <*> file WriteMode "out.txt"
... and acquire both handles in one step using runResource:
main = runResource inAndOut $ \(hIn, hOut) -> do
    str <- hGetContents hIn
    hPutStr hOut str
The above program will copy the contents of file1.txt to out.txt:
$ cat file1.txt
Line 1
Line 2
Line 3
$ ./example
$ cat out.txt
Line 1
Line 2
Line 3
$
Even cooler, we can allocate an entire list of Handles in one fell swoop, using traverse from Data.Traversable:
import qualified Data.Traversable as T
import Control.Monad
import System.Environment

main = do
    filePaths <- getArgs
    let files :: Resource [Handle]
        files = T.traverse (file ReadMode) filePaths
    runResource files $ \hs -> do
        forM_ hs $ \h -> do
            str <- hGetContents h
            putStr str

The above program behaves like cat, concatenating the contents of all the files passed on the command line:
$ cat file1.txt
Line 1
Line 2
Line 3
$ cat file2.txt
Line 4
Line 5
Line 6
$ ./example file1.txt file2.txt file1.txt
Line 1
Line 2
Line 3
Line 4
Line 5
Line 6
Line 1
Line 2
Line 3
$
The above example is gratuitous because we could have acquired just one handle at a time. However, you will appreciate how useful this is if you ever need to acquire multiple managed resources in an exception-safe way without using Resource.


Conclusion


I haven't seen this in any library on Hackage, so if there is any interest in this abstraction I can package it up into a small library. I can see this being used when you can't predict in advance how many resources you will need to acquire or as a convenient way to bundle multiple managed resources into a single data type.


Appendix


I've included code listings for the above examples so people can experiment with them:
-- Resource.hs

module Resource where

import Control.Applicative (Applicative(pure, (<*>)))
import Control.Exception (bracket, onException)

newtype Resource a = Resource { acquire :: IO (a, IO ()) }

instance Functor Resource where
    fmap f resource = Resource $ do
        (a, release) <- acquire resource
        return (f a, release)

instance Applicative Resource where
    pure a = Resource (pure (a, pure ()))
    resource1 <*> resource2 = Resource $ do
        (f, release1) <- acquire resource1
        (x, release2) <- acquire resource2 `onException` release1
        return (f x, release2 >> release1)

instance Monad Resource where
    return a = Resource (return (a, return ()))
    m >>= f = Resource $ do
        (m', release1) <- acquire m
        (x , release2) <- acquire (f m') `onException` release1
        return (x, release2 >> release1

runResource :: Resource a -> (a -> IO ()) -> IO ()
runResource resource k = bracket (acquire resource)
                                 (\(_, release) -> release)
                                 (\(a, _) -> k a)
-- example.hs

import Control.Applicative
import Control.Monad
import qualified Data.Traversable as T
import Resource
import System.Environment
import System.IO

file :: IOMode -> FilePath -> Resource Handle
file mode path = Resource $ do
    handle <- openFile path mode
    return (handle, hClose handle)

inAndOut :: Resource (Handle, Handle)
inAndOut =
    (,) <$> file ReadMode "file1.txt"
        <*> file WriteMode "out.txt"

main = runResource inAndOut $ \(hIn, hOut) -> do
    str <- hGetContents hIn
    hPutStr hOut str

{-
main = do
    filePaths <- getArgs
    let files :: Resource [Handle]
        files = T.traverse (file ReadMode) filePaths
    runResource files $ \hs -> do
        forM_ hs $ \h -> do
            str <- hGetContents h
            putStr str
-}

Sunday, June 23, 2013

From zero to cooperative threads in 33 lines of Haskell code

Haskell differentiates itself from most functional languages by having deep cultural roots in mathematics and computer science, which gives the misleading impression that Haskell is poorly suited to solving practical problems. However, the more you learn Haskell more you appreciate that theory is often the most practical solution to many common programming problems. This post will underscore this point by mixing off-the-shelf theoretical building blocks to create a pure user-land threading system.


The Type


Haskell is a types-first language, so we will begin by choosing an appropriate type to represent threads. First we must state in plain English what we want threads to do:
  • Threads must extend existing sequences of instructions
  • Threads must permit a set of operations: forking, yielding control, and terminating.
  • Threads should permit multiple types of schedulers
Now we translate those concepts into Haskell:
  • When you hear "multiple interpreters/schedulers/backends" you should think "free" (as in "free object")
  • When you hear "sequence of instructions" you should think: "monad".
  • When you qualify that with "extend" you should think: "monad transformer".
Combine those words together and you get the correct mathematical solution: a "free monad transformer".


Syntax trees


"Free monad transformer" is a fancy mathematical name for an abstract syntax tree where sequencing plays an important role. We provide it with an instruction set and it builds us a syntax tree from those instructions.

We said we want our thread to be able to fork, yield, or terminate, so let's make a data type that forks, yields, or terminates:
{-# LANGUAGE DeriveFunctor #-}

data ThreadF next = Fork  next next
                  | Yield next
                  | Done
                  deriving (Functor)
ThreadF represents our instruction set. We want to add three new instructions, so ThreadF has three constructors, one for each instruction: Fork, Yield, and Done.

Our ThreadF type represents one node in our syntax tree. The next fields of the constructors represent where the children nodes should go. Fork creates two execution paths, so it has two children. Done terminates the current execution path, so it has zero children. Yield neither branches nor terminates, so it has one child. The deriving (Functor) part just tells the free monad transformer that the next fields are where the children should go.

Now the free monad transformer, FreeT, can build a syntax tree from our instruction set. We will call this tree a Thread:
import Control.Monad.Trans.Free  -- from the `free` package

type Thread = FreeT ThreadF
An experienced Haskell programmer would read the above code as saying "A Thread is a syntax tree built from ThreadF instructions".


Instructions


Now we need primitive instructions. The free package provides the liftF operation which converts a single instruction into a syntax tree one node deep:
yield :: (Monad m) => Thread m ()
yield = liftF (Yield ())

done :: (Monad m) => Thread m r
done = liftF Done

cFork :: (Monad m) => Thread m Bool
cFork = liftF (Fork False True)
You don't need to completely understand how that works, except to notice that the return value of each command corresponds to what we store in the child fields of the node:
  • The yield command stores () as its child, so its return value is ()
  • The done command has no children, so the compiler deduces that it has a polymorphic return value (i.e. r), meaning that it never finishes
  • The cFork command stores boolean values as children, so it returns a Bool
cFork gets its name because it behaves like the fork function from C, meaning that the Bool return value tells us which branch we are on after the fork. If we receive False then we are on the left branch and if we receive True then we are on the right branch.

We can combine cFork and done to reimplement a more traditional Haskell-style fork, using the convention that the left branch is the "parent" and the right branch is the "child":
import Control.Monad

fork :: (Monad m) => Thread m a -> Thread m ()
fork thread = do
    child <- cFork
    when child $ do
        thread
        done
The above code calls cFork and then says "if I am the child, run the forked action and then stop, otherwise proceed as normal".


Free monads


Notice that something unusual happened in the last code snippet. We assembled primitive Thread instructions like cFork and done using do notation and we got a new Thread back. This is because Haskell lets us use do notation to assemble any type that implements the Monad interface and our free monad transformer type automatically deduces the correct Monad instance for Thread. Convenient!

Actually, our free monad transformer is not being super smart at all. When we assemble free monad transformers using do notation, all it does is connect these primitive one-node-deep syntax trees (i.e. the instructions) into a larger syntax tree. When we sequence two commands like:
do yield
   done
... this desugars to just storing the second command (i.e. done) as a child of the first command (i.e. yield).


The scheduler


Now we're going to write our own thread scheduler. This will be a naive round-robin scheduler:
import Data.Sequence -- Queue with O(1) head and tail operations

roundRobin :: (Monad m) => Thread m a -> m ()
roundRobin t = go (singleton t)  -- Begin with a single thread
  where
    go ts = case (viewl ts) of
        -- The queue is empty: we're done!
        EmptyL   -> return ()

        -- The queue is non-empty: Process the first thread
        t :< ts' -> do
            x <- runFreeT t  -- Run this thread's effects
            case x of
                -- New threads go to the back of the queue
                Free (Fork t1 t2) -> go (t1 <| (ts' |> t2))

                -- Yielding threads go to the back of the queue
                Free (Yield   t') -> go (ts' |> t')

                -- Thread done: Remove the thread from the queue
                Free  Done        -> go ts'
                Pure  _           -> go ts'
... and we're done! No really, that's it! That's the whole threading implementation.


User-land threads


Let's try out our brave new threading system. We'll start off simple:
mainThread :: Thread IO ()
mainThread = do
    lift $ putStrLn "Forking thread #1"
    fork thread1
    lift $ putStrLn "Forking thread #2"
    fork thread2

thread1 :: Thread IO ()
thread1 = forM_ [1..10] $ \i -> do
    lift $ print i
    yield

thread2 :: Thread IO ()
thread2 = replicateM_ 3 $ do
    lift $ putStrLn "Hello"
    yield
Each of these threads has type Thread IO (). Thread is a "monad transformer", meaning that it extends an existing monad with additional functionality. In this case, we are extending the IO monad with our user-land threads, which means that any time we need to call IO actions we must use lift to distinguish IO actions from Thread actions.

When we call roundRobin we unwrap the Thread monad transformer and our threaded program collapses to a linear sequence of instructions in IO:
>>> roundRobin mainThread :: IO ()
Forking thread #1
Forking thread #2
1
Hello
2
Hello
3
Hello
4
5
6
7
8
9
10
Moreover, this threading system is pure! We can extend monads other than IO, yet still thread effects. For example, we can build a threaded Writer computation, where Writer is one of Haskell's many pure monads:
import Control.Monad.Trans.Writer

logger :: Thread (Writer [String]) ()
logger = do
    fork helper
    lift $ tell ["Abort"]
    yield
    lift $ tell ["Fail"]

helper :: Thread (Writer [String]) ()
helper = do
    lift $ tell ["Retry"]
    yield
    lift $ tell ["!"]
This time roundRobin produces a pure Writer action when we run logger:
roundRobin logger :: Writer [String] ()
... and we can extract the results of that logging action purely, too:
execWriter (roundRobin logger) :: [String]
Notice how the type evaluates to a pure value, a list of Strings in this case. Yet, we still get real threading of logged values:
>>> execWriter (roundRobin logger)
["Abort","Retry","Fail","!"]

Conclusion


You might think I'm cheating by off-loading the real work onto the free library, but all the functionality I used from that library boils down to 12 lines of very generic and reusable code (see the Appendix). This is a recurring theme in Haskell: when we stick to the theory we get reusable, elegant, and powerful solutions in a shockingly small amount of code.

The inspiration for this post was a computer science paper written by Peng Li and Steve Zdancewic titled A Language-based Approach to Unifying Events and Threads . The main difference is that I converted their continuation-based approach to a simpler free monad approach.

Edit: aseipp on /r/haskell just pointed out that my post is scarily similar to a pre-existing functional pearl: A Poor Man's Concurrency Monad by Koen Classen.

Appendix: Free monad transformer code


The essence of a syntax tree distilled into 12 lines of code:
data FreeF f a x = Pure a | Free (f x)

newtype FreeT f m a =
    FreeT { runFreeT :: m (FreeF f a (FreeT f m a)) }

instance (Functor f, Monad m) => Monad (FreeT f m) where
    return a = FreeT (return (Pure a))
    FreeT m >>= f = FreeT $ m >>= \v -> case v of
        Pure a -> runFreeT (f a)
        Free w -> return (Free (fmap (>>= f) w))

instance MonadTrans (FreeT f) where
    lift = FreeT . liftM Pure

liftF :: (Functor f, Monad m) => f r -> FreeT f m r
liftF x = FreeT (return (Free (fmap return x)))

Thursday, June 6, 2013

pipes-concurrency-1.2.0: Behaviors and broadcasts

pipes-concurrency-1.2.0 adds two cool new features:
  • "continuous" events (what reactive-banana calls Behaviors)
  • broadcasts
Both of them are very interesting both practically and theoretically.

Behaviors


One of the major deficiencies of the pipes-concurrency-1.0 API was the requirement that every event was handled by downstream listeners. This is the sort of interaction that reactive-banana would call an Event where there is a one-to-one correspondence between production of events and consumption of events.

However, this model breaks down when you have two things updating at a different rate. For example, if you have a mouse and a monitor you don't want either one tied to the other's update cycle. You can't have the monitor respond to every mouse event because it cannot refresh fast enough. Similarly, you cannot have the mouse's event loop track the monitor's refresh rate because then you will develop an ever-increasing backlog of events.

pipes-concurrency solves this by providing a new option for the spawn command: the Latest constructor. This replaces the internal buffer with a single value that simply tracks the "latest" value received from upstream, discarding old values immediately. This completely decouples the input and output ends which no longer need to match each others' pace:
main = do
    (input, output) <- spawn (Latest 0)
    -- Update at any rate
    a1 <- async $ do runProxy $ events >-> sendD input
                     performGC
    -- Poll the latest value
    a2 <- async $ do runProxy $ recvS output >-> handler
                     performGC
    mapM_ wait [a1, a2]
The most fascinating part of this addition is how unintrusive it is. This only required adding 7 lines of code (1 for the new constructor, and 6 for the additional case branch for spawn), yet all of the advanced features of the library (like termination detection and deadlock safety) still work correctly with this new Latest constructor.

This will most likely interest people interested in functional reactive programming (FRP), because it flies in the face of conventional thinking where Events and Behaviors are typically treated very differently. pipes-concurrency suggests that these two concepts may actually be more similar than people suspect when viewed through the appropriate lens.

What differentiates pipes-concurrency from typical reactive programming abstractions is that it does not reify the streams as the central component for non-deterministic concurrency. In fact, the core machinery of pipes-concurrency is entirely pipes-independent, with the pipes utilities just forming a convenient interface. Rather, the central abstraction is the mailbox and how it coordinates inbound messages with outbound messages. When you view reactive programming through the lens of mailboxes instead of streaming processes then Behaviors and Events differ only very slightly in their implementations and they both support the same interface.


Broadcasts


Broadcasts are a neat little feature that arose out of a discussion with Florian Hofmann. Now Inputs are Monoids and if you want to broadcast to multiple mailboxes, just mconcat their Inputs and send messages to the combined Input:
import Control.Monad
import Control.Concurrent.Async
import Control.Proxy
import Control.Proxy.Concurrent
import Data.Monoid

main = do
    -- Subscribers
    (inputs, outputs) <-
        fmap unzip $ replicateM 10 $ spawn Unbounded

    -- Broadcast to all subscribers
    a  <- async $ do runProxy $ stdinS >-> sendD (mconcat inputs)
                     performGC

    -- Receive broadcasted messages
    as <- forM outputs $ \o ->
          async $ do runProxy $ recvS o >-> stdoutD
                     performGC

    mapM_ wait (a:as)
This shows how handy and simple-to-use the Monoid class is.

There are also new type class instances for the Output type as well, such as Monad and Alternative! This is a great example of how the mailboxes make a more useful and theoretically interesting interface for concurrency than the streams.


Conclusions


This update adds some cool new features, but is very unlikely to break code. The main backwards-incompatible change was renaming the Size type to Buffer (since Size does not make sense for Latest), but other than that most existing code should work without any modifications.

Sunday, June 2, 2013

pipes-parse-1.0.0: Pushback, delimited parsers, resumable parsing, and lenses

pipes-parse is finally out! pipes users know that pipes has lagged behind conduit and io-streams in the parsing arena and this library provides the utilities necessary to close the gap. You can find the pipes-parse library here, and I recommend reading the tutorial. This post will mainly discuss the development of pipes-parse and compare it to parsing solutions from other streaming libraries.


End of Input


pipes-parse copies both io-streams and conduit for the end of input protocol: wrap values in Just and end with a stream of Nothings. There are two ways you can modify an input stream to obey this protocol.

The first approach is to use the wrap function, which enforces this protocol:
wrap :: (Monad m, Proxy p)
     => p a' a b' b m r -> p a' a b' (Maybe b) m s
Then you can just write:
wrap . producer >-> consumer
wrap proves its termination safety by having a polymorphic return value (because it ends with a never-ending stream of Nothings).

The second approach is to rewrite your producer as an input stream in the style of io-streams (see this post for details):
source' :: (Monad m, Proxy p) => () -> Session p m (Maybe a)
... and use request composition to connect the producer:
source' \>\ consumer
This approach proves its termination safety by virtue of using request composition. The composition operator specializes to:
(\>\) :: (Monad m, Proxy p)
      => (() -> Session  p           m (Maybe a))
      -> (() -> Consumer p (Maybe a) m        b )
      -> (() -> Session  p           m        b )
The composite pipe's return value only derives from the downstream pipe (i.e. consumer in this case). This is because request composition is automatically safe against termination from the upstream pipe. In the above example, source' just replaces every request within consumer and if source' terminates all that means is that the request completes.

What's nice is that both approaches are 100% compatible with each other. You, the pipe writer, do not need to anticipate which way users will supply input. You just write a pipe that consumes values of type Maybe a and both of the above approaches will work with your pipe. Also, both of these approaches guarantee that you can return values directly from the downstream pipe without guarding the return value with a Maybe.


Pushback and Leftovers


pipes implements pushback using the StateP proxy transformer:
-- Like @request ()@, except try to use the leftovers buffer first
draw :: (Monad m, Proxy p)
     => StateP [a] p () (Maybe a) y' y m (Maybe a)

-- Push an element back onto the leftovers buffer 
unDraw :: (Monad m, Proxy p)
       => a -> StateP [a] p x' x y' y m ()
This is a great example of how the proxy transformer system makes it easy to extend pipes with new features without baking them into the core implementation. I can use the (newly-fixed) StateP proxy transformer to add a leftovers buffer that draw and unDraw both use.

Pushback is where pipes-parse significantly improves on the competition. To motivate the pipes-parse solution, consider the type for conduit's most general composition operator:
(>+>) :: Monad m
      => Pipe l    a b r0 m r1
      -> Pipe Void b c r1 m r2
      -> Pipe l    a c r0 m r2
--            ^
--            |
--            +-- Leftovers
The downstream conduit cannot provide leftovers because they will be lost after composition. With pipes-parse you can save leftovers from both composed pipes very easily. To see how, imagine we have the following two pipe types:
p1  :: (Monad m, Proxy p)
    => () -> Pipe (StateP [a] p) (Maybe a) (Maybe b) m r
p2  :: (Monad m, Proxy p)
    => () -> Pipe (StateP [b] p) (Maybe b) (Maybe c) m r
--                         ^
--                         |
--                         +-- Leftovers
Each of these pipes stores a leftovers buffer equal to its input type, but we can't yet compose these pipes because their leftovers buffers don't match. However, pipes-parse provides lens support in the form of the zoom function so that you can easily unify two leftovers buffers in order to compose them:
zoom _fst . p1
    :: (Monad m, Proxy p)
    => () -> Pipe (StateP ([a], [b]) p) (Maybe a) (Maybe b) m r
zoom _snd . p2
    :: (Monad m, Proxy p)
    => () -> Pipe (StateP ([a], [b]) p) (Maybe b) (Maybe c) m r

zoom _fst . p1 >-> zoom _snd . p2
    :: (Monad m, Proxy p)
    => () -> Pipe (StateP ([a], [b]) p) (Maybe a) (Maybe c) m r
But you can do more than that! You can still access the leftovers buffers afterwards, too, again using zoom:
example = do
    (zoom _fst . p1 >-> zoom _snd . p2) ()
    -- Draw, reusing the leftovers from @p1@
    ma <- zoom _fst draw
    -- Retrieve the leftovers from @p2@
    mb <- zoom _snd get
    ...
This kind of multiple-buffer management isn't possible using conduit.

zoom is a perfect example of the functor design pattern. We lift two existing proxies to agree on a common global state for compatibility purposes. Therefore, we expect that three should be functor laws at play:
zoom id = id

zoom (f . g) = zoom f . zoom g

pipes also improves upon io-streams pushback, too. With io-streams all the push-back is done using IORefs, meaning that:
  • It isn't pure
  • You can't easily control which streams share leftovers and which ones do not
  • None of the state is reflected in the types
With pipes-parse you get pure and precise control over leftovers. Moreover, you do not need to instrument streams to correctly forward values that you push back upstream, because StateP abstracts over that for you.


Nesting and delimiting parsers


Like other streaming libraries, pipes-parse makes it very easy to run a parser on a subset of the stream. This was probably the #1 feature requested, followed shortly by...


Resumable parsing


pipes-parse uses StateP, so if you want to interrupt parsing you can just use runStateK to return the current state of the leftovers for use in a later computation. Simple!


Perfect streaming


One of the more advanced features to come out of the last wave of development was what I like to call "perfect streaming". This has a very specific meaning: grouping the input and interacting with each group as a stream without bringing more than one chunk into memory.

For example, consider the following conduit:
lines :: Monad m => Conduit ByteString m ByteString
This will load each line into memory, which means that if your file is one long line then you will load the entire file into memory, defeating the purpose of streaming! io-streams has the same problem, but, unlike conduit, io-streams can easily fix its lines utility to stream perfectly and I plan to show Greg how to do this so that io-streams users can benefit from the same trick.

pipes-parse does not teach how to use this trick, but it does lay the groundwork for it and the upcoming pipes-bytestring library will provide examples of this idiom. If you want to see a concrete example of this trick in action, check out Oliver Charles's upcoming pipes-tar library on Github to see a preview of this idiom, where he streams individual files from a TAR archive without ever loading more than one chunk in memory. His very interesting use case was the inspiration for this trick, and I also preview this idiom in this Stack Overflow answer.

More generally, perfect streaming uses the respond category's composition operator, which has the following general type:
(/>/) :: (Monad m, Proxy p)
      => (a -> p x' x b' b m a')
      -> (b -> p x' x c' c m b')
      -> (a -> p x' x c' c m a')
When you use respond composition, both pipes share the same upstream interface meaning that you can group the input into subsections but still allow each subsection to access the original upstream interface. With appropriate information hiding you can set up pipes which behave like lenses to specific subsections of the stream and allow the user to stream from each subsection independently.


Compatibility


pipes-parse takes great care to ensure that non-parsing pipes are completely interoperable with parsing pipes, thanks to the following compatibility functions:
fmapPull
    :: (Monad m, Proxy p)
    => (x -> p x        a  x        b  m r)
    -> (x -> p x (Maybe a) x (Maybe b) m r)

returnPull
    :: (Monad m, Proxy p)
    => x -> p x a x (Maybe a) m r

bindPull
    :: (Monad m, Proxy p)
    => (x -> p x        a  x (Maybe b) m r)
    -> (x -> p x (Maybe a) x (Maybe b) m r)
These three functions define functors and monads in the category where the objects are the downstream components of each proxy interface and the morphisms are pull-based pipes.

As you might guess, fmapPull satisfies the functor laws:
fmapPull f >-> fmapPull g = fmapPull (f >-> g)

fmapPull pull = pull
Similarly, returnPull and bindPull satisfy the monad laws:
-- Using: f >>> g = f >-> bindPull g

returnPull >>> f = f

f >>> returnPull = f

(f >>> g) >>> h = f >>> (g >>> h)
... which are equivalent to:
returnPull >-> bindPull f = f

bindPull returnPull = pull

bindPull (f >-> bindPull g) = bindPull f >-> bindPull g
... and we can derive the functor from the monad:
fmapPull f = bindPull (f >-> returnPull)

-- i.e. fmap f = (>>= return . f)
These functions could replace Maybe and parametrize it with a type class like:
class FunctorPull f where
    fmapPull
    :: (Monad m, Proxy p)
    => (x -> p x    a  x    b  m r)
    -> (x -> p x (f a) x (f b) m r)
... and there is a sensible instance for Either, too (in fact, that's how rightD from the pipes prelude works). However, I decided to keep them monomorphic for now for simplicity.


Conclusion


pipes-parse, like most pipes libraries, keeps the best spirit of Haskell programming by:
  • composing features from smaller, simpler, and correct building blocks,
  • using higher-order functions to lift existing functions for compatibility,
  • isolating features from each other to statically prevent accidental complexity
pipes-parse is the last of the three core libraries, the other two being pipes-safe and pipes-concurrency. These libraries define the central idioms for the pipes ecosystem and they were all designed to be instructive and convention-setting in areas where there isn't a perfectly elegant solution and some pragmatic trade-offs had to be made.

The completion of these libraries marks the point where I feel the core pipes API has proven itself to be sufficiently versatile and future-proof. The proxy transformer system makes the central API unusually stable because I don't need to bake in any new features that I want to add.

This means I will be upgrading pipes to version 4.0 soon to mark the transition to a stabler API in preparation for eventual inclusion in the Haskell platform. Also, most development work will shift to derived libraries now.

That does not mean that the derived libraries are complete, yet. For example, I am currently writing up a pipes-safe-2.0.0 which will feature improved promptness guarantees and eliminate the need for unsafe finalization primitives. Similarly, I am about to release a pipes-safe-1.2.0 at the end of this week which will add broadcasts and continuous behaviors. More generally, I will only consider the derived core libraries to be mature when more code is built on top of them on the scale of what conduit has right now.

The next library on the development docket is pipes-bytestring. Now that pipes-parse is complete I feel much more comfortable about the stability of pipes-bytestring API. Also, pipes now has an official mailing list where you can ask questions, follow pipes development, or offer feedback and contribute to upcoming pipes libraries.