Saturday, February 22, 2014

Reasoning about stream programming

This post answers a question people sometimes ask me about pipes, which I will paraphase here:

If resource management is not a core focus of pipes, why should I use pipes instead of lazy IO?

Many people who ask this question discovered stream programming through Oleg, who framed the lazy IO problem in terms of resource management. However, I never found this argument compelling in isolation; you can solve most resource management issues simply by separating resource acquisition from the lazy IO, like this:

import System.IO

main = withFile "example.txt" ReadMode $ \handle -> do
    str <- hGetContents handle
    doSomethingWith str

You can now easily reason about resource management: the handle lifetime matches the duration of the callback provided to withFile. This solves 95% of resource management scenarios, and libraries like resourcet/conduit/pipes-safe handle the more exotic cases.

If lazy IO were only a problem for 5% of all use cases, I wouldn't object to its use. Rather, resource management issues are symptoms of a larger problem: lazy IO conflicts with equational reasoning about ordering of effects.

The issue with lazy IO

Lazy IO is teaching an entire generation of Haskell programmers to "not think very hard" about ordering of side effects. Some consider this a feature, but I believe it is a disadvantage because it conflicts with the original spirit of Haskell IO: preserving equational reasoning. Lazy IO ties the effect order to evaluation order, and equational reasoning does not usually preserve evaluation order.

In my eyes, Haskell's treatment of IO was one of the major revolutions in programming language theory, because for the first time ever one could formally reason about side effects within the language rather than building an external formal model. This empowered lay programmers to reason about side effects using simple substitution and greatly lowered the entry barrier to formal reasoning in general.

Consequently, I was very disappointed to see Haskell squander this immense progress by teaching new programmers to abandon reasoning about ordering of side effects through the use of lazy IO. We should be democratizing computer science by helping the average programmer be precise, rather than encouraging imprecision and instructing newcomers to defer precision to the experts.


I built pipes to restore equational reasoning to stream programming. Besides eschewing lazy IO, pipes also promotes equational reasoning in other ways:

  • pipes obeys several laws inspired by category theory that simplify equational rearrangements
  • The core pipes implementation is minimal
  • pipes extensions are pipes-independent, so you can reason about them orthogonally

pipes does not come with every bell and whistle pre-installed because pipes is a minimal substrate upon which other streaming abstractions can be built. This reflects the Haskell ethos of only using the minimum power necessary for the task at hand, which improves correctness and minimizes surprise.

Therefore, you can think of pipes as a gateway drug to equationally reasoning about stream processing. pipes empowers you to be correct and rigorous so that you can spend less time chasing bugs and more time releasing features.

Those who are interested in equationally reasoning about stream programming can read the pipes tutorial, which illustrates how many common streaming abstractions that we take for granted possess beautiful equational properties that we can harness for real applications.

Saturday, February 8, 2014

pipes-http-1.0: Streaming HTTP/HTTPS clients

The pipes-http package now provides an HTTP client for pipes. This was made possible by Michael Snoyman, who released http-client and http-client-tls, which are a conduit-independent subset of http-conduit. I wanted to thank Michael for releasing those packages, which greatly simplified my job. I even get TLS support for free thanks to him, which is incredible.

I've chosen to write the thinnest layer possible, only providing functions to stream in request bodies and stream out response bodies. Everything else is re-exported from http-client and http-client-tls. Hopefully this will encourage shared contributions to those libraries from both pipes and conduit users.

I'll show the library in action with an example of how you would query and format the titles of the top 10 posts on /r/haskell:

-- haskell-reddit.hs

