Monday, May 6, 2013

pipes-3.3.0: Folds and uniting ListT with Proxy

pipes-3.3.0 simultaneously resolves two long-standing problems in the library:
  • Not all proxy transformers implemented ListT
  • Folds required using the base monad
It turns out that fixing ListT for the remaining hold-outs proved to solve the fold problem as well, and this post will detail that a bit more.


ListT


pipes-2.4 first identified the existence of three extra categories, two of which I call the "request" and "respond" categories. These categories are enormously useful, especially since you can implement ListT with both of them, but then I discovered that you couldn't implement the (/>/) and (\>\) composition operators for certain proxy transformers, specifically:
  • MaybeP
  • EitherP
  • StateP
  • WriterP
This was really disconcerting, and it seemed really odd that every proxy transformer could lift the identities for those two categories (i.e. request and respond), but not always lift the corresponding composition operators. I settled for a temporary solution, which was to split out (/>/) and (\>\) into a separate ListT type class.

However, several recent events made me suspect that something was amiss and caused me to revisit this solution. I received my first clue while working on the pipes-directory library, where I wanted to model getDirectoryContents using the following type:
getDirectoryContents
    :: (Proxy p)
    => FilePath -> ProduceT (ExceptionP p) SafeIO FilePath
This would let users bind directories non-deterministically in ProduceT so that they could describe effectful directory traversals at a high-level. This required pipes-safe so that the directory stream would be properly finalized in the event of exceptions of termination, which is why it uses ExceptionP and SafeIO.

However, ExceptionP is just a type synonym for EitherP, and EitherP did not implement the ListT type class, which meant that I could not use the ProduceT monad. So I revisited EitherP and discovered that there was a law-obiding ListT instance for EitherP that I had missed the first time around. Moreover, I could use the exact same trick to implement ListT for MaybeP, too.

This meant that only two proxy transformers remained which did not implement ListT:
  • StateP
  • WriterP
Moreover, WriterP was internally implemented using StateP under the hood, meaning that if I could solve StateP then I could finally merge the ListT class back into the Proxy class.

Simultaneously, while working on pipes-parse I encountered several buggy corner cases with StateP, all of which gave the wrong behavior. Similarly, WriterP also gave the wrong behavior in a wide variety of cases and this Stack Overflow question gives a great example of how useless WriterP was. This suggested that I had implemented both of those two proxy transformers incorrectly, since both of them gave the wrong behavior in many corner cases and both of them resisted a correct ListT implementation.

This observation led me to discover the correct solution: make StateP and WriterP share their effects globally across the pipeline, instead of locally. This fix solved both problems:
  • Both of them now implement List and obey the ListT laws
  • Both of them now give correct behavior in all corner cases
Consequently, I can now merge the ListT class into the Proxy class and reunite request and respond with their respective composition operators. Also, now all proxy transformers lift all four categories correctly.


Folds


The WriterP fix leads to a big improvement in the pipes folding API. Now you can do folds using WriterP within the pipeline and without using the base monad.

For example, if you want to fold all positive elements from upstream, you can now write:
somePipe = do
    -- The unitU discards values that 'toListD' reforwards
    xs <- execWriterK (takeWhileD (> 0) >-> toListD >-> unitU) ()
    respond xs
    ...
You can now access the result of folds within pipes! You no longer have to wait until the Session is complete to retrieve the folded data.

Also, since folds don't use the base monad you no longer need to hoist stages that you compose with a fold. For example, if you want to sum the first ten lines of user input, you can just write:
runProxy $ execWriterK $ readLnS >-> takeB_ 10 >-> sumD

Deprecation


I've also started to deprecate several parts of the API in preparation for an eventual pipes-4.0.0 release. These are the main things I deprecated:
  • The classic pipes API (i.e. await, yield, and (>+>))
  • raise functions (i.e. raise and raiseP)
  • K functions, like hoistK and liftK (exception: I keep the run...K functions)
  • Many bidirectional utilities from the pipes prelude and some upstream utilities
  • idT and coidT are renamed to pull and push
If you disagree with any of these deprecations, please let me know since I'm always open to suggestions to keep them or migrate them to a pipes-extras library.

I renamed idT and coidT because this allows for a nice convention where every category is named after its identity operator. Also, the new names are more suggestive of their behavior: idT begins by pulling information, while coidT first pushes information.

This rename becomes even more advantageous when you use the io-streams style I discussed in a previous post, but I will save the full explanation of why for later.


Modules


I've also tightened up the module hierarchy, which has gone down from 23 modules to 18, and will go down further to 16 when I remove the deprecated Control.Pipe and Control.Proxy.Pipe modules in pipes-4.0.0. Hopefully this makes the library a bit less intimidating to newcomers and easier to navigate.


Future Work


As always, I'm still working on pipes-parse. The big holdup is that I have been experimenting with more elegant solutions to pushback, mainly because I would like to implement many non-trivial features like nested sub-parsers. If worse comes to worse, I will just drop those advanced features and push the simpler version out the door in the next few weeks.

Saturday, May 4, 2013

Program imperatively using Haskell lenses

Haskell gets a lot of flack because it has no built-in support for state and mutation. Consequently, if we want to bake a stateful apple pie in Haskell we must first create a whole universe of stateful operations. However, this principled approach has paid off and now Haskell programmers enjoy more elegant, concise, and powerful imperative code than you can find even in self-described imperative languages.


Lenses


Your ticket to elegant code is the lens library. You define your data types as usual, but you prefix each field with an underscore. For example, I can define a Game:
data Game = Game
    { _score :: Int
    , _units :: [Unit]
    , _boss  :: Unit
    } deriving (Show)
... full of Units:
data Unit = Unit
    { _health   :: Int
    , _position :: Point
    } deriving (Show)
... whose locations are represented by Points:
data Point = Point
    { _x :: Double
    , _y :: Double
    } deriving (Show)
We prefix these fields with an underscore because we will not be using them directly. Instead, we will use them to build lenses, which are much more pleasant to work with.

We can build these lenses in two ways. Our first option is to define lenses manually using the lens convenience function from Control.Lens. For example, we can define a score lens to replace the _score field accessor:
import Control.Lens

score :: Lens' Game Int
score = lens _score (\game v -> game { _score = v })
A Lens is like a map which you use to navigate complex data types. We use the above score lens to navigate from our Game type to its _score field.

The type reflects where we begin and end: Lens' Game Int means we must begin on a value of type Game and end on a value of type Int (the score, in this case). Similarly, our other lenses will clearly indicate their starting and ending points in their types:
units :: Lens' Game [Unit]
units = lens _units (\game v -> game { _units = v })

boss :: Lens' Game Unit
boss = lens _boss (\game v -> game { _boss = v })

health :: Lens' Unit Int
health = lens _health (\unit v -> unit { _health = v })

position :: Lens' Unit Point
position = lens _position (\unit v -> unit { _position = v })

x :: Lens' Point Double
x = lens _x (\point v -> point { _x = v })

