0

I created an fs2.Stream with:

topic      <- Stream.eval(fs2.concurrent.Topic[F, Either[Throwable, AmqpMessage[Array[Byte]]]])
messages   <- Stream.resource(topic.subscribeAwait(innerQueueSize))

the subscribeAwait returns before starting to pull elements, so when I use a created stream it seems that it won't starting pulling. I use it for the background processes with s.compile.drain.background.

one way to pull it is to create another queue:

for {
  q <- Resource.eval(cats.effect.std.Queue.unbounded[F, T])
} yield fs2.Stream.fromQueueUnterminated(q)

is there any way to start pulling without using intermidiate queue? under the hood the stream is populated using some java blocking code (by putting elements to the topic from blocking callbacks).

1

0

Browse other questions tagged or ask your own question.