Recently I was playing around with the core types in the conduit
library
(attempting to change leftovers so you could only unget values you had read),
when I stumbled across a formulation of those types that lead to some
interesting simplifications.
Before I jump in, let’s review what any effectful streaming library should aim to accomplish. The basics are:
- Iterate over values within a structure, or produced by a computation.
- Cleanup resources involved in that computation once they are no longer needed.
- Allow processing to be composed nicely, forming a “pipeline” from the initial source to a final sink.
- It would be nice if any part of the pipeline could decide when to terminate.
What I discovered during my exploration is that all four of these requirements
can be captured using simple, monadic folds, like foldM
. Here is the type of
foldM
:
foldM :: Monad m => (a -> b -> m a) -> a -> [b] -> m a
We can obtain a slightly easier function type for our needs by reversing the arguments:
sourceList :: Monad m => [b] -> a -> (a -> b -> m a) -> m a
This says that given a list of elements of type b
, sourceList
returns a
function that knows how to generate a result type a
from a starting value by
folding over every element of that list. We might trivially sum lists of
integers as follows:
1..10] 0 $ \acc x -> return $ acc + x sourceList [
We can abstract our summing function into a sink that works on any source of integers:
sumC :: (Num a, Monad m)
=> (a -> (a -> a -> m a) -> m a) -> m a
= await 0 $ \acc x -> return $ acc + x sumC await
sumC
is a higher-order function that takes a fold closure obtained from
sourceList [1..10]
above. (I call the closure await
, although it’s behavior is
a lot closer to a folding-variant of the awaitForever
function from conduit
).
await
wants a starting state, and a function to fold that state over the
incoming elements.
Both of these are regular, higher-order functions, so we can build a pipeline using nothing more than function application:
1..10]) sumC (sourceList [
Notice how close this is to the non-streaming version sum (id [1..10])
; and if
we execute the pipeline using runIdentity
, the two are identical.
Adding type synonyms
Since the “fold closure” argument is cumbersome to restate, let’s restate it as a type synonym:
type Source m a r = r -> (r -> a -> m r) -> m r
With this synonym, the example source and sink become:
sourceList :: Monad m => [a] -> Source m a r
sumC :: (Num a, Monad m) => Source m a a -> m a
Another pattern we’ll start noticing pretty shortly is that every “sink” is a fold from a Source down to its result type. We can capture this using another type synonym:
type Sink a m r = Source m a r -> m r
It’s not really necessary, but it advertises to the reader that we’re defining a sink. Likewise, a “conduit” is always a mapping from one source to another where the result type is common:
type Conduit a m b r = Source m a r -> Source m b r
In cases where the result types must differ (for example, the dropC
function
in simple-conduit
), we cannot use these type synonyms, but they are handy in
the majority of cases.
With these synonyms, the types of our sources and sinks should start looking
familiar to users of the regular conduit library (mapC here is based on
conduit-combinators
):
sourceList :: Monad m => [a] -> Source m a r
mapC :: Monad m => (a -> b) -> Conduit a m b r
sumC :: (Num a, Monad m) => Sink a m a
Conduit has special operators for connecting sources with sinks, and for mapping sources to sources. We don’t need them, since we’re just applying functions to functions, but we can define them as synonyms easily enough:
infixl 1 $=
($=) :: a -> (a -> b) -> b
$=) = flip ($)
(
infixr 2 =$
(=$) :: (a -> b) -> (b -> c) -> a -> c
=$) = flip (.)
(
infixr 0 $$
($$) :: a -> (a -> b) -> b
$$) = flip ($) (
We can now express the pipeline in three different ways:
+1) (sourceList [1..10]))
sumC (mapC (
$ mapC (+1) $ sourceList [1..10]
sumC
$= mapC (+1) $$ sourceList [1..10] sumC
This will perhaps seem more compelling if we use a file:
putStrLn (sourceFile "hello.hs") mapM_C
This action prints the contents of the given file, doing so in constant space and without employing lazy I/O. It handles opening and closing of the file for us, and deals properly cleanup in the case of exceptions.
Early termination
There is just one detail we haven’t implemented yet, and that is the ability for segments in the pipeline to abort processing early. To encode this, we need some short-circuiting behavior, which sounds like a job for Either:
type Source m a r =
-> (r -> a -> m (Either r r)) -> m (Either r r) r
Once we start implementing sources and sinks, it will be much more convenient
to use EitherT
instead of returning an Either
value:
type Source m a r =
-> (r -> a -> EitherT r m r) -> EitherT r m r r
This way the monadic action of EitherT
provides the short-circuiting behavior,
rather than having to encode that explicitly in various places.
And that’s it! As simple as it is, this set of types is expressive enough to
implement many of the combinators from the original conduit library. Of
course, it’s not nearly as capable, but it’s leaner, easier to understand the
core types, and significantly faster in some situations (computation of simple
pipelines over Identity
on my machine were about 45% faster).
Consumers and producers
One thing that conduit makes very easy to do is to abstract Sinks and Conduits
as Consumers, and Sources and Conduits as Producers. Based on our presentation
above such an abstraction is not possible. However, we can regain some of the
generality with a helper function: You can turn sinks into conduits using a
new combinator, returnC
:
$ returnC $ sumC $ mapC (+1) $ sourceList [1..10] sinkList