y :: Lens' Point Double
y = lens _y (\point v -> point { _y = v })
However, we don't have to write out all this boilerplate if we're lazy. Our second option is to use Template Haskell to define all these lenses for us:
{-# LANGUAGE TemplateHaskell #-}

import Control.Lens

data Game = Game
    { _score :: Int
    , _units :: [Unit]
    , _boss  :: Unit
    } deriving (Show)

data Unit = Unit
    { _health   :: Int
    , _position :: Point
    } deriving (Show)

data Point = Point
    { _x :: Double
    , _y :: Double
    } deriving (Show)

makeLenses ''Game
makeLenses ''Unit
makeLenses ''Point
Just remember that Template Haskell requires these makeLenses declarations to go after your data types.


Initial State


The next thing we need is a test initial game state:
initialState :: Game
initialState = Game
    { _score = 0
    , _units =
        [ Unit
            { _health = 10
            , _position = Point { _x = 3.5, _y = 7.0 }
            }
        , Unit
            { _health = 15
            , _position = Point { _x = 1.0, _y = 1.0 }
            }
        , Unit
            { _health = 8
            , _position = Point { _x = 0.0, _y = 2.1 }
            }
        ]
    , _boss = Unit
        { _health = 100
        , _position = Point { _x = 0.0, _y = 0.0 }
        }
    }
We've enlisted three valiant heroes to slay the dungeon boss. Let the battle begin!


First Steps


Now we can use our lenses! Let's create a routine for our warriors to strike at the boss:
import Control.Monad.Trans.Class
import Control.Monad.Trans.State

strike :: StateT Game IO ()
strike = do
    lift $ putStrLn "*shink*"
    boss.health -= 10
strike prints an evocative sound to the console, then decrements the boss's health by 10 hit points.

strike's type indicates that it operates within the StateT Game IO monad. You can think of this as a DSL where we layer our pure game state (i.e. StateT Game) on top of side effects (i.e. IO) so that we can both mutate our game and also print cute battle effects to the console. All you have to remember is that any time we need side effects, we will use lift to invoke them.

We'll test out strike in ghci. In order to run strike, we must supply it with an initialState:
>>> execStateT strike initialState 
*shink*
Game {_score = 0, _units = [Unit {_health = 10, _position = Poin
t {_x = 3.5, _y = 7.0}},Unit {_health = 15, _position = Point {_
x = 1.0, _y = 1.0}},Unit {_health = 8, _position = Point {_x = 0
.0, _y = 2.1}}], _boss = Unit {_health = 90, _position = Point {
_x = 0.0, _y = 0.0}}}
execStateT takes our stateful code and an initial state, and then runs that code to produce a new state. ghci automatically shows the return value as a convenience so we can inspect the newly returned state. The output is a bit of a mess, but if you strain your eyes you can see that the boss now only has 90 health.

We can view this more easily by storing the new state in a variable:
>>> newState <- execStateT strike initialState 
*shink*
... and then we can query newState for the part we actually care about:
>>> newState^.boss.health
90

Composition


This syntax very strongly resembles imperative and object-oriented programming:
boss.health -= 10
What is going on here? Haskell is decidely not a multi-paradigm language, yet we have what appears to be multi-paradigm code.

Amazingly, nothing on that line is a built-in language feature!
  • boss and health are just the lenses we defined above
  • (-=) is an infix function
  • (.) is function composition from the Haskell Prelude!
Wait, (.) is function composition? Really?

This is where the lens magic comes in. Lenses are actually ordinary functions, and our "multi-paradigm" code is actually functions all the way down!

In fact, Lens' a b is actually a type synonym for a certain type of higher-order function:
type Lens' a b =
    forall f . (Functor f) => (b -> f b) -> (a -> f a)
You don't need to understand the details of that. Just remember that Lens' a b is a higher-order function that accepts a function of type (b -> f b) as an argument, and returns a new function of type (a -> f a). The Functor part is the theoretically-inspired "magic".

Armed with that knowledge, let's make sure the types check out by expanding out the Lens' type synonyms for boss and health
boss :: Lens' Game Unit
-- expands to:
boss :: (Functor f) => (Unit -> f Unit) -> (Game -> f Game)

health :: Lens' Unit Int
-- expands to:
health :: (Functor f) => (Int -> f Int) -> (Unit -> f Unit)
Now let's review the definition of function composition:
(.) :: (b -> c) -> (a -> b) -> (a -> c)
(f . g) x = f (g x)
Notice that if we specialize our type variables to:
a ~ (Int  -> f Int)
b ~ (Unit -> f Unit)
c ~ (Game -> f Game)
... then this has exactly the right type to compose our two lenses:
(.) :: ((Unit -> f Unit) -> (Game -> f Game))
    -> ((Int  -> f Int ) -> (Unit -> f Unit))
    -> ((Int  -> f Int ) -> (Game -> f Game))
If we put the Lens' type synonyms back in, we get:
(.) :: Lens' Game Unit -> Lens' Unit Int -> Lens' Game Int

boss . health :: Lens' Game Int
So function composition is also lens composition! In fact, lenses form a category where (.) is the category's composition operator and the identity function id is also the identity lens:
(.) :: Lens' x y -> Lens' y z -> Lens' x z

id  :: Lens' x x
What's so beautiful about this is that Haskell lets us remove the spaces around the function composition operator so that it looks exactly like object-oriented accessor notation!

Categories make it really easy to connect and group components on the fly. For example, if I anticipate that I will be modifying the Boss's health frequently, I can just define a composite lens:
bossHP :: Lens' Game Int
bossHP = boss.health
... and now I can use it wherever I previously used boss.health:
strike :: StateT Game IO ()
strike = do
    lift $ putStrLn "*shink*"
    bossHP -= 10
... or similarly use it as an accessor:
>>> newState^.bossHP
90

Traversals


Lenses are grounded in some really elegant theory, and as a result they get a lot of things right that imperative languages normally don't!

For example, let's say that our boss is a dragon and breathes fire, which damages all heroes. Using lenses, I can decrement the entire party's health using a single instruction:
fireBreath :: StateT Game IO ()
fireBreath = do
    lift $ putStrLn "*rawr*"
    units.traversed.health -= 3
This makes use of a new lens!
traversed :: Traversal' [a] a
traversed lets us "dig in" to the values in a list so that we can manipulate them as a single unit instead of manually looping over the list. However, this time the type is a Traversal' instead of a Lens'.

A Traversal is a like a Lens' except weaker:
type Traversal' a b =
    forall f . (Applicative f) => (b -> f b) -> (a -> f a)
If you compose Lens' with a Traversal', you get the weaker of the two: a Traversal'. This works no matter which order you compose them in:
(.) :: Lens' a b -> Traversal' b c -> Traversal' a c

(.) :: Traversal' a b -> Lens' b c -> Traversal' a c
units                  :: Lens'      Game [Unit]
units.traversed        :: Traversal' Game  Unit
units.traversed.health :: Traversal' Game  Int
In fact, we don't need to figure this out. The compiler will infer the correct type all by itself:
>>> :t units.traversed.health
units.traversed.health
  :: Applicative f =>
     (Int -> f Int) -> Game -> f Game
That's exactly the right type to be a Traversal' Game Int!

Actually, why not just compose these lenses into a single lens:
partyHP :: Traversal' Game Int
partyHP = units.traversed.health

fireBreath :: StateT Game IO ()
fireBreath = do
    lift $ putStrLn "*rawr*"
    partyHP -= 3
Let's also use partyHP lens to retrieve the new party hitpoints:
>>> newState <- execStateT fireBreath initialState 
*rawr*
>>> newState^.partyHP

<interactive>:3:11:
    No instance for (Data.Monoid.Monoid Int)
      arising from a use of `partyHP'
    Possible fix:
      add an instance declaration for (Data.Monoid.Monoid Int)
    In the second argument of `(^.)', namely `partyHP'
    In the expression: newState ^. partyHP
    In an equation for `it': it = newState ^. partyHP
Oops! This is a type error because there is no single health to get! This is why a Traversal' is weaker than a Lens': traversals may point to multiple values, so they do not support a well-defined way to get just one value. The type system saved us from a potential bug!

Instead, we must specify that we actually want a list of values using the toListOf function:
toListOf :: Traversal' a b -> a -> [b]
This gives the desired result:
>>> toListOf partyHP newState 
[7,12,5]
... and there's an infix operator equivalent to toListOf: (^..):
>>> initialState^..partyHP
[10,15,8]
>>> newState^..partyHP
[7,12,5]
Now we can clearly see at a glance that fireBreath worked the way we intended.

Now I want to get really fancy. I want to define a traversal over a geographic area. Can I do that?
around :: Point -> Double -> Traversal' Unit Unit
around center radius = filtered (\unit ->
    (unit^.position.x - center^.x)^2
  + (unit^.position.y - center^.y)^2
  < radius^2 )
Sure I can! Now I can limit the dragon's fire breath to a circular area!

Edit: filtered is apparently not a theoretically valid traversal because it does not preserve the number of elements. See this /r/haskell thread for details.
fireBreath :: Point -> StateT Game IO ()
fireBreath target = do
    lift $ putStrLn "*rawr*"
    units.traversed.(around target 1.0).health -= 3
Notice how expressive that code is: we want to decrement the health of all units around the target. That code conveys our intention much more clearly than the equivalent mainstream imperative code and it leaves much less room for error.

Anyway, back to breathing fire. First, let's see where the units are located:
> initialState^..units.traversed.position
[Point {_x = 3.5, _y = 7.0},Point {_x = 1.0, _y = 1.0},Point {_x
 = 0.0, _y = 2.1}]
Hmmm, the latter two units are close by, so I will aim the fireball in between them:
>>> newState <- execStateT (fireBreath (Point 0.5 1.5)) initialState 
*rawr*
>>> (initialState^..partyHP, newState^..partyHP)
([10,15,8],[10,12,5])
Nailed it!


Zooming


We can do more unique things with lenses, like zoom in on subsets of our global state:
retreat :: StateT Game IO ()
retreat = do
    lift $ putStrLn "Retreat!"
    zoom (units.traversed.position) $ do
        x += 10
        y += 10
As before, we can combine these lenses into a single lens if we want to reuse it later on:
partyLoc :: Traversal' Game Point
partyLoc = units.traversed.position

retreat :: StateT Game IO ()
retreat = do
    lift $ putStrLn "Retreat!"
    zoom partyLoc $ do
        x += 10
        y += 10
Let's try it out:
>>> initialState^..partyLoc
[Point {_x = 3.5, _y = 7.0},Point {_x = 1.0, _y = 1.0},Point {_x
 = 0.0, _y = 2.1}]
>>> newState <- execStateT retreat initialState 
Retreat!
>>> newState^..partyLoc
[Point {_x = 13.5, _y = 17.0},Point {_x = 11.0, _y = 11.0},Point
 {_x = 10.0, _y = 12.1}]
Let's look at the type of zoom in the context of this particular example:
zoom :: Traversal a b -> StateT b IO r -> StateT a IO r
zoom has some nice theoretical properties. For example, we'd expect that if we zoom using two successive lenses, it should behave the same as zooming using the composite lens:
zoom lens1 . zoom lens2 = zoom (lens1 . lens2)
... and if we zoom in on the empty lens, we end up back where we started:
zoom id = id
In other words, zoom defines a functor, and those equations are the functor laws!


Combining commands


So far I've only shown a single command at a time, but now let's take all of these concepts and imperatively assemble a battle from them:
battle :: StateT Game IO ()
battle = do
    -- Charge!
    forM_ ["Take that!", "and that!", "and that!"] $ \taunt -> do
        lift $ putStrLn taunt
        strike

    -- The dragon awakes!
    fireBreath (Point 0.5 1.5)
    
    replicateM_ 3 $ do
        -- The better part of valor
        retreat

        -- Boss chases them
        zoom (boss.position) $ do
            x += 10
            y += 10
Let's try it out!
>>> execStateT battle initialState 
Take that!
*shink*
and that!
*shink*
and that!
*shink*
*rawr*
Retreat!
Retreat!
Retreat!
Game {_score = 0, _units = [Unit {_health = 10, _position = Poin
t {_x = 33.5, _y = 37.0}},Unit {_health = 12, _position = Point 
{_x = 31.0, _y = 31.0}},Unit {_health = 5, _position = Point {_x
 = 30.0, _y = 32.1}}], _boss = Unit {_health = 70, _position = P
oint {_x = 30.0, _y = 30.0}}}
I guess people really aren't joking when they say Haskell is the finest imperative language.


Conclusions


This really just scratches the surface of the lens library, which is one of the crown jewels of the Haskell ecosystem. You can use lenses for pure programming, too, and compress very powerful and complex computations into very readable and elegant code. When I have more time I will write even more about this amazing library.

Sunday, April 21, 2013

pipes and io-streams

I was originally planning to release an extension library for pipes that would add an io-streams-like interface to pipes. However, this week I discovered a surprisingly elegant way to implement io-streams behavior (minus pushback) using the existing pipes API. In fact, this translation has been possible ever since pipes-2.4!

This post assumes that you are familiar with both pipes and io-streams. If not, then you can read the pipes tutorial or the io-streams tutorial, both of which I wrote!

Edit: I also just discovered that when you re-implement io-streams using pipes, then pipes ties io-streams in performance. See this reddit comment for code and benchmarks.


Conversions


I'll begin with the following generic correspondence between io-streams and pipes:
{-# LANGUAGE RankNTypes #-}

import Control.Proxy

type InputStream a
    = forall p . (Proxy p) => () -> Session p IO (Maybe a)

type OutputStream a
    = forall p . (Proxy p) => Maybe a -> Session p IO ()

makeInputStream :: IO (Maybe a) -> InputStream a
makeInputStream m () = runIdentityP $ lift m

makeOutputStream :: (Maybe a -> IO ()) -> OutputStream a
makeOutputStream f a = runIdentityP $ lift (f a)
For example:
stdin :: InputStream String
stdin = makeInputStream (fmap Just getLine)

stdout :: OutputStream String
stdout = makeOutputStream (maybe (return ()) putStrLn)
Notice how this does not convert the read and write actions to repetitive streams. Instead, you lift each action just once into the Proxy monad.

To read or write to these sources you just run the proxy monad:
import Prelude hiding (read)

read :: InputStream a -> IO (Maybe a)
read is = runProxy is

{- This requires `pipes` HEAD, which generalizes the type
   of `runProxyK.

   Alternatively, you can hack around the current definition
   using:

   write ma os = runProxy (\() -> os ma)
-}
write :: Maybe a -> OutputStream a -> IO ()
write ma os = runProxyK os ma
For example:
>>> read stdin
Test<Enter>
Just "Test"
>>> write (Just "Test") stdout
"Test"
Now consider the following io-streams transformation:
map :: (a -> b) -> InputStream a -> IO (InputStream b)
map f is = makeInputStream $ do
    ma <- read is
    return (fmap f ma)
Here's the naive translation to pipes (later I will show the more elegant version):
import Prelude hiding (map)

map :: (a -> b) -> InputStream a -> InputStream b
map f is () = runIdentityP $ do
    ma <- is ()
    return (fmap f ma) )
For example:
>>> read (map (++ "!") stdin)
Test<Enter>
Just "Test!"
The only real difference is that instead of using read is I just use is () directly.

So far I've used absolutely no pipes-specific features to implement any of this. I could have done this just as easily in any other library, including conduit.


Input streams


Now I will add a twist: we can reify transformations to be Consumers instead of functions of an InputStream.

For example, let's transform map to be a Consumer instead of a function:
map :: (Proxy p)
    => (a -> b)
    -> () -> Consumer p (Maybe a) IO (Maybe b)
map f () = runIdentityP $ do
    ma <- request ()
    return (fmap f ma)
I've only made two changes:
  • map is no longer a function of an InputStream
  • Instead of calling the input stream, I use request ()
Check out the final pipe type carefully (ignoring the Maybes):
() -> Consumer p a m b
An input stream transformation is a fold in disguise! These transformations fold 0 or more values from the old input stream to return a new value.

Notice how this differs from the conventional flow of information pipes. We're not converting a stream of as into a new stream of bs. Instead, we're folding a stream of as into a single return value of type b.

How do we even connect map to stdin, though? They don't have the right types for ordinary pipe composition:
stdin 
    :: (Proxy  p)
    => () -> Session  p                IO (Maybe String)

map length
    :: (Proxy p)
    => () -> Consumer p (Maybe String) IO (Maybe Int   )
We need some way to feed the return value of stdin into the upstream input of map. This means we need to call stdin once each time that map requests a new value and feed that into the request. How can we do that using the existing pipes API?

Fortunately, this is exactly the problem that the (\>\) composition operator solves:
>>> read (stdin \>\ map length)
Test<Enter>
Just 4
Woah, slow down! What just happened?


The "request" category


The definition of (\>\) is actually really simple:
(f \>\ g) replaces all of g's requests with f.
So when we wrote (stdin \>\ map length), we replaced the single request in map with stdin as if we had written it in by hand:
-- Before
map length () = runIdentityP $ do
    ma <- request ()
    return (fmap length ma)

-- After
(stdin \>\ map length) () = runIdentityP $ do
    ma <- stdin ()
    return (fmap length ma)
In fact, this behavior means that (\>\) and request form a category. (\>\) is the composition operation, request is the identity, and they satisfy the category laws:
-- Replacing each 'request' in 'f' with 'request' gives 'f'
request \>\ f = f

-- Replacing each 'request' in 'request' with 'f' gives 'f'
f \>\ request = f

