All Questions
Tagged with scala apache-kafka
1,261
questions
0
votes
1
answer
27
views
How. to read or retry the process for uncommited message in kafka
lets say i am getting 1 to 15 messages from kafka. I am processing it and commiting it, if the message does satisfy the condition I am uncommiting the particular message.
Now, My question is how to ...
1
vote
0
answers
65
views
How to Codec Case Class using Vulcan in FS2 Kafka
I have the following case classes. Trying to produce a avro message to Kafka, using FS2 and Vulcan to do the Codec.
case class People(name: String, address: Seq[Address])
case class Address(`type`: ...
0
votes
0
answers
66
views
Error while running tests java.lang.NoClassDefFoundError: org/apache/kafka/common/requests/AlterPartitionRequest$Builder
Using spring boot 2.7.18 and below mentioned spring kafka test and scala library dependency.
artifact = "spring-kafka-test",
group = "org.springframework.kafka",
version = 2.9.3,
...
0
votes
0
answers
62
views
How to deserailize confluent kafka avro messages in spark scala without using abris or removing magic bytes
I am trying to deserialize messages written in confluent avro format.
I know we can use abris or remove magic bytes but Is there a way we can do it without any of these two methods?
Below is what I am ...
0
votes
1
answer
32
views
spark streaming and kafka integration dependency problem
am trying to write a script that reads data from kafka with spark streaming but when i run "sbt compile" i get this error:
sbt.librarymanagement.ResolveException: Error downloading org....
0
votes
1
answer
266
views
Exposing kafka internal metrics via Micrometer
I believe there are various internal metrics of the kafka-client code. I am wandering how to expose those metrics. I want to expose the internal metrics, how to do in scala.
Can you give me a hint how ...
0
votes
0
answers
73
views
Gatling scala Kafka avro : schema.registry.url ' were supplied but are not used yet
I am using a script to serialize and deserialize Avro messages using Gatling. I have taken this script from @tinkoff on GitHub.
While running it, I get the following error:
schema.registry.url ' were ...
0
votes
0
answers
41
views
How am I missing reading some of the kafka messages?
I have a Kafka topic with 3 partitions and I have only one consumer subscribed to that topic. Somehow I am missing reading some of the Kafka messages.
val prop = new Properties()
prop.setProperty(&...
0
votes
0
answers
76
views
Write a Spark UDF which will return a String value
I have been playing around with Apache Spark scala to see how I can deserialize avro record based on the avro record dynamically. Instead of passing avro schema directly to from_avro function, I wrote ...
0
votes
0
answers
149
views
How does TopicRecordNameStratergy work with kafka and schema registry?
I was trying to implement TopicRecordNameStrategy in Apache Kafka. I have written a producer that will produce the record using this strategy. I used a consumer to just read the record and try to ...
0
votes
0
answers
65
views
Handling Real-time Data Stream Processing with Kafka and Spark
I am working on a project where I need to implement a real-time data stream processing pipeline using Apache Kafka and Apache Spark. The goal is to ingest, process, and analyze high-velocity data ...
1
vote
1
answer
40
views
Scala, how to simplify or reuse side-effecting pattern matching logic?
I am looking for a way to refactor this code and make it cleaner. I am certain that there is a way but haven't been able to figure it out.
I am working with avro4s, I need to enable serialisation for ...
0
votes
0
answers
161
views
Resetting offset for partition after (Re-)joining group
here is the problem: due to some issues (probably network) our application losing connection with 3 of 5 our Kafka brokers:
Error sending fetch request (sessionId=2108009367, epoch=INITIAL) to node 4: ...
0
votes
0
answers
126
views
Kafka producer client is not able to connect to schema registry
I am trying to run a job on google Dataproc which pushes a record into a Kafka topic. It uses a schema registry for schema validation and serialization. Representative code below:
case class ...
0
votes
1
answer
133
views
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer
i am doing spark structured streaming with kafka and cassandra while i am running below command i got an error
spark-submit --class StreamHandler --master local[*] --packages "org.apache.spark:...