Pipes are a very simple but powerful abstraction which can be used to implement streambased IO, in a very similar fashion to iteratees and friends, or conduits. In this post, I introduce guarded pipes: a slight generalization of pipes which makes it possible to implement a larger class of combinators.
> {# LANGUAGE NoMonomorphismRestriction #}
> module Blog.Pipes.Guarded where
>
> import Control.Category
> import Control.Monad.Free
> import Control.Monad.Identity
> import Data.Maybe
> import Data.Void
> import Prelude hiding (id, (.), until, filter)
The idea behind pipes is straightfoward: fix a base monad m
, then construct the free monad over a specific PipeF
functor:
> data PipeF a b m x = M (m x)
>  Yield b x
>  Await (Maybe a > x)
>
> instance Monad m => Functor (PipeF a b m) where
> fmap f (M m) = M $ liftM f m
> fmap f (Yield x c) = Yield x (f c)
> fmap f (Await k) = Await (f . k)
>
> type Pipe a b m r = Free (PipeF a b m) r
Generally speaking, a free monad can be thought of as an embedded language in CPS style: every summand of the base functor (PipeF
in this case), is a primitive operation, while the x
parameter represents the continuation at each step.
In the case of pipes, M
corresponds to an effect in the base monad, Yield
produces an output value, and Await
blocks until it receives an input value, then passes it to its continuation. You can see that the Await
continuation takes a Maybe a
type: this is the only thing that distinguishes guarded pipes from regular pipes (as implemented in the pipes package on Hackage). The idea is that Await
will receive Nothing
whenever the pipe runs out of input values. That will give it a chance to do some cleanup or yield extra outputs. Any additional Await
after that point will terminate the pipe immediately.
We can write a simplistic listbased (strict) interpreter formalizing the semantics I just described:
> evalPipe :: Monad m => Pipe a b m r > [a] > m [b]
> evalPipe p xs = go False xs [] p
The boolean parameter is going to be set to True
as soon as we execute an Await
with an empty input list.
A Pure
value means that the pipe has terminated spontaneously, so we return the accumulated output list:
> where
> go _ _ ys (Pure r) = return (reverse ys)
Execute inner monadic effects:
> go t xs ys (Free (M m)) = m >>= go t xs ys
Save yielded values into the accumulator:
> go t xs ys (Free (Yield y c)) = go t xs (y : ys) c
If we still have values in the input list, feed one to the continuation of an Await
statement.
> go t (x:xs) ys (Free (Await k)) = go t xs ys $ k (Just x)
If we run out of inputs, pass Nothing
to the Await
continuation…
> go False [] ys (Free (Await k)) = go True [] ys (k Nothing)
… but only the first time. If the pipe awaits again, terminate it.
> go True [] ys (Free (Await _)) = return (reverse ys)
To simplify the implementation of actual pipes, we define the following basic combinators:
> tryAwait :: Monad m => Pipe a b m (Maybe a)
> tryAwait = wrap $ Await return
>
> yield :: Monad m => b > Pipe a b m ()
> yield x = wrap $ Yield x (return ())
>
> lift :: Monad m => m r > Pipe a b m r
> lift = wrap . M . liftM return
and a couple of secondary combinators, very useful in practice. First, a pipe that consumes all input and never produces output:
> discard :: Monad m => Pipe a b m r
> discard = forever tryAwait
then a simplified await
primitive, that dies as soon as we stop feeding values to it.
> await :: Monad m => Pipe a b m a
> await = tryAwait >>= maybe discard return
now we can write a very simple pipe that sums consecutive pairs of numbers:
> sumPairs :: (Monad m, Num a) => Pipe a a m ()
> sumPairs = forever $ do
> x < await
> y < await
> yield $ x + y
we get:
> ex1 :: [Int]
> ex1 = runIdentity $ evalPipe sumPairs [1,2,3,4]
> { ex1 == [3, 7] }
Composing pipes
The usefulness of pipes, however, is not limited to being able to express list transformations as monadic computations using the await
and yield
primitives. In fact, it turns out that two pipes can be composed sequentially to create a new pipe.
> infixl 9 >+>
> (>+>) :: Monad m => Pipe a b m r > Pipe b c m r > Pipe a c m r
> (>+>) = go False False
> where
When implementing evalPipe
, we needed a boolean parameter to signal upstream input exhaustion. This time, we need two boolean parameters, one for the input of the upstream pipe, and one for its output, i.e. the input of the downstream pipe. First, if downstream does anything other than waiting, we just let the composite pipe execute the same action:
> go _ _ p1 (Pure r) = return r
> go t1 t2 p1 (Free (Yield x c)) = yield x >> go t1 t2 p1 c
> go t1 t2 p1 (Free (M m)) = lift m >>= \p2 > go t1 t2 p1 p2
then, if upstream is yielding and downstream is waiting, we can feed the yielded value to the downstream pipe and continue from there:
> go t1 t2 (Free (Yield x c)) (Free (Await k)) =
> go t1 t2 c $ k (Just x)
if downstream is waiting and upstream is running a monadic computation, just let upstream run and keep downstream waiting:
> go t1 t2 (Free (M m)) p2@(Free (Await _)) =
> lift m >>= \p1 > go t1 t2 p1 p2
if upstream terminates while downstream is waiting, finalize downstream:
> go t1 False p1@(Pure _) (Free (Await k)) =
> go t1 True p1 (k Nothing)
but if downstream awaits again, terminate the whole composite pipe:
> go _ True (Pure r) (Free (Await _)) = return r
now, if both pipes are waiting, we keep the second pipe waiting and we feed whatever input we get to the first pipe. If the input is Nothing
, we set the first boolean flag, so that next time the first pipe awaits, we can finalize the downstream pipe.
> go False t2 (Free (Await k)) p2@(Free (Await _)) =
> tryAwait >>= \x > go (isNothing x) t2 (k x) p2
> go True False p1@(Free (Await _)) (Free (Await k)) =
> go True True p1 (k Nothing)
> go True True p1@(Free (Await _)) p2@(Free (Await _)) =
> tryAwait >>= \_ > { unreachable } go True True p1 p2
This composition can be shown to be associative (in a rather strong sense), with identity given by:
> idP :: Monad m => Pipe a a m r
> idP = forever $ await >>= yield
So we can define a Category
instance:
> newtype PipeC m r a b = PipeC { unPipeC :: Pipe a b m r }
>
> instance Monad m => Category (PipeC m r) where
> id = PipeC idP
> (PipeC p2) . (PipeC p1) = PipeC $ p1 >+> p2
Running pipes
A runnable pipe, also called Pipeline
, is a pipe that doesn’t yield any value and doesn’t wait for any input. We can formalize this in the types as follows:
> type Pipeline m r = Pipe () Void m r
Disregarding bottom, calling await
on such a pipe does not return any useful value, and yielding is impossible. Another way to think of Pipeline
is as an arrow (in PipeC
) from the terminal object to the initial object of Hask^{1}.
Running a pipeline is straightforward:
> runPipe :: Monad m => Pipeline m r > m r
> runPipe (Pure r) = return r
> runPipe (Free (M m)) = m >>= runPipe
> runPipe (Free (Await k)) = runPipe $ k (Just ())
> runPipe (Free (Yield x c)) = absurd x
where the impossibility of the last case is guaranteed by the types, unless of course the pipe introduced a bottom value at some point.
The three primitive operations tryAwait
, yield
and lift
, together with pipe composition and the runPipe
function above, are basically all we need to define most pipes and pipe combinators. For example, the simple pipe interpreter evalPipe
can be easily rewritten in terms of these primitives:
> evalPipe' :: Monad m => Pipe a b m r > [a] > m [b]
> evalPipe' p xs = runPipe $
> (mapM_ yield xs >> return []) >+>
> (p >> discard) >+>
> collect id
> where
> collect xs =
> tryAwait >>= maybe (return $ xs [])
> (\x > collect (xs . (x:)))
Note that we use the discard
pipe to turn the original pipe into an infinite one, so that the final return value will be taken from the final pipe.
Extra combinators
The rich structure on pipes (category and monad) makes it really easy to define new higherlevel combinators. For example, here are implementations of some of the combinators in Data.Conduit.List, translated to pipes:
> sourceList = mapM_ yield
> sourceNull = return ()
> fold f z = go z
> where
> go x = tryAwait >>= maybe (return x) (go . f x)
> consume = fold (\xs x > xs . (x:)) id >>= \xs > return (xs [])
> sinkNull = discard
> take n = (isolate n >> return []) >+> consume
> drop n = replicateM n await >> idP
> pipe f = forever $ await >>= yield . f  called map in conduit
> concatMap f = forever $ await >>= mapM_ yield . f
> until p = go
> where
> go = await >>= \x > if p x then return () else yield x >> go
> groupBy (~=) = p >+>
> forever (until isNothing >+>
> pipe fromJust >+>
> (consume >>= yield))
> where
>  the pipe p yields Nothing whenever the current item y
>  and the previous one x do not satisfy x ~= y, and behaves
>  like idP otherwise
> p = await >>= \x > yield (Just x) >> go x
> go x = do
> y < await
> unless (x ~= y) $ yield Nothing
> yield $ Just y
> go y
> isolate n = replicateM_ n $ await >>= yield
> filter p = forever $ until (not . p)
To work with the equivalent of sinks, it is useful to define a source to sink composition operator:
> infixr 2 $$
> ($$) :: Monad m => Pipe () a m r' > Pipe a Void m r > m (Maybe r)
> p1 $$ p2 = runPipe $ (p1 >> return Nothing) >+> liftM Just p2
which ignores the source return type, and just returns the sink return value, or Nothing
if the source happens to terminate first. So we have, for example:
> ex2 :: Maybe [Int]
> ex2 = runIdentity $ sourceList [1..10] >+> isolate 4 $$ consume
> { ex2 == Just [1,2,3,4] }
>
> ex3 :: Maybe [Int]
> ex3 = runIdentity $ sourceList [1..10] $$ discard
> { ex3 == Nothing }
>
> ex4 :: Maybe Int
> ex4 = runIdentity $ sourceList [1,1,2,2,2,3,4,4]
> >+> groupBy (==)
> >+> pipe head
> $$ fold (+) 0
> { ex4 == Just 10 }
>
> ex5 :: Maybe [Int]
> ex5 = runIdentity $ sourceList [1..10]
> >+> filter (\x > x `mod` 3 == 0)
> $$ consume
> { ex5 == Just [3, 6, 9] }
Pipes in practice
You can find an implementation of guarded pipes in my fork of pipes. There is also a pipesextra repository where you can find some pipes to deal with chunked ByteStream
s and utilities to convert conduits to pipes.
I hope to be able to merge this into the original pipes package once the guarded pipe concept has proven its worth. Without the tryAwait
primitive, combinators like fold
and consume
cannot be implemented, nor even a simple stateful pipe like one to split a chunked input into lines. So I think there are enough benefits to justify a little extra complexity in the definition of composition.

In reality, Hask doesn’t have an initial object, and the terminal object is actually
Void
, because of nonstrict semantics. ↩
February 3, 2012 at 5:02 am 
I think you are on the right track, but you don’t even have to mess with the original Pipe type or composition definitions. You can extend Pipes with this behavior rather cleanly as outlined below.
First, you just create a type synonym for Pipes that accept Maybe inputs:
type PipeG a b m r = Pipe (Maybe a) b m r
The finalization behavior is implemented by the Pipes themselves. For example:
quitter :: PipeG a a IO ()
quitter = do
m yield a >> quitter
Nothing > lift $ putStrLn “I’m done!”
Then the composition instance for PipeG’s is derived from the Lazy composition instance instead of having to handwrite a new composition case statement from scratch:
f << g = f <+> yield Nothing)
Also, with this way you don’t have to define a separate evalPipe function. Producers are automatically converted to guarded producers using the above composition. I still haven’t proven the above composition satisfies the category laws, but I think it will.
I agree that the pipes library needs a way to intercept shutdown for resource management. I like your idea and I’ll think about it some more.
February 3, 2012 at 5:05 am 
Oops, markdown ate my code. The composition should have been
f <> Nothing)
where compose = (<+<)
February 3, 2012 at 5:06 am 
Gah! One last try.
f `guardCompose` g = f `lazyCompose` (yieldMap Just g >> Nothing)
where
guardCompose = (<<)
lazyCompose = (<+<)
February 3, 2012 at 8:11 am 
@Gabriel: That was indeed my first approach, too, but it turns out it doesn’t work. The problem with your guardCompose is that the second part is not going to be executed if the pipe awaits again after its upstream terminates (so p . idP != p if p has a finalizer).
You can work around this by yielding Nothing directly in the “Nothing” branch of the definition of tryAwait, but then you have to remember that you have done so, otherwise you end up yielding Nothing twice (causing downstream finalizers to be called twice).
You can find a solution along these lines in pipesextra: https://github.com/pcapriotti/pipesextra/blob/master/Control/Pipe/Guarded.hs (look at the history of that file for failed attempts).
The problem is that it’s very slow: every tryAwait directive does a lot of work, and performance is severely affected. While the direct solution is basically on par with conduit in terms of performance (and a lot faster in some cases!), this GPipe implementation is 2 to 3 times slower in simple benchmarks.
In the end, reusing the simpler original composition would be great, but I’m not sure it’s worth it. I think tryAwait is a pretty basic primitive (on top of which you can build everything that comes to mind), so it makes sense to have it baked in the internals.
February 3, 2012 at 4:34 pm 
Well, I haven’t tested all the laws, but the example you cite does work in my proposal. You have to realize, though, that the identity pipe is different. Since guarded composition is in the category of PipeG, the identity pipe must be of type PipeG, not Pipe, and I’m pretty sure the correct identity PipeG would be:
id = do
x yield a >> id
Nothing > return()
This satisfies the identity laws for guarded composition, as far as I can tell.
I agree that baking it into the internals is probably faster and I will probably even do that for the Strict composition instance, too. I just want to first reason about them at a high level before encoding them into complex case statements. I’d probably leave the implementations in terms of Lazy composition in a comment alongside the bakedin implementation.
My overall goal for the 1.1 release was to have a working finalization framework and then 1.2 and 1.3 would be performance and standard utility functions, in some order.
February 3, 2012 at 4:37 pm 
And can somebody explain WordPress markdown to me? I’ll try to redo the above id example:
id = do
x < await
case x of
Just a `then` do
yield a
id
Nothing `then return ()
If it still doesn't come out right, it basically forwards all Just values and terminates if it receives a Nothing. Remember that the type of this id is:
PipeG a a m () = Pipe (Maybe a) a m ()
February 3, 2012 at 6:14 pm 
Further discussion here: http://www.reddit.com/r/haskell/comments/p8g55/guarded_pipes_or_how_to_write_conduitlist/
February 4, 2012 at 8:26 pm 
[…] Paolo Capriotti KDE hacking and more « An introduction to guarded pipes […]
April 26, 2013 at 11:43 am 
Hiya! I simply wish to give a huge thumbs up for the nice info you’ve here on this post.
I can be coming back to your weblog for extra
soon.
November 7, 2013 at 6:18 pm 
[…] easily using the primitives await and yield. Paolo Capriotti extended the concept of pipes with guarded pipes, which uses the slightly more complicated tryAwait primitive, which allows a pipe to perform some […]