Friday, September 20, 2013

Perfect streaming using `pipes-bytestring`

pipes-bytestring-1.0.0 is complete, providing pipes utilities for reading and writing effectul streams of ByteStrings. Most people who have been waiting on this library were mainly interested in the following four pipes:

stdin :: MonadIO m => Producer' ByteString m ()

stdout :: MonadIO m => Consumer' ByteString m ()

fromHandle :: MonadIO m => Handle -> Producer' ByteString m ()

toHandle :: MonadIO m => Handle -> Consumer' ByteString m r

However, I delayed releasing pipes-bytestring for so long because there was a separate problem I wanted to solve first, which I like to call the Perfect Streaming Problem.

The Problem

Here is the statement of the Perfect Streaming Problem:

How do you partition an effectful stream into logical units while strictly enforcing an upper bound on memory use?

This problem will resonate with anybody who has ever built a streaming web service: how do you group your stream of bytes into logical units like files or messages? If you naively load each file or message into memory then a malicious input that is one extremely long file or message will take down your server by exhausting all available memory. Most web services work around this by imposing artificial limits on file or message length, or by writing brittle and non-compositional code or protocols that are difficult to understand, debug, and maintain.

However, we don't have to invoke web services to illustrate this problem. Just consider this simpler problem: how do you implement the Unix head utility in Haskell so that it runs in constant space on any input, including infinite inputs without any line boundaries like /dev/zero:

$ head < /dev/zero > /dev/null  # This must run in constant space

Let's keep it simple and assume that our head utility only needs to forward exactly 10 lines from stdin and to stdout.

Before pipes-bytestring, the only way to do this was using lazy IO, like this:

takeLines :: String -> String
takeLines n = unlines . take n . lines

main = do
    str <- getContents
    putStr (takeLines 10 str)

However, pipes and other streaming libraries were built to replace lazy IO so that Haskell programmers could easily reason about the order of effects and decouple them from Haskell's evaluation order. Yet, this simple head challenge remains a perennial problem for streaming libraries. To illustrate why, let's consider how conduit does this, using Data.Conduit.Text.lines:

lines :: Monad m => Conduit Text m Text

This conduit receives a stream of Text chunks of arbitrary size and outputs Text chunks one line long. This sounds reasonable until you feed your program /dev/zero. The lines conduit would then attempt to load an infinitely large chunk of zeroes as the first output and exhaust all memory.

Michael Snoyman already realized this problem and added the linesBounded conduit as a temporary workaround:

linesBounded :: MonadThrow m => Int -> Conduit Text m Text

linesBounded throws an exception when given a line that exceeds the specified length. This is the same workaround that web services use: artificially limit your input stream. I wasn't satisfied with this solution, though. How can I claim that pipes is a mature replacement for lazy IO if lazy IO soundly beats pipes on something as simple as head?

The Solution

I will introduce the solution by beginning from the final code:

Note: There was a last minute bug which I introduced in the lines function before release. Use pipes-bytestring-1.0.1, which fixes this bug, to run this example.
-- head.hs

import Pipes
import Pipes.ByteString
import Pipes.Parse (takeFree)
import Prelude hiding (lines, unlines)

    :: (Monad m)
    => Int
    -> Producer ByteString m () -> Producer ByteString m ()
takeLines n = unlines . takeFree n . lines

main = runEffect $ takeLines 10 stdin >-> stdout

Compile and run this to verify that it takes the first ten lines of input for any file:

$ ./head < head.hs
-- head.hs

import Pipes
import Pipes.ByteString
import Pipes.Parse (takeFree)
import Prelude hiding (lines, unlines)

    :: (Monad m)
    => Int

... while still handling infinitely long lines in constant space:

$ ./head < /dev/zero >/dev/null  # Runs forever in constant space

To see why this works, first take a look at the type of Pipes.ByteString.lines:

    :: (Monad m)
    => Producer ByteString m ()
    -> FreeT (Producer ByteString m) m ()

Now compare to that the type of Data.ByteString.Lazy.Char8.lines:

lines :: ByteString -> [ByteString]

pipes treats a Producer of ByteStrings as the effectful analog of a lazy ByteString:

-- '~' means "is analogous to"
Producer ByteString m ()  ~  ByteString

Similarly, pipes also treats a FreeT of Producers as the effectful analog of a list of lazy ByteStrings:

FreeT (Producer ByteString m) m ()  ~  [ByteString]

You can think of FreeT as a "linked list" of zero or more Producers, where each Producer's return value contains either the next Producer or the final return value (() in this case). So if a Producer is analogous to a lazy ByteString then a FreeT-based "linked list" of Producers is analogous to a true linked list of lazy ByteStrings.

Each layer of our FreeT is a Producer that streams one line's worth of chunks. This differs from a single chunk one line long because it's still in Producer form so we haven't actually read anything from the file yet. Also, FreeT is smart and statically enforces that we cannot read lines from the next Producer (i.e. the next line) until we finish the first line.

