Skip to content

Commit

Permalink
Fix C# SwitchS and Split to not create node cycles.
Browse files Browse the repository at this point in the history
  • Loading branch information
jam40jeff committed Sep 7, 2017
1 parent 06a0d2c commit fb73c43
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 16 deletions.
82 changes: 68 additions & 14 deletions c#/src/Sodium/Sodium.Tests/CellTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -301,26 +301,40 @@ public void TestDiscreteCellLoopThrowsException()
}

[Test]
public void TestStreamLoop()
public void TestDiscreteCellLoopSwitchS()
{
StreamSink<int> streamSink = new StreamSink<int>();
Stream<int> s = Transaction.Run(() =>
StreamSink<TestObject> addStreamSink = new StreamSink<TestObject>();
DiscreteCell<IReadOnlyList<TestObject>> cell = Transaction.Run(() =>
{
StreamLoop<int> sl = new StreamLoop<int>();
DiscreteCell<int> c = sl.Map(v => v + 2).Hold(0);
Stream<int> s2 = streamSink.Snapshot(c, (x, y) => x + y);
sl.Loop(s2);
return s2;
DiscreteCellLoop<IReadOnlyList<TestObject>> cellLoop = new DiscreteCellLoop<IReadOnlyList<TestObject>>();
DiscreteCell<IReadOnlyList<TestObject>> cellLocal =
cellLoop.Map(oo => oo.Select(o => o.RemoveStream.MapTo(new[] { o })).Merge((x, y) => x.Concat(y).ToArray())).SwitchS().Map<Func<IReadOnlyList<TestObject>, IReadOnlyList<TestObject>>>(o => c => c.Except(o).ToArray())
.Merge(addStreamSink.Map<Func<IReadOnlyList<TestObject>, IReadOnlyList<TestObject>>>(o => c => c.Concat(new[] { o }).ToArray()), (f, g) => c => g(f(c)))
.Snapshot(cellLoop, (f, c) => f(c))
.Hold(new TestObject[0]);
cellLoop.Loop(cellLocal);
return cellLocal;
});
List<int> @out = new List<int>();
IListener l = s.Listen(@out.Add);
streamSink.Send(3);
streamSink.Send(4);
streamSink.Send(7);
streamSink.Send(8);
IListener l = cell.Listen(c => @out.Add(c.Count));
TestObject t1 = new TestObject(1, 1);
addStreamSink.Send(t1);
TestObject t2 = new TestObject(2, 2);
addStreamSink.Send(t2);
TestObject t3 = new TestObject(3, 3);
addStreamSink.Send(t3);
t2.Remove();
TestObject t4 = new TestObject(4, 4);
Transaction.RunVoid(() =>
{
addStreamSink.Send(t4);
t3.Remove();
});
TestObject t5 = new TestObject(5, 5);
addStreamSink.Send(t5);
l.Unlisten();

CollectionAssert.AreEqual(new[] { 3, 9, 18, 28 }, @out);
CollectionAssert.AreEqual(new[] { 0, 1, 2, 3, 2, 2, 3 }, @out);
}

[Test]
Expand Down Expand Up @@ -1138,6 +1152,46 @@ public void SwitchSCatchFirst()
CollectionAssert.AreEqual(new[] { 2, 13, 14, 5 }, output);
}

[Test]
public void SwitchSCatchFirstBefore()
{
List<int> output = new List<int>();

ValueTuple<Stream<int>, StreamSink<int>, StreamSink<int>, CellSink<Stream<int>>, IListener> t = Transaction.Run(() =>
{
StreamSink<int> c1 = Stream.CreateSink<int>();
StreamSink<int> c2 = Stream.CreateSink<int>();
CellSink<Stream<int>> s = Cell.CreateSink(c1.AsStream());
c1.Send(2);
c2.Send(12);
s.Send(c2);
Stream<int> c = s.SwitchS();
IListener l = c.Listen(output.Add);
return ValueTuple.Create(c, c1, c2, s, l);
});

t.Item2.Send(3);
t.Item3.Send(13);

Transaction.RunVoid(() =>
{
t.Item2.Send(4);
t.Item3.Send(14);
t.Item4.Send(t.Item2);
});

t.Item2.Send(5);
t.Item3.Send(15);

t.Item5.Unlisten();

CollectionAssert.AreEqual(new[] { 2, 13, 14, 5 }, output);
}

[Test]
public void SwitchEarlySCatchFirst()
{
Expand Down
42 changes: 42 additions & 0 deletions c#/src/Sodium/Sodium.Tests/StreamTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -778,5 +778,47 @@ public async Task TestListenAsync()
l2.Unlisten();
resultsAndCalled.Item3.Unlisten();
}

[Test]
public void TestStreamLoop()
{
StreamSink<int> streamSink = new StreamSink<int>();
Stream<int> s = Transaction.Run(() =>
{
StreamLoop<int> sl = new StreamLoop<int>();
DiscreteCell<int> c = sl.Map(v => v + 2).Hold(0);
Stream<int> s2 = streamSink.Snapshot(c, (x, y) => x + y);
sl.Loop(s2);
return s2;
});
List<int> @out = new List<int>();
IListener l = s.Listen(@out.Add);
streamSink.Send(3);
streamSink.Send(4);
streamSink.Send(7);
streamSink.Send(8);
l.Unlisten();

CollectionAssert.AreEqual(new[] { 3, 9, 18, 28 }, @out);
}

[Test]
public void TestStreamLoopDefer()
{
StreamSink<int> streamSink = new StreamSink<int>();
Stream<int> stream = Transaction.Run(() =>
{
StreamLoop<int> streamLoop = new StreamLoop<int>();
Stream<int> streamLocal = Operational.Defer(streamSink.OrElse(streamLoop).Filter(v => v < 5).Map(v => v + 1));
streamLoop.Loop(streamLocal);
return streamLocal;
});
List<int> @out = new List<int>();
IListener l = stream.Listen(@out.Add);
streamSink.Send(2);
l.Unlisten();

CollectionAssert.AreEqual(new[] { 3, 4, 5 }, @out);
}
}
}
2 changes: 1 addition & 1 deletion c#/src/Sodium/Sodium/CellExtensionMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public static Stream<T> SwitchS<T>(this Cell<Stream<T>> csa)
});
};
trans1.Prioritized(new Node<T>(), trans2 => hInitial(trans2, csa.SampleNoTransaction()));
IListener l1 = csa.Updates(trans1).Listen(@out.Node, trans1, h, false);
IListener l1 = csa.Updates(trans1).Listen(new Node<T>(), trans1, h, false);
return @out.UnsafeAttachListener(l1).UnsafeAttachListener(currentListener);
}, false);
}
Expand Down
2 changes: 1 addition & 1 deletion c#/src/Sodium/Sodium/Operational.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static Stream<T> Defer<T>(Stream<T> s)
public static Stream<T> Split<T, TCollection>(Stream<TCollection> s) where TCollection : IEnumerable<T>
{
Stream<T> @out = new Stream<T>(s.KeepListenersAlive);
IListener l1 = s.Listen(@out.Node, (trans, aa) =>
IListener l1 = s.Listen(new Node<T>(), (trans, aa) =>
{
int childIx = 0;
foreach (T a in aa)
Expand Down

0 comments on commit fb73c43

Please sign in to comment.