Thursday, June 6, 2013

pipes-concurrency-1.2.0: Behaviors and broadcasts

pipes-concurrency-1.2.0 adds two cool new features:
  • "continuous" events (what reactive-banana calls Behaviors)
  • broadcasts
Both of them are very interesting both practically and theoretically.

Behaviors


One of the major deficiencies of the pipes-concurrency-1.0 API was the requirement that every event was handled by downstream listeners. This is the sort of interaction that reactive-banana would call an Event where there is a one-to-one correspondence between production of events and consumption of events.

However, this model breaks down when you have two things updating at a different rate. For example, if you have a mouse and a monitor you don't want either one tied to the other's update cycle. You can't have the monitor respond to every mouse event because it cannot refresh fast enough. Similarly, you cannot have the mouse's event loop track the monitor's refresh rate because then you will develop an ever-increasing backlog of events.

pipes-concurrency solves this by providing a new option for the spawn command: the Latest constructor. This replaces the internal buffer with a single value that simply tracks the "latest" value received from upstream, discarding old values immediately. This completely decouples the input and output ends which no longer need to match each others' pace:
main = do
    (input, output) <- spawn (Latest 0)
    -- Update at any rate
    a1 <- async $ do runProxy $ events >-> sendD input
                     performGC
    -- Poll the latest value
    a2 <- async $ do runProxy $ recvS output >-> handler
                     performGC
    mapM_ wait [a1, a2]
The most fascinating part of this addition is how unintrusive it is. This only required adding 7 lines of code (1 for the new constructor, and 6 for the additional case branch for spawn), yet all of the advanced features of the library (like termination detection and deadlock safety) still work correctly with this new Latest constructor.

This will most likely interest people interested in functional reactive programming (FRP), because it flies in the face of conventional thinking where Events and Behaviors are typically treated very differently. pipes-concurrency suggests that these two concepts may actually be more similar than people suspect when viewed through the appropriate lens.

What differentiates pipes-concurrency from typical reactive programming abstractions is that it does not reify the streams as the central component for non-deterministic concurrency. In fact, the core machinery of pipes-concurrency is entirely pipes-independent, with the pipes utilities just forming a convenient interface. Rather, the central abstraction is the mailbox and how it coordinates inbound messages with outbound messages. When you view reactive programming through the lens of mailboxes instead of streaming processes then Behaviors and Events differ only very slightly in their implementations and they both support the same interface.


Broadcasts


Broadcasts are a neat little feature that arose out of a discussion with Florian Hofmann. Now Inputs are Monoids and if you want to broadcast to multiple mailboxes, just mconcat their Inputs and send messages to the combined Input:
import Control.Monad
import Control.Concurrent.Async
import Control.Proxy
import Control.Proxy.Concurrent
import Data.Monoid

main = do
    -- Subscribers
    (inputs, outputs) <-
        fmap unzip $ replicateM 10 $ spawn Unbounded

    -- Broadcast to all subscribers
    a  <- async $ do runProxy $ stdinS >-> sendD (mconcat inputs)
                     performGC

    -- Receive broadcasted messages
    as <- forM outputs $ \o ->
          async $ do runProxy $ recvS o >-> stdoutD
                     performGC

    mapM_ wait (a:as)
This shows how handy and simple-to-use the Monoid class is.

There are also new type class instances for the Output type as well, such as Monad and Alternative! This is a great example of how the mailboxes make a more useful and theoretically interesting interface for concurrency than the streams.


Conclusions


This update adds some cool new features, but is very unlikely to break code. The main backwards-incompatible change was renaming the Size type to Buffer (since Size does not make sense for Latest), but other than that most existing code should work without any modifications.

Sunday, June 2, 2013

pipes-parse-1.0.0: Pushback, delimited parsers, resumable parsing, and lenses

pipes-parse is finally out! pipes users know that pipes has lagged behind conduit and io-streams in the parsing arena and this library provides the utilities necessary to close the gap. You can find the pipes-parse library here, and I recommend reading the tutorial. This post will mainly discuss the development of pipes-parse and compare it to parsing solutions from other streaming libraries.


End of Input


pipes-parse copies both io-streams and conduit for the end of input protocol: wrap values in Just and end with a stream of Nothings. There are two ways you can modify an input stream to obey this protocol.

The first approach is to use the wrap function, which enforces this protocol:
wrap :: (Monad m, Proxy p)
     => p a' a b' b m r -> p a' a b' (Maybe b) m s
Then you can just write:
wrap . producer >-> consumer
wrap proves its termination safety by having a polymorphic return value (because it ends with a never-ending stream of Nothings).

The second approach is to rewrite your producer as an input stream in the style of io-streams (see this post for details):
source' :: (Monad m, Proxy p) => () -> Session p m (Maybe a)
... and use request composition to connect the producer:
source' \>\ consumer
This approach proves its termination safety by virtue of using request composition. The composition operator specializes to:
(\>\) :: (Monad m, Proxy p)
      => (() -> Session  p           m (Maybe a))
      -> (() -> Consumer p (Maybe a) m        b )
      -> (() -> Session  p           m        b )
The composite pipe's return value only derives from the downstream pipe (i.e. consumer in this case). This is because request composition is automatically safe against termination from the upstream pipe. In the above example, source' just replaces every request within consumer and if source' terminates all that means is that the request completes.

What's nice is that both approaches are 100% compatible with each other. You, the pipe writer, do not need to anticipate which way users will supply input. You just write a pipe that consumes values of type Maybe a and both of the above approaches will work with your pipe. Also, both of these approaches guarantee that you can return values directly from the downstream pipe without guarding the return value with a Maybe.


Pushback and Leftovers


pipes implements pushback using the StateP proxy transformer:
-- Like @request ()@, except try to use the leftovers buffer first
draw :: (Monad m, Proxy p)
     => StateP [a] p () (Maybe a) y' y m (Maybe a)

-- Push an element back onto the leftovers buffer 
unDraw :: (Monad m, Proxy p)
       => a -> StateP [a] p x' x y' y m ()
This is a great example of how the proxy transformer system makes it easy to extend pipes with new features without baking them into the core implementation. I can use the (newly-fixed) StateP proxy transformer to add a leftovers buffer that draw and unDraw both use.

Pushback is where pipes-parse significantly improves on the competition. To motivate the pipes-parse solution, consider the type for conduit's most general composition operator:
(>+>) :: Monad m
      => Pipe l    a b r0 m r1
      -> Pipe Void b c r1 m r2
      -> Pipe l    a c r0 m r2
--            ^
--            |
--            +-- Leftovers
The downstream conduit cannot provide leftovers because they will be lost after composition. With pipes-parse you can save leftovers from both composed pipes very easily. To see how, imagine we have the following two pipe types:
p1  :: (Monad m, Proxy p)
    => () -> Pipe (StateP [a] p) (Maybe a) (Maybe b) m r
p2  :: (Monad m, Proxy p)
    => () -> Pipe (StateP [b] p) (Maybe b) (Maybe c) m r
--                         ^
--                         |
--                         +-- Leftovers
Each of these pipes stores a leftovers buffer equal to its input type, but we can't yet compose these pipes because their leftovers buffers don't match. However, pipes-parse provides lens support in the form of the zoom function so that you can easily unify two leftovers buffers in order to compose them:
zoom _fst . p1
    :: (Monad m, Proxy p)
    => () -> Pipe (StateP ([a], [b]) p) (Maybe a) (Maybe b) m r
zoom _snd . p2
    :: (Monad m, Proxy p)
    => () -> Pipe (StateP ([a], [b]) p) (Maybe b) (Maybe c) m r

zoom _fst . p1 >-> zoom _snd . p2
    :: (Monad m, Proxy p)
    => () -> Pipe (StateP ([a], [b]) p) (Maybe a) (Maybe c) m r
But you can do more than that! You can still access the leftovers buffers afterwards, too, again using zoom:
example = do
    (zoom _fst . p1 >-> zoom _snd . p2) ()
    -- Draw, reusing the leftovers from @p1@
    ma <- zoom _fst draw
    -- Retrieve the leftovers from @p2@
    mb <- zoom _snd get
    ...