{-# LANGUAGE OverloadedStrings #-}

import Control.Lens (_Right, (^..))
import Control.Lens.Aeson (key, values, _String)
import Control.Monad.Trans.State.Strict (evalStateT)
import Data.Aeson.Parser (json')
import qualified Data.Text    as T
import qualified Data.Text.IO as T
import Pipes.Attoparsec (parse)
import Pipes.HTTP

main = do
    req <- parseUrl ""
    withManager defaultManagerSettings $ \m ->
        withHTTP req m $ \resp -> do
            json <- evalStateT (parse json') (responseBody resp)

            let titles :: [T.Text]
                titles = json  ^.. _Right
                                 . key "data"
                                 . key "children"
                                 . values
                                 . key "data"
                                 . key "title"
                                 . _String

            mapM_ (T.putStrLn . format) (take 10 titles)

format :: T.Text -> T.Text
format txt =
    if   T.length txt <= columns
    then T.concat [bullet,                txt          ]
    else T.concat [bullet, T.take columns txt, ellipsis]
    bullet   = "[*] "
    ellipsis = "..."
    columns = 60 - (T.length bullet + T.length ellipsis)

This example uses pipes-attoparsec to parse the response body as JSON and lens-aeson to query the JSON value for titles:

$ ./haskell-reddit
[*] My Haskell will
[*] What are some common "design patterns" when it comes ...
[*] The Resumption and Reactive Resumption Monads
[*] `cabal install` failing in Vagrant
[*] A Lazy Evaluation - Why I Find Learning Haskell Hard
[*] The Essence Of Reynolds
[*] A test driven haskell course
[*] Wheb -- A WAI framework.
[*] I wrote a simple &lt;200 line Forth interpreter. Does...
[*] GHC iOS 7.8 RC1

You can find pipes-http on Hackage or on Github.

Wednesday, February 5, 2014

pipes-parse-3.0: Lens-based parsing

pipes-parse-3.0.0 introduces a new lens-based parsing mechanism. These lenses improve the library in two ways:

  • They greatly simplify the API

  • They correctly propagate leftovers further upstream


The new parsing API consists of three main types, which roughly parallel the three pipes abstractions:

  • Producers, which are unchanged from pipes

    Producer a m x
  • Parsers, which are the parsing analog of Consumers:

    type Parser a m r = forall x . StateT (Producer a m x) m r
  • Lens'es between Producer's, which are the parsing analog of Pipes:

    Lens' (Producer a m x) (Producer b m y)

What's neat is that pipes-parse does not need to provide any operators to connect these three abstractions. All of the tools you need already exist in either transformers and lens (or lens-family-core, if you prefer a simpler lens alternative).

For example, you connect a Parser to a Producer using either runStateT, evalStateT, or execStateT:

                                            +- Result
runStateT                                   v
    :: Parser a m r -> Producer a m x -> m (r, Producer a m x)
    :: Parser a m r -> Producer a m x -> m  r
    :: Parser a m r -> Producer a m x -> m (   Producer a m x)
                       ^^^^^^^^^^^^^^          ^^^^^^^^^^^^^^
                        Parser Input              Leftovers

These correspond to the three possible ways you might want to run a parser:

  • runStateT: Return the result and leftovers
  • evalStateT: Return only the result
  • execStateT: Return only the leftovers

In fact, two of these functions closely parallel conduit operators:

  • runStateT parallels ($$+)
  • evalStateT parallels ($$)

You can also connect Producers to Lens'es using (^.) or view:

(^.) :: Producer a m x
     -> Lens' (Producer a m x) (Producer b m y)
     -> Producer b m y

(^.) parallels conduit's ($=) operator.

If you want to connect Lens'es to Parsers, you use zoom:

zoom :: Lens' (Producer a m x) (Producer b m y)
     -> Parser b m r
     -> Parser a m r

zoom parallels conduit's (=$) operator.

Finally, you connect Lens'es to each other using (.) (i.e. function composition):

(.) :: Lens' (Producer a m x) (Producer b m y)
    -> Lens' (Producer b m y) (Producer c m z)
    -> Lens' (Producer a m y) (Producer c m z)

(.) parallels conduit's (=$=) operator.

Here's a worked example showing off the new API, including newly-added foldl integration:

import qualified Control.Foldl as L
import Lens.Family ((^.))
import Lens.Family.State.Strict (zoom)
import Pipes
import Pipes.Parse
import Prelude hiding (span, splitAt)

parser :: Parser Int IO ()
parser = do
    -- Attempt to draw a single element
    a <- draw
    lift (print a)

    -- Sum the next 10 input elements
    b <- zoom (splitAt 10) (L.purely foldAll L.sum)
    lift (print b)

    -- Collect all elements less than 15 into a list
    c <- zoom (span (< 15)) drawAll
    lift (print c)

    -- You can nest `zoom`s
    zoom (span (< 20)) $ do
        d <- zoom (splitAt 3) (L.purely foldAll L.product)
        lift (print d)

        e <- peek
        lift (print e)

    -- ... or compose lenses:
    f <- zoom (span (< 20) . splitAt 3) drawAll
    lift (print f)

    -- Print the remaining elements
    g <- drawAll
    lift (print g)

-- Lenses can modify `Producer`s, too
producer :: Monad m => Producer Int m (Producer Int m ())
producer = each [1..] ^. span (< 25)

We get the following output if we connect our parser to our producer:

>>> evalStateT parser producer
Just 1
Just 18


There is a subtle difference between pipes-parse and conduit: pipes-parse correctly propagates leftovers further upstream whereas conduit does not. To illustrate this, let's begin from the following implementation of peek for conduit:

import Control.Monad.Trans.Class (lift)
import Data.Conduit
import Data.Conduit.List (isolate, sourceList)

peek :: Monad m => Sink a m (Maybe a)
peek = do
    ma <- await
    case ma of
        Nothing -> return ()
        Just a  -> leftover a
    return ma

peek attempts to draw a value using await and then undraws the value using leftover before returning the result. peek will work correctly when used like this:

source :: Monad m => Source m Int
source = sourceList [1, 2]

sink1 :: Show a => Sink a IO ()
sink1 = do
    ma1 <- peek
    ma2 <- peek
    lift $ print (ma1, ma2)

If we feed source to sink1, both peeks will return Just 1, since the first peek undraws the 1 to make it available for the second peek:

>>> source $$ sink1
(Just 1,Just 1)

But what happens if we delimit the first peek using isolate, which only allows a fixed number of values to flow through? This is a common parsing request: run a parser within a subset of the input.

sink2 :: Show a => Sink a IO ()
sink2 = do
    ma1 <- isolate 10 =$ peek
    ma2 <- peek
    lift $ print (ma1, ma2)

However, when you compose two conduits, the downstream conduit discards all leftovers when done. Michael is up front about this in the documentation for (=$):

"Leftover data returned from the Sink will be discarded."

There are similar warnings for ($=) and (=$=), all of which discard leftovers from the right component. We can run sink2 to trigger this behavior:

>>> source $$ sink2
(Just 1,Just 2)

The undrawn 1 is irreversibly lost when (=$) completes, which is why the second peek reads a 2 instead of a 1.

The analogous pipes code gets this correct:

import Lens.Family.State.Strict (zoom)
import Pipes
import Pipes.Parse
import Prelude hiding (splitAt)

parser :: Show a => Parser a IO ()
parser = do
    ma1 <- zoom (splitAt 10) peek
    ma2 <- peek
    lift $ print (ma1, ma2)

producer :: Monad m => Producer Int m ()
producer = each [1, 2]

The pipes-parse version correctly restores the undrawn 1 so that the second peek also draws a 1:

>>> evalStateT parser producer
(Just 1,Just 1)

The magic is in the splitAt function, which is the pipes-parse analog of conduit's isolate. Compare the source of isolate (slightly rewritten):

isolate :: Monad m => Int -> Conduit a m a
isolate = loop
    loop 0 = return ()
    loop n = do
        ma <- await
        case ma of
            Nothing -> return ()
            Just a  -> do
                yield a
                loop (n - 1)

... to the source of splitAt (also slightly changed to resemble isolate):

    :: Monad m
    => Int
    -> Lens' (Producer a m x) (Producer a m (Producer a m x))
splitAt n0 k p0 = fmap join (k (loop n0 p0))
    loop 0 p = return p
    loop n p = do
        x <- lift (next p)
        case x of
            Left   r      -> return (return r)
            Right (a, p') -> do
                yield a
                loop (n - 1) p'

The internal loop function in splitAt corresponds to the internal loop from isolate. The extra magic lies within the first line:

splitAt n0 k p0 = fmap join (k (loop n0 p0))

Lens aficionados will recognize this as the dependency-free version of:

splitAt n0 = iso (loop n0) join

This not only instructs the Lens in how to isolate out the first ten elements (using loop), but also how to reverse the process and merge the remaining elements back in (using join). This latter information is what makes leftover propagation work.


Note that not all transformations are reversible and therefore cannot propagate leftovers upstream. Fortunately, the lens model handles this perfectly: just define a Getter instead of a Lens' for transformations that are not reversible:

getter :: Getter (Producer a m x) (Producer b m y)

A Getter will only type-check as an argument to (^.) and not zoom, so you cannot propagate unDraw through a Getter like you can with a Lens':

zoom getter unDraw -- Type error

However, you can still use the Getter to transform Producers:

producer ^. getter -- Type checks!

This provides a type-safe way to distinguish transformations that can propagate leftovers from those that cannot.

In practice, pipes-based parsing libraries just provide functions between Producers instead of Getters for simplicity:

getter :: Producer a m x -> Producer b m y

... but you can keep using lens-like syntax if you promote them to Getters using the to function from lens:

to :: (a -> b) -> Getter a b

producer ^. to getter

Lens Support

Some people may worry about the cost of using lens in conjunction with pipes-parse because lens is not beginner-friendly and has a large dependency graph, so I'd like to take a moment to advertise Russell O'Connor's lens-family-core library. lens-family-core is a beginner-friendly lens-alternative that is (mostly) lens-compatible. It provides much simpler, beginner-friendly types and has a really tiny dependency profile.

Note that pipes-parse does not depend on either lens library. This is one of the beautiful aspects about lenses: you can write a lens-compatible library using nothing more than stock components from the Prelude and the transformers library. You can therefore combine pipes-parse with either lens-family-core or lens without any conflicts. This provides a smooth transition path from the beginner-friendly lens-family-core library to the expert-friendly lens library.


The lens approach is nice because you get many laws for free. For example, you get several associativity laws, like the fact that (^.) associates with (.):

(producer ^. lens1) ^. lens2 = producer ^. (lens1 . lens2)

Similarly, (.) associates with zoom:

zoom lens1 (zoom lens2 parser) = zoom (lens1 . lens2) parser

... and the trio of evalStateT/(^.)/zoom all associate:

evalStateT parser (producer ^. lens)
    = evalStateT (zoom lens parser) producer

Also, lens composition associates, because it's just function composition:

(lens1 . lens2) . lens3 = lens1 . (lens2 . lens3)

You even get the following identity laws for free:

producer ^. id = producer

zoom id producer = producer

f . id = f

id . f = f

However, there is one caveat: many of the lenses in pipes-parse do not satisfy certain lens laws. Specifically, they do not satisfy these laws:

-- Law violation #1
view lens (set lens x a) /= x

-- Law violation #2
zoom lens $ do x <- m  /=  do x <- zoom lens m
               f x            zoom lens (f x)

Law violation #1 arises because I don't know of a lens-like abstraction that type-checks as a Getter and a Focusing, but not a Setter.

However, law #2 directly conflicts with a core pipes-parse feature, specifically lenses like splitAt that delimit parsers. Here's why:

zoom (splitAt n) $ do x <- m  /=  do x <- zoom (splitAt n) m
                      f x            zoom (splitAt n) (f x)

Limiting one parser to n elements is not the same as limiting its two sub-parsers to n elements each. So if you use pipes-parse lenses you cannot rely on zoom being a monad morphism when doing equational reasoning. This was a tough call for me to make, but I felt that delimiting parsers were more important than the monad morphism laws for zoom. Perhaps there is a more elegant solution that I missed that resolves this conflict, but I'm still pleased with the current solution.


Notice how all of these functions and laws are completely pipes-independent. Any streaming abstraction that has some Producer-like type can implement lens-based parsing, too, and also get all of these laws for free, including pure streams (such as strict or lazy Text) or even lazy IO. Other streaming libraries can therefore benefit from this exact same trick.

Batteries included

Downstream libraries have been upgraded to use the pipes-parse API, including pipes-bytestring, pipes-binary, and pipes-attoparsec.This means that right now you can do cool things like:

import Lens.Family.State.Strict (zoom)
import Pipes
import Pipes.Parse
import Pipes.Binary
import qualified Pipes.ByteString as ByteString

parser :: Parser ByteString IO ()
parser = zoom (ByteString.splitAt 100 . decoded) $ do
    x <- draw  -- Draw a decoded Int
    lift $ print (x :: Maybe Int)
    unDraw 99  -- This undraws the encoded bytes

producer :: Monad m => Producer ByteString m ()
producer = for (each [(1::Int)..]) encode

The above parser composes two lenses so that it zooms in on a stream of decoded ints that consume no more than 100 bytes. This will transmute draw to now receive decoded elements and unDraw will magically re-encode elements when pushing back leftovers:

>>> producer' <- execStateT parser producer
Just 1
>>> evalStateT parser producer'
Just 99

Also, Michael Thompson has released a draft of pipes-text on Hackage. This means you can parse a byte stream through a UTF-8 lens and any undrawn input will be encoded back into the original byte stream as bytes. Here is an example program show-casing this neat feature:

-- decode.hs

import Data.ByteString (ByteString)
import Data.Text       (Text)
import Lens.Family.State.Strict (zoom)
import Pipes
import Pipes.Parse
import qualified Pipes.ByteString as ByteString
import qualified Pipes.Text       as Text

-- Retrieve all `Text` chunks up to 10 characters
parser :: Monad m => Parser ByteString m [Text]
parser = zoom (Text.decodeUtf8 . Text.splitAt 10) drawAll

main = do
    (textChunks, leftovers) <- runStateT parser ByteString.stdin
    print textChunks

    -- Now print the remaining `ByteString` chunks
    byteChunks <- evalStateT drawAll leftovers
    print byteChunks

The unused bytes from the decoded stream get correctly undrawn to the original byte stream!

$ ./decode
Hello, 世界!!!<Enter>
["Hello, \19990\30028!"]

The remainder of the first line is undrawn by the decoder and restored back as the original encoding bytes.

Note that the above example is line-buffered, which is why the program does not output the Text chunks immediately after the 10th input character. However, if you disable line buffering then all chunks have just a single character and the example wouldn't illustrate how leftovers worked.

The above example could have also been written as a single Parser:

parser :: Parser ByteString IO ()
parser = do
    texts <- zoom (Text.decodeUtf8 . Text.splitAt 10) drawAll
    lift (print texts)
    bytes <- drawAll
    lift (print bytes)

main = evalStateT parser ByteString.stdin

... but I wanted to make the leftover passing explicit to emphasize that the leftover behavior holds correctly whether or not you exit and re-enter pipes.


The pipes-parse API lets you propagate leftovers upstream, encode leftover support in the type system, and equationally reason about code with several theoretical laws. Additionally, pipes-parse reuses existing functions and concepts from lens and StateT rather than introducing a new set of abstractions to learn.

Note that pipes-parse used to have a some FreeT-based operations as well. These have been moved to a separate pipes-group library (and upgraded to use lenses) since they are conceptually orthogonal to parsing and I will blog about this library in a separate post.

You can find pipes-parse on Hackage or Github, and it comes with an extended tutorial.

Sunday, February 2, 2014

Streaming logging

I see many beginners use WriterT [a] in their programs to log outputs like this:

import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Writer
import Prelude hiding (log)

log :: Monad m => String -> WriterT [String] m ()
log string = tell [string]

example :: WriterT [String] IO ()
example = do
    log "Printing 1 ..."
    lift $ print 1
    log "Printing 2 ..."
    lift $ print 2

main = do
    strings <- execWriterT example
    mapM_ putStrLn strings

This is not the best approach, because you cannot retrieve any logged values until the computation is complete:

>>> main
Printing 1 ...
Printing 2 ...

We cannot appropriate this for long-running programs like servers where we wish to inspect logged output while the program is still running. Worse, this approach will waste memory storing all logged values until the very end.

The simplest way to solve this is just to modify our computation to take the desired logging function as a parameter:

parametrize :: (String -> IO ()) -> IO ()
parametrize log = do
    log "Printing 1 ..."
    print 1
    log "Printing 2 ..."
    print 2

main = parametrize putStrLn

Now we log output immediately without wasting any memory:

>>> main
Printing 1 ...
Printing 2 ...

However, this approach is still a little brittle. For example, suppose we wish to log these lines to a file. As a basic denial-of-service precaution we might wish to cap the number of logged lines (or put the log file on a separate partition, but humor me). Limiting the logged output would necessitate the use of an IORef to coordinate between logging callbacks:

import Control.Monad (when)
import Data.IORef

main = do
    count <- newIORef 0

    let putStrLn' maxLines string = do
            n <- readIORef count
            when (n < maxLines) $ do
                putStrLn string
                writeIORef count (n + 1)

    parametrize (putStrLn' 1)

Now we have tightly integrated state into our log function and increased our dependence on IO. I prefer to limit unnecessary IO and also avoid callback hell, so I will introduce a third solution:

import Pipes
import Prelude hiding (log)

log :: Monad m => String -> Producer String m ()
log = yield

piped :: Producer String IO ()
piped = do
    log "Printing 1 ..."
    lift $ print 1
    log "Printing 2 ..."
    lift $ print 2

main = runEffect $ for piped (lift . putStrLn)

The piped code is syntactically identical to our original example, but this time we stream values immediately instead of deferring all results to a large list at the end:

>>> main
Printing 1 ...
Printing 2 ...

In fact, the for combinator from Pipes exactly recapitulates the behavior of our parametrized function. (for p f) replaces every yield in p with f, and log is just a synonym for yield, so we can freely substitute log commands using for. It's as if we had directly parametrized our piped function on the logging action:

for piped (lift . putStrLn)

-- Replace each `log`/`yield` with `(lift . putStrLn)`
= do (lift . putStrLn) "Printing 1 ..."
     lift $ print 1
     (lift . putStrLn) "Printing 2 ..."
     lift $ print 2

-- Simplify a little bit
= do lift $ putStrLn $ "Printing 1 ..."
     lift $ print 1
     lift $ putStrLn $ "Printing 2 ..."
     lift $ print 2

-- `lift` is a monad morphism, so we can factor it out
= lift $ do putStrLn $ "Printing 1 ..."
            print 1
            putStrLn $ "Printing 2 ..."
            print 2

... and all that runEffect does is remove the lift:

runEffect (for piped yield)

= runEffect $ lift $ do
     putStrLn $ "Printing 1 ..."
     print 1
     putStrLn $ "Printing 2 ..."
     print 2

-- runEffect (lift m) = m
= do putStrLn $ "Printing 1 ..."
     print 1
     putStrLn $ "Printing 2 ..."
     print 2

However, unlike the parametrized example, piped is more flexible. We can manipulate yields in many more ways than just the for combinator. For example, we can use the take Pipe from Pipes.Prelude to easily limit the number of logged outputs:

import qualified Pipes.Prelude as Pipes

limit :: Monad m => Int -> Pipe String String m r
limit n = do
    Pipes.take n  -- Forward the first `n` outputs
    Pipes.drain   -- Ignore the remaining log statements

main = runEffect $ for (piped >-> limit 1) (lift . putStrLn)

... or for people who prefer (>->) over for, you can write the entire thing as one long pipeline:

main = runEffect $ piped >-> limit 1 >-> Pipes.stdoutLn

This will now only output the first logged value:

>>> main
Printing 1 ...

We get all of this with a strict separation of concerns. All three stages in our pipeline are separable and reusable in the same spirit as Unix pipes.

So the next time you need to log a stream of values, consider using a Producer to stream values immediately instead of building up a large list in memory. Producers preserve a great deal of flexibility with very few dependencies and low syntactic overhead. You can learn more about pipes by reading the tutorial.