Skip to content

StreamT merging and zipping + parsing updates

Pre-release
Pre-release
Compare
Choose a tag to compare
@louthy louthy released this 18 Aug 22:10
· 106 commits to main since this release

This release follows on from the last release (which featured the new StreamT type): we can now merge and zip multiple streams. There's also an update to the Prelude.parse* functions (like the Option<int> returning parseInt).

Merging

Merging multiple StreamT streams has the following behaviours:

  • async & async stream: the items merge and yield as they happen
  • async & sync stream: as each async item is yielded, a sync item is immediately yielded after
  • sync & async stream: each sync item is yielded immediately before each async item is yielded
  • sync & sync stream: each stream is perfectly interleaved

If either stream finishes first, the rest of the stream that still has items keeps yielding its own items.

There is an example of merging on in the Streams sample:

public static class Merging
{
    public static IO<Unit> run =>
        example(20).Iter().As() >>
        emptyLine;

    static StreamT<IO, Unit> example(int n) =>
        from v in evens(n) & odds(n)
        where false
        select unit;
    
    static StreamT<IO, int> evens(int n) =>
        from x in Range(0, n).AsStream<IO>()
        where isEven(x)
        from _ in magenta >> write($"{x} ")
        select x;

    static StreamT<IO, int> odds(int n) =>
        from x in Range(0, n).AsStream<IO>()
        where isOdd(x)
        from _ in yellow >> write($"{x} ")
        select x;
    
    static bool isOdd(int n) =>
        (n & 1) == 1;

    static bool isEven(int n) =>
        !isOdd(n);
}

This creates two streams: odds and evens and them merges them into a single stream using:

evens(n) & odds(n)

The output looks like this:

0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19

With differing colours depending on whether odd or even.

You can merge any number of streams with the & operator, or concatenate streams with the + operator.

Other ways to merge:

var s = stream1.Merge(stream2, ...);
var s = StreamT.merge(stream1, stream2, ...);
var s = merge(stream1, stream2, ...);  // in the Prelude

Zipping

You can zip up to four streams and the result is a stream of tuples.

Obviously, to create a tuple all of the streams need to have yielded a value and so must wait for them on each stream. But, be sure that the async streams are running independently and not blocking before being tupled.

That also means the length of the tuple stream is clamped to the shortest stream length.

Useful aspects of zipping sync and async is that you can pair async events with identifiers:

For example, imagine you have a stream of messages coming from an external source (async):

static StreamT<IO, Message> messages =>
    // create an async message stream

And a stream of natural numbers, playing the role of an identifier (sync):

static StreamT<IO, long> ids =>
    Range(0, long.MaxValue).AsStream<IO>();

Then you can tag each message with a unique identifier like so:

static StreamT<IO, (long Id, Message)> incoming =>
    ids.Zip(messages);

There's also an example in the Streams sample. It's similar to the merging example, except, instead of interleaving the odd and even streams, it tuples them:

public static class Zipping
{
    public static IO<Unit> run =>
        from x in example(10).Iter().As()
        select unit;

    static StreamT<IO, Unit> example(int n) =>
        from v in evens(n).Zip(odds(n))
        from _ in writeLine(v)
        where false
        select unit;

    static StreamT<IO, int> evens(int n) =>
        from x in Range(0, n).AsStream<IO>()
        where isEven(x)
        select x;

    static StreamT<IO, int> odds(int n) =>
        from x in Range(0, n).AsStream<IO>()
        where isOdd(x)
        select x;
    
    static bool isOdd(int n) =>
        (n & 1) == 1;

    static bool isEven(int n) =>
        !isOdd(n);
}

The output looks like this:

(0, 1)
(2, 3)
(4, 5)
(6, 7)
(8, 9)

There are no operators for zipping (because operators don't support generics), these are the options:

var s = stream1.Zip(stream2, .., stream4);
var s = StreamT.zip(stream1, .., stream4);
var s = zip(stream1, .., stream4);  // in the Prelude

Parsing

parseInt and its variants (parseLong, parseGuid, etc.) all return Option<A> where A is the type being generated from the parse. With the advent of the trait-types - in particular the Alternative<M> trait - we can now parse to any type that implements the Alternative<M> trait.

Alternative<M> is like a monoid for higher-kinds and it has an Empty<A>() function that allows us to construct a 'zero' version of higher-kind (think None in Option, but also Errors.None in types with an alternative value of Error).

The original parse* functions (that return Option), remain unchanged, but there is now an extra overload for each variant that takes the trait-implementation type as a generic parameter:

Here's the original parseInt with the new parseInt<M>:

public static Option<int> parseInt(string value) =>
    Parse<int>(int.TryParse, value);

public static K<M, int> parseInt<M>(string value)
    where M : Alternative<M> =>
    Parse<M, int>(int.TryParse, value);

To see how this helps, take a look at the run function from the SumOfSquares example:

Before:

public static class SumOfSquares
{
    public static IO<Unit> run =>
        from _ in writeLine("Enter a number to find the sum of squares")
        from s in readLine
        from n in parseInt(s).Match(Some: IO.pure, None: IO.fail<int>("expected a number!"))
        from x in example(n).Iter().As()
        select unit;

    ..
}

After

public static class SumOfSquares
{
    public static IO<Unit> run =>
        from _ in writeLine("Enter a number to find the sum of squares")
        from s in readLine
        from n in parseInt<IO>(s)
        from x in example(n).Iter().As()
        select unit;
   
    ..
}

We lift directly into the IO monad instead of into Option first (only to have to match on it straight away).

Obviously, the default alternative value might not be right, and so you can then use the | operator to catch the failure:

public static class SumOfSquares
{
    public static IO<Unit> run =>
        from _ in writeLine("Enter a number to find the sum of squares")
        from s in readLine
        from n in parseInt<IO>(s) | IO.fail<int>("expected a number!")
        from x in example(n).Iter().As()
        select unit;

    ..
}

Instead of raising an error, you could also provide a default if the parse fails:

parseInt<IO>(s) | IO.pure(0)

This is nice and elegant and, I think, shows the usefulness of the traits. I wouldn't mind removing the Option bearing parse* functions, but I don't think it hurts to keep them in.

As always, any questions or comments, please reply below.