Questions tagged [fs2]
FS2: Functional Streams for Scala
160
questions
1
vote
1
answer
53
views
Scala Fs2: Aggregate computation on infinite streams
I cant seem to understand how to perform aggregate computations on infinite streams. Taking an infinite stream of elements and performing a computation on each one individually is easy, but collecting ...
0
votes
0
answers
42
views
How to force pull in fs2.Stream
I created an fs2.Stream with:
topic <- Stream.eval(fs2.concurrent.Topic[F, Either[Throwable, AmqpMessage[Array[Byte]]]])
messages <- Stream.resource(topic.subscribeAwait(innerQueueSize))
...
1
vote
1
answer
33
views
How can I update a variable within my Http4s service every set interval of time?
I am trying to continually refresh a value after a set interval of time within an Http4s service. Up until now it's been defined as just a val that maintains its initial value. I would like to ...
1
vote
0
answers
42
views
How to combine multiple ordered streams efficiently?
Given a List[Stream[F[_], A], where A <: Ordered, and each stream contains an ordered list of elements, what is an effective way to combine these streams into a single, ordered stream of all ...
1
vote
0
answers
65
views
How to Codec Case Class using Vulcan in FS2 Kafka
I have the following case classes. Trying to produce a avro message to Kafka, using FS2 and Vulcan to do the Codec.
case class People(name: String, address: Seq[Address])
case class Address(`type`: ...
1
vote
1
answer
55
views
Handling Exceptions in Scala FS2 Stream Transformation flow
import cats.effect.{IO, IOApp}
import fs2.Pipe
import fs2.Stream
object Test extends IOApp.Simple {
final case class Student(id: Int, name: String)
private val studentData: Map[Int, Student] = ...
1
vote
1
answer
73
views
How to make cancellable timeout callback?
I want that user can run a timer with callback and able to cancel it. Something like this:
def main: F[Unit] =
for
cancel <- runTimer(callback, 5.seconds)
shouldCancel <- askUser
...
0
votes
2
answers
96
views
What are some FS2 Error Hanlding Practices?
I am new to FS2, Cats Effect, etc., but I have been using Scala since 2005 and Akka since 2010...
I tuned up the FS2 example code to play with error-handling ideas, but I wondered if there are better ...
0
votes
1
answer
130
views
fs2: How to do something once the stream is started ("doOnSubscribe")?
I am trying to use an impure ("java") API in the context of a cats-effect IO-App. The impure API looks somewhat like this:
import io.reactivex.Flowable
import java.util.concurrent....
0
votes
0
answers
92
views
Stream.Eval vs Stream.generate
I am a Scala programer. For a specific project i need to revert to pure java.
I have been meaning to learn what could be an equivalent for using fs2 in java.
I found several candidate such as reactor. ...
1
vote
1
answer
155
views
Kafka fs2 stream how to set a backpressure
i have fs2 stream from kafka topic, is it possible to set, how much of data can be pulled? Be cause, currently i have a problem, if in the middle of a stream a problem occures, it end up with a memory ...
1
vote
0
answers
43
views
Does kafka producer reconnect fs2.Kafka
the microservice does not always switch to another kafka node, in case of loss of connection with the node to which it was connected and simply crashes.
If we create a producer and transfer several ...
0
votes
0
answers
81
views
Cancelling fs2 Streams When Encountering an Error in Akka Actor
I'm having the following Akka Actor that does some self messages at a fixed time interval and for this purpose, I'm using fs2 streams that can send me signals at this fixed time interval:
private[this]...
0
votes
2
answers
524
views
Handle large file with Stream in Scala
I have a large csv file with users data. I have an endpoint that gets a user and should return a boolean indicating whether the user exists in the file or not.
To avoid running out of memory I read ...
0
votes
0
answers
515
views
Kafka consumer stops listening to messages
My service has a Kafka consumer that listens to the messages from a topic. This has been working fine for over a year. But in the last couple of days, my Kafka consumer stopped listening to the Kafka ...