Sunday, November 23, 2014

How to build library-agnostic streaming sources

The Haskell ecosystem has numerous libraries for effectful stream programming, including, but not limited to:

  • List
  • conduit
  • enumerator
  • io-streams
  • iterIO
  • iteratee
  • list-t
  • logict
  • machines
  • pipes

Unfortunately, this poses a problem for library writers. Which streaming library should they pick when providing effectful streaming components?

Sometimes the correct answer is: none of the above! We can often build streaming and effectful generators using only the base and transformers libraries!

The trick is to build polymorphic generators using only the MonadPlus and MonadTrans type classes. These generators can then be consumed by any library that provides an implementation of ListT that implements MonadPlus and MonadTrans.

MonadPlus

I like to think of MonadPlus as the "list building" type class. This is because you can assemble a list using return, mzero, and mplus:

>>> import Control.Monad (MonadPlus(..))
>>> mzero :: [Int]
[]
>>> return 1 :: [Int]
[1]
>>> return 1 `mplus` return 2 :: [Int]  -- [1] ++ [2]
[1, 2]

In other words, mzero is analogous to [], mplus is analogous to (++), and return builds a singleton list.

However, many things other than lists implement MonadPlus, including every implementation of ListT in the wild. Therefore, if we build collections using MonadPlus operations these collections will type-check as ListT streams as well.

Let's provide the following helper function to convert a list to this more general MonadPlus type:

select :: MonadPlus m => [a] -> m a
select  []    = mzero
select (x:xs) = return x `mplus` select xs

-- or: select = foldr (\x m -> return x `mplus` m) mzero

Note that this select function has some nice mathematical properties:

select (xs `mplus` ys) = (select xs) `mplus` (select ys)
select  mzero          = mzero

-- This assumes the distributivity law for `MonadPlus`
select . (f >=> g) = (select . f) >=> (select . g)
select . return    = return

MonadTrans

Using select and liftIO (from MonadIO), we can build list comprehensions that interleave effects, like this:

example :: (MonadIO m, MonadPlus m) => m ()
example = do
    x <- select [1, 2, 3]
    liftIO (putStrLn ("x = " ++ show x))
    y <- select [4, 5, 6]
    liftIO (putStrLn ("y = " ++ show y))

You can read the above code as saying:

  • Let x range from 1 to 3
  • Print x every time it selects a new value
  • For each value of x, let y range from 4 to 6
  • Print y every time it selects a new value

Notice how example doesn't depend on any particular streaming library, so we can run it with diverse ListT implementations, all of which implement MonadPlus and MonadIO:

>>> Pipes.runListT example  -- This requires `pipes-4.1.4`
x = 1
y = 4
y = 5
y = 6
x = 2
y = 4
y = 5
y = 6
x = 3
y = 4
y = 5
y = 6
>>> _ <- Control.Monad.Logic.observeAllT example
<Exact same output>
>>> _ <- ListT.toList example
<Exact same output>

However, we can use this trick for more than just list comprehensions. We can build arbitrary lazy and effectful streams this way!

Generators

Here's an example of a generator that lazily emits lines read from standard input:

import Control.Monad (MonadPlus(..))
import Control.Monad.Trans.Class (MonadTrans(..))
import System.IO (isEOF)

stdinLn :: (MonadIO m, MonadPlus m) => m String
stdinLn = do
    eof <- liftIO isEOF
    if eof
        then mzero
        else liftIO getLine `mplus` stdinLn

You can read the above code as saying:

  • Check if we are at the end of the file
  • If we're done, then return an empty list
  • If we're not done, prepend a getLine onto a recursive call to stdinLn

We can prove this works by writing a program that forwards these lines to standard output:

echo :: (MonadIO m, MonadPlus m) => m ()
echo = do
    str <- stdinLn
    liftIO (putStrLn str)

Now we can run echo using any of our favorite ListT implementations and it will do the right thing, streaming lines lazily from standard input to standard output in constant space:

>>> Pipes.runListT echo
Test<Enter>
Test
ABC<Enter>
ABC
42<Enter>
42
<Ctrl-D>
>>>

The exception is the transformers library, whose ListT implementation does not stream or run in constant space.

Combinators

We can also implement lazy variations on Control.Monad combinators using this interface.

For example, we can implement a lazy variation on replicateM using just select and replicate:

replicateM' :: MonadPlus m => Int -> m a -> m a
replicateM' n m = do
    m' <- select (replicate n m)
    m'

-- or: replicateM' n = join . select . replicate n

We can use this lazy replicateM' to lazily echo 10 lines from standard input to standard output:

example :: (MonadIO m, MonadPlus m) => m ()
example = do
    str <- replicateM' 10 (lift getLine)
    liftIO (putStrLn str)

We can also implement a lazy mapM and forM, too, except now their implementations are so trivial that they don't even deserve their own functions:

mapM' :: Monad m => (a -> m b) -> m a -> m b
mapM' = (=<<)

forM' :: MOnad m => m a -> (a -> m b) -> m a
forM' = (>>=)

example :: (MonadIO m, MonadPlus m) => m ()
example = mapM' (liftIO . print) (replicateM' 10 (liftIO getLine))

Similarly, a lazy sequence just becomes join.

Conclusion

The following streaming libraries already provide their own implementation of ListT compatible with the above trick:

  • List
  • list-t
  • LogicT
  • pipes

The other streaming libraries do not currently provide a ListT implementation, but there is no reason why they couldn't! For example, conduit could implement an internal ListT type of its own and use that as an intermediate type for converting the above abstraction to the public Source type:

convert :: Monad m
        => Data.Conduit.Internal.ListT m a -> Source m a

This polymorphic API obviously does not capture all possible streaming patterns. For example, you can not implement something like take using this API. However, I suspect that a significant number of streaming components can be written using this dependency-free interface.

Edit: Thank you to Twan van Laarhoven, who pointed out that you can sometimes use MonadIO instead of MonadTrans, which produces nicer constraints. I updated this article to incorporate his suggestion.