An introduction to guarded pipes

Pipes are a very simple but powerful abstraction which can be used to implement stream-based IO, in a very similar fashion to iteratees and friends, or conduits. In this post, I introduce guarded pipes: a slight generalization of pipes which makes it possible to implement a larger class of combinators.

> {-# LANGUAGE NoMonomorphismRestriction #-}
> module Blog.Pipes.Guarded where
> 
> import Control.Category
> import Control.Monad.Free
> import Control.Monad.Identity
> import Data.Maybe
> import Data.Void
> import Prelude hiding (id, (.), until, filter)

The idea behind pipes is straightfoward: fix a base monad m, then construct the free monad over a specific PipeF functor:

> data PipeF a b m x = M (m x)
>                    | Yield b x
>                    | Await (Maybe a -> x)
> 
> instance Monad m => Functor (PipeF a b m) where
>   fmap f (M m) = M $ liftM f m
>   fmap f (Yield x c) = Yield x (f c)
>   fmap f (Await k) = Await (f . k)
> 
> type Pipe a b m r = Free (PipeF a b m) r

Generally speaking, a free monad can be thought of as an embedded language in CPS style: every summand of the base functor (PipeF in this case), is a primitive operation, while the x parameter represents the continuation at each step.

In the case of pipes, M corresponds to an effect in the base monad, Yield produces an output value, and Await blocks until it receives an input value, then passes it to its continuation. You can see that the Await continuation takes a Maybe a type: this is the only thing that distinguishes guarded pipes from regular pipes (as implemented in the pipes package on Hackage). The idea is that Await will receive Nothing whenever the pipe runs out of input values. That will give it a chance to do some cleanup or yield extra outputs. Any additional Await after that point will terminate the pipe immediately.

We can write a simplistic list-based (strict) interpreter formalizing the semantics I just described:

> evalPipe :: Monad m => Pipe a b m r -> [a] -> m [b]
> evalPipe p xs = go False xs [] p

The boolean parameter is going to be set to True as soon as we execute an Await with an empty input list.

A Pure value means that the pipe has terminated spontaneously, so we return the accumulated output list:

>   where
>     go _ _ ys (Pure r) = return (reverse ys)

Execute inner monadic effects:

>     go t xs ys (Free (M m)) = m >>= go t xs ys

Save yielded values into the accumulator:

>     go t xs ys (Free (Yield y c)) = go t xs (y : ys) c

If we still have values in the input list, feed one to the continuation of an Await statement.

>     go t (x:xs) ys (Free (Await k)) = go t xs ys $ k (Just x)

If we run out of inputs, pass Nothing to the Await continuation…

>     go False [] ys (Free (Await k)) = go True [] ys (k Nothing)

… but only the first time. If the pipe awaits again, terminate it.

>     go True [] ys (Free (Await _)) = return (reverse ys)

To simplify the implementation of actual pipes, we define the following basic combinators:

> tryAwait :: Monad m => Pipe a b m (Maybe a)
> tryAwait = wrap $ Await return
> 
> yield :: Monad m => b -> Pipe a b m ()
> yield x = wrap $ Yield x (return ())
> 
> lift :: Monad m => m r -> Pipe a b m r
> lift = wrap . M . liftM return

and a couple of secondary combinators, very useful in practice. First, a pipe that consumes all input and never produces output:

> discard :: Monad m => Pipe a b m r
> discard = forever tryAwait

then a simplified await primitive, that dies as soon as we stop feeding values to it.

> await :: Monad m => Pipe a b m a
> await = tryAwait >>= maybe discard return

now we can write a very simple pipe that sums consecutive pairs of numbers:

> sumPairs :: (Monad m, Num a) => Pipe a a m ()
> sumPairs = forever $ do
>   x <- await
>   y <- await
>   yield $ x + y

we get:

> ex1 :: [Int]
> ex1 = runIdentity $ evalPipe sumPairs [1,2,3,4]
> {- ex1 == [3, 7] -}

Composing pipes

The usefulness of pipes, however, is not limited to being able to express list transformations as monadic computations using the await and yield primitives. In fact, it turns out that two pipes can be composed sequentially to create a new pipe.

> infixl 9 >+>
> (>+>) :: Monad m => Pipe a b m r -> Pipe b c m r -> Pipe a c m r
> (>+>) = go False False
>   where

When implementing evalPipe, we needed a boolean parameter to signal upstream input exhaustion. This time, we need two boolean parameters, one for the input of the upstream pipe, and one for its output, i.e. the input of the downstream pipe. First, if downstream does anything other than waiting, we just let the composite pipe execute the same action:

>     go _ _ p1 (Pure r) = return r
>     go t1 t2 p1 (Free (Yield x c)) = yield x >> go t1 t2 p1 c
>     go t1 t2 p1 (Free (M m)) = lift m >>= \p2 -> go t1 t2 p1 p2

then, if upstream is yielding and downstream is waiting, we can feed the yielded value to the downstream pipe and continue from there:

>     go t1 t2 (Free (Yield x c)) (Free (Await k)) =
>       go t1 t2 c $ k (Just x)

if downstream is waiting and upstream is running a monadic computation, just let upstream run and keep downstream waiting:

>     go t1 t2 (Free (M m)) p2@(Free (Await _)) =
>       lift m >>= \p1 -> go t1 t2 p1 p2

if upstream terminates while downstream is waiting, finalize downstream:

>     go t1 False p1@(Pure _) (Free (Await k)) =
>       go t1 True p1 (k Nothing)

but if downstream awaits again, terminate the whole composite pipe:

>     go _ True (Pure r) (Free (Await _)) = return r

now, if both pipes are waiting, we keep the second pipe waiting and we feed whatever input we get to the first pipe. If the input is Nothing, we set the first boolean flag, so that next time the first pipe awaits, we can finalize the downstream pipe.

>     go False t2 (Free (Await k)) p2@(Free (Await _)) =
>       tryAwait >>= \x -> go (isNothing x) t2 (k x) p2
>     go True False p1@(Free (Await _)) (Free (Await k)) =
>       go True True p1 (k Nothing)
>     go True True p1@(Free (Await _)) p2@(Free (Await _)) =
>       tryAwait >>= \_ -> {- unreachable -} go True True p1 p2

This composition can be shown to be associative (in a rather strong sense), with identity given by:

> idP :: Monad m => Pipe a a m r
> idP = forever $ await >>= yield

So we can define a Category instance:

> newtype PipeC m r a b = PipeC { unPipeC :: Pipe a b m r }
> 
> instance Monad m => Category (PipeC m r) where
>   id = PipeC idP
>   (PipeC p2) . (PipeC p1) = PipeC $ p1 >+> p2

Running pipes

A runnable pipe, also called Pipeline, is a pipe that doesn’t yield any value and doesn’t wait for any input. We can formalize this in the types as follows:

> type Pipeline m r = Pipe () Void m r

Disregarding bottom, calling await on such a pipe does not return any useful value, and yielding is impossible. Another way to think of Pipeline is as an arrow (in PipeC) from the terminal object to the initial object of Hask1.

Running a pipeline is straightforward:

> runPipe :: Monad m => Pipeline m r -> m r
> runPipe (Pure r) = return r
> runPipe (Free (M m)) = m >>= runPipe
> runPipe (Free (Await k)) = runPipe $ k (Just ())
> runPipe (Free (Yield x c)) = absurd x

where the impossibility of the last case is guaranteed by the types, unless of course the pipe introduced a bottom value at some point.

The three primitive operations tryAwait, yield and lift, together with pipe composition and the runPipe function above, are basically all we need to define most pipes and pipe combinators. For example, the simple pipe interpreter evalPipe can be easily rewritten in terms of these primitives:

> evalPipe' :: Monad m => Pipe a b m r -> [a] -> m [b]
> evalPipe' p xs = runPipe $
>   (mapM_ yield xs >> return []) >+>
>   (p >> discard) >+>
>   collect id
>   where
>     collect xs =
>       tryAwait >>= maybe (return $ xs [])
>                          (\x -> collect (xs . (x:)))

Note that we use the discard pipe to turn the original pipe into an infinite one, so that the final return value will be taken from the final pipe.

Extra combinators

The rich structure on pipes (category and monad) makes it really easy to define new higher-level combinators. For example, here are implementations of some of the combinators in Data.Conduit.List, translated to pipes:

> sourceList = mapM_ yield
> sourceNull = return ()
> fold f z = go z
>   where
>     go x = tryAwait >>= maybe (return x) (go . f x)
> consume = fold (\xs x -> xs . (x:)) id >>= \xs -> return (xs [])
> sinkNull = discard
> take n = (isolate n >> return []) >+> consume
> drop n = replicateM n await >> idP
> pipe f = forever $ await >>= yield . f -- called map in conduit
> concatMap f = forever $ await >>= mapM_ yield . f
> until p = go
>   where
>     go = await >>= \x -> if p x then return () else yield x >> go
> groupBy (~=) = p >+>
>   forever (until isNothing >+>
>            pipe fromJust >+>
>            (consume >>= yield))
>   where 
>     -- the pipe p yields Nothing whenever the current item y
>     -- and the previous one x do not satisfy x ~= y, and behaves
>     -- like idP otherwise
>     p = await >>= \x -> yield (Just x) >> go x
>     go x = do
>       y <- await
>       unless (x ~= y) $ yield Nothing
>       yield $ Just y
>       go y
> isolate n = replicateM_ n $ await >>= yield
> filter p = forever $ until (not . p)

To work with the equivalent of sinks, it is useful to define a source to sink composition operator:

> infixr 2 $$
> ($$) :: Monad m => Pipe () a m r' -> Pipe a Void m r -> m (Maybe r)
> p1 $$ p2 = runPipe $ (p1 >> return Nothing) >+> liftM Just p2

which ignores the source return type, and just returns the sink return value, or Nothing if the source happens to terminate first. So we have, for example:

> ex2 :: Maybe [Int]
> ex2 = runIdentity $ sourceList [1..10] >+> isolate 4 $$ consume
> {- ex2 == Just [1,2,3,4] -}
> 
> ex3 :: Maybe [Int]
> ex3 = runIdentity $ sourceList [1..10] $$ discard
> {- ex3 == Nothing -}
> 
> ex4 :: Maybe Int
> ex4 = runIdentity $ sourceList [1,1,2,2,2,3,4,4]
>                 >+> groupBy (==)
>                 >+> pipe head
>                  $$ fold (+) 0
> {- ex4 == Just 10 -}
> 
> ex5 :: Maybe [Int]
> ex5 = runIdentity $ sourceList [1..10]
>                 >+> filter (\x -> x `mod` 3 == 0)
>                  $$ consume
> {- ex5 == Just [3, 6, 9] -}

Pipes in practice

You can find an implementation of guarded pipes in my fork of pipes. There is also a pipes-extra repository where you can find some pipes to deal with chunked ByteStreams and utilities to convert conduits to pipes.

I hope to be able to merge this into the original pipes package once the guarded pipe concept has proven its worth. Without the tryAwait primitive, combinators like fold and consume cannot be implemented, nor even a simple stateful pipe like one to split a chunked input into lines. So I think there are enough benefits to justify a little extra complexity in the definition of composition.


  1. In reality, Hask doesn’t have an initial object, and the terminal object is actually Void, because of non-strict semantics.

About these ads

10 Responses to “An introduction to guarded pipes”

  1. Gabriel Gonzalez Says:

    I think you are on the right track, but you don’t even have to mess with the original Pipe type or composition definitions. You can extend Pipes with this behavior rather cleanly as outlined below.

    First, you just create a type synonym for Pipes that accept Maybe inputs:

    type PipeG a b m r = Pipe (Maybe a) b m r

    The finalization behavior is implemented by the Pipes themselves. For example:

    quitter :: PipeG a a IO ()
    quitter = do
    m yield a >> quitter
    Nothing -> lift $ putStrLn “I’m done!”

    Then the composition instance for PipeG’s is derived from the Lazy composition instance instead of having to hand-write a new composition case statement from scratch:

    f <|< g = f <+> yield Nothing)

    Also, with this way you don’t have to define a separate evalPipe function. Producers are automatically converted to guarded producers using the above composition. I still haven’t proven the above composition satisfies the category laws, but I think it will.

    I agree that the pipes library needs a way to intercept shutdown for resource management. I like your idea and I’ll think about it some more.

  2. Gabriel Gonzalez Says:

    Oops, markdown ate my code. The composition should have been

    f <|> Nothing)
    where compose = (<+<)

  3. Gabriel Gonzalez Says:

    Gah! One last try.

    f `guardCompose` g = f `lazyCompose` (yieldMap Just g >> Nothing)
    where
    guardCompose = (<|<)
    lazyCompose = (<+<)

  4. pcapriotti Says:

    @Gabriel: That was indeed my first approach, too, but it turns out it doesn’t work. The problem with your guardCompose is that the second part is not going to be executed if the pipe awaits again after its upstream terminates (so p . idP != p if p has a finalizer).

    You can work around this by yielding Nothing directly in the “Nothing” branch of the definition of tryAwait, but then you have to remember that you have done so, otherwise you end up yielding Nothing twice (causing downstream finalizers to be called twice).

    You can find a solution along these lines in pipes-extra: https://github.com/pcapriotti/pipes-extra/blob/master/Control/Pipe/Guarded.hs (look at the history of that file for failed attempts).

    The problem is that it’s very slow: every tryAwait directive does a lot of work, and performance is severely affected. While the direct solution is basically on par with conduit in terms of performance (and a lot faster in some cases!), this GPipe implementation is 2 to 3 times slower in simple benchmarks.

    In the end, reusing the simpler original composition would be great, but I’m not sure it’s worth it. I think tryAwait is a pretty basic primitive (on top of which you can build everything that comes to mind), so it makes sense to have it baked in the internals.

  5. Gabriel Gonzalez Says:

    Well, I haven’t tested all the laws, but the example you cite does work in my proposal. You have to realize, though, that the identity pipe is different. Since guarded composition is in the category of PipeG, the identity pipe must be of type PipeG, not Pipe, and I’m pretty sure the correct identity PipeG would be:

    id = do
    x yield a >> id
    Nothing -> return()

    This satisfies the identity laws for guarded composition, as far as I can tell.

    I agree that baking it into the internals is probably faster and I will probably even do that for the Strict composition instance, too. I just want to first reason about them at a high level before encoding them into complex case statements. I’d probably leave the implementations in terms of Lazy composition in a comment alongside the baked-in implementation.

    My overall goal for the 1.1 release was to have a working finalization framework and then 1.2 and 1.3 would be performance and standard utility functions, in some order.

  6. Gabriel Gonzalez Says:

    And can somebody explain WordPress markdown to me? I’ll try to redo the above id example:

    id = do
    x <- await
    case x of
    Just a `then` do
    yield a
    id
    Nothing `then return ()

    If it still doesn't come out right, it basically forwards all Just values and terminates if it receives a Nothing. Remember that the type of this id is:

    PipeG a a m () = Pipe (Maybe a) a m ()

  7. pcapriotti Says:

    Further discussion here: http://www.reddit.com/r/haskell/comments/p8g55/guarded_pipes_or_how_to_write_conduitlist/

  8. Monoidal instances for pipes « Paolo Capriotti Says:

    […] Paolo Capriotti KDE hacking and more « An introduction to guarded pipes […]

  9. SEO Says:

    Hiya! I simply wish to give a huge thumbs up for the nice info you’ve here on this post.
    I can be coming back to your weblog for extra
    soon.

  10. Do guarded pipes behave the same as pipes using await? | Ask Programming & Technology Says:

    […] easily using the primitives await and yield. Paolo Capriotti extended the concept of pipes with guarded pipes, which uses the slightly more complicated tryAwait primitive, which allows a pipe to perform some […]

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s


Follow

Get every new post delivered to your Inbox.

%d bloggers like this: