All Questions
Tagged with scala akka-stream
1,196
questions
0
votes
0
answers
8
views
Create Flow[ByteString, ByteString, NotUsed] by piping through InputStream
I need decompress data using compression not supported by akka, but supported by other library which provided InputStream interface.
To make it work with akka stream, I need to implement function:
def ...
1
vote
0
answers
34
views
HTTP connection closed unexpectedly issue in akka-stream application
In my akka-streams(scala) application, I am using google pubsub as the source, querying from a BigQuery table and then publishing an outcome to a kafka topic and later acknowledging the pubsub
message....
0
votes
0
answers
66
views
java.lang.IllegalStateException Sink.asPublisher(fanout = false) only supports one subscriber when using custom Playframework body parser as stream
After looking around stackoverflow and github issues, it seems this happened several times already in different contexts but never received any real help nor solutions so I try my luck once again.
We ...
1
vote
1
answer
55
views
Timeout and errors "This publisher only supports one subscriber" with Akka Stream
We sometimes face weird timeout errors in client code using Akka Stream which also manifests itself with some kind of bottlenecks/deadlocks and seeing calls fails from the client point-of-view even ...
0
votes
1
answer
54
views
How to create akka source that is materialized to ActorRef in which the incoming messages know the sender
val ref = Source.actorRef[String](
completionMatcher = PartialFunction.empty,
failureMatcher = PartialFunction.empty,
100000,
OverflowStrategy.dropNew
).to(Sink.foreachAsync(1){ ...
0
votes
1
answer
38
views
Using Akka GraphDSL with Zip stages
Consider the following code:
GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val in = Source(0 to 10)
val fanOut = builder.add(Broadcast[Int](2))
val toString = builder....
0
votes
0
answers
25
views
akka PubSub not working across distributed system
I have a system that is clustered deploy on k8s, it will have multiple instances when it's deployed.
My code sample like below
import akka.actor.typed.pubsub.Topic
import akka.actor.typed.scaladsl....
0
votes
0
answers
22
views
akka stream merge data from multiple replicas system
I have two systems, let's refer to them as SystemA and SystemB, both deployed on a Kubernetes (k8s) cluster. SystemA is deployed with two replicas, namely SystemA-1 and SystemA-2, while SystemB has ...
0
votes
2
answers
106
views
Akka Streams: How to construct a Source of Sources with GraphDSL?
Here's a simple scenario.
Let's begin with a single Akka Source: let's say, of rows retrieved from a database. Based on a partitioning function, different rows need to be diverted into different ...
0
votes
1
answer
48
views
Skip flow on failure akka streams
I wan't to skip a flow without losing the data that was sent in case of failure. But I can't found a way to do that. Here is an example code that I being using to test.
val decider: Supervision....
0
votes
0
answers
41
views
Akka Streams, how to 'mock' source based on configuration
I'm looking for suggestion for this scenario: I have multiple sources that I want to merge at some point and I would like to enable or disable them according to configuration. For instance, in the ...
0
votes
3
answers
63
views
Akka Streaming - Redistribute chunks into max_permissible_chunk_size Scala
My code is uploading binary to s3 using Akka streaming as shown below:
source
.via(distributeChunks(MAX_BYTES_PER_CHUNK))
.throttle(maxRequestsPerSecond, 1.second, ...
1
vote
1
answer
71
views
Akka Source from Iterator with blocking actions
The Akka documentation on Source.fromIterator (https://doc.akka.io/docs/akka/current/stream/operators/Source/fromIterator.html) says:
If the iterator perform[s] blocking operations, make sure to run ...
0
votes
1
answer
57
views
Restricting stream based on URL or local file
I have an akka http API where users sends S3 URL to the server. Server then starts stream from AWS server and performs future action on the source. However I would like to validate the size of the ...
0
votes
0
answers
60
views
How to ensure valid path object is returned in Java
I am going through the quickstart guide in the Akka Streams docs for Scala and in one of the examples they are using Akka Streams to write to a file.
import java.nio.file.Paths
import scala....