Friday, September 20, 2013

Perfect streaming using `pipes-bytestring`

pipes-bytestring-1.0.0 is complete, providing pipes utilities for reading and writing effectul streams of ByteStrings. Most people who have been waiting on this library were mainly interested in the following four pipes:

stdin :: MonadIO m => Producer' ByteString m ()

stdout :: MonadIO m => Consumer' ByteString m ()

fromHandle :: MonadIO m => Handle -> Producer' ByteString m ()

toHandle :: MonadIO m => Handle -> Consumer' ByteString m r

However, I delayed releasing pipes-bytestring for so long because there was a separate problem I wanted to solve first, which I like to call the Perfect Streaming Problem.

The Problem

Here is the statement of the Perfect Streaming Problem:

How do you partition an effectful stream into logical units while strictly enforcing an upper bound on memory use?

This problem will resonate with anybody who has ever built a streaming web service: how do you group your stream of bytes into logical units like files or messages? If you naively load each file or message into memory then a malicious input that is one extremely long file or message will take down your server by exhausting all available memory. Most web services work around this by imposing artificial limits on file or message length, or by writing brittle and non-compositional code or protocols that are difficult to understand, debug, and maintain.

However, we don't have to invoke web services to illustrate this problem. Just consider this simpler problem: how do you implement the Unix head utility in Haskell so that it runs in constant space on any input, including infinite inputs without any line boundaries like /dev/zero:

$ head < /dev/zero > /dev/null  # This must run in constant space

Let's keep it simple and assume that our head utility only needs to forward exactly 10 lines from stdin and to stdout.

Before pipes-bytestring, the only way to do this was using lazy IO, like this:

takeLines :: String -> String
takeLines n = unlines . take n . lines

main = do
    str <- getContents
    putStr (takeLines 10 str)

However, pipes and other streaming libraries were built to replace lazy IO so that Haskell programmers could easily reason about the order of effects and decouple them from Haskell's evaluation order. Yet, this simple head challenge remains a perennial problem for streaming libraries. To illustrate why, let's consider how conduit does this, using Data.Conduit.Text.lines:

lines :: Monad m => Conduit Text m Text

This conduit receives a stream of Text chunks of arbitrary size and outputs Text chunks one line long. This sounds reasonable until you feed your program /dev/zero. The lines conduit would then attempt to load an infinitely large chunk of zeroes as the first output and exhaust all memory.

Michael Snoyman already realized this problem and added the linesBounded conduit as a temporary workaround:

linesBounded :: MonadThrow m => Int -> Conduit Text m Text

linesBounded throws an exception when given a line that exceeds the specified length. This is the same workaround that web services use: artificially limit your input stream. I wasn't satisfied with this solution, though. How can I claim that pipes is a mature replacement for lazy IO if lazy IO soundly beats pipes on something as simple as head?

The Solution

I will introduce the solution by beginning from the final code:

Note: There was a last minute bug which I introduced in the lines function before release. Use pipes-bytestring-1.0.1, which fixes this bug, to run this example.
-- head.hs

import Pipes
import Pipes.ByteString
import Pipes.Parse (takeFree)
import Prelude hiding (lines, unlines)

    :: (Monad m)
    => Int
    -> Producer ByteString m () -> Producer ByteString m ()
takeLines n = unlines . takeFree n . lines

main = runEffect $ takeLines 10 stdin >-> stdout

Compile and run this to verify that it takes the first ten lines of input for any file:

$ ./head < head.hs
-- head.hs

import Pipes
import Pipes.ByteString
import Pipes.Parse (takeFree)
import Prelude hiding (lines, unlines)

    :: (Monad m)
    => Int

... while still handling infinitely long lines in constant space:

$ ./head < /dev/zero >/dev/null  # Runs forever in constant space

To see why this works, first take a look at the type of Pipes.ByteString.lines:

    :: (Monad m)
    => Producer ByteString m ()
    -> FreeT (Producer ByteString m) m ()