This kind of multiple-buffer management isn't possible using conduit.

zoom is a perfect example of the functor design pattern. We lift two existing proxies to agree on a common global state for compatibility purposes. Therefore, we expect that three should be functor laws at play:
zoom id = id

zoom (f . g) = zoom f . zoom g

pipes also improves upon io-streams pushback, too. With io-streams all the push-back is done using IORefs, meaning that:
  • It isn't pure
  • You can't easily control which streams share leftovers and which ones do not
  • None of the state is reflected in the types
With pipes-parse you get pure and precise control over leftovers. Moreover, you do not need to instrument streams to correctly forward values that you push back upstream, because StateP abstracts over that for you.


Nesting and delimiting parsers


Like other streaming libraries, pipes-parse makes it very easy to run a parser on a subset of the stream. This was probably the #1 feature requested, followed shortly by...


Resumable parsing


pipes-parse uses StateP, so if you want to interrupt parsing you can just use runStateK to return the current state of the leftovers for use in a later computation. Simple!


Perfect streaming


One of the more advanced features to come out of the last wave of development was what I like to call "perfect streaming". This has a very specific meaning: grouping the input and interacting with each group as a stream without bringing more than one chunk into memory.

For example, consider the following conduit:
lines :: Monad m => Conduit ByteString m ByteString
This will load each line into memory, which means that if your file is one long line then you will load the entire file into memory, defeating the purpose of streaming! io-streams has the same problem, but, unlike conduit, io-streams can easily fix its lines utility to stream perfectly and I plan to show Greg how to do this so that io-streams users can benefit from the same trick.

pipes-parse does not teach how to use this trick, but it does lay the groundwork for it and the upcoming pipes-bytestring library will provide examples of this idiom. If you want to see a concrete example of this trick in action, check out Oliver Charles's upcoming pipes-tar library on Github to see a preview of this idiom, where he streams individual files from a TAR archive without ever loading more than one chunk in memory. His very interesting use case was the inspiration for this trick, and I also preview this idiom in this Stack Overflow answer.