-- Substitution of 'request's is associative
(f \>\ g) \>\ h = f \>\ (g \>\ h)
Let's take a closer look at the type of (\>\) to understand what is going on:
(\>\)
    :: (Monad m, ListT p)
    => (b' -> p a' a x' x m b)
    -> (c' -> p b' b x' x m c)
    -> (c' -> p a' a x' x m c)
We can illuminate this type by type-restricting the arguments. First, let's consider the case where the first argument is a Session and the second argument is a Consumer, and see what type the compiler infers for the result:
(\>\)
    :: (Monad m, ListT p)
    => (() -> Session  p   m b)
    -> (() -> Consumer p b m c)
    -> (() -> Session  p   m c)
The compiler infers that we must get back a readable Session, just like we wanted! This is precisely what happened when we composed stdin with map:
stdin
    :: () -> Session  p                IO (Maybe String)

map length
    :: () -> Consumer p (Maybe String) IO (Maybe Int   )

stdin \>\ map length
    :: () -> Session  p                IO (Maybe Int   )

read (stdin \>\ map) :: IO (Maybe String)
However, there's another way we can specialize the type of (\>\). We can instead restrict both arguments to be Consumers:
(\>\)
    :: (Monad m, ListT p)
    => (() -> Consumer p a m b)
    -> (() -> Consumer p b m c)
    -> (() -> Consumer p a m c)
The compiler infers that if we compose two folds, we must get a new fold!

This means that we can compose input stream transformations using the same (\>\) operator:
map length
    :: () -> Consumer p (Maybe String) IO (Maybe Int )

map even
    :: () -> Consumer p (Maybe Int   ) IO (Maybe Bool)

map length \>\ map even
    :: () -> Consumer p (Maybe String) IO (Maybe Bool)
Then we will use the exact same operator to connect our input stream:
stdin \>\ map length \>\ map even
    :: () -> Session p IO (Maybe Bool)

read (stdin \>\ map length \>\ map even)
    :: IO (Maybe Bool)
Let's try it!
>>> read (stdin \>\ map length \>\ map even)
Test<Enter>
Just True
Now let's make things more interesting to really drive the point home:
import Control.Monad

twoReqs
    :: (Proxy p) => () -> Consumer p (Maybe a) IO (Maybe [a])
twoReqs () = runIdentityP $ do
    mas <- replicateM 2 (request ())
    return (sequence mas)
>>> read (stdin \>\ twoReqs \>\ twoReqs) :: IO [[String]]
1<Enter>
2<Enter>
3<Enter>
4<Enter>
Just [["1","2"],["3","4"]]
>>> -- We can still read from raw 'stdin'
>>> read stdin
5<Enter>
Just 5
In other words, the "request" category is actually the category of InputStreams and pipes already has InputStream support today!


Output streams


Now let's do the exact same thing for OutputStreams! We'll begin with the filterOutput function from io-streams:
filterOutput
    :: (a -> Bool) -> OutputStream a -> IO (OutputStream a)
filterOutput pred os = makeOutputStream $ \ma -> case ma of
    Nothing            -> write ma os
    Just a | pred a    -> write ma os
           | otherwise -> return ()
The pipes equivalent is:
filterOutput
    :: (Proxy p)
    => (a -> Bool)
    -> Maybe a -> Producer p (Maybe a) IO ()
filterOutput pred ma = runIdentityP $ case ma of
    Nothing            -> respond ma
    Just a | pred a    -> respond ma
           | otherwise -> return ()
This time we get a new shape for our type signature:
a -> Producer p b m ()
An output stream transformation is an unfold in disguise! These transformations unfold each output destined for the old output stream into 0 or more outputs destined for the new output stream.

Let's hook this transformation up to stdout and start writing values to it ... oh wait. How do we hook it up? Well, let's look at the types of stdout and filterOutput:
filterOutput f
    :: (Proxy p) => Maybe a -> Producer p (Maybe a) IO ()

stdout
    :: (Proxy p) => Maybe a -> Session  p           IO ()
We need some way to feed the stream output of filterOutput into the argument of stdout. This means that we need to call stdout on each output of filterOutput. How can we do that using the existing pipes API?

Why, we use (/>/), the dual of (\>\)!
>>> write (Just "") (filterOutput (not . null) />/ stdout)
>>> write (Just "Test") (filterOutput (not . null) />/ stdout)
Test

The "respond" category


(/>/) also has a simple behavior:
(f />/ g) replaces all of f's responds with g.
This behavior means that (/>/) and respond form a category, too! (/>/) is the composition operation, respond is the identity, and they satisfy the category laws:
-- Replacing each 'respond' in 'f' with 'respond' gives 'f'
f />/ respond = f

-- Replacing each 'respond' in 'respond' with 'f' gives 'f'
respond />/ f = f

-- Substitution of 'respond's is associative
(f />/ g) />/ h = f />/ (g />/ h)
Let's learn more by reviewing the type of (/>/):
(/>/)
    :: (Monad m, ListT p)
    => (a -> p x' x b' b m a')
    -> (b -> p x' x c' c m b')
    -> (a -> p x' x c' c m a')
We'll specialize this type signature in two ways. First, we'll consider the case where the first argument is a Producer and the second argument is a Session and then see what type the compiler infers for the result:
(/>/)
    :: (Monad m, ListT p)
    => (a -> Producer p b m ())
    -> (b -> Session  p   m ())
    -> (a -> Session  p   m ())
The compiler infers that we must get back a writeable session, just like we wanted! This is precisely what happened when we composed filterOutput and stdout:
filterOutput (not . null)
    :: Maybe String -> Producer p (Maybe String) IO ()

stdout
    :: Maybe String -> Session  p                IO ()

filterOutput (not . null) />/ stdout
    :: Maybe String -> Session  p                IO ()

flip write (filterOutput (not . null) />/ stdout)
    :: Maybe String -> IO ()
However, there's another way we can specialize the type of (/>/). We can instead restrict both arguments to be Producers:
(/>/)
    :: (Monad m, ListT p)
    => (a -> Producer p b m ())
    -> (b -> Producer p c m ())
    -> (a -> Producer p c m ())
The compiler infers that if we compose two unfolds, we must get a new unfold!

This means that we can compose output stream transformations using (/>/). To show this, let's define the equivalent of io-stream's contramap:
contramap
    :: (Proxy p)
    => (a -> b)
    -> Maybe a -> Producer p (Maybe b) IO ()
contramap f ma = runIdentityP $ respond (fmap f ma)
We can connect contramap's using (/>/):
contramap even
    :: Maybe Int  -> Producer p (Maybe Bool  ) IO ()

contramap show
    :: Maybe Bool -> Producer p (Maybe String) IO ()

contramap even />/ contramap show
    :: Maybe Int  -> Producer p (Maybe String) IO ()
... then using the exact same operator we connect them to stdout:
contramap even />/ contramap show />/ stdout
    :: Maybe Int -> Session p IO ()

flip write (contramap even />/ contramap show />/ stdout)
    :: Maybe Int -> IO ()
Let's try it!
>>> write (Just 4) (contramap even />/ contramap show />/ stdout)
True
Now let's make things more interesting to really drive the point home:
hiBye
    :: (Proxy p)
    => Maybe String -> Producer p (Maybe String) IO ()
hiBye mstr = runIdentityP $ do
    respond $ fmap ("Hello, " ++) mstr
    respond $ fmap ("Goodbye, " ++) mstr
>>> write (Just "world") (hiBye />/ hiBye />/ stdout)
Hello, Hello, world
Goodbye, Hello, world
Hello, Goodbye, world
Goodbye, Goodbye, world
>>> -- We can still write to raw 'stdout'
>>> write (Just "Haskell") stdout
Haskell
In other words, the "respond" category is actually the category of OutputStreams and pipes already has OutputStream support today!


Unify input and output streams


We can do much more than just implement the io-streams API, though! We can generalize it in many more powerful ways than io-streams permits.

First off, I'm going to inline the InputStream and OutputStream types into the type signatures of read and write:
read
    :: (forall p . (Proxy p) => () -> Session p IO (Maybe a))
    -> IO (Maybe a)

write
    :: Maybe a
    -> (forall p . (Proxy p) => Maybe a -> Session p IO ())
    -> IO ()
Then I will generalize the types to not be Maybe-specific and flip the arguments for write
read
    :: (forall p . (Proxy p) => () -> Session p IO a)
    -> IO a

write
    :: (forall p . (Proxy p) => a -> Session p IO ())
    -> a -> IO ()
Now, I will generalize both read and write by adding bidirectionality to them. read will now accept an argument parametrizing its request and write can now optionally receive a result.
read
    :: (forall p . (Proxy p) => a -> Session p IO b)
    -> a -> IO b
read is = runProxyK is

write
    :: (forall p . (Proxy p) => a -> Session p IO b)
    -> a -> IO b
write is = runProxyK is
Well, would you look at that! They both have the same type and implementation! In other words, we can unify read and write into a single function:
once
    :: (forall p . (Proxy p) => a -> Session p IO b)
    -> a -> IO b
once is = runProxyK is
Also, once is really polymorphic over the base monad:
once
    :: (Monad m)
    => (forall p . (ListT p) => a -> Session p m b)
    -> a -> m b
once is = runProxyK is
Using this function, we can both read from InputStreams and write to OutputStreams:
>>> once (stdin \>\ map length) ()
Test<Enter>
Just 4
>>> once (contramap show />/ stdout) (Just 1)
1
Let's test the bidirectionality. I'll write a new stdin that accepts a count parameter telling how many lines to get. This time, there will be no Maybes:
stdinN :: (Proxy p) => Int -> Session p IO [String]
stdinN n = runIdentityP $ replicateM n $ lift getLine
First, briefly test it:
>>> once stdinN 3
Test
Apple
123
["Test","Apple","123"]
Similarly, let's define a transformation to automate supplying values to stdinN:
automate :: (Proxy p) => () -> Client p Int [String] IO ()
automate () = runIdentityP $ do
    lift $ putStrLn "First batch:"
    xs <- request 2
    lift $ print xs
    lift $ putStrLn "Second batch:"
    ys <- request 2
    lift $ print ys
Now compose!
>>> once (stdinN \>\ automate) ()
First batch:
1<Enter>
2<Enter>
["1","2"]
Second batch:
3<Enter>
4<Enter>
["3","4"]
>>> -- Go back to using 'stdinN' unfiltered
>>> once stdinN 1
5<Enter>
["5"]
We can similarly unify makeInputStream and makeOutputStream:
make :: (Monad m, Proxy p) => (q -> m r) -> q -> p a' a b' b m r
make f a = runIdentityP (lift (f a))
The only difference between an input stream and an output stream is whether it consumes or produces a value and bidirectional streams blur the line even further.


Purity


None of this machinery is specific to the IO monad. Everything I've introduced is polymorphic over the base monad! We only incur IO if any of our stages use IO. This departs greatly from io-streams, where you are trapped in the IO monad to do everything. However, I want to qualify that statement with a caveat: I expect that pipes-parse will require either IO or an ST base monad (your choice) to properly manage pushback like io-streams does. Even still, you only pay this price if you actually need it.

pipes improves on purity in two other ways. First, building input and output streams is pure when you use pipes:
-- pipes
makeInputStream  :: IO (Maybe a)       -> InputStream a
makeOutputStream :: (Maybe a -> IO ()) -> OutputStream a

-- io-streams
makeInputStream  :: IO (Maybe a)       -> IO (InputStream  a)
makeOutputStream :: (Maybe a -> IO ()) -> IO (OutputStream a)
Consequently, io-streams must use unsafePerformIO for its stdin:
stdin :: InputStream ByteString
stdin = unsafePerformIO $
    handleToInputStream IO.stdin >>= lockingInputStream
Second, composing and transforming input streams is pure when you use pipes:
>>> -- pipes
>>> read (stdin \>\ map length \>\ map even)
...
>>> -- io-streams
>>> is <- (map even <=< map length) stdin
>>> read is
...
... and if none of the stages use IO, then the entire operation is pure!


Extra interfaces


You can generalize the type signatures even further. Let's revisit the types of (\>\) and (/>/):
(\>\)
    :: (Monad m, ListT p)
    => (b' -> p a' a x' x m b)
    -> (c' -> p b' b x' x m c)
    -> (c' -> p a' a x' x m c)

(/>/)
    :: (Monad m, ListT p)
    => (a -> p x' x b' b m a')
    -> (b -> p x' x c' c m b')
    -> (a -> p x' x c' c m a')
So far we've been ignoring the x'/x interfaces entirely, treating them as closed interfaces. However, there's no reason we have to! If we open up this interface, then every processing stage that we connect can read and write to this streaming interface.

In a future post, I will show you how you can really elegantly use this trick to generalize Hutton-Meijer parsers to permit effects.


ListT


The people who read the pipes-3.2 announcement post know that the (\>\) and (/>/) composition operators correspond to (>=>) for the two pipes ListT monads.
This means that you can take any output stream transformation and assemble more sophisticated behaviors using do notation:
pairs
    :: (ListT p)
    => () -> Producer p (Maybe (String, String)) IO ()
pairs () = runRespondT $ do
    mstr1 <- RespondT $ hiBye (Just "world")
    mstr2 <- RespondT $ hiBye (Just "Haskell")
    return $ (,) <$> mstr1 <*> mstr2
These behave just like the list monad, non-deterministically selecting all possible paths:
>>> once (pairs />/ contramap show />/ stdout) ()
("Hello, world","Hello, Haskell")
("Hello, world","Goodbye, Haskell")
("Goodbye, world","Hello, Haskell")
("Goodbye, world","Goodbye, Haskell")
The exact same trick works for input stream transformations, too, but it's less useful because we don't send information upstream as frequently as downstream.


Conclusions


One of the common questions I often get was "How do I escape from the pipe monad?" and I wrote this post to demonstrate how to do so. You can use pipes to program exactly in the same style as io-streams. The only thing missing is a standard library of io-streams-like utilities.

So far, I have not touched on the issue of push-back, a feature which io-streams provides. pipes-parse will build on the ideas I've introduced in this post to provide an io-streams-style interface to parsing that includes push-back functionality. This will make it easy to dynamically add or remove processing stages throughout the parsing process, which is necessary for parsing HTTP. In fact, everything I've presented here fell out very naturally from my work on pipes-parse and I only later discovered that I had accidentally reinvented io-streams entirely within pipes.

So I will cautiously state that I believe pipes is "io-streams done right". Everything that Greg has been doing with io-streams matches quite nicely with the pre-existing "request" and "respond" pipes categories that I independently discovered within pipes. This indicates that Greg was really onto something and pipes provides the elegant theoretical foundation for his work.

Sunday, April 14, 2013

pipes-concurrency-1.0.0: Reactive programming

This post introduces the pipes-concurrency library (which you can find here), which is the renamed pipes-stm library that I previously promised. I ended up completing this much sooner than I anticipated, which is why it precedes the upcoming pipes-parse package.

Begin with the tutorial if you want to learn how to use the library. This post mainly highlights some features and compares the pipes-concurrency approach to other libraries. Also, I'll provide some bonus examples that are not in the tutorial.

Before I continue, I want to credit Eric Jones, who first began the library as pipes-stm on Github. Unfortunately, I lost all contact with him and he didn't include a LICENSE in his repository, so I had to rebuild the library from scratch because I wasn't sure if a fork would constitute copyright infringement. If he reads this and gets in touch with me and approves of the BSD license then I will add him to the LICENSE and also add him as a library author.

Reactive programming


Several people are beginning to realize that streaming libraries overlap substantially with reactive programming frameworks. pipes-concurrency provides the basic building blocks necessary to build reactive programming abstractions.

For example, let's say that I want to simulate reactive-banana's Events using pipes-concurrency:
{-# LANGUAGE RankNTypes #-}

import Control.Proxy

-- `pipes` does not need the `t` parameter from reactive-banana
type Event a = forall p . (Proxy p) => () -> Producer p a IO ()
If you want to take the union of two asynchronous streams, you spawn a mailbox, merge two streams into it using sendD, and then read out the results using recvS:
import Control.Monad
import Control.Concurrent.Async
import Control.Proxy.Concurrent

union :: Event a -> Event a -> Event a
union e1 e2 () = runIdentityP $ do
    (input, output) <- lift $ spawn Unbounded
    as <- lift $ forM [e1, e2] $ \e ->
        async $ do runProxy $ e >-> sendD input
                   performGC
    recvS output ()
    lift $ mapM_ wait as
Now we can define two event sources:
clock :: Event String
clock = fromListS (cycle ["Tick", "Tock"])
    >-> execD (threadDelay 1000000)

user :: Event String
user = stdinS
... and cleanly merge them:
main = runProxy $ union clock user
              >-> takeWhileD (/= "quit")
              >-> stdoutD
$ ./union
Tick
Tock
test<Enter>
test
Tick
Tock
quit<Enter>
$
People often tout spreadsheets as the classic example of functional-reactive programming, so why not simulate that, too? Well, a spreadsheet cell is just a non-empty stream of values:
{-# LANGUAGE PolymorphicComponents #-}

import Control.Proxy

data Cell a = Cell
    { initial :: a
    , stream  :: forall p . (Proxy p) => () -> Producer p a IO ()
    }

runCell :: (Proxy p) => Cell a -> () -> Producer p a IO ()
runCell (Cell a ga) () = runIdentityP $ do
    respond a
    ga ()
Each value in the stream represents an update to the cell's contents, either by the user:
input :: Cell String
input = Cell "" stdinS
... or some data source:
time :: Cell Int
time = Cell 0 $ \() -> evalStateP 1 $ forever $ do
    n <- get
    respond n
    lift $ threadDelay 1000000
    put (n + 1)
Spreadsheet cells are both Functors and Applicatives:
instance Functor Cell where
    fmap f (Cell x gx) = Cell (f x) (gx >-> mapD f)

instance Applicative Cell where
    pure a = Cell a (runIdentityK return)

    (Cell f0 gf) <*> (Cell x0 gx)
        = Cell (f0 x0) $ \() -> runIdentityP $ do
            (input, output) <- lift $ spawn Unbounded
            lift $ do
                a1 <- async $ runProxy $
                    gf >-> mapD Left  >-> sendD input
                a2 <- async $ runProxy $
                    gx >-> mapD Right >-> sendD input
                link2 a1 a2
                link a1
            (recvS output >-> handler) ()
      where
        handler () = evalStateP (f0, x0) $ forever $ do
            e <- request ()
            (f, x) <- get
            case e of
                Left  f' -> do
                    put (f', x)
                    respond (f' x)
                Right x' -> do
                    put (f, x')
                    respond (f x')
... so we can use Applicative style to combine spreadsheet cells, which will only update when their dependencies update:
both :: Cell (Int, String)
both = (,) <$> time <*> input

main = runProxy $ runCell both >-> printD
$ ./cell
(0,"")
(1,"")
(2,"")
test<Enter>
(2,"test")
(3,"test")
apple<Enter>
(3,"apple")
(4,"apple")
(5,"apple")
...

Simple API


pipes-concurrency has a really, really, really simple API, and the three key functions are:
spawn :: Size -> IO (Input a, Output a)

send :: Input a -> a -> STM Bool

recv :: Output a -> STM (Maybe a)
The spawn function creates a FIFO channel, send adds messages to the channel, and recv takes messages off the channel. That's it! The rest of the library are the following two higher-level pipes that build on those two functions to stream messages into and out of the channel:
sendD :: Proxy p => Input a -> x -> p x a x a IO ()

recvS :: Proxy p => Output a -> () -> Producer p a IO ()
The library only has five functions total, making it very easy to learn.


Deadlock safety


What distinguishes this abstraction from traditional STM channels is that send and recv hook into the garbage collection system to automatically detect and avoid deadlocks. If they detect a deadlock they just terminate cleanly instead.

Surprisingly, this works so well that it even correctly handles crazy scenarios like cyclic graphs. For example, the run-time system magically ties the knot in the following example and both pipelines successfully terminate and get garbage collected:
import Control.Concurrent.Async
import Control.Proxy
import Control.Proxy.Concurrent

main = do
    (in1, out1) <- spawn Unbounded
    (in2, out2) <- spawn Unbounded
    a1 <- async $ do runProxy $ recvS out1 >-> sendD in2
                     performGC
    a2 <- async $ do runProxy $ recvS out2 >-> sendD in1
                     performGC
    mapM_ wait [a1, a2]
I don't even know why the above example works, to be completely honest. I really only designed pipes-concurrency to avoid deadlocks for acyclic graphs and the above was just a really nice emergent behavior that fell out of the implementation. I think this is an excellent tribute to how amazing ghc is, and I want to give a big shout-out to all the people who contribute to it.

I call this "semi-automatic" reference management because you must still call the garbage collector manually after you stop using each reference, otherwise you cannot guarantee promptly releasing the reference. However, even if you forget to do this, all that happens is that it just delays stream termination until the next garbage collection cycle.

Severability


I designed the API so that if any other streaming library wants to use it I can cleanly separate out the pipes-agnostic part, consisting of spawn, send, and recv. If you want to build on this neat deadlock-free abstraction with, say, conduit or io-streams, then just let me know and I will factor those functions out into their own library.


Comparisons


People might wonder how pipes-concurrency compares to the stm-conduit and io-streams approaches for managing concurrency. Before I continue I just want to point out that I contributed some of the concurrency code to io-streams, so I am at fault for some of its current weaknesses. One of the reasons I made the pipes-concurrency functionality severable was so that io-streams could optionally merge in this same feature to fix some of the concurrency issues that I couldn't resolve the first time around.

pipes-concurrency does several things that are very unique in the streaming concurrency space, including:
  • Dynamic communication graphs with semi-automatic reference management
  • Correctly handling multiple readers and writers on the same resource
  • Deadlock safety (as mentioned above)
  • Exception safety (by virtue of deadlock safety)

Conclusion


There are still more features that I haven't even mentioned, so I highly recommend you read the tutorial to learn other cool tricks you can do with the library.

For people following the pipes ecosystem, the next library coming up is pipes-parse which is getting very close to completion, although the version currently on Github is stale and doesn't reflect the current state of the project. Expect to see some very cool and unique features when it comes out, which should be within the next two weeks.

Wednesday, April 10, 2013

Defaults

Many programs require default values of some sort and normally we would consider this aspect of programming "business logic" and not give it a second thought. However, mathematics provides some some surprising insight into the banal task of choosing appropriate defaults, so let's temporarily put on our theory hats and try to over-think this problem.


Patterns


Let's begin by trying to identify a common pattern unifying all default values. If I were to name a few types, most people would agree on the following default values:
  • Int: 0
  • Bool: False
  • [a]: [], the empty list
  • Text: "", the empty Text string (if you use OverloadedStrings)
Why not choose 5 as the default Int or "ridiculous" as the default Text string? What makes us gravitate towards choosing those particular values?

Well, we can discern a common pattern: all these default values seem to correspond to something "empty". But what does it really mean to be "empty"?

Well, for numbers it is obvious why we consider 0 empty. If you add 0 to any number n, you get back n, signifying that 0 must be empty:
0 + n = n
n + 0 = n
We can extend this same reasoning to the other values to justify why we consider them empty. For example, if you append "" to any Text string str, you get back str, signifying that "" added nothing:
"" `append` str = str
str `append` "" = str
Similarly, the empty list adds nothing when you concatenate it:
[] ++ xs = xs
xs ++ [] = xs
... and False does nothing when you (||) it:
False || b = b
b || False = b
The pattern is obvious: in every case we have some empty default value, which we will call mempty, and some combining function, which we will call mappend, that satisfy the following two equations:
mempty `mappend` x = x
x `mappend` mempty = x
This is a Monoid, and Haskell's Data.Monoid module defines mempty and mappend in the Monoid type class:
class Monoid m where
    mempty  :: m
    mappend :: m -> m -> m
... and also provides a convenient infix operator for mappend:
(<>) :: (Monoid a) => a -> a -> a
m1 <> m2 = m1 `mappend` m2

Emptiness


Not all types have a unique Monoid instance. For example, Bool has two separate Monoid instances and we must use the Any or All newtypes from Data.Monoid to distinguish which one we prefer.

The Any monoid corresponds to the one we chose above, where False is the empty value and (||) is the combining operation:
newtype Any = Any { getAny :: Bool }

instance Monoid Any where
    mempty = Any False
    (Any b1) `mappend` (Any b2) = Any (b1 || b2)
However, there is a dual monoid where True is the "empty" value and (&&) is the combining operation:
newtype All = All { getAll :: Bool }

instance Monoid All where
    mempty = All True
    (All b1) `mappend` (All b2) = All (b1 && b2)
Similarly, numbers have two separate Monoid instances, and we use the Sum or Product monoids to distinguish which one we prefer.

newtype Sum a = Sum { getSum :: a }

instance (Num a) => Monoid (Sum a) where
    mempty = Sum 0
    (Sum n1) `mappend` (Sum n2) = Sum (n1 + n2)


newtype Product a = Product { getProduct :: a }

instance (Num a) => Monoid (Sum a) where
    mempty = Product 1
    (Product n1) `mappend` (Product n2) = Product (n1 * n2)
Monoids teach a valuable lesson: there is no such thing as an intrinsically empty value. Values are only empty with respect to a specific combining operation. We can choose more exotic default values to be mempty, but if we must select an unusual mappend to justify their emptiness then that suggests that we chose the wrong defaults.

So the next time you find yourself needing a Default type class, chances are that you actually should use the Monoid type class instead. The Monoid class forces us to demonstrate why our default is truly empty by also providing the associated combining operator. This basic sanity check discourages us from defining "ridiculous" defaults.

Thursday, March 21, 2013

pipes-3.2: ListT, Codensity, ArrowChoice, and performance

pipes-3.2 is out and it boasts several cool new features. The important highlights are:
  • A correct-by-construction ListT implementation that converts to and from proxies
  • The CodensityP proxy transformer, which improves the time complexity of left-associated binds
  • ArrowChoice operations for selectively applying pipes to subsets of an input stream
  • "Pointful" operators
  • Many performance improvements
This post is mainly a changelog, so if you are completely new to pipes, then I recommend you begin from the pipes tutorial, which is probably the longest tutorial on Hackage at this point.


ListT


Many people know that ListT in transformers is broken, thanks to the wonderful ListT done right article. Fewer people know that there is a correct-by-construction implementation of ListT up on Hackage in the List package. However, there are a couple of problems with the List package:
  • It's difficult to build custom ListT actions.
  • It's difficult to read out the result of List.
The generator package tries to solve the first problem, and remarkably resembles a Producer from pipes. It provides a monad equipped with a yield command and you use generate to compile it to a ListT action. However, neither List nor generator solve the second problem.

If you thought to yourself "Oh, no.... Gabriel added a List dependency to pipes", you'd be wrong! In fact, pipes has had ListT support since version 2.4 and I didn't even realize it until I was working on a perfect backtracking parser for the upcoming pipes-parse library.

The key lies in the Interact class that I introduced in pipes-2.4, which I've renamed to the ListT class in this release. People familiar with pipes know that this mysterious class provided two extra operators: (/>/) and (\>\), and that these operators happened to form two extra categories, with respond and request as their respective identities:
respond />/ f = f

f />/ respond = f

(f />/ g) />/ h = f />/ (g />/ h)


request \>\ f = f

f \>\ request = f

(f \>\ g) \>\ h = f \>\ (g \>\ h)
However, at the time I discovered these two additional categories I dismissed them as less interesting than the proxy composition categories, even mentioning in the tutorial that "they are more exotic and you probably never need to use them". Little did I know how wrong I was!

I later discovered that these two categories were both ListT Kleisli categories. The "respond" category (i.e. respond and (/>/))) is actually a monad over the downstream output of proxies, or in other words:
  • respond corresponds to return
  • (/>/) corresponds to (>=>)
I call this monad RespondT and Control.Proxy.ListT exports the following newtype which lets you convert between the Proxy monad and the RespondT monad:
newtype RespondT p a' a b' m b
    = RespondT { runRespondT :: p a' a b' b m b' }

instance (Monad m, ListT p) => Monad (RespondT p a' a b' m) where
    return a = RespondT (respond a)
    m >>= f  = RespondT (
        runRespondT m //> \a ->
        runRespondT (f a))
    -- (//>) is the "pointful" version of (/>/),
    -- just like (>>=) is the "pointful" version of (>=>)
Using RespondT, you can bind a proxy's output as if you were binding a list within the list monad:
import Control.Proxy

twoNumbers :: (Proxy p) => () -> Producer p Int IO ()
twoNumbers =
     readLnS
 >-> execU (putStrLn "Enter a number: ")
 >-> takeB_ 2

stringsTillQuit :: (Proxy p) => () -> Producer p String IO ()
stringsTillQuit =
     stdinS
 >-> execU (putStrLn "Enter a string: ")
 >-> takeWhileD (/= "quit")

-- 'ProduceT' is a convenient type synonym around 'RespondT'
exampleListT :: (ListT p) => () -> ProduceT p IO (Int, String)
exampleListT () = do
    n   <- RespondT $ twoNumbers ()
    str <- RespondT $ stringsTillQuit ()
    return (n, str)
You can then compile RespondT back to a proxy just by unwrapping the newtype, generating a proxy that produces one output per permutation of bound values:
exampleProxy :: (List p) => () -> Producer p (Int, String) IO ()
exampleProxy = runRespondK exampleListT
More often, you would just combine these two steps into one:
exampleProxy :: (List p) => () -> Producer p (Int, String) IO ()
exampleProxy () = runRespondT $ do
    n   <- RespondT $ twoNumbers ()
    str <- RespondT $ stringsTillQuit ()
    return (n, str)
>>> runProxy $ exampleProxy >-> printD
Enter a number: 
1<Enter>
Enter a string: 
Hello<Enter>
(1,"Hello")
Enter a string: 
world<Enter>
(1,"world")
Enter a string: 
quit<Enter>
Enter a number: 
2<Enter>
Enter a string: 
Testing<Enter>
(2,"Testing")
Enter a string: 
123<Enter>
(2,"123")
Enter a string: 
quit<Enter>
Notice how reading out the ListT is trivial. You just use the pipes machinery you know and love to read out the resulting lazy stream of values.

As you might have guessed, there is a symmetric ListT monad over upstream outputs, too, which I've named RequestT. RespondT and RequestT are correct by construction, meaning that they always satisfy the monad and monad transformer laws.

However, RespondT and RequestT are much more powerful than meets the eye. For example, you need not limit yourself to Producers when you use RespondT, as the following example demonstrates:
pipeT :: (ListT p) => () -> Pipe p Int (Int, Int) IO ()
pipeT () = runRespondT $ do
    x <- RespondT $ takeB_ 2 ()
    y <- RespondT $ takeB_ 3 ()
    return (x, y)
You can non-deterministically select outputs from any Proxy type and RespondT just magically does the right thing:
>>> runProxy $ enumFromS 1 >-> pipeT >-> printD
(1,2)
(1,3)
(1,4)
(5,6)
(5,7)
(5,8)
Similarly, you need not restrict yourself to unidirectional pipes. RespondT and RequestT won't bat an eyelash if you try to use them for bidirectional general-purpose proxies. pipes goes above and beyond a traditional ListT implementation.

The proxy prelude provides convenience functions for common operations, such as ranges or iterating over lists:
pairs :: (ListT p) => () -> Producer p (Int, Int) IO ()
pairs () = runRespondT $ do
    x <- rangeS 1 2
    lift $ putStrLn $ "x = " ++ show x
    y <- eachS [4, 6, 8]
    lift $ putStrLn $ "y = " ++ show y
    return (x, y)
eachS is named after Ruby's each function and rangeS is named after Python's range function. You can bind each one within RespondT to non-deterministically select from a list or range, respectively.
>>> runProxy $ pairs >-> printD
x = 1
y = 4
(1,4)
y = 6
(1,6)
y = 8
(1,8)
x = 2
y = 4
(2,4)
y = 6
(2,6)
y = 8
(2,8)
It also wouldn't be a "ListT done right" unless it got the examples from that article correct, too:
myTest :: (ListT p) => Int -> () -> ProduceT p IO (Int, Int)
myTest n () = do
    let squares = eachS $ takeWhile (<= n) $ map (^2) [0..]
    x <- squares
    y <- squares
    lift $ print (x, y)
    guard $ x + y == n
    lift $ putStrLn "Sum of squares."
    return (x, y)
However, that example had a much more difficult time reading out just the first result. We can do so quite easily just by using the headD_ fold, which only drives the RespondT block long enough to retrieve the first result:
>>> let p = raiseK (runRespondK (myTest 5)) >-> headD_
>>> execWriterT $ runProxy p
(0,0)
(0,1)
(0,4)
(1,0)
(1,1)
(1,4)
Sum of squares.
First {getFirst = Just (1,4)}
If you want to learn more, you can read the newly added ListT section of the tutorial, which provides even more code examples.

So now proxies now possess two symmetric ListT implementations you can add to your toolbox, and they improve on the state of the art by reusing the elegant pipes machinery for both building ListT actions and reading out their values.


Codensity Proxy Transformer


In addition to conduit, I must also contend with Edward's machines library (currently on Github). Until recently, machines possessed one notable advantage over pipes: it used a codensity transformation of its internal free monad to avoid a quadratic blowup from a large series of left-associated binds. Normally these do not arise commonly in practice, but it is still a nice feature to have.

Now pipes has assimilated this feature, too. You can improve the time complexity of any pipe just by wrapping the pipe in runCodensityP or runCodensityK, both of which behave like the improve function from the free package. These automatically fix any quadratic time complexity of left-associated binds.

replicateM is a great example of a function that generates lots of left-associated binds. If I try to use it within a pipe, I will get a quadratic blowup:
import Control.Monad
import Control.Proxy

leftAssociate () = replicateM 10000 (request ())

main = do
    xs <- runProxy $ enumFromS (1 :: Int) >-> leftAssociate
    print xs
$ time ./main >/dev/null

real    0m3.773s
user    0m3.716s
sys     0m0.052s
... but if you wrap the pipeline in runCodensityK, it switches to linear time complexity:
import Control.Monad
import Control.Proxy
import Control.Proxy.Trans.Codensity

leftAssociate () = replicateM 10000 (request ())

main = do
    xs <- runProxy $ runCodensityK $
        enumFromS (1 :: Int) >-> leftAssociate
    print xs
$ time ./main >/dev/null

real    0m0.031s
user    0m0.024s
sys     0m0.000s
Even better, you can wrap just the pathological pipe in runCodensityK:
main = do
    xs <- runProxy $
        enumFromS (1 :: Int) >-> runCodensityK leftAssociate
    print xs
... which gives a minor performance improvement (more easily detectable for larger numbers of requests):
time ./main >/dev/null

real    0m0.027s
user    0m0.024s
sys     0m0.000s
My own performance measurements show that while the codensity transformation does improve time complexities for left-associated binds, it yields worse constant factors (about 6-fold slower for entirely pure code, but much less for IO-bound code), which is why I do not enable it on by default. The main reason the naive free monad in the ProxyFast implementation outperforms the codensity version is that it can use rewrite RULES to rewrite your code into the optimal tuned form, but the codensity-transformed version cannot.


ArrowChoice


Ever since I first released pipes, I've received numerous questions about whether or not pipes can be made an instance of Arrow. While you can't make Arrow work, you CAN make proxies implement ArrowChoice, although I don't provide an actual instance because there are two such instances (one for downstream and one for upstream) and the requisite newtypes would be very cumbersome.

You can find these combinators in the ArrowChoice section of the proxy prelude, which provides left{D/U} and right{D/U}. Using these combinators, you can selectively apply pipes to a subset of a stream:
stream
    :: (Monad m, Proxy p)
    => () -> Producer p (Either Int Char) m ()
stream = fromListS
    [Left 3, Right 'C', Right 'D', Left 4, Right 'E', Left 5]
>>> let p = stream >-> leftD (takeB_ 2 >-> mapD show) >-> printD
>>> runProxy p
Left "3"
Right 'C'
Right 'D'
Left "4"
This lets you dynamically switch behavior in response to stream values. For example, one person recently asked me how you would switch content handling in the middle of a pipeline (for example, after negotiating encryption and compression). One option I proposed is that they could use Either to distinguish between values before and after negotiation and then filter them differently using the ArrowChoice combinators:
negotiation () = do
     (before >-> mapD Left ) ()
     (after  >-> mapD Right) ()

main = runProxy $
     source
 >-> negotiation
 >-> leftD idT >-> rightD (decompress >-> decrypt)
 >-> sink
However, that's purely a theoretical idea I threw out there. I haven't actually tried this solution in practice, yet.


Point-ful operators


All type classes now use the "point-ful" equivalents of their original point-free composition operators, and the point-free operators are now derived from the point-ful ones. There are two significant advantages of this:
  • Types. Some previous perfectly safe code required the following hack: ((\_ -> p) >-> k) undefined. These point-ful operators now naturally lead to the correct and more general types of their corresponding point-free composition operators.
  • Performance. I actually used these operators internally to get pipes performance so high. Exposing them directly in the type class removes a lot of code indirection and improves the performance of the base proxy implementations (typically by about 10-15%) and the proxy transformers because there is less indirection.
Also, some people have told me that they found these point-ful operators to be more intuitive to work with, although I still personally prefer the point-free operators.


Performance


Other performance improvements include better rewrite RULES. I found several cases where the original rules were not firing, which is why the proxy prelude still depended on the manual worker/wrapper code to ensure that they fired. I recently found a more general set of rewrite RULES that work even more reliably and the proxy prelude now seems to optimize correctly even when written using the most naive code. However, I haven't committed the newer naive versions yet, as a precaution.


Stability


Many people probably want to know when I still stabilize the pipes package so that it can eventually go in the Haskell platform. The answer is that I won't stabilize it officially until I complete three upcoming libraries:
  • pipes-parse, which provides a native parsing extension and a standard set of end-of-input machinery for pipes
  • pipes-free, which will expose the underlying free monad and also provide an iostreams-like interface to pipes
  • pipes-stm, which will be the basis of an FRP system built on top of pipes
These are the three libraries that I expect to considerably exercise the API of the main library and uncover any significant omissions in its design. For example, just working on pipes-parse alone gave rise to the ListT machinery and the ArrowChoice combinators. However, the core API has so far proven to be particularly solid and future-proof, thanks to the enormously useful proxy transformer system. Proxy transformers let me continue to release new features (like CodensityP and the upcoming ParseP) without impacting any existing code. The only major backwards-incompatible change in this last release was just renaming the Interact class to ListT for clarity, since I figured this would be my last chance to rename it now that people will probably use it much more heavily now.


Open Design Issues


There remains one major performance bottleneck in the fast proxy implementation: hoist. I can dramatically speed it up by removing an normalization step, but doing so means that you can only safely supply hoist with monad morphisms (such as lift), otherwise you will violate the monad transformer laws. In theory, you should really only use monad morphisms for hoist anyway, but in practice people violate this all the time and try to do "weird" things like hoist (runStateT 0), which is very insensible, and I can't easily use the types to forbid that kind of thing. So I would like feedback on whether people prefer speed or safety for the ProxyFast implementation. Alternatively, I could release a third base Proxy instance not selected by default that is identical to ProxyFast in all respects except for providing the faster hoist.

Another question I have is whether or not to merge the proxy prelude into a single module. I welcome any feedback on that. Some people comment that the the module hierarchy is a little-bit too fine-grained and that's one opportunity to condense four modules into one.


Upcoming libraries


The next library on the docket is pipes-parse, followed shortly by pipes-bytestring. I know that many people need the pipes-bytestring library to progress further with pipes, but it depends on pipes-parse, which establishes some important higher-level idioms for the pipes ecosystem in general. My rate of progress has slowed recently mainly because I'm wrapping up my PhD, but I expect that I can finish both within about two months even at my current rate.

Saturday, March 16, 2013

mmorph-1.0.0: Monad morphisms

Several people have asked me to split off MFunctor from pipes so that they could use it in their own libraries without a pipes dependency, so today I'm releasing the mmorph library, which is the new official home of MFunctor. The upcoming pipes-3.2 release will depend on mmorph to provide MFunctor.

The mmorph library specifically targets people who make heavy use of monad transformers. Many common problems that plague users of monad transformers have very elegant solutions inspired by category theory and mmorph provides a standard home for these kinds of operations.

This post won't include examples because mmorph already features an extended tutorial at the bottom of its sole module: Control.Monad.Morph. The tutorial highlights several common use cases where the mmorph library comes in handy and I highly recommend you read it if you want to understand the concrete problems that the mmorph library solves.



Moving on up


mmorph takes several common Haskell idioms you know and love and lifts them to work on monads instead. The simplest example is a monad morphism:
{-# LANGUAGE RankNTypes, TypeOperators #-}

type m :-> n = forall a . m a -> n a
A monad morphism is a function between monads and all monad morphisms must satisfy the following two "monad morphism laws":
morph :: m :-> n

morph $ do x <- m  =  do x <- morph m
           f x           morph (f x)

morph (return x) = return x
Using the above type synonym for monad morphisms, we can simplify the type signature of hoist from the MFunctor type class:
class MFunctor t where
    hoist :: (Monad m) => (m :-> n) -> (t m :-> t n)
MFunctor is the higher-order analog of the Functor class (thus the name), and the resemblance becomes even more striking if you change the type variable names:
class MFunctor f where
    hoist :: (a :-> b) -> (f a :-> f b)
This parallel lets us reuse our intuition for Functors. An MFunctor wraps a monad in the same way that a Functor wraps a type, and MFunctors provide an fmap-like function, hoist, which modifies the wrapped monad.

If you've ever used monad transformers then you've probably already used MFunctors. Just check out the instance list for MFunctor and you'll see many familiar names:
instance MMorph  IdentityT where ...
instance MMorph  MaybeT    where ...
instance MMorph (StateT s) where ...
In fact, transformers has been carrying around type-specialized versions of hoist for years:
  • mapIdentityT is hoist for IdentityT
  • mapStateT is hoist for StateT
  • mapMaybeT is hoist for MaybeT
hoist provides a standard interface to these functions so that you can program generically over any monad transformer that implements MFunctor.


I heard you like monads


We can define a higher-order functor that wraps monads, so why not also define a higher-order monad that wraps ... monads?

It turns out that actually works!
class MMonad t where
    embed :: (Monad n) => (m :-> t n) -> (t m :-> t n)
Again, judicious renaming of type variables reveals the parallel to the Monad class:
class MMonad m where
    embed :: (Monad b) => (a :-> m b) -> (m a :-> m b)
embed is just the higher-order cousin of (=<<)! Many monad transformers have sensible definitions for embed:
instance               MMonad IdentityT   where ...
instance               MMonad MaybeT      where ...
instance (Monoid w) => MMonad (WriterT w) where ...
But wait! Where is return? Well, what type would we expect the higher-order return to have?
??? :: m :-> t m
Well, if we expand out the definition of (:->), we get:
??? :: m a -> t m a
Why, that is just the signature for lift!

But it's not enough for it to just have the right shape of type. If it's really part of a higher-order monad, then lift and embed must obey the monad laws:
-- m >>= return = m
embed lift m = m

-- return x >>= f = f x
embed f (lift x) = f x

-- (m >>= f) >>= g = m >>= (\x -> f x >>= g)
embed g (embed f m) = embed (\x -> embed g (f x)) m
... and all the MMonad instances do satisfy these laws!


Functor design pattern


The mmorph library represents a concrete example of the functor design pattern in two separate ways.

First, the monad morphisms themselves define functors that transform Kleisli categories, and the monad morphism laws are actually functor laws:
morph :: forall a . m a -> n a

(morph .) (f >=> g) = (morph .) f >=> (morph .) g

(morph .) return = return
... so you can think of a monad morphism as just a principled way to transform one monad into another monad for compatibility purposes.

Second, the hoist function from MFunctor defines a functor that transforms monad morphisms:
hoist (f . g) = hoist f . hoist g

hoist id = id
... so you can think of hoist as just a principled way to transform one monad morphism into another monad morphism for compatibility purposes.

The mmorph library is a concrete example of how functors naturally arise as compatibility layers whenever we encounter impedance mismatch between our tools. In this case, we have an impedance mismatch between our monad transformers and we use functors to bridge between them so they can seamlessly work together.