Archive for the ‘haskell’ Category

Monoidal instances for pipes

February 4, 2012

In this post, I’m going to introduce a new class of combinators for pipes, with an interesting categorical interpretation. I will be using the pipe implementation of my previous post.

> {-# LANGUAGE MultiParamTypeClasses #-}
> {-# LANGUAGE FlexibleInstances #-}
> {-# LANGUAGE TypeFamilies #-}
> {-# LANGUAGE GeneralizedNewtypeDeriving #-}
> module Blog.Pipes.MonoidalInstances where
> 
> import Blog.Pipes.Guarded hiding (groupBy)
> import qualified Control.Arrow as A
> import Control.Category
> import Control.Categorical.Bifunctor
> import Control.Category.Associative
> import Control.Category.Braided
> import Control.Category.Monoidal
> import Control.Monad (forever)
> import Control.Monad.Free
> import Data.Maybe
> import Data.Void
> import Prelude hiding ((.), id, filter, until)

When pipes were first released, some people noticed the lack of an Arrow instance. In fact, it is not hard to show that, even identifying pipes modulo some sort of observational equality, there is no Arrow instance that satisfies the arrow laws.

The problem, of course, is with first, because we already have a simple implementation of arr. If we try to implement first we immediately discover that there’s a problem with the Yield case:

first (Yield x c) = yield (x, ???) >> first c

Since ??? can be of any type, the only possible value is bottom, which of course we don’t want to introduce. Alternative definitions of first that alter the structure of a yielding pipe are not possible if we want to satisfy the law:

first p >+> pipe fst == pipe fst >+> p

Concretely, the problem is that the cartesian product in the type of first forces a sort of "synchronization point" that doesn’t necessarily exist. This is better understood if we look at the type of (***), of which first can be thought of as a special case:

(***) :: Arrow k => k a b -> k a' b' -> k (a, a') (b, b')

first = (*** id)

If the two input pipes yield at different times, there is no way to faithfully match their yielded values into a pair. There are hacks around that, but they don’t behave well compositionally, and exhibit either arbitrarily large space leaks or data loss.

This has been addressed before: stream processors, like those of the Fudgets library, being very similar to Pipes, have the same problem, and some resolutions have been proposed, although not entirely satisfactory.

Arrows as monoidal categories

It is well known within the Haskell community that Arrows correspond to so called Freyd categories, i.e. premonoidal categories with some extra structures.

Using the Monoidal class by Edward Kmett (now in the categories package on Hackage), we can try to make this idea precise.

Unfortunately, we have to use a newtype to avoid overlapping instances in the case of the Hask category:

> newtype ACat a b c = ACat { unACat :: a b c }
>   deriving (Category, A.Arrow)

First, cartesian products are a bifunctor in the category determined by an Arrow.

> instance A.Arrow a => PFunctor (,) (ACat a) (ACat a) where
>   first = ACat . A.first . unACat
> instance A.Arrow a => QFunctor (,) (ACat a) (ACat a) where
>   second = ACat . A.second . unACat
> instance A.Arrow a
>       => Bifunctor (,) (ACat a) (ACat a) (ACat a) where
>   bimap (ACat f) (ACat g) = ACat $ f A.*** g

Now we can say that products are associative, using the associativity of products in Hask:

> instance A.Arrow a => Associative (ACat a) (,) where
>   associate = ACat $ A.arr associate
> instance A.Arrow a => Disassociative (ACat a) (,) where
>   disassociate = ACat $ A.arr disassociate

Where we use the Disassociative instance to express the inverse of the associator. And finally, the Monoidal instance:

> type instance Id (ACat a) (,) = ()
> instance A.Arrow a => Monoidal (ACat a) (,) where
>   idl = ACat $ A.arr idl
>   idr = ACat $ A.arr idr
> instance A.Arrow a => Comonoidal (ACat a) (,) where
>   coidl = ACat $ A.arr coidl
>   coidr = ACat $ A.arr coidr

Where, again, the duals are actually inverses. Also, products are symmetric:

> instance A.Arrow a => Braided (ACat a) (,) where
>   braid = ACat $ A.arr braid
> instance A.Arrow a => Symmetric (ACat a) (,)

As you see, everything is trivially induced by the cartesian structure on Hask, since A.arr gives us an identity-on-objects functor. Note, however, that the Bifunctor instance is legitimate only if we assume a strong commutativity law for arrows:

first f >>> second g == second g >>> first f

which we will, for the sake of simplicity.

Replacing products with arbitrary monoidal structures

Once we express the Arrow concept in terms of monoidal categories, it is easy to generalize it to arbitrary monoidal structures on Hask.

In particular, coproducts work particularly well in the category of pipes:

> instance Monad m
>       => PFunctor Either (PipeC m r) (PipeC m r) where
>   first = PipeC . firstP . unPipeC
> 
> firstP :: Monad m => Pipe a b m r
>        -> Pipe (Either a c) (Either b c) m r
> firstP (Pure r) = return r
> firstP (Free (M m)) = lift m >>= firstP

Yielding a sum is now easy: just yield on the left component.

> firstP (Free (Yield x c)) = yield (Left x) >> firstP c

Awaiting is a little bit more involved, but still easy enough: receive left and null values normally, and act like an identity on the right.

> firstP (Free (Await k)) = go
>         where
>           go = tryAwait
>            >>= maybe (firstP $ k Nothing)
>                      (either (firstP . k . Just)
>                              (\x -> yield (Right x) >> go))

And of course we have an analogous instance on the right:

> instance Monad m
>       => QFunctor Either (PipeC m r) (PipeC m r) where
>   second = PipeC . secondP . unPipeC
> 
> secondP :: Monad m => Pipe a b m r
>         -> Pipe (Either c a) (Either c b) m r
> secondP (Pure r) = return r
> secondP (Free (M m)) = lift m >>= secondP
> secondP (Free (Yield x c)) = yield (Right x) >> secondP c
> secondP (Free (Await k)) = go
>         where
>           go = tryAwait
>            >>= maybe (secondP $ k Nothing)
>                      (either (\x -> yield (Left x) >> go)
>                              (secondP . k . Just))

And a bifunctor instance obtained by composing first and second in arbitrary order:

> instance Monad m
>       => Bifunctor Either (PipeC m r)
>                    (PipeC m r) (PipeC m r) where
>   bimap f g = first f >>> second g

At this point we can go ahead and define the remaining instances in terms of the identity-on-objects functor given by pipe:

> instance Monad m => Associative (PipeC m r) Either where
>   associate = PipeC $ pipe associate
> instance Monad m => Disassociative (PipeC m r) Either where
>   disassociate = PipeC $ pipe disassociate
> 
> type instance Id (PipeC m r) Either = Void
> instance Monad m => Monoidal (PipeC m r) Either where
>   idl = PipeC $ pipe idl
>   idr = PipeC $ pipe idr
> instance Monad m => Comonoidal (PipeC m r) Either where
>   coidl = PipeC $ pipe coidl
>   coidr = PipeC $ pipe coidr
> 
> instance Monad m => Braided (PipeC m r) Either where
>   braid = PipeC $ pipe braid
> instance Monad m => Symmetric (PipeC m r) Either

Multiplicative structures

There is still a little bit of extra structure that we might want to exploit. Since PipeC m r is a monoidal category, it induces a (pointwise) monoidal structure on its endofunctor category, so we can speak of monoid objects there. In particular, if the identity functor is a monoid, it means that we can define a "uniform" monoid structure for all the objects of our category, given in terms of natural transformations (i.e. polymorphic functions).

We can represent this specialized monoid structure with a type class (using kind polymorphism and appropriately generalized category-related type classes, it should be possible to unify this class with Monoid and even Monad, similarly to how it’s done here):

> class Monoidal k p => Multiplicative k p where
>   unit :: k (Id k p) a
>   mult :: k (p a a) a

Dually, we can have a sort of uniform coalgebra:

> class Comonoidal k p => Comultiplicative k p where
>   counit :: k a (Id k p)
>   comult :: k a (p a a)

The laws for those type classes are just the usual laws for a monoid in a (not necessarily strict) monoidal category:

first unit . mult == idl
second unit . mult == idr
mult . first mult == mult . second mult . associate

first counit . comult == coidl
second counit . comult == coidr
first diag . diag == disassociate . second diag . diag

Now, products have a comultiplicative structure on Hask (as in every category with finite products), given by the terminal object and diagonal natural transformation:

> instance Comultiplicative (->) (,) where
>   counit = const ()
>   comult x = (x, x)

while coproducts have a multiplicative structure:

> instance Multiplicative (->) Either where
>   unit = absurd
>   mult = either id id

that we can readily transport to PipeC m r using pipe:

> instance Monad m => Multiplicative (PipeC m r) Either where
>   unit = PipeC $ pipe absurd
>   mult = PipeC $ pipe mult

Somewhat surprisingly, pipes also have a comultiplicative structure of their own:

> instance Monad m => Comultiplicative (PipeC m r) Either where
>   counit = PipeC discard
>   comult = PipeC . forever $ do
>     x <- await
>     yield (Left x)
>     yield (Right x)

Heterogeneous metaprogramming

All the combinators we defined can actually be used in practice, and the division in type classes certainly sheds some light on their structure and properties, but there’s actually something deeper going on here.

The fact that the standard Arrow class uses (,) as monoidal structure is not coincidental: Hask is a cartesian closed category, so to embed Haskell’s simply typed λ-calculus into some other category structure, we need at the very least a way to transport cartesian products, i.e. a premonoidal functor [1].

However, as long as our monoidal structure is comultiplicative and symmetric, we can always recover a first-order fragment of λ-calculus inside the "guest" category, and we don’t even need an identity-on-objects functor [2].

The idea is that we can use the monoidal structure of the guest category to represent contexts, where weakening is given by counit, contraction by comult, and exchange by swap.

There is an experimental GHC branch with a preprocessor which is able to translate expressions written in an arbitrary guest language into Haskell, given instances of appropriate type classes , which correspond exactly to the ones we have defined above.

Examples

This exposition was pretty abstract, so we end with some examples.

We first need to define a few wrappers for our monoidal combinators, so we don’t have to deal with the PipeC newtype:

> split :: Monad m => Pipe a (Either a a) m r
> split = unPipeC comult
> 
> join :: Monad m => Pipe (Either a a) a m r
> join = unPipeC mult
> 
> (*+*) :: Monad m => Pipe a b m r -> Pipe a' b' m r
>       -> Pipe (Either a a') (Either b b') m r
> f *+* g = unPipeC $ bimap (PipeC f) (PipeC g)
> 
> discardL :: Monad m => Pipe (Either Void a) a m r
> discardL = unPipeC idl
> 
> discardR :: Monad m => Pipe (Either a Void) a m r
> discardR = unPipeC idr

Now let’s write a tee combinator, similar to the tee command for shell pipes:

> tee :: Monad m => Pipe a Void m r -> Pipe a a m r
> tee p = split >+> firstP p >+> discardL
> 
> printer :: Show a => Pipe a Void IO r
> printer = forever $ await >>= lift . print
> 
> ex6 :: IO ()
> ex6 = do
>   (sourceList [1..5] >+>
>     tee printer >+>
>     (fold (+) 0 >>= yield) $$
>     printer)
>   return ()
> {- ex6 == mapM_ print [1,2,3,4,5,15] -}

Another interesting exercise is reimplementing the groupBy combinator of the previous post:

> groupBy :: Monad m => (a -> a -> Bool) -> Pipe a [a] m r
> groupBy p =
>    -- split the stream in two
>    split >+>
> 
>    -- yield Nothing whenever (not (p x y))
>    -- for consecutive x y
>   ((consec >+>
>     filter (not . uncurry p) >+>
>     pipe (const Nothing)) *+*
>   
>   -- at the same time, let everything pass through
>   pipe Just) >+>
> 
>   -- now rejoin the two streams
>   join >+>
>   
>   -- then accumulate results until a Nothing is hit
>   forever (until isNothing >+>
>            pipe fromJust >+>
>            (consume >>= yield))
> 
> -- yield consecutive pairs of values
> consec :: Monad m => Pipe a (a, a) m r
> consec = await >>= go
>   where
>     go x = await >>= \y -> yield (x, y) >> go y
> 
> ex7 :: IO ()
> ex7 = do (sourceList [1,1,2,2,2,3,4,4]
>           >+> groupBy (==)
>           >+> pipe head
>            $$ printer)
>          return ()
> {- ex7 == mapM_ print [1,2,3,4] -}

References

[1] J. Power and E. Robinson, “Premonoidal categories and notions of computation,” Mathematical. Structures in Comp. Sci., vols. 7, pp. 453-468, 1997.

[2] A. Megacz, “Multi-Level Languages are Generalized Arrows,” arXiv:1007.2885, 2010.

Advertisements

An introduction to guarded pipes

February 2, 2012

Pipes are a very simple but powerful abstraction which can be used to implement stream-based IO, in a very similar fashion to iteratees and friends, or conduits. In this post, I introduce guarded pipes: a slight generalization of pipes which makes it possible to implement a larger class of combinators.

> {-# LANGUAGE NoMonomorphismRestriction #-}
> module Blog.Pipes.Guarded where
> 
> import Control.Category
> import Control.Monad.Free
> import Control.Monad.Identity
> import Data.Maybe
> import Data.Void
> import Prelude hiding (id, (.), until, filter)

The idea behind pipes is straightfoward: fix a base monad m, then construct the free monad over a specific PipeF functor:

> data PipeF a b m x = M (m x)
>                    | Yield b x
>                    | Await (Maybe a -> x)
> 
> instance Monad m => Functor (PipeF a b m) where
>   fmap f (M m) = M $ liftM f m
>   fmap f (Yield x c) = Yield x (f c)
>   fmap f (Await k) = Await (f . k)
> 
> type Pipe a b m r = Free (PipeF a b m) r

Generally speaking, a free monad can be thought of as an embedded language in CPS style: every summand of the base functor (PipeF in this case), is a primitive operation, while the x parameter represents the continuation at each step.

In the case of pipes, M corresponds to an effect in the base monad, Yield produces an output value, and Await blocks until it receives an input value, then passes it to its continuation. You can see that the Await continuation takes a Maybe a type: this is the only thing that distinguishes guarded pipes from regular pipes (as implemented in the pipes package on Hackage). The idea is that Await will receive Nothing whenever the pipe runs out of input values. That will give it a chance to do some cleanup or yield extra outputs. Any additional Await after that point will terminate the pipe immediately.

We can write a simplistic list-based (strict) interpreter formalizing the semantics I just described:

> evalPipe :: Monad m => Pipe a b m r -> [a] -> m [b]
> evalPipe p xs = go False xs [] p

The boolean parameter is going to be set to True as soon as we execute an Await with an empty input list.

A Pure value means that the pipe has terminated spontaneously, so we return the accumulated output list:

>   where
>     go _ _ ys (Pure r) = return (reverse ys)

Execute inner monadic effects:

>     go t xs ys (Free (M m)) = m >>= go t xs ys

Save yielded values into the accumulator:

>     go t xs ys (Free (Yield y c)) = go t xs (y : ys) c

If we still have values in the input list, feed one to the continuation of an Await statement.

>     go t (x:xs) ys (Free (Await k)) = go t xs ys $ k (Just x)

If we run out of inputs, pass Nothing to the Await continuation…

>     go False [] ys (Free (Await k)) = go True [] ys (k Nothing)

… but only the first time. If the pipe awaits again, terminate it.

>     go True [] ys (Free (Await _)) = return (reverse ys)

To simplify the implementation of actual pipes, we define the following basic combinators:

> tryAwait :: Monad m => Pipe a b m (Maybe a)
> tryAwait = wrap $ Await return
> 
> yield :: Monad m => b -> Pipe a b m ()
> yield x = wrap $ Yield x (return ())
> 
> lift :: Monad m => m r -> Pipe a b m r
> lift = wrap . M . liftM return

and a couple of secondary combinators, very useful in practice. First, a pipe that consumes all input and never produces output:

> discard :: Monad m => Pipe a b m r
> discard = forever tryAwait

then a simplified await primitive, that dies as soon as we stop feeding values to it.

> await :: Monad m => Pipe a b m a
> await = tryAwait >>= maybe discard return

now we can write a very simple pipe that sums consecutive pairs of numbers:

> sumPairs :: (Monad m, Num a) => Pipe a a m ()
> sumPairs = forever $ do
>   x <- await
>   y <- await
>   yield $ x + y

we get:

> ex1 :: [Int]
> ex1 = runIdentity $ evalPipe sumPairs [1,2,3,4]
> {- ex1 == [3, 7] -}

Composing pipes

The usefulness of pipes, however, is not limited to being able to express list transformations as monadic computations using the await and yield primitives. In fact, it turns out that two pipes can be composed sequentially to create a new pipe.

> infixl 9 >+>
> (>+>) :: Monad m => Pipe a b m r -> Pipe b c m r -> Pipe a c m r
> (>+>) = go False False
>   where

When implementing evalPipe, we needed a boolean parameter to signal upstream input exhaustion. This time, we need two boolean parameters, one for the input of the upstream pipe, and one for its output, i.e. the input of the downstream pipe. First, if downstream does anything other than waiting, we just let the composite pipe execute the same action:

>     go _ _ p1 (Pure r) = return r
>     go t1 t2 p1 (Free (Yield x c)) = yield x >> go t1 t2 p1 c
>     go t1 t2 p1 (Free (M m)) = lift m >>= \p2 -> go t1 t2 p1 p2

then, if upstream is yielding and downstream is waiting, we can feed the yielded value to the downstream pipe and continue from there:

>     go t1 t2 (Free (Yield x c)) (Free (Await k)) =
>       go t1 t2 c $ k (Just x)

if downstream is waiting and upstream is running a monadic computation, just let upstream run and keep downstream waiting:

>     go t1 t2 (Free (M m)) p2@(Free (Await _)) =
>       lift m >>= \p1 -> go t1 t2 p1 p2

if upstream terminates while downstream is waiting, finalize downstream:

>     go t1 False p1@(Pure _) (Free (Await k)) =
>       go t1 True p1 (k Nothing)

but if downstream awaits again, terminate the whole composite pipe:

>     go _ True (Pure r) (Free (Await _)) = return r

now, if both pipes are waiting, we keep the second pipe waiting and we feed whatever input we get to the first pipe. If the input is Nothing, we set the first boolean flag, so that next time the first pipe awaits, we can finalize the downstream pipe.

>     go False t2 (Free (Await k)) p2@(Free (Await _)) =
>       tryAwait >>= \x -> go (isNothing x) t2 (k x) p2
>     go True False p1@(Free (Await _)) (Free (Await k)) =
>       go True True p1 (k Nothing)
>     go True True p1@(Free (Await _)) p2@(Free (Await _)) =
>       tryAwait >>= \_ -> {- unreachable -} go True True p1 p2

This composition can be shown to be associative (in a rather strong sense), with identity given by:

> idP :: Monad m => Pipe a a m r
> idP = forever $ await >>= yield

So we can define a Category instance:

> newtype PipeC m r a b = PipeC { unPipeC :: Pipe a b m r }
> 
> instance Monad m => Category (PipeC m r) where
>   id = PipeC idP
>   (PipeC p2) . (PipeC p1) = PipeC $ p1 >+> p2

Running pipes

A runnable pipe, also called Pipeline, is a pipe that doesn’t yield any value and doesn’t wait for any input. We can formalize this in the types as follows:

> type Pipeline m r = Pipe () Void m r

Disregarding bottom, calling await on such a pipe does not return any useful value, and yielding is impossible. Another way to think of Pipeline is as an arrow (in PipeC) from the terminal object to the initial object of Hask1.

Running a pipeline is straightforward:

> runPipe :: Monad m => Pipeline m r -> m r
> runPipe (Pure r) = return r
> runPipe (Free (M m)) = m >>= runPipe
> runPipe (Free (Await k)) = runPipe $ k (Just ())
> runPipe (Free (Yield x c)) = absurd x

where the impossibility of the last case is guaranteed by the types, unless of course the pipe introduced a bottom value at some point.

The three primitive operations tryAwait, yield and lift, together with pipe composition and the runPipe function above, are basically all we need to define most pipes and pipe combinators. For example, the simple pipe interpreter evalPipe can be easily rewritten in terms of these primitives:

> evalPipe' :: Monad m => Pipe a b m r -> [a] -> m [b]
> evalPipe' p xs = runPipe $
>   (mapM_ yield xs >> return []) >+>
>   (p >> discard) >+>
>   collect id
>   where
>     collect xs =
>       tryAwait >>= maybe (return $ xs [])
>                          (\x -> collect (xs . (x:)))

Note that we use the discard pipe to turn the original pipe into an infinite one, so that the final return value will be taken from the final pipe.

Extra combinators

The rich structure on pipes (category and monad) makes it really easy to define new higher-level combinators. For example, here are implementations of some of the combinators in Data.Conduit.List, translated to pipes:

> sourceList = mapM_ yield
> sourceNull = return ()
> fold f z = go z
>   where
>     go x = tryAwait >>= maybe (return x) (go . f x)
> consume = fold (\xs x -> xs . (x:)) id >>= \xs -> return (xs [])
> sinkNull = discard
> take n = (isolate n >> return []) >+> consume
> drop n = replicateM n await >> idP
> pipe f = forever $ await >>= yield . f -- called map in conduit
> concatMap f = forever $ await >>= mapM_ yield . f
> until p = go
>   where
>     go = await >>= \x -> if p x then return () else yield x >> go
> groupBy (~=) = p >+>
>   forever (until isNothing >+>
>            pipe fromJust >+>
>            (consume >>= yield))
>   where 
>     -- the pipe p yields Nothing whenever the current item y
>     -- and the previous one x do not satisfy x ~= y, and behaves
>     -- like idP otherwise
>     p = await >>= \x -> yield (Just x) >> go x
>     go x = do
>       y <- await
>       unless (x ~= y) $ yield Nothing
>       yield $ Just y
>       go y
> isolate n = replicateM_ n $ await >>= yield
> filter p = forever $ until (not . p)

To work with the equivalent of sinks, it is useful to define a source to sink composition operator:

> infixr 2 $$
> ($$) :: Monad m => Pipe () a m r' -> Pipe a Void m r -> m (Maybe r)
> p1 $$ p2 = runPipe $ (p1 >> return Nothing) >+> liftM Just p2

which ignores the source return type, and just returns the sink return value, or Nothing if the source happens to terminate first. So we have, for example:

> ex2 :: Maybe [Int]
> ex2 = runIdentity $ sourceList [1..10] >+> isolate 4 $$ consume
> {- ex2 == Just [1,2,3,4] -}
> 
> ex3 :: Maybe [Int]
> ex3 = runIdentity $ sourceList [1..10] $$ discard
> {- ex3 == Nothing -}
> 
> ex4 :: Maybe Int
> ex4 = runIdentity $ sourceList [1,1,2,2,2,3,4,4]
>                 >+> groupBy (==)
>                 >+> pipe head
>                  $$ fold (+) 0
> {- ex4 == Just 10 -}
> 
> ex5 :: Maybe [Int]
> ex5 = runIdentity $ sourceList [1..10]
>                 >+> filter (\x -> x `mod` 3 == 0)
>                  $$ consume
> {- ex5 == Just [3, 6, 9] -}

Pipes in practice

You can find an implementation of guarded pipes in my fork of pipes. There is also a pipes-extra repository where you can find some pipes to deal with chunked ByteStreams and utilities to convert conduits to pipes.

I hope to be able to merge this into the original pipes package once the guarded pipe concept has proven its worth. Without the tryAwait primitive, combinators like fold and consume cannot be implemented, nor even a simple stateful pipe like one to split a chunked input into lines. So I think there are enough benefits to justify a little extra complexity in the definition of composition.


  1. In reality, Hask doesn’t have an initial object, and the terminal object is actually Void, because of non-strict semantics.

Reinversion of control with continuations

January 18, 2012

In my last post I mentioned how it is possible to achieve a form of "reinversion of control" by using (green) threads. Some commenters noted how this is effectively a solved problem, as demonstrated for example by Erlang, as well as the numerous variations on CSP currently gaining a lot of popularity.

I don’t disagree with that, but it’s just not the point of this series of posts. This is about understanding the computational structure of event-driven code, and see how it’s possible to transform it into a less awkward form without introducing concurrency (or at least not in the traditional sense of the term).

Using threads to solve what is essentially a control flow problem is cheating. And you pay in terms of increased complexity, and code which is harder to reason about, since you introduced a whole lot of interleaving opportunities and possible race conditions. Using a non-preemptive concurrency abstraction with manual yield directives (like my Python gist does) will solve that, but then you’d have to think of how to schedule your coroutines, so that is also not a complete solution.

Programmable semicolons

To find an alternative to the multitask-based approach, let’s focus on two particular lines of the last example:

    reply = start_request();
    get_data(reply)

where I added an explicit semicolon at the end of the first line. A semicolon is an important component of an imperative program, even though, syntactically, it is often omitted in languages like Python. It corresponds to the sequencing operator: execute the instruction on the left side, then pass the result to the right side and execute that.

If the instruction on the left side corresponds to an asynchronous operation, we want to alter the meaning of sequencing. Given a sequence of statements of the form

    x = A(); B(x)

we want to interpret that as: call A, then return control back to the main loop; when A is finished, bind its result to x, then run B.

So what we want is to be able to override the sequencing operator: we want programmable semicolons.

The continuation monad

Since it is often really useful to look at the types of functions to understand how exactly they fit together, we’ll leave Python and start focusing on Haskell for our running example.

We can make a very important observation immediately by looking at the type of the callback registration function that our framework offers, and try to interpret it in the context of controlled side effects (i.e. the IO monad). For Qt, it could look something like:

1
    connect :: Object -> String -> (a -> IO ()) -> IO ()

to be used, for example, like this:

1
2
    connect httpReply "finished()" $ \_ -> do
        putStrLn "request finished"

so the first argument is the object, the second is the C++ signature of the signal, and the third is a callback that will be invoked by the framework whenever the specified signal is emitted. Now, we can get rid of all the noise of actually connecting to a signal, and define a type representing just the act of registering a callback.

1
    newtype Event a = Event { on :: (a -> IO ()) -> IO () }

Doesn’t that look familiar? It is exactly the continuation monad transformer applied to the IO monad! The usual monad instance for ContT perfectly captures the semantics we are looking for:

1
2
3
4
5
    instance Monad Event where
      return x = Event $ \k -> k x
      e >>= f = Event $ \k ->
        on e $ \x ->
          on (f x) k

The return function simply calls the callback immediately with the provided value, no actual connection is performed. The bind operator represents our custom semicolon: we connect to the first event, and when that fires, we take the value it yielded, apply it to f, and connect to the resulting event.

Now we can actually translate the Python code of the previous example to Haskell:

1
2
3
4
5
6
7
8
9
10
    ex :: Event ()
    ex = forever $ do
      result <- untilRight . replicate 2 $ do
        reply <- startRequest
        either (return . Left) (liftM Right . getData) reply
      either handleError displayData result

    untilRight :: Monad m => [m (Either a b)] -> m (Either a b)
    untilRight [m] = m
    untilRight (m : ms) = m >>= either (const (untilRight ms)) (return . Right)

Again, this could be cleaned up by adding some error reporting functionality into the monad stack.

Implementing the missing functions in terms of connect is straightforward. For example, startRequest will look something like this:

1
2
3
4
5
    startRequest :: Event (Either String Reply)
    startRequest = Event $ \k -> do
      reply <- AccessManager.get "http://example.net"
      connect reply "finished()" $ \_ -> k (Right reply)
      connect reply "error(QString)" $ \e -> k (Left e)

where I took the liberty of glossing over some irrelevant API details.

How do we run such a monad? Well, the standard runContT does the job:

1
2
    runEvent :: Event () -> IO ()
    runEvent e = on $ \k -> return ()

so

1
    runEvent ex

will run until the first connection, return control to the main loop, resume when an event occurs, and so on.

Conclusion

I love the simplicity and elegance of this approach, but unfortunately, it is far from a complete solution. So far we have only dealt with "one-shot" events, but what happens when an event fires multiple times? Also, as this is still very imperative in nature, can we do better? Is it possible to employ a more functional style, with emphasis on composability?

I’ll leave the (necessarily partial) answers to those questions for a future post.

Monads for Markov chains

October 16, 2008

This is my first literate Haskell post, and actually my first post on Haskell, and my first attempt at literate programming. I spent (wasted?) a lot of time in trying to make the code segments render nicely on WordPress, with a process that was only partially automated and very tedious. I think I will cook up some script to automate it completely, if I ever get to write another such post.

Suppose you need to model a finite Markov chain in code. There are essentially two ways of doing that: one is to simply run a simulation of the Markov chain using a random number generator to obtain dice rolls and random cards from the decks, the other is to create a stochastic matrix containing the transition probabilities for each pair of states.

In this post I will show how a single monadic description of the Markov chain dynamics can be used to obtain both a simulator and the transition matrix.

> {-# LANGUAGE MultiParamTypeClasses, 
>     FlexibleInstances, 
>     GeneralizedNewtypeDeriving #-}
>
> import Control.Arrow
> import Control.Monad
> import Control.Monad.State.Strict
> import Data.Array
> import Random

Let’s start with an example of Markov chain and how we would like to be able to implement in Haskell. Consider a simplified version of the familiar Monopoly game: there are 40 squares (numbered 0 to 39), you throw two 6-sided dice each turn, some special squares have particular effects (see below), if you get a double roll three times in a row, you go to jail. The special squares are:

30: go to jail
2, 17, 33: Community Chest
7, 22, 36: Chance

Community Chest (CC) and Chance (CH) make you take a card from a deck and move to some other place depending on what’s written on the card. You will find the details on the code, so I won’t explain them here.

This is of course a Markov chain, where the states can be represented by:

> type Square = Int
> data GameState = GS {
>       position :: Square,
>       doubles :: Int } deriving (Eq, Ord, Show)

and a description of the game can be given in a monadic style like this:

> sGO :: Square
> sGO = 0
> 
> sJAIL :: Square
> sJAIL = 10
>
> finalize :: Square -> Game Square
> finalize n
>     | n == 2 || n == 17 || n == 33 = cc n
>     | n == 7 || n == 22 || n == 36 = ch n
>     | n == 30 = return sJAIL
>     | otherwise = return n
> 
> cc :: Square -> Game Square
> cc n = do i <- choose (1 :: Int, 16)
>           return $ case i of
>                      1 -> sGO
>                      2 -> sJAIL
>                      _ -> n
> 
> ch :: Square -> Game Square
> ch n = do i <- choose (1 :: Int, 16)
>           return $ case i of
>                      1 -> sGO
>                      2 -> sJAIL
>                      3 -> 11
>                      4 -> 24
>                      5 -> 39
>                      6 -> 5
>                      7 -> nextR n
>                      8 -> nextR n
>                      9 -> nextU n
>                      10 -> n - 3
>                      _ -> n
>     where
>       nextR n = let n' = n + 5
>                 in n' - (n' `mod` 5)
>       nextU n
>           | n >= 12 && n < 28 = 28
>           | otherwise = 12
> 
> roll :: Game (Int, Int)
> roll = let r1 = choose (1, 6)
>        in liftM2 (,) r1 r1
> 
> markDouble :: Bool -> Game ()
> markDouble True = modify $ \s -> s {
>                     doubles = doubles s + 1 }
> markDouble False = modify $ \s -> s {
>                      doubles = 0
>                    }
>
> goTo :: Square -> Game ()
> goTo n = let n' = n `mod` 40
>          in modify $ \s -> s { position = n' }
> 
> game :: Game ()
> game = do n <- liftM position get
>           (a, b) <- roll
>           markDouble (a == b)
>           d <- liftM doubles get
>           if d == 3
>            then do markDouble False
>                    goTo sJAIL
>            else do let n' = n + a + b
>                    n'' <- finalize n'
>                    goTo n''
> 

As you can see, Game is a state monad, with an additional function choose that gives us a random element of a range:

> class MonadState s m => MonadMC s m where
>     choose :: (Enum a) => (a, a) -> m a

This can be implemented very easily using the (strict) state monad and a random generator:

> newtype MCSim s a = MCSim (State ([s], StdGen) a)
>     deriving Monad
> 
> instance MonadState s (MCSim s) where
>     get = MCSim $ liftM (head . fst) get
>     put x = MCSim . modify $ \(xs, g) -> (x : xs, g)
> 
> instance MonadMC s (MCSim s) where
>     choose (a, b) = MCSim $
>                     do (xs, g) <- get
>                        let bnds = (fromEnum a, fromEnum b)
>                        let (y, g') = randomR bnds g
>                        put (xs, g')
>                        return . toEnum $ y
> 
> -- type Game a = MCSim GameState a
>
> runSim :: StdGen -> Int -> s -> MCSim s () -> [s]
> runSim g n start m = fst $ execState m' ([start], g)
>     where
>       (MCSim m') = foldr (>>) (return ()) $ replicate n m

The runSim function runs the simulation and returns the list of visited states. This is already quite nice, but the best thing is that the same code can be used to create the transition matrix, just swapping in a new implementation of the Game type alias:

> newtype MC s a = MC (s -> [(s, Double, a)])
> 
> instance Monad (MC s) where
>     return x = MC $ \s -> return (s, 1.0, x)
>     (MC m) >>= f = MC $ \s ->
>                    do (s, p, x) <- m s
>                       let (MC m') = f x
>                       (s', q, y) <- m' s
>                       return (s', p * q, y)
> 
> instance MonadState s (MC s) where
>     get = MC $ \s -> return (s, 1.0, s)
>     put x = MC $ \s -> return (x, 1.0, ())
> 
> instance MonadMC s (MC s) where
>     choose (a, b) = let r = [a..b]
>                         p = recip . fromIntegral . length $ r
>                     in MC $ \s -> map (\x -> (s, p, x)) r
> 
> type Game a = MC GameState a

The idea is that we keep track of all possible destination states for a given state, with associated conditional probabilities. For those familiar with Eric Kidd’s series on probability monads, this is basically:

type MC s a = StateT s (PerhapsT [] a)

Now, how to get a transition matrix from such a monad? Of course, we have to require that the states are indexable:

> markov :: Ix s =>
>           MC s () -> (s, s) -> Array (s, s) Double
> markov (MC m) r = accumArray (+) 0.0 (double r) $
>  	     	    range r >>= transitions
>     where
>       mkAssoc s (s', p, _) = ((s, s'), p)
>       transitions s = map (mkAssoc s) $ m s
> 	double (a, b) = ((a, a), (b, b))

So we iterate over all states and use the probability values contained in the monad to fill in the array cells corresponding to the selected state pair.

To actually apply this to our Monopoly example, we need to make GameState indexable:

> nextState :: GameState -> GameState
> nextState (GS p d) = if d == 2
>                      then GS (p + 1) 0
>                      else GS p (d + 1)
> 
> instance Ix GameState where
>     range (s1, s2) = takeWhile (<= s2) .
>                      iterate nextState $ s1
>     index (s1, s2) s =
>         let poss = (position s1, position s2)
>         in index poss (position s) * 3 +
>            doubles s - doubles s1
>     inRange (s1, s2) s = s1 <= s && s <= s2
>     rangeSize (s1, s2) = index (s1, s2) s2 + 1

then finally we can try:

> monopoly :: (GameState, GameState)
> monopoly = (GS 0 0, GS 39 2)
> 
> initialState :: Array GameState Double
> initialState = let n = rangeSize monopoly
>                    p = recip $ fromIntegral n
>                in listArray monopoly $ replicate n p
> 
> statDistr :: Int -> [(GameState, Double)]
> statDistr n = let mat = markov game monopoly
>                   distributions = iterate (.* mat)
>                         initialState
>                   st = distributions !! n
>               in assocs st

where .* is a simple vector-matrix multiplication function:

> infixl 5 .*
> (.*) :: (Ix i, Num a) =>
>            Array i a -> Array (i, i) a -> Array i a
> (.*) x y = array resultBounds
>               [(i, sum [x!k * y!(k,i) | k <- range (l,u)])
>                | i <- range (l'',u'') ]
>         where (l, u) = bounds x
>               ((l', l''), (u', u'')) = bounds y
>               resultBounds
>                 | (l,u)==(l',u') = (l'', u'')
>                 | otherwise = error ".*: incompatible bounds"

Calling statDistr 100 will return an association list of states with corresponding probability in an approximation of the stationary distribution, computed by applying the power method to the transition matrix. The number 100 is a pure guess, I don’t know how to estimate the number of iterations necessary for convergence, but that is out of the scope of this post, anyway.