Now compare to that the type of Data.ByteString.Lazy.Char8.lines:

lines :: ByteString -> [ByteString]

pipes treats a Producer of ByteStrings as the effectful analog of a lazy ByteString:

-- '~' means "is analogous to"
Producer ByteString m ()  ~  ByteString

Similarly, pipes also treats a FreeT of Producers as the effectful analog of a list of lazy ByteStrings:

FreeT (Producer ByteString m) m ()  ~  [ByteString]

You can think of FreeT as a "linked list" of zero or more Producers, where each Producer's return value contains either the next Producer or the final return value (() in this case). So if a Producer is analogous to a lazy ByteString then a FreeT-based "linked list" of Producers is analogous to a true linked list of lazy ByteStrings.

Each layer of our FreeT is a Producer that streams one line's worth of chunks. This differs from a single chunk one line long because it's still in Producer form so we haven't actually read anything from the file yet. Also, FreeT is smart and statically enforces that we cannot read lines from the next Producer (i.e. the next line) until we finish the first line.

FreeT has a very important property which other solutions do not have: we can use FreeT to sub-divide the Producer into logical units while still keeping the original chunking scheme. Once we have these nice logical boundaries we can manipulate the FreeT using high-level list-like functions such as Pipes.Parse.takeFree:

    :: (Functor f, Monad m)
    => Int -> FreeT f m () -> FreeT f m ()

takeFree is the FreeT analog of Prelude.take. We keep the first three f layers of the FreeT and discard the rest. In this case our f is (Producer ByteString m) so if each Producer represents one line then we can take a fixed number of lines.

This works because FreeT (Producer ByteString m) m () is just an ordinary Haskell data type. This data type doesn't contain any actual chunks. Instead, it is just a description of how we might traverse our input stream. When we call takeFree on it we are just saying: "Now that I think about it, I don't intend to traverse as much of the input as I had originally planned".

unlines completes the analogy, collapsing our logical units back down into an unannotated stream of bytes:

    :: (Monad m)
    => FreeT (Producer ByteString m) m r
    -> Producer ByteString m r

We can then combine lines, takeFree and unlines to mirror the lazy IO approach:

-- Lazy IO:
takeLines n = unlines . take n . lines

-- pipes:
takeLines n = unlines . takeFree n . lines

The main difference is that the intermediate types are larger because we're moving more information in to the type system and, more importantly, moving that same information out of implicit magic.


The next pipes library in development is pipes-text. The main development bottle-neck (besides my own graduation) is that the text library does not export functions to partially decode incomplete ByteString fragments as much as possible without throwing errors. If somebody were to write that up it would speed up the release of pipes-text significantly.

As always, you can follow or contribute to pipes development or just ask questions by joining the haskell-pipes mailing list.

Saturday, September 7, 2013

pipes-4.0: Simpler types and API

I'm announcing pipes-4.0 which greatly simplifies the types and API of the pipes ecosystem. For people new to pipes, pipes is a compositional streaming library that decouples streaming programs into simpler, reusable components.

The purpose behind pipes is to simplify effectful stream programming for both library authors and application writers. Library authors can package streaming components into a highly reusable interface, and application writers can easily connect together streaming components to build correct, efficient, and low-memory streaming programs. For example, the following program connects three reusable components to stream lines from standard input to standard output until the user types "quit":

import Pipes
import qualified Pipes.Prelude as P

main = runEffect $
    P.stdinLn >-> P.takeWhile (/= "quit") >-> P.stdoutLn

pipes distinguishes itself from other stream programming libraries in three main ways:

  • Insistence on elegance, symmetry, and theoretical purity
  • Careful attention to correctness, documentation, and detail
  • Emphasis on a high power-to-weight ratio

Important links