More generally, perfect streaming uses the respond category's composition operator, which has the following general type:
(/>/) :: (Monad m, Proxy p)
      => (a -> p x' x b' b m a')
      -> (b -> p x' x c' c m b')
      -> (a -> p x' x c' c m a')
When you use respond composition, both pipes share the same upstream interface meaning that you can group the input into subsections but still allow each subsection to access the original upstream interface. With appropriate information hiding you can set up pipes which behave like lenses to specific subsections of the stream and allow the user to stream from each subsection independently.


Compatibility


pipes-parse takes great care to ensure that non-parsing pipes are completely interoperable with parsing pipes, thanks to the following compatibility functions:
fmapPull
    :: (Monad m, Proxy p)
    => (x -> p x        a  x        b  m r)
    -> (x -> p x (Maybe a) x (Maybe b) m r)

returnPull
    :: (Monad m, Proxy p)
    => x -> p x a x (Maybe a) m r

bindPull
    :: (Monad m, Proxy p)
    => (x -> p x        a  x (Maybe b) m r)
    -> (x -> p x (Maybe a) x (Maybe b) m r)
These three functions define functors and monads in the category where the objects are the downstream components of each proxy interface and the morphisms are pull-based pipes.

As you might guess, fmapPull satisfies the functor laws:
fmapPull f >-> fmapPull g = fmapPull (f >-> g)

fmapPull pull = pull
Similarly, returnPull and bindPull satisfy the monad laws:
-- Using: f >>> g = f >-> bindPull g

returnPull >>> f = f

f >>> returnPull = f

(f >>> g) >>> h = f >>> (g >>> h)
... which are equivalent to:
returnPull >-> bindPull f = f

bindPull returnPull = pull

bindPull (f >-> bindPull g) = bindPull f >-> bindPull g
... and we can derive the functor from the monad:
fmapPull f = bindPull (f >-> returnPull)

-- i.e. fmap f = (>>= return . f)
These functions could replace Maybe and parametrize it with a type class like:
class FunctorPull f where
    fmapPull
    :: (Monad m, Proxy p)
    => (x -> p x    a  x    b  m r)
    -> (x -> p x (f a) x (f b) m r)
... and there is a sensible instance for Either, too (in fact, that's how rightD from the pipes prelude works). However, I decided to keep them monomorphic for now for simplicity.


Conclusion


pipes-parse, like most pipes libraries, keeps the best spirit of Haskell programming by:
  • composing features from smaller, simpler, and correct building blocks,
  • using higher-order functions to lift existing functions for compatibility,
  • isolating features from each other to statically prevent accidental complexity
pipes-parse is the last of the three core libraries, the other two being pipes-safe and pipes-concurrency. These libraries define the central idioms for the pipes ecosystem and they were all designed to be instructive and convention-setting in areas where there isn't a perfectly elegant solution and some pragmatic trade-offs had to be made.

The completion of these libraries marks the point where I feel the core pipes API has proven itself to be sufficiently versatile and future-proof. The proxy transformer system makes the central API unusually stable because I don't need to bake in any new features that I want to add.

This means I will be upgrading pipes to version 4.0 soon to mark the transition to a stabler API in preparation for eventual inclusion in the Haskell platform. Also, most development work will shift to derived libraries now.

That does not mean that the derived libraries are complete, yet. For example, I am currently writing up a pipes-safe-2.0.0 which will feature improved promptness guarantees and eliminate the need for unsafe finalization primitives. Similarly, I am about to release a pipes-safe-1.2.0 at the end of this week which will add broadcasts and continuous behaviors. More generally, I will only consider the derived core libraries to be mature when more code is built on top of them on the scale of what conduit has right now.

The next library on the development docket is pipes-bytestring. Now that pipes-parse is complete I feel much more comfortable about the stability of pipes-bytestring API. Also, pipes now has an official mailing list where you can ask questions, follow pipes development, or offer feedback and contribute to upcoming pipes libraries.

Monday, May 6, 2013

pipes-3.3.0: Folds and uniting ListT with Proxy

pipes-3.3.0 simultaneously resolves two long-standing problems in the library:
  • Not all proxy transformers implemented ListT
  • Folds required using the base monad
It turns out that fixing ListT for the remaining hold-outs proved to solve the fold problem as well, and this post will detail that a bit more.


ListT


pipes-2.4 first identified the existence of three extra categories, two of which I call the "request" and "respond" categories. These categories are enormously useful, especially since you can implement ListT with both of them, but then I discovered that you couldn't implement the (/>/) and (\>\) composition operators for certain proxy transformers, specifically:
  • MaybeP
  • EitherP
  • StateP
  • WriterP
This was really disconcerting, and it seemed really odd that every proxy transformer could lift the identities for those two categories (i.e. request and respond), but not always lift the corresponding composition operators. I settled for a temporary solution, which was to split out (/>/) and (\>\) into a separate ListT type class.

However, several recent events made me suspect that something was amiss and caused me to revisit this solution. I received my first clue while working on the pipes-directory library, where I wanted to model getDirectoryContents using the following type:
getDirectoryContents
    :: (Proxy p)
    => FilePath -> ProduceT (ExceptionP p) SafeIO FilePath
This would let users bind directories non-deterministically in ProduceT so that they could describe effectful directory traversals at a high-level. This required pipes-safe so that the directory stream would be properly finalized in the event of exceptions of termination, which is why it uses ExceptionP and SafeIO.

However, ExceptionP is just a type synonym for EitherP, and EitherP did not implement the ListT type class, which meant that I could not use the ProduceT monad. So I revisited EitherP and discovered that there was a law-obiding ListT instance for EitherP that I had missed the first time around. Moreover, I could use the exact same trick to implement ListT for MaybeP, too.

This meant that only two proxy transformers remained which did not implement ListT:
  • StateP
  • WriterP
Moreover, WriterP was internally implemented using StateP under the hood, meaning that if I could solve StateP then I could finally merge the ListT class back into the Proxy class.

Simultaneously, while working on pipes-parse I encountered several buggy corner cases with StateP, all of which gave the wrong behavior. Similarly, WriterP also gave the wrong behavior in a wide variety of cases and this Stack Overflow question gives a great example of how useless WriterP was. This suggested that I had implemented both of those two proxy transformers incorrectly, since both of them gave the wrong behavior in many corner cases and both of them resisted a correct ListT implementation.

This observation led me to discover the correct solution: make StateP and WriterP share their effects globally across the pipeline, instead of locally. This fix solved both problems:
  • Both of them now implement List and obey the ListT laws
  • Both of them now give correct behavior in all corner cases
Consequently, I can now merge the ListT class into the Proxy class and reunite request and respond with their respective composition operators. Also, now all proxy transformers lift all four categories correctly.


Folds


The WriterP fix leads to a big improvement in the pipes folding API. Now you can do folds using WriterP within the pipeline and without using the base monad.

For example, if you want to fold all positive elements from upstream, you can now write:
somePipe = do
    -- The unitU discards values that 'toListD' reforwards
    xs <- execWriterK (takeWhileD (> 0) >-> toListD >-> unitU) ()
    respond xs
    ...
You can now access the result of folds within pipes! You no longer have to wait until the Session is complete to retrieve the folded data.

Also, since folds don't use the base monad you no longer need to hoist stages that you compose with a fold. For example, if you want to sum the first ten lines of user input, you can just write:
runProxy $ execWriterK $ readLnS >-> takeB_ 10 >-> sumD

Deprecation


I've also started to deprecate several parts of the API in preparation for an eventual pipes-4.0.0 release. These are the main things I deprecated:
  • The classic pipes API (i.e. await, yield, and (>+>))
  • raise functions (i.e. raise and raiseP)
  • K functions, like hoistK and liftK (exception: I keep the run...K functions)
  • Many bidirectional utilities from the pipes prelude and some upstream utilities
  • idT and coidT are renamed to pull and push
If you disagree with any of these deprecations, please let me know since I'm always open to suggestions to keep them or migrate them to a pipes-extras library.

I renamed idT and coidT because this allows for a nice convention where every category is named after its identity operator. Also, the new names are more suggestive of their behavior: idT begins by pulling information, while coidT first pushes information.

This rename becomes even more advantageous when you use the io-streams style I discussed in a previous post, but I will save the full explanation of why for later.


Modules


I've also tightened up the module hierarchy, which has gone down from 23 modules to 18, and will go down further to 16 when I remove the deprecated Control.Pipe and Control.Proxy.Pipe modules in pipes-4.0.0. Hopefully this makes the library a bit less intimidating to newcomers and easier to navigate.


Future Work


As always, I'm still working on pipes-parse. The big holdup is that I have been experimenting with more elegant solutions to pushback, mainly because I would like to implement many non-trivial features like nested sub-parsers. If worse comes to worse, I will just drop those advanced features and push the simpler version out the door in the next few weeks.

Saturday, May 4, 2013

Program imperatively using Haskell lenses

Haskell gets a lot of flack because it has no built-in support for state and mutation. Consequently, if we want to bake a stateful apple pie in Haskell we must first create a whole universe of stateful operations. However, this principled approach has paid off and now Haskell programmers enjoy more elegant, concise, and powerful imperative code than you can find even in self-described imperative languages.


Lenses


Your ticket to elegant code is the lens library. You define your data types as usual, but you prefix each field with an underscore. For example, I can define a Game:
data Game = Game
    { _score :: Int
    , _units :: [Unit]
    , _boss  :: Unit
    } deriving (Show)
... full of Units:
data Unit = Unit
    { _health   :: Int
    , _position :: Point
    } deriving (Show)
... whose locations are represented by Points:
data Point = Point
    { _x :: Double
    , _y :: Double
    } deriving (Show)
We prefix these fields with an underscore because we will not be using them directly. Instead, we will use them to build lenses, which are much more pleasant to work with.

We can build these lenses in two ways. Our first option is to define lenses manually using the lens convenience function from Control.Lens. For example, we can define a score lens to replace the _score field accessor:
import Control.Lens

score :: Lens' Game Int
score = lens _score (\game v -> game { _score = v })
A Lens is like a map which you use to navigate complex data types. We use the above score lens to navigate from our Game type to its _score field.

The type reflects where we begin and end: Lens' Game Int means we must begin on a value of type Game and end on a value of type Int (the score, in this case). Similarly, our other lenses will clearly indicate their starting and ending points in their types:
units :: Lens' Game [Unit]
units = lens _units (\game v -> game { _units = v })

boss :: Lens' Game Unit
boss = lens _boss (\game v -> game { _boss = v })

health :: Lens' Unit Int
health = lens _health (\unit v -> unit { _health = v })

position :: Lens' Unit Point
position = lens _position (\unit v -> unit { _position = v })

x :: Lens' Point Double
x = lens _x (\point v -> point { _x = v })

y :: Lens' Point Double
y = lens _y (\point v -> point { _y = v })
However, we don't have to write out all this boilerplate if we're lazy. Our second option is to use Template Haskell to define all these lenses for us:
{-# LANGUAGE TemplateHaskell #-}

import Control.Lens

data Game = Game
    { _score :: Int
    , _units :: [Unit]
    , _boss  :: Unit
    } deriving (Show)

data Unit = Unit
    { _health   :: Int
    , _position :: Point
    } deriving (Show)

data Point = Point
    { _x :: Double
    , _y :: Double
    } deriving (Show)

makeLenses ''Game
makeLenses ''Unit
makeLenses ''Point
Just remember that Template Haskell requires these makeLenses declarations to go after your data types.


Initial State


The next thing we need is a test initial game state:
initialState :: Game
initialState = Game
    { _score = 0
    , _units =
        [ Unit
            { _health = 10
            , _position = Point { _x = 3.5, _y = 7.0 }
            }
        , Unit
            { _health = 15
            , _position = Point { _x = 1.0, _y = 1.0 }
            }
        , Unit
            { _health = 8
            , _position = Point { _x = 0.0, _y = 2.1 }
            }
        ]
    , _boss = Unit
        { _health = 100
        , _position = Point { _x = 0.0, _y = 0.0 }
        }
    }
We've enlisted three valiant heroes to slay the dungeon boss. Let the battle begin!


First Steps


Now we can use our lenses! Let's create a routine for our warriors to strike at the boss:
import Control.Monad.Trans.Class
import Control.Monad.Trans.State

strike :: StateT Game IO ()
strike = do
    lift $ putStrLn "*shink*"
    boss.health -= 10
strike prints an evocative sound to the console, then decrements the boss's health by 10 hit points.

strike's type indicates that it operates within the StateT Game IO monad. You can think of this as a DSL where we layer our pure game state (i.e. StateT Game) on top of side effects (i.e. IO) so that we can both mutate our game and also print cute battle effects to the console. All you have to remember is that any time we need side effects, we will use lift to invoke them.

We'll test out strike in ghci. In order to run strike, we must supply it with an initialState:
>>> execStateT strike initialState 
*shink*
Game {_score = 0, _units = [Unit {_health = 10, _position = Poin
t {_x = 3.5, _y = 7.0}},Unit {_health = 15, _position = Point {_
x = 1.0, _y = 1.0}},Unit {_health = 8, _position = Point {_x = 0
.0, _y = 2.1}}], _boss = Unit {_health = 90, _position = Point {
_x = 0.0, _y = 0.0}}}
execStateT takes our stateful code and an initial state, and then runs that code to produce a new state. ghci automatically shows the return value as a convenience so we can inspect the newly returned state. The output is a bit of a mess, but if you strain your eyes you can see that the boss now only has 90 health.

We can view this more easily by storing the new state in a variable:
>>> newState <- execStateT strike initialState 
*shink*
... and then we can query newState for the part we actually care about:
>>> newState^.boss.health
90

Composition


This syntax very strongly resembles imperative and object-oriented programming:
boss.health -= 10
What is going on here? Haskell is decidely not a multi-paradigm language, yet we have what appears to be multi-paradigm code.

Amazingly, nothing on that line is a built-in language feature!
  • boss and health are just the lenses we defined above
  • (-=) is an infix function
  • (.) is function composition from the Haskell Prelude!
Wait, (.) is function composition? Really?

This is where the lens magic comes in. Lenses are actually ordinary functions, and our "multi-paradigm" code is actually functions all the way down!

In fact, Lens' a b is actually a type synonym for a certain type of higher-order function:
type Lens' a b =
    forall f . (Functor f) => (b -> f b) -> (a -> f a)
You don't need to understand the details of that. Just remember that Lens' a b is a higher-order function that accepts a function of type (b -> f b) as an argument, and returns a new function of type (a -> f a). The Functor part is the theoretically-inspired "magic".

Armed with that knowledge, let's make sure the types check out by expanding out the Lens' type synonyms for boss and health
boss :: Lens' Game Unit
-- expands to:
boss :: (Functor f) => (Unit -> f Unit) -> (Game -> f Game)

health :: Lens' Unit Int
-- expands to:
health :: (Functor f) => (Int -> f Int) -> (Unit -> f Unit)
Now let's review the definition of function composition:
(.) :: (b -> c) -> (a -> b) -> (a -> c)
(f . g) x = f (g x)
Notice that if we specialize our type variables to:
a ~ (Int  -> f Int)
b ~ (Unit -> f Unit)
c ~ (Game -> f Game)
... then this has exactly the right type to compose our two lenses:
(.) :: ((Unit -> f Unit) -> (Game -> f Game))
    -> ((Int  -> f Int ) -> (Unit -> f Unit))
    -> ((Int  -> f Int ) -> (Game -> f Game))
If we put the Lens' type synonyms back in, we get:
(.) :: Lens' Game Unit -> Lens' Unit Int -> Lens' Game Int

boss . health :: Lens' Game Int
So function composition is also lens composition! In fact, lenses form a category where (.) is the category's composition operator and the identity function id is also the identity lens:
(.) :: Lens' x y -> Lens' y z -> Lens' x z

id  :: Lens' x x
What's so beautiful about this is that Haskell lets us remove the spaces around the function composition operator so that it looks exactly like object-oriented accessor notation!

Categories make it really easy to connect and group components on the fly. For example, if I anticipate that I will be modifying the Boss's health frequently, I can just define a composite lens:
bossHP :: Lens' Game Int
bossHP = boss.health
... and now I can use it wherever I previously used boss.health:
strike :: StateT Game IO ()
strike = do
    lift $ putStrLn "*shink*"
    bossHP -= 10
... or similarly use it as an accessor:
>>> newState^.bossHP
90

Traversals


Lenses are grounded in some really elegant theory, and as a result they get a lot of things right that imperative languages normally don't!

For example, let's say that our boss is a dragon and breathes fire, which damages all heroes. Using lenses, I can decrement the entire party's health using a single instruction:
fireBreath :: StateT Game IO ()
fireBreath = do
    lift $ putStrLn "*rawr*"
    units.traversed.health -= 3
This makes use of a new lens!
traversed :: Traversal' [a] a
traversed lets us "dig in" to the values in a list so that we can manipulate them as a single unit instead of manually looping over the list. However, this time the type is a Traversal' instead of a Lens'.

A Traversal is a like a Lens' except weaker:
type Traversal' a b =
    forall f . (Applicative f) => (b -> f b) -> (a -> f a)
If you compose Lens' with a Traversal', you get the weaker of the two: a Traversal'. This works no matter which order you compose them in:
(.) :: Lens' a b -> Traversal' b c -> Traversal' a c

(.) :: Traversal' a b -> Lens' b c -> Traversal' a c
units                  :: Lens'      Game [Unit]
units.traversed        :: Traversal' Game  Unit
units.traversed.health :: Traversal' Game  Int
In fact, we don't need to figure this out. The compiler will infer the correct type all by itself:
>>> :t units.traversed.health
units.traversed.health
  :: Applicative f =>
     (Int -> f Int) -> Game -> f Game
That's exactly the right type to be a Traversal' Game Int!

Actually, why not just compose these lenses into a single lens:
partyHP :: Traversal' Game Int
partyHP = units.traversed.health

fireBreath :: StateT Game IO ()
fireBreath = do
    lift $ putStrLn "*rawr*"
    partyHP -= 3
Let's also use partyHP lens to retrieve the new party hitpoints:
>>> newState <- execStateT fireBreath initialState 
*rawr*
>>> newState^.partyHP

<interactive>:3:11:
    No instance for (Data.Monoid.Monoid Int)
      arising from a use of `partyHP'
    Possible fix:
      add an instance declaration for (Data.Monoid.Monoid Int)
    In the second argument of `(^.)', namely `partyHP'
    In the expression: newState ^. partyHP
    In an equation for `it': it = newState ^. partyHP
Oops! This is a type error because there is no single health to get! This is why a Traversal' is weaker than a Lens': traversals may point to multiple values, so they do not support a well-defined way to get just one value. The type system saved us from a potential bug!

Instead, we must specify that we actually want a list of values using the toListOf function:
toListOf :: Traversal' a b -> a -> [b]
This gives the desired result:
>>> toListOf partyHP newState 
[7,12,5]
... and there's an infix operator equivalent to toListOf: (^..):
>>> initialState^..partyHP
[10,15,8]
>>> newState^..partyHP
[7,12,5]
Now we can clearly see at a glance that fireBreath worked the way we intended.

Now I want to get really fancy. I want to define a traversal over a geographic area. Can I do that?
around :: Point -> Double -> Traversal' Unit Unit
around center radius = filtered (\unit ->
    (unit^.position.x - center^.x)^2
  + (unit^.position.y - center^.y)^2
  < radius^2 )
Sure I can! Now I can limit the dragon's fire breath to a circular area!

Edit: filtered is apparently not a theoretically valid traversal because it does not preserve the number of elements. See this /r/haskell thread for details.
fireBreath :: Point -> StateT Game IO ()
fireBreath target = do
    lift $ putStrLn "*rawr*"
    units.traversed.(around target 1.0).health -= 3
Notice how expressive that code is: we want to decrement the health of all units around the target. That code conveys our intention much more clearly than the equivalent mainstream imperative code and it leaves much less room for error.

Anyway, back to breathing fire. First, let's see where the units are located:
> initialState^..units.traversed.position
[Point {_x = 3.5, _y = 7.0},Point {_x = 1.0, _y = 1.0},Point {_x
 = 0.0, _y = 2.1}]
Hmmm, the latter two units are close by, so I will aim the fireball in between them:
>>> newState <- execStateT (fireBreath (Point 0.5 1.5)) initialState 
*rawr*
>>> (initialState^..partyHP, newState^..partyHP)
([10,15,8],[10,12,5])
Nailed it!


Zooming


We can do more unique things with lenses, like zoom in on subsets of our global state:
retreat :: StateT Game IO ()
retreat = do
    lift $ putStrLn "Retreat!"
    zoom (units.traversed.position) $ do
        x += 10
        y += 10
As before, we can combine these lenses into a single lens if we want to reuse it later on:
partyLoc :: Traversal' Game Point
partyLoc = units.traversed.position

retreat :: StateT Game IO ()
retreat = do
    lift $ putStrLn "Retreat!"
    zoom partyLoc $ do
        x += 10
        y += 10
Let's try it out:
>>> initialState^..partyLoc
[Point {_x = 3.5, _y = 7.0},Point {_x = 1.0, _y = 1.0},Point {_x
 = 0.0, _y = 2.1}]
>>> newState <- execStateT retreat initialState 
Retreat!
>>> newState^..partyLoc
[Point {_x = 13.5, _y = 17.0},Point {_x = 11.0, _y = 11.0},Point
 {_x = 10.0, _y = 12.1}]
Let's look at the type of zoom in the context of this particular example:
zoom :: Traversal a b -> StateT b IO r -> StateT a IO r
zoom has some nice theoretical properties. For example, we'd expect that if we zoom using two successive lenses, it should behave the same as zooming using the composite lens:
zoom lens1 . zoom lens2 = zoom (lens1 . lens2)
... and if we zoom in on the empty lens, we end up back where we started:
zoom id = id
In other words, zoom defines a functor, and those equations are the functor laws!


Combining commands


So far I've only shown a single command at a time, but now let's take all of these concepts and imperatively assemble a battle from them:
battle :: StateT Game IO ()
battle = do
    -- Charge!
    forM_ ["Take that!", "and that!", "and that!"] $ \taunt -> do
        lift $ putStrLn taunt
        strike

    -- The dragon awakes!
    fireBreath (Point 0.5 1.5)
    
    replicateM_ 3 $ do
        -- The better part of valor
        retreat

        -- Boss chases them
        zoom (boss.position) $ do
            x += 10
            y += 10
Let's try it out!
>>> execStateT battle initialState 
Take that!
*shink*
and that!
*shink*
and that!
*shink*
*rawr*
Retreat!
Retreat!
Retreat!
Game {_score = 0, _units = [Unit {_health = 10, _position = Poin
t {_x = 33.5, _y = 37.0}},Unit {_health = 12, _position = Point 
{_x = 31.0, _y = 31.0}},Unit {_health = 5, _position = Point {_x
 = 30.0, _y = 32.1}}], _boss = Unit {_health = 70, _position = P
oint {_x = 30.0, _y = 30.0}}}
I guess people really aren't joking when they say Haskell is the finest imperative language.


Conclusions


This really just scratches the surface of the lens library, which is one of the crown jewels of the Haskell ecosystem. You can use lenses for pure programming, too, and compress very powerful and complex computations into very readable and elegant code. When I have more time I will write even more about this amazing library.

Sunday, April 21, 2013

pipes and io-streams

I was originally planning to release an extension library for pipes that would add an io-streams-like interface to pipes. However, this week I discovered a surprisingly elegant way to implement io-streams behavior (minus pushback) using the existing pipes API. In fact, this translation has been possible ever since pipes-2.4!

This post assumes that you are familiar with both pipes and io-streams. If not, then you can read the pipes tutorial or the io-streams tutorial, both of which I wrote!

Edit: I also just discovered that when you re-implement io-streams using pipes, then pipes ties io-streams in performance. See this reddit comment for code and benchmarks.


Conversions


I'll begin with the following generic correspondence between io-streams and pipes:
{-# LANGUAGE RankNTypes #-}

import Control.Proxy

type InputStream a
    = forall p . (Proxy p) => () -> Session p IO (Maybe a)

type OutputStream a
    = forall p . (Proxy p) => Maybe a -> Session p IO ()

makeInputStream :: IO (Maybe a) -> InputStream a
makeInputStream m () = runIdentityP $ lift m

makeOutputStream :: (Maybe a -> IO ()) -> OutputStream a
makeOutputStream f a = runIdentityP $ lift (f a)
For example:
stdin :: InputStream String
stdin = makeInputStream (fmap Just getLine)

stdout :: OutputStream String
stdout = makeOutputStream (maybe (return ()) putStrLn)
Notice how this does not convert the read and write actions to repetitive streams. Instead, you lift each action just once into the Proxy monad.

To read or write to these sources you just run the proxy monad:
import Prelude hiding (read)

read :: InputStream a -> IO (Maybe a)
read is = runProxy is

{- This requires `pipes` HEAD, which generalizes the type
   of `runProxyK.

   Alternatively, you can hack around the current definition
   using:

   write ma os = runProxy (\() -> os ma)
-}
write :: Maybe a -> OutputStream a -> IO ()
write ma os = runProxyK os ma
For example:
>>> read stdin
Test<Enter>
Just "Test"
>>> write (Just "Test") stdout
"Test"
Now consider the following io-streams transformation:
map :: (a -> b) -> InputStream a -> IO (InputStream b)
map f is = makeInputStream $ do
    ma <- read is
    return (fmap f ma)
Here's the naive translation to pipes (later I will show the more elegant version):
import Prelude hiding (map)

map :: (a -> b) -> InputStream a -> InputStream b
map f is () = runIdentityP $ do
    ma <- is ()
    return (fmap f ma) )
For example:
>>> read (map (++ "!") stdin)
Test<Enter>
Just "Test!"
The only real difference is that instead of using read is I just use is () directly.

So far I've used absolutely no pipes-specific features to implement any of this. I could have done this just as easily in any other library, including conduit.


Input streams


Now I will add a twist: we can reify transformations to be Consumers instead of functions of an InputStream.

For example, let's transform map to be a Consumer instead of a function:
map :: (Proxy p)
    => (a -> b)
    -> () -> Consumer p (Maybe a) IO (Maybe b)
map f () = runIdentityP $ do
    ma <- request ()
    return (fmap f ma)
I've only made two changes:
  • map is no longer a function of an InputStream
  • Instead of calling the input stream, I use request ()
Check out the final pipe type carefully (ignoring the Maybes):
() -> Consumer p a m b
An input stream transformation is a fold in disguise! These transformations fold 0 or more values from the old input stream to return a new value.

Notice how this differs from the conventional flow of information pipes. We're not converting a stream of as into a new stream of bs. Instead, we're folding a stream of as into a single return value of type b.

How do we even connect map to stdin, though? They don't have the right types for ordinary pipe composition:
stdin 
    :: (Proxy  p)
    => () -> Session  p                IO (Maybe String)

map length
    :: (Proxy p)
    => () -> Consumer p (Maybe String) IO (Maybe Int   )
We need some way to feed the return value of stdin into the upstream input of map. This means we need to call stdin once each time that map requests a new value and feed that into the request. How can we do that using the existing pipes API?

Fortunately, this is exactly the problem that the (\>\) composition operator solves:
>>> read (stdin \>\ map length)
Test<Enter>
Just 4
Woah, slow down! What just happened?


The "request" category


The definition of (\>\) is actually really simple:
(f \>\ g) replaces all of g's requests with f.
So when we wrote (stdin \>\ map length), we replaced the single request in map with stdin as if we had written it in by hand:
-- Before
map length () = runIdentityP $ do
    ma <- request ()
    return (fmap length ma)

-- After
(stdin \>\ map length) () = runIdentityP $ do
    ma <- stdin ()
    return (fmap length ma)
In fact, this behavior means that (\>\) and request form a category. (\>\) is the composition operation, request is the identity, and they satisfy the category laws:
-- Replacing each 'request' in 'f' with 'request' gives 'f'
request \>\ f = f

-- Replacing each 'request' in 'request' with 'f' gives 'f'
f \>\ request = f

-- Substitution of 'request's is associative
(f \>\ g) \>\ h = f \>\ (g \>\ h)
Let's take a closer look at the type of (\>\) to understand what is going on:
(\>\)
    :: (Monad m, ListT p)
    => (b' -> p a' a x' x m b)
    -> (c' -> p b' b x' x m c)
    -> (c' -> p a' a x' x m c)
We can illuminate this type by type-restricting the arguments. First, let's consider the case where the first argument is a Session and the second argument is a Consumer, and see what type the compiler infers for the result:
(\>\)
    :: (Monad m, ListT p)
    => (() -> Session  p   m b)
    -> (() -> Consumer p b m c)
    -> (() -> Session  p   m c)
The compiler infers that we must get back a readable Session, just like we wanted! This is precisely what happened when we composed stdin with map:
stdin
    :: () -> Session  p                IO (Maybe String)

map length
    :: () -> Consumer p (Maybe String) IO (Maybe Int   )

stdin \>\ map length
    :: () -> Session  p                IO (Maybe Int   )

read (stdin \>\ map) :: IO (Maybe String)
However, there's another way we can specialize the type of (\>\). We can instead restrict both arguments to be Consumers:
(\>\)
    :: (Monad m, ListT p)
    => (() -> Consumer p a m b)
    -> (() -> Consumer p b m c)
    -> (() -> Consumer p a m c)
The compiler infers that if we compose two folds, we must get a new fold!

This means that we can compose input stream transformations using the same (\>\) operator:
map length
    :: () -> Consumer p (Maybe String) IO (Maybe Int )

map even
    :: () -> Consumer p (Maybe Int   ) IO (Maybe Bool)

map length \>\ map even
    :: () -> Consumer p (Maybe String) IO (Maybe Bool)
Then we will use the exact same operator to connect our input stream:
stdin \>\ map length \>\ map even
    :: () -> Session p IO (Maybe Bool)

read (stdin \>\ map length \>\ map even)
    :: IO (Maybe Bool)
Let's try it!
>>> read (stdin \>\ map length \>\ map even)
Test<Enter>
Just True
Now let's make things more interesting to really drive the point home:
import Control.Monad

twoReqs
    :: (Proxy p) => () -> Consumer p (Maybe a) IO (Maybe [a])
twoReqs () = runIdentityP $ do
    mas <- replicateM 2 (request ())
    return (sequence mas)
>>> read (stdin \>\ twoReqs \>\ twoReqs) :: IO [[String]]
1<Enter>
2<Enter>
3<Enter>
4<Enter>
Just [["1","2"],["3","4"]]
>>> -- We can still read from raw 'stdin'
>>> read stdin
5<Enter>
Just 5
In other words, the "request" category is actually the category of InputStreams and pipes already has InputStream support today!


Output streams


Now let's do the exact same thing for OutputStreams! We'll begin with the filterOutput function from io-streams:
filterOutput
    :: (a -> Bool) -> OutputStream a -> IO (OutputStream a)
filterOutput pred os = makeOutputStream $ \ma -> case ma of
    Nothing            -> write ma os
    Just a | pred a    -> write ma os
           | otherwise -> return ()
The pipes equivalent is:
filterOutput
    :: (Proxy p)
    => (a -> Bool)
    -> Maybe a -> Producer p (Maybe a) IO ()
filterOutput pred ma = runIdentityP $ case ma of
    Nothing            -> respond ma
    Just a | pred a    -> respond ma
           | otherwise -> return ()
This time we get a new shape for our type signature:
a -> Producer p b m ()
An output stream transformation is an unfold in disguise! These transformations unfold each output destined for the old output stream into 0 or more outputs destined for the new output stream.

Let's hook this transformation up to stdout and start writing values to it ... oh wait. How do we hook it up? Well, let's look at the types of stdout and filterOutput:
filterOutput f
    :: (Proxy p) => Maybe a -> Producer p (Maybe a) IO ()

stdout
    :: (Proxy p) => Maybe a -> Session  p           IO ()
We need some way to feed the stream output of filterOutput into the argument of stdout. This means that we need to call stdout on each output of filterOutput. How can we do that using the existing pipes API?

Why, we use (/>/), the dual of (\>\)!
>>> write (Just "") (filterOutput (not . null) />/ stdout)
>>> write (Just "Test") (filterOutput (not . null) />/ stdout)
Test

The "respond" category


(/>/) also has a simple behavior:
(f />/ g) replaces all of f's responds with g.
This behavior means that (/>/) and respond form a category, too! (/>/) is the composition operation, respond is the identity, and they satisfy the category laws:
-- Replacing each 'respond' in 'f' with 'respond' gives 'f'
f />/ respond = f

-- Replacing each 'respond' in 'respond' with 'f' gives 'f'
respond />/ f = f

-- Substitution of 'respond's is associative
(f />/ g) />/ h = f />/ (g />/ h)
Let's learn more by reviewing the type of (/>/):
(/>/)
    :: (Monad m, ListT p)
    => (a -> p x' x b' b m a')
    -> (b -> p x' x c' c m b')
    -> (a -> p x' x c' c m a')
We'll specialize this type signature in two ways. First, we'll consider the case where the first argument is a Producer and the second argument is a Session and then see what type the compiler infers for the result:
(/>/)
    :: (Monad m, ListT p)
    => (a -> Producer p b m ())
    -> (b -> Session  p   m ())
    -> (a -> Session  p   m ())
The compiler infers that we must get back a writeable session, just like we wanted! This is precisely what happened when we composed filterOutput and stdout:
filterOutput (not . null)
    :: Maybe String -> Producer p (Maybe String) IO ()

stdout
    :: Maybe String -> Session  p                IO ()

filterOutput (not . null) />/ stdout
    :: Maybe String -> Session  p                IO ()

flip write (filterOutput (not . null) />/ stdout)
    :: Maybe String -> IO ()
However, there's another way we can specialize the type of (/>/). We can instead restrict both arguments to be Producers:
(/>/)
    :: (Monad m, ListT p)
    => (a -> Producer p b m ())
    -> (b -> Producer p c m ())
    -> (a -> Producer p c m ())
The compiler infers that if we compose two unfolds, we must get a new unfold!

This means that we can compose output stream transformations using (/>/). To show this, let's define the equivalent of io-stream's contramap:
contramap
    :: (Proxy p)
    => (a -> b)
    -> Maybe a -> Producer p (Maybe b) IO ()
contramap f ma = runIdentityP $ respond (fmap f ma)
We can connect contramap's using (/>/):
contramap even
    :: Maybe Int  -> Producer p (Maybe Bool  ) IO ()

contramap show
    :: Maybe Bool -> Producer p (Maybe String) IO ()

contramap even />/ contramap show
    :: Maybe Int  -> Producer p (Maybe String) IO ()
... then using the exact same operator we connect them to stdout:
contramap even />/ contramap show />/ stdout
    :: Maybe Int -> Session p IO ()

flip write (contramap even />/ contramap show />/ stdout)
    :: Maybe Int -> IO ()
Let's try it!
>>> write (Just 4) (contramap even />/ contramap show />/ stdout)
True
Now let's make things more interesting to really drive the point home:
hiBye
    :: (Proxy p)
    => Maybe String -> Producer p (Maybe String) IO ()
hiBye mstr = runIdentityP $ do
    respond $ fmap ("Hello, " ++) mstr
    respond $ fmap ("Goodbye, " ++) mstr
>>> write (Just "world") (hiBye />/ hiBye />/ stdout)
Hello, Hello, world
Goodbye, Hello, world
Hello, Goodbye, world
Goodbye, Goodbye, world
>>> -- We can still write to raw 'stdout'
>>> write (Just "Haskell") stdout
Haskell
In other words, the "respond" category is actually the category of OutputStreams and pipes already has OutputStream support today!


Unify input and output streams


We can do much more than just implement the io-streams API, though! We can generalize it in many more powerful ways than io-streams permits.

First off, I'm going to inline the InputStream and OutputStream types into the type signatures of read and write:
read
    :: (forall p . (Proxy p) => () -> Session p IO (Maybe a))
    -> IO (Maybe a)

write
    :: Maybe a
    -> (forall p . (Proxy p) => Maybe a -> Session p IO ())
    -> IO ()
Then I will generalize the types to not be Maybe-specific and flip the arguments for write
read
    :: (forall p . (Proxy p) => () -> Session p IO a)
    -> IO a

write
    :: (forall p . (Proxy p) => a -> Session p IO ())
    -> a -> IO ()
Now, I will generalize both read and write by adding bidirectionality to them. read will now accept an argument parametrizing its request and write can now optionally receive a result.
read
    :: (forall p . (Proxy p) => a -> Session p IO b)
    -> a -> IO b
read is = runProxyK is

write
    :: (forall p . (Proxy p) => a -> Session p IO b)
    -> a -> IO b
write is = runProxyK is
Well, would you look at that! They both have the same type and implementation! In other words, we can unify read and write into a single function:
once
    :: (forall p . (Proxy p) => a -> Session p IO b)
    -> a -> IO b
once is = runProxyK is
Also, once is really polymorphic over the base monad:
once
    :: (Monad m)
    => (forall p . (ListT p) => a -> Session p m b)
    -> a -> m b
once is = runProxyK is
Using this function, we can both read from InputStreams and write to OutputStreams:
>>> once (stdin \>\ map length) ()
Test<Enter>
Just 4
>>> once (contramap show />/ stdout) (Just 1)
1
Let's test the bidirectionality. I'll write a new stdin that accepts a count parameter telling how many lines to get. This time, there will be no Maybes:
stdinN :: (Proxy p) => Int -> Session p IO [String]
stdinN n = runIdentityP $ replicateM n $ lift getLine
First, briefly test it:
>>> once stdinN 3
Test
Apple
123
["Test","Apple","123"]
Similarly, let's define a transformation to automate supplying values to stdinN:
automate :: (Proxy p) => () -> Client p Int [String] IO ()
automate () = runIdentityP $ do
    lift $ putStrLn "First batch:"
    xs <- request 2
    lift $ print xs
    lift $ putStrLn "Second batch:"
    ys <- request 2
    lift $ print ys
Now compose!
>>> once (stdinN \>\ automate) ()
First batch:
1<Enter>
2<Enter>
["1","2"]
Second batch:
3<Enter>
4<Enter>
["3","4"]
>>> -- Go back to using 'stdinN' unfiltered
>>> once stdinN 1
5<Enter>
["5"]
We can similarly unify makeInputStream and makeOutputStream:
make :: (Monad m, Proxy p) => (q -> m r) -> q -> p a' a b' b m r
make f a = runIdentityP (lift (f a))
The only difference between an input stream and an output stream is whether it consumes or produces a value and bidirectional streams blur the line even further.


Purity


None of this machinery is specific to the IO monad. Everything I've introduced is polymorphic over the base monad! We only incur IO if any of our stages use IO. This departs greatly from io-streams, where you are trapped in the IO monad to do everything. However, I want to qualify that statement with a caveat: I expect that pipes-parse will require either IO or an ST base monad (your choice) to properly manage pushback like io-streams does. Even still, you only pay this price if you actually need it.

pipes improves on purity in two other ways. First, building input and output streams is pure when you use pipes:
-- pipes
makeInputStream  :: IO (Maybe a)       -> InputStream a
makeOutputStream :: (Maybe a -> IO ()) -> OutputStream a

-- io-streams
makeInputStream  :: IO (Maybe a)       -> IO (InputStream  a)
makeOutputStream :: (Maybe a -> IO ()) -> IO (OutputStream a)
Consequently, io-streams must use unsafePerformIO for its stdin:
stdin :: InputStream ByteString
stdin = unsafePerformIO $
    handleToInputStream IO.stdin >>= lockingInputStream
Second, composing and transforming input streams is pure when you use pipes:
>>> -- pipes
>>> read (stdin \>\ map length \>\ map even)
...
>>> -- io-streams
>>> is <- (map even <=< map length) stdin
>>> read is
...
... and if none of the stages use IO, then the entire operation is pure!


Extra interfaces


You can generalize the type signatures even further. Let's revisit the types of (\>\) and (/>/):
(\>\)
    :: (Monad m, ListT p)
    => (b' -> p a' a x' x m b)
    -> (c' -> p b' b x' x m c)
    -> (c' -> p a' a x' x m c)

(/>/)
    :: (Monad m, ListT p)
    => (a -> p x' x b' b m a')
    -> (b -> p x' x c' c m b')
    -> (a -> p x' x c' c m a')
So far we've been ignoring the x'/x interfaces entirely, treating them as closed interfaces. However, there's no reason we have to! If we open up this interface, then every processing stage that we connect can read and write to this streaming interface.

In a future post, I will show you how you can really elegantly use this trick to generalize Hutton-Meijer parsers to permit effects.


ListT


The people who read the pipes-3.2 announcement post know that the (\>\) and (/>/) composition operators correspond to (>=>) for the two pipes ListT monads.
This means that you can take any output stream transformation and assemble more sophisticated behaviors using do notation:
pairs
    :: (ListT p)
    => () -> Producer p (Maybe (String, String)) IO ()
pairs () = runRespondT $ do
    mstr1 <- RespondT $ hiBye (Just "world")
    mstr2 <- RespondT $ hiBye (Just "Haskell")
    return $ (,) <$> mstr1 <*> mstr2
These behave just like the list monad, non-deterministically selecting all possible paths:
>>> once (pairs />/ contramap show />/ stdout) ()
("Hello, world","Hello, Haskell")
("Hello, world","Goodbye, Haskell")
("Goodbye, world","Hello, Haskell")
("Goodbye, world","Goodbye, Haskell")
The exact same trick works for input stream transformations, too, but it's less useful because we don't send information upstream as frequently as downstream.


Conclusions


One of the common questions I often get was "How do I escape from the pipe monad?" and I wrote this post to demonstrate how to do so. You can use pipes to program exactly in the same style as io-streams. The only thing missing is a standard library of io-streams-like utilities.

So far, I have not touched on the issue of push-back, a feature which io-streams provides. pipes-parse will build on the ideas I've introduced in this post to provide an io-streams-style interface to parsing that includes push-back functionality. This will make it easy to dynamically add or remove processing stages throughout the parsing process, which is necessary for parsing HTTP. In fact, everything I've presented here fell out very naturally from my work on pipes-parse and I only later discovered that I had accidentally reinvented io-streams entirely within pipes.

So I will cautiously state that I believe pipes is "io-streams done right". Everything that Greg has been doing with io-streams matches quite nicely with the pre-existing "request" and "respond" pipes categories that I independently discovered within pipes. This indicates that Greg was really onto something and pipes provides the elegant theoretical foundation for his work.

Sunday, April 14, 2013

pipes-concurrency-1.0.0: Reactive programming

This post introduces the pipes-concurrency library (which you can find here), which is the renamed pipes-stm library that I previously promised. I ended up completing this much sooner than I anticipated, which is why it precedes the upcoming pipes-parse package.

Begin with the tutorial if you want to learn how to use the library. This post mainly highlights some features and compares the pipes-concurrency approach to other libraries. Also, I'll provide some bonus examples that are not in the tutorial.

Before I continue, I want to credit Eric Jones, who first began the library as pipes-stm on Github. Unfortunately, I lost all contact with him and he didn't include a LICENSE in his repository, so I had to rebuild the library from scratch because I wasn't sure if a fork would constitute copyright infringement. If he reads this and gets in touch with me and approves of the BSD license then I will add him to the LICENSE and also add him as a library author.

Reactive programming


Several people are beginning to realize that streaming libraries overlap substantially with reactive programming frameworks. pipes-concurrency provides the basic building blocks necessary to build reactive programming abstractions.

For example, let's say that I want to simulate reactive-banana's Events using pipes-concurrency:
{-# LANGUAGE RankNTypes #-}

import Control.Proxy

-- `pipes` does not need the `t` parameter from reactive-banana
type Event a = forall p . (Proxy p) => () -> Producer p a IO ()
If you want to take the union of two asynchronous streams, you spawn a mailbox, merge two streams into it using sendD, and then read out the results using recvS:
import Control.Monad
import Control.Concurrent.Async
import Control.Proxy.Concurrent

union :: Event a -> Event a -> Event a
union e1 e2 () = runIdentityP $ do
    (input, output) <- lift $ spawn Unbounded
    as <- lift $ forM [e1, e2] $ \e ->
        async $ do runProxy $ e >-> sendD input
                   performGC
    recvS output ()
    lift $ mapM_ wait as
Now we can define two event sources:
clock :: Event String
clock = fromListS (cycle ["Tick", "Tock"])
    >-> execD (threadDelay 1000000)

user :: Event String
user = stdinS
... and cleanly merge them:
main = runProxy $ union clock user
              >-> takeWhileD (/= "quit")
              >-> stdoutD
$ ./union
Tick
Tock
test<Enter>
test
Tick
Tock
quit<Enter>
$
People often tout spreadsheets as the classic example of functional-reactive programming, so why not simulate that, too? Well, a spreadsheet cell is just a non-empty stream of values:
{-# LANGUAGE PolymorphicComponents #-}

import Control.Proxy

data Cell a = Cell
    { initial :: a
    , stream  :: forall p . (Proxy p) => () -> Producer p a IO ()
    }

runCell :: (Proxy p) => Cell a -> () -> Producer p a IO ()
runCell (Cell a ga) () = runIdentityP $ do
    respond a
    ga ()
Each value in the stream represents an update to the cell's contents, either by the user:
input :: Cell String
input = Cell "" stdinS
... or some data source:
time :: Cell Int
time = Cell 0 $ \() -> evalStateP 1 $ forever $ do
    n <- get
    respond n
    lift $ threadDelay 1000000
    put (n + 1)
Spreadsheet cells are both Functors and Applicatives:
instance Functor Cell where
    fmap f (Cell x gx) = Cell (f x) (gx >-> mapD f)

instance Applicative Cell where
    pure a = Cell a (runIdentityK return)

    (Cell f0 gf) <*> (Cell x0 gx)
        = Cell (f0 x0) $ \() -> runIdentityP $ do
            (input, output) <- lift $ spawn Unbounded
            lift $ do
                a1 <- async $ runProxy $
                    gf >-> mapD Left  >-> sendD input
                a2 <- async $ runProxy $
                    gx >-> mapD Right >-> sendD input
                link2 a1 a2
                link a1
            (recvS output >-> handler) ()
      where
        handler () = evalStateP (f0, x0) $ forever $ do
            e <- request ()
            (f, x) <- get
            case e of
                Left  f' -> do
                    put (f', x)
                    respond (f' x)
                Right x' -> do
                    put (f, x')
                    respond (f x')
... so we can use Applicative style to combine spreadsheet cells, which will only update when their dependencies update:
both :: Cell (Int, String)
both = (,) <$> time <*> input

main = runProxy $ runCell both >-> printD
$ ./cell
(0,"")
(1,"")
(2,"")
test<Enter>
(2,"test")
(3,"test")
apple<Enter>
(3,"apple")
(4,"apple")
(5,"apple")
...

Simple API


pipes-concurrency has a really, really, really simple API, and the three key functions are:
spawn :: Size -> IO (Input a, Output a)

send :: Input a -> a -> STM Bool

recv :: Output a -> STM (Maybe a)
The spawn function creates a FIFO channel, send adds messages to the channel, and recv takes messages off the channel. That's it! The rest of the library are the following two higher-level pipes that build on those two functions to stream messages into and out of the channel:
sendD :: Proxy p => Input a -> x -> p x a x a IO ()

recvS :: Proxy p => Output a -> () -> Producer p a IO ()
The library only has five functions total, making it very easy to learn.


Deadlock safety


What distinguishes this abstraction from traditional STM channels is that send and recv hook into the garbage collection system to automatically detect and avoid deadlocks. If they detect a deadlock they just terminate cleanly instead.

Surprisingly, this works so well that it even correctly handles crazy scenarios like cyclic graphs. For example, the run-time system magically ties the knot in the following example and both pipelines successfully terminate and get garbage collected:
import Control.Concurrent.Async
import Control.Proxy
import Control.Proxy.Concurrent

main = do
    (in1, out1) <- spawn Unbounded
    (in2, out2) <- spawn Unbounded
    a1 <- async $ do runProxy $ recvS out1 >-> sendD in2
                     performGC
    a2 <- async $ do runProxy $ recvS out2 >-> sendD in1
                     performGC
    mapM_ wait [a1, a2]
I don't even know why the above example works, to be completely honest. I really only designed pipes-concurrency to avoid deadlocks for acyclic graphs and the above was just a really nice emergent behavior that fell out of the implementation. I think this is an excellent tribute to how amazing ghc is, and I want to give a big shout-out to all the people who contribute to it.

I call this "semi-automatic" reference management because you must still call the garbage collector manually after you stop using each reference, otherwise you cannot guarantee promptly releasing the reference. However, even if you forget to do this, all that happens is that it just delays stream termination until the next garbage collection cycle.

Severability


I designed the API so that if any other streaming library wants to use it I can cleanly separate out the pipes-agnostic part, consisting of spawn, send, and recv. If you want to build on this neat deadlock-free abstraction with, say, conduit or io-streams, then just let me know and I will factor those functions out into their own library.


Comparisons


People might wonder how pipes-concurrency compares to the stm-conduit and io-streams approaches for managing concurrency. Before I continue I just want to point out that I contributed some of the concurrency code to io-streams, so I am at fault for some of its current weaknesses. One of the reasons I made the pipes-concurrency functionality severable was so that io-streams could optionally merge in this same feature to fix some of the concurrency issues that I couldn't resolve the first time around.

pipes-concurrency does several things that are very unique in the streaming concurrency space, including:
  • Dynamic communication graphs with semi-automatic reference management
  • Correctly handling multiple readers and writers on the same resource
  • Deadlock safety (as mentioned above)
  • Exception safety (by virtue of deadlock safety)

Conclusion


There are still more features that I haven't even mentioned, so I highly recommend you read the tutorial to learn other cool tricks you can do with the library.

For people following the pipes ecosystem, the next library coming up is pipes-parse which is getting very close to completion, although the version currently on Github is stale and doesn't reflect the current state of the project. Expect to see some very cool and unique features when it comes out, which should be within the next two weeks.

Wednesday, April 10, 2013

Defaults

Many programs require default values of some sort and normally we would consider this aspect of programming "business logic" and not give it a second thought. However, mathematics provides some some surprising insight into the banal task of choosing appropriate defaults, so let's temporarily put on our theory hats and try to over-think this problem.


Patterns


Let's begin by trying to identify a common pattern unifying all default values. If I were to name a few types, most people would agree on the following default values:
  • Int: 0
  • Bool: False
  • [a]: [], the empty list
  • Text: "", the empty Text string (if you use OverloadedStrings)
Why not choose 5 as the default Int or "ridiculous" as the default Text string? What makes us gravitate towards choosing those particular values?

Well, we can discern a common pattern: all these default values seem to correspond to something "empty". But what does it really mean to be "empty"?

Well, for numbers it is obvious why we consider 0 empty. If you add 0 to any number n, you get back n, signifying that 0 must be empty:
0 + n = n
n + 0 = n
We can extend this same reasoning to the other values to justify why we consider them empty. For example, if you append "" to any Text string str, you get back str, signifying that "" added nothing:
"" `append` str = str
str `append` "" = str
Similarly, the empty list adds nothing when you concatenate it:
[] ++ xs = xs
xs ++ [] = xs
... and False does nothing when you (||) it:
False || b = b
b || False = b
The pattern is obvious: in every case we have some empty default value, which we will call mempty, and some combining function, which we will call mappend, that satisfy the following two equations:
mempty `mappend` x = x
x `mappend` mempty = x
This is a Monoid, and Haskell's Data.Monoid module defines mempty and mappend in the Monoid type class:
class Monoid m where
    mempty  :: m
    mappend :: m -> m -> m
... and also provides a convenient infix operator for mappend:
(<>) :: (Monoid a) => a -> a -> a
m1 <> m2 = m1 `mappend` m2

Emptiness


Not all types have a unique Monoid instance. For example, Bool has two separate Monoid instances and we must use the Any or All newtypes from Data.Monoid to distinguish which one we prefer.

The Any monoid corresponds to the one we chose above, where False is the empty value and (||) is the combining operation:
newtype Any = Any { getAny :: Bool }

instance Monoid Any where
    mempty = Any False
    (Any b1) `mappend` (Any b2) = Any (b1 || b2)
However, there is a dual monoid where True is the "empty" value and (&&) is the combining operation:
newtype All = All { getAll :: Bool }

instance Monoid All where
    mempty = All True
    (All b1) `mappend` (All b2) = All (b1 && b2)
Similarly, numbers have two separate Monoid instances, and we use the Sum or Product monoids to distinguish which one we prefer.

newtype Sum a = Sum { getSum :: a }

instance (Num a) => Monoid (Sum a) where
    mempty = Sum 0
    (Sum n1) `mappend` (Sum n2) = Sum (n1 + n2)


newtype Product a = Product { getProduct :: a }

instance (Num a) => Monoid (Sum a) where
    mempty = Product 1
    (Product n1) `mappend` (Product n2) = Product (n1 * n2)
Monoids teach a valuable lesson: there is no such thing as an intrinsically empty value. Values are only empty with respect to a specific combining operation. We can choose more exotic default values to be mempty, but if we must select an unusual mappend to justify their emptiness then that suggests that we chose the wrong defaults.

So the next time you find yourself needing a Default type class, chances are that you actually should use the Monoid type class instead. The Monoid class forces us to demonstrate why our default is truly empty by also providing the associated combining operator. This basic sanity check discourages us from defining "ridiculous" defaults.