FreeT has a very important property which other solutions do not have: we can use FreeT to sub-divide the Producer into logical units while still keeping the original chunking scheme. Once we have these nice logical boundaries we can manipulate the FreeT using high-level list-like functions such as Pipes.Parse.takeFree:

    :: (Functor f, Monad m)
    => Int -> FreeT f m () -> FreeT f m ()

takeFree is the FreeT analog of Prelude.take. We keep the first three f layers of the FreeT and discard the rest. In this case our f is (Producer ByteString m) so if each Producer represents one line then we can take a fixed number of lines.

This works because FreeT (Producer ByteString m) m () is just an ordinary Haskell data type. This data type doesn't contain any actual chunks. Instead, it is just a description of how we might traverse our input stream. When we call takeFree on it we are just saying: "Now that I think about it, I don't intend to traverse as much of the input as I had originally planned".

unlines completes the analogy, collapsing our logical units back down into an unannotated stream of bytes:

    :: (Monad m)
    => FreeT (Producer ByteString m) m r
    -> Producer ByteString m r

We can then combine lines, takeFree and unlines to mirror the lazy IO approach:

-- Lazy IO:
takeLines n = unlines . take n . lines

-- pipes:
takeLines n = unlines . takeFree n . lines

The main difference is that the intermediate types are larger because we're moving more information in to the type system and, more importantly, moving that same information out of implicit magic.


The next pipes library in development is pipes-text. The main development bottle-neck (besides my own graduation) is that the text library does not export functions to partially decode incomplete ByteString fragments as much as possible without throwing errors. If somebody were to write that up it would speed up the release of pipes-text significantly.

As always, you can follow or contribute to pipes development or just ask questions by joining the haskell-pipes mailing list.


  1. Gabriel, wouldn't the `FreeT` trick (which is great, by the way, I love the idea) work just as well for any other streaming framework, like Conduit or io-streams?

    1. For `conduit`, yes. For, `io-streams` you would need to modify input streams to return `Either r a` (where `r` is analogous to the `conduit`\`pipes` return value) instead of `Maybe a`, and make them functors over the `r` so that they would work with `FreeT`.

  2. Could you make a type family that turns the type signature of a lazy IO function into the type signature of a pipes function?

    1. You mean like an automated way that new users can use to find out what the equivalent pipe type would be?

    2. Yes, and perhaps to give simplified type signatures to functions.

    3. I'm generally reluctant to add type synonyms or type families unless they are "sort of opaque" (i.e. the user can meaningfully interact with without understanding what the type synonym expands to).

      In some trivial use cases this works (i.e. when consuming simple functions already written for you this way, such as `lines`, `takeFree`, and `unlines`), but in other cases if you don't know that it is using `FreeT` or how that works then you are missing out on a lot of functionality, such as traversing the data type by hand if there is not a `FreeT` recursion scheme for what you have in mind.

      Also, I would like people to grow more comfortable with using `FreeT` directly as I think it is a rather fundamental type, right up there with lists. For me, I would prefer to hide things that are ugly, but there is nothing ugly about `FreeT` in my mind.

  3. "Perfect streaming" is inseparable from "Perfect parsing", it seems. Or at least from "Perfect regex-matching": splitting input at fixed separators/at fixed lengths/at encountering different sort of input symbols.

    By the way, imagine I have a string-producer which produces a potentially infinite string. I want to turn it into "Key-Value pairs" producer, by treating each line as a key-value pair in "key: value" format. So how would be "Producer KeyString" and "Producer ValueString" combined to express this?

    1. So I generally provide a two-tiered approach. For every function of producers I usually provide a lower-level parser that you can use to build it up step by step. The exception is `pipes-bytestring` where I left out most of the parsers because I wanted to get it out, but I plan on adding them later.

      I can use the example you gave: the way I would break down the problem is to first provide two parsers, one for keys and one for values, with these types:

      parseKey :: (Monad m) => StateT (Producer ByteString m r) m Key

      parseValue :: (Monad m) => StateT (Producer ByteString m r) m Value

      Then you could combine those into a parser for a key-value pair:

      parseKeyVal :: (Monad m) => StateT (Producer ByteString m r) (Key, Val)
      parseKeyVal = (,) <$> parseKey <*> parseVal

      Then I would provide a high-level function that applied that parser repeatedly for user convenience:

      keyVals :: (Monad m) => Producer ByteString m r -> Producer (Key, Val) m r

      The idea is that the parsers are more reusable but the functions are more convenient, so the pipes ecosystem usually provides both. For an example of this, see the `pipes-binary` package, which provides the `decode` parser (to decode one element from the stream) and the `decodeMany` function (to decode a stream of elements).

    2. Don't you need to parse the ":" separately ? (and throw it away ?)

    3. Yes, you would. For more sophisticated cases like this you can use `pipes-attoparsec` which turns any `attoparsec` parser into the equivalent `pipes-parse` parser (using the `parse` function). So in practice you could actually do the entire example within `pipes-attoparsec` instead of the way I just suggested.