This release was made possible due to the suggestions and contributions of many people and I want to give special mention to several people:

  • Renzo Carbonara, who is the largest contributor of downstream libraries, building and maintaining pipes-network, pipes-network-tls, pipes-zlib, pipes-binary, pipes-attoparsec and pipes-aeson. He also provided lots of useful feedback on design proposals because of his experience maintaining these libraries.

  • Ben Gamari, who contributed pipes-vector and pipes-interleave.

  • Oliver Charles who contributed to the design of the new pipes-parse library in the process of developing the pipes-tar library.

  • Csernik Flaviu Andrei, who contributed the complete benchmark suite for pipes.

  • Merijn Verstraaten, who contribute the new mtl instances for pipes.

  • Gergely Risko, who fixed a concurrency bug in pipes-concurrency.

  • Mihaly Barasz, who contributed the complete test suite for pipes-concurrency.

  • Tony Day who helped automate the pipes ecosystem and contributed lots of useful feedback on documentation.

  • Aleksey Khudyakov first proposed the idea to remove the old proxy transformer system and outsource the same functionality to monad transformers in the base monad. This change alone accounted for an almost 60% reduction in the library size and the greatest simplification of the types.

  • Johan Tibell proposed the initial idea to provide a simpler unidirectional subset of the API by default. This removed the warts that bidirectionality introduced.

  • Florian Hofmann whose work on pipes-eep led to the discovery of an Arrow instance for push-based pipes.

  • Aristid Breitkreuz and Pierre Radermecker, who both caught important bugs in pipes-parse and pipes-safe.

  • Oliver Batchelor, whose work on integrating pipes with Cloud Haskell improved the design of pipes-safe.

Also, I would like to also thank everybody who provided feedback on the library and its documentation and also contributed code.

Change Log

People familiar with pipes will notice that the biggest change to the library is the elimination of the proxy transformer system. This was made possible by an insight of Aleksey Khudyakov that the proxy transformers were isomorphism to monad transformers in the base monad if you ignored their ability to be unwrapped before the Proxy layer. I later discovered how to unwrap these base monad transformers while preserving the Proxy layer, which made possible the complete elimination of the proxy transformer system.

This had the largest impact on simplifying the API:

  • The number of exported functions dropped to approximately 1/3 of the original size (from about 300+ to 100+)

  • The number of modules dropped to 1/3 of the original size (from 18 to 6)

  • The p type parameter in type signatures disappeared, along with the Proxy type class, which became the concrete Proxy type (which was the old ProxyFast implementation).

  • No need for runIdentityP any longer

The next most important change was a simplification of the API to a unidirectional subset which is the new default. This fixed several warts of the previous API:

  • No more gratuitous () parameters

  • The pipe monad and category now overlap

  • Polymorphic type synonyms can now be used to simplify the types

The original bidirectional functionality still remains intact within the Pipes.Core module. The only difference is that it is not exported by default.

The next important change was the realization that bind in the respond Category (i.e. (//>)) was exactly equivalent to a for loop, so the unidirectional API now uses for as a synonym for (//>) and produces really beautiful for loops.

Other important syntactic changes:

  • The unidirectional API uses yield instead of respond like it was back in pipes-1.0

  • The unidirectional API uses await instead of request like it was back in pipes-1.0

  • runProxy is now runEffect

  • (>->) is the unidirectional pull-based composition, instead of bidirectional composition

Pipes.Prelude has also changed to remove the suffix from all utilities, but is no longer re-exported from the main Pipes module.

The downstream libraries have been updated as well to use the pipes-4.0 API and several of these now have much simpler APIs, too, particularly pipes-safe. I will discuss these libraries in separate library-specific posts later on.

Future Goals

This release is intended to be the last major version bump. The next development priorities are:

  • Stabilize the core pipes library

  • Improve distribution by packaging up pipes for several package managers

  • Continue to build out the pipes ecosystem, particularly dedicated Text and ByteString libraries

The long-term goal is to get pipes into the Haskell platform once the API has proven itself stable and the ecosystem matures.

People interested in learning more about pipes or contributing to development can join the official mailing list.