Lost in Technopolis

by John Wiegley

# Simpler conduit library based on monadic folds

Posted by John Wiegley on June 6, 2014 with labels: haskell

Recently I was playing around with the core types in the conduit library (attempting to change leftovers so you could only unget values you had read), when I stumbled across a formulation of those types that lead to some interesting simplifications.

Before I jump in, let’s review what any effectful streaming library should aim to accomplish. The basics are:

1. Iterate over values within a structure, or produced by a computation.

2. Cleanup resources involved in that computation once they are no longer needed.

3. Allow processing to be composed nicely, forming a “pipeline” from the initial source to a final sink.

4. It would be nice if any part of the pipeline could decide when to terminate.

What I discovered during my exploration is that all four of these requirements can be captured using simple, monadic folds, like foldM. Here is the type of foldM:

foldM :: Monad m => (a -> b -> m a) -> a -> [b] -> m a


We can obtain a slightly easier function type for our needs by reversing the arguments:

sourceList :: Monad m => [b] -> a -> (a -> b -> m a) -> m a


This says that given a list of elements of type b, sourceList returns a function that knows how to generate a result type a from a starting value by folding over every element of that list. We might trivially sum lists of integers as follows:

sourceList [1..10] 0 $\acc x -> return$ acc + x


We can abstract our summing function into a sink that works on any source of integers:

sumC :: (Num a, Monad m)
=> (a -> (a -> a -> m a) -> m a) -> m a
sumC await = await 0 $\acc x -> return$ acc + x


sumC is a higher-order function that takes a fold closure obtained from sourceList [1..10] above. (I call the closure await, although it’s behavior is a lot closer to a folding-variant of the awaitForever function from conduit). await wants a starting state, and a function to fold that state over the incoming elements.

Both of these are regular, higher-order functions, so we can build a pipeline using nothing more than function application:

sumC (sourceList [1..10])


Notice how close this is to the non-streaming version sum (id [1..10]); and if we execute the pipeline using runIdentity, the two are identical.

Since the “fold closure” argument is cumbersome to restate, let’s restate it as a type synonym:

type Source m a r = r -> (r -> a -> m r) -> m r


With this synonym, the example source and sink become:

sourceList :: Monad m => [a] -> Source m a r
sumC :: (Num a, Monad m) => Source m a a -> m a


Another pattern we’ll start noticing pretty shortly is that every “sink” is a fold from a Source down to its result type. We can capture this using another type synonym:

type Sink a m r = Source m a r -> m r


It’s not really necessary, but it advertises to the reader that we’re defining a sink. Likewise, a “conduit” is always a mapping from one source to another where the result type is common:

type Conduit a m b r = Source m a r -> Source m b r


In cases where the result types must differ (for example, the dropC function in simple-conduit), we cannot use these type synonyms, but they are handy in the majority of cases.

With these synonyms, the types of our sources and sinks should start looking familiar to users of the regular conduit library (mapC here is based on conduit-combinators):

sourceList :: Monad m => [a] -> Source m a r
mapC :: Monad m => (a -> b) -> Conduit a m b r
sumC :: (Num a, Monad m) => Sink a m a


Conduit has special operators for connecting sources with sinks, and for mapping sources to sources. We don’t need them, since we’re just applying functions to functions, but we can define them as synonyms easily enough:

infixl 1 $= ($=) :: a -> (a -> b) -> b
($=) = flip ($)

infixr 2 =$(=$) :: (a -> b) -> (b -> c) -> a -> c