diff --git a/util-core/src/main/scala/com/twitter/concurrent/AsyncStream.scala b/util-core/src/main/scala/com/twitter/concurrent/AsyncStream.scala index a26def1ace..770c9b3758 100644 --- a/util-core/src/main/scala/com/twitter/concurrent/AsyncStream.scala +++ b/util-core/src/main/scala/com/twitter/concurrent/AsyncStream.scala @@ -647,7 +647,7 @@ object AsyncStream { def fromSeq[A](seq: Seq[A]): AsyncStream[A] = seq match { case Nil => empty case _ if SeqUtil.hasKnownSize(seq) && seq.tail.isEmpty => of(seq.head) - case _ => seq.head +:: fromSeq(seq.tail) + case _ => of(seq.head).flatMap(_ +:: fromSeq(seq.tail)) } /** diff --git a/util-core/src/test/scala/com/twitter/concurrent/AsyncStreamTest.scala b/util-core/src/test/scala/com/twitter/concurrent/AsyncStreamTest.scala index d24f1fcb7f..a60ed91ee8 100644 --- a/util-core/src/test/scala/com/twitter/concurrent/AsyncStreamTest.scala +++ b/util-core/src/test/scala/com/twitter/concurrent/AsyncStreamTest.scala @@ -748,6 +748,20 @@ class AsyncStreamTest extends AnyFunSuite with ScalaCheckDrivenPropertyChecks { assert(Await.result(stream.toSeq()) == Seq(n)) } + + test(s"$impl: fromSeq is stack-safe") { + val n = 100000 + val longSeq = (0 until n).toSeq + val stream = AsyncStream.fromSeq(longSeq) + .filter(_ > 10) + .filter(_ > 100) + .filter(_ > 1000) + .filter(_ > 10000) + .filter(_ > 100000) + .take(Int.MaxValue) + + assert(Await.result(stream.toSeq.liftToTry).isReturn) + } } } @@ -805,4 +819,4 @@ private object AsyncStreamTest { } def seqImpl: Seq[FromSeq] = Seq(Cons, EmbeddedCons, OfFuture, EmbeddedFuture) -} \ No newline at end of file +}