Skip to main content

All Questions

Tagged with
0 votes
0 answers
8 views

Create Flow[ByteString, ByteString, NotUsed] by piping through InputStream

I need decompress data using compression not supported by akka, but supported by other library which provided InputStream interface. To make it work with akka stream, I need to implement function: def ...
Vistritium's user avatar
1 vote
0 answers
34 views

HTTP connection closed unexpectedly issue in akka-stream application

In my akka-streams(scala) application, I am using google pubsub as the source, querying from a BigQuery table and then publishing an outcome to a kafka topic and later acknowledging the pubsub message....
Nitesh's user avatar
  • 11
0 votes
0 answers
66 views

java.lang.IllegalStateException Sink.asPublisher(fanout = false) only supports one subscriber when using custom Playframework body parser as stream

After looking around stackoverflow and github issues, it seems this happened several times already in different contexts but never received any real help nor solutions so I try my luck once again. We ...
Bill'o's user avatar
  • 514
1 vote
1 answer
55 views

Timeout and errors "This publisher only supports one subscriber" with Akka Stream

We sometimes face weird timeout errors in client code using Akka Stream which also manifests itself with some kind of bottlenecks/deadlocks and seeing calls fails from the client point-of-view even ...
Gaël J's user avatar
  • 14.1k
0 votes
1 answer
54 views

How to create akka source that is materialized to ActorRef in which the incoming messages know the sender

val ref = Source.actorRef[String]( completionMatcher = PartialFunction.empty, failureMatcher = PartialFunction.empty, 100000, OverflowStrategy.dropNew ).to(Sink.foreachAsync(1){ ...
Vistritium's user avatar
0 votes
1 answer
38 views

Using Akka GraphDSL with Zip stages

Consider the following code: GraphDSL.create() { implicit builder => import GraphDSL.Implicits._ val in = Source(0 to 10) val fanOut = builder.add(Broadcast[Int](2)) val toString = builder....
Uko's user avatar
  • 13.3k
0 votes
0 answers
25 views

akka PubSub not working across distributed system

I have a system that is clustered deploy on k8s, it will have multiple instances when it's deployed. My code sample like below import akka.actor.typed.pubsub.Topic import akka.actor.typed.scaladsl....
somethingW's user avatar
0 votes
0 answers
22 views

akka stream merge data from multiple replicas system

I have two systems, let's refer to them as SystemA and SystemB, both deployed on a Kubernetes (k8s) cluster. SystemA is deployed with two replicas, namely SystemA-1 and SystemA-2, while SystemB has ...
somethingW's user avatar
0 votes
2 answers
106 views

Akka Streams: How to construct a Source of Sources with GraphDSL?

Here's a simple scenario. Let's begin with a single Akka Source: let's say, of rows retrieved from a database. Based on a partitioning function, different rows need to be diverted into different ...
silverberry's user avatar
0 votes
1 answer
48 views

Skip flow on failure akka streams

I wan't to skip a flow without losing the data that was sent in case of failure. But I can't found a way to do that. Here is an example code that I being using to test. val decider: Supervision....
David Santiago Gantiva Castro's user avatar
0 votes
0 answers
41 views

Akka Streams, how to 'mock' source based on configuration

I'm looking for suggestion for this scenario: I have multiple sources that I want to merge at some point and I would like to enable or disable them according to configuration. For instance, in the ...
ticofab's user avatar
  • 7,685
0 votes
3 answers
63 views

Akka Streaming - Redistribute chunks into max_permissible_chunk_size Scala

My code is uploading binary to s3 using Akka streaming as shown below: source .via(distributeChunks(MAX_BYTES_PER_CHUNK)) .throttle(maxRequestsPerSecond, 1.second, ...
Shivam Sahil's user avatar
  • 4,732
1 vote
1 answer
71 views

Akka Source from Iterator with blocking actions

The Akka documentation on Source.fromIterator (https://doc.akka.io/docs/akka/current/stream/operators/Source/fromIterator.html) says: If the iterator perform[s] blocking operations, make sure to run ...
silverberry's user avatar
0 votes
1 answer
57 views

Restricting stream based on URL or local file

I have an akka http API where users sends S3 URL to the server. Server then starts stream from AWS server and performs future action on the source. However I would like to validate the size of the ...
Sushant Somani's user avatar
0 votes
0 answers
60 views

How to ensure valid path object is returned in Java

I am going through the quickstart guide in the Akka Streams docs for Scala and in one of the examples they are using Akka Streams to write to a file. import java.nio.file.Paths import scala....
T-Braico's user avatar

15 30 50 per page
1
2 3 4 5
80