Skip to main content

All Questions

Tagged with
0 votes
1 answer
27 views

How. to read or retry the process for uncommited message in kafka

lets say i am getting 1 to 15 messages from kafka. I am processing it and commiting it, if the message does satisfy the condition I am uncommiting the particular message. Now, My question is how to ...
BALAJI's user avatar
  • 1
1 vote
0 answers
65 views

How to Codec Case Class using Vulcan in FS2 Kafka

I have the following case classes. Trying to produce a avro message to Kafka, using FS2 and Vulcan to do the Codec. case class People(name: String, address: Seq[Address]) case class Address(`type`: ...
Chen Guo's user avatar
0 votes
0 answers
66 views

Error while running tests java.lang.NoClassDefFoundError: org/apache/kafka/common/requests/AlterPartitionRequest$Builder

Using spring boot 2.7.18 and below mentioned spring kafka test and scala library dependency. artifact = "spring-kafka-test", group = "org.springframework.kafka", version = 2.9.3, ...
Aditya Setia's user avatar
0 votes
0 answers
62 views

How to deserailize confluent kafka avro messages in spark scala without using abris or removing magic bytes

I am trying to deserialize messages written in confluent avro format. I know we can use abris or remove magic bytes but Is there a way we can do it without any of these two methods? Below is what I am ...
mt_leo's user avatar
  • 67
0 votes
1 answer
32 views

spark streaming and kafka integration dependency problem

am trying to write a script that reads data from kafka with spark streaming but when i run "sbt compile" i get this error: sbt.librarymanagement.ResolveException: Error downloading org....
noureddine's user avatar
0 votes
1 answer
266 views

Exposing kafka internal metrics via Micrometer

I believe there are various internal metrics of the kafka-client code. I am wandering how to expose those metrics. I want to expose the internal metrics, how to do in scala. Can you give me a hint how ...
Vimal's user avatar
  • 1
0 votes
0 answers
73 views

Gatling scala Kafka avro : schema.registry.url ' were supplied but are not used yet

I am using a script to serialize and deserialize Avro messages using Gatling. I have taken this script from @tinkoff on GitHub. While running it, I get the following error: schema.registry.url ' were ...
Santhosh's user avatar
0 votes
0 answers
41 views

How am I missing reading some of the kafka messages?

I have a Kafka topic with 3 partitions and I have only one consumer subscribed to that topic. Somehow I am missing reading some of the Kafka messages. val prop = new Properties() prop.setProperty(&...
Gansun's user avatar
  • 301
0 votes
0 answers
76 views

Write a Spark UDF which will return a String value

I have been playing around with Apache Spark scala to see how I can deserialize avro record based on the avro record dynamically. Instead of passing avro schema directly to from_avro function, I wrote ...
pacman's user avatar
  • 815
0 votes
0 answers
149 views

How does TopicRecordNameStratergy work with kafka and schema registry?

I was trying to implement TopicRecordNameStrategy in Apache Kafka. I have written a producer that will produce the record using this strategy. I used a consumer to just read the record and try to ...
pacman's user avatar
  • 815
0 votes
0 answers
65 views

Handling Real-time Data Stream Processing with Kafka and Spark

I am working on a project where I need to implement a real-time data stream processing pipeline using Apache Kafka and Apache Spark. The goal is to ingest, process, and analyze high-velocity data ...
Sam's user avatar
  • 156
1 vote
1 answer
40 views

Scala, how to simplify or reuse side-effecting pattern matching logic?

I am looking for a way to refactor this code and make it cleaner. I am certain that there is a way but haven't been able to figure it out. I am working with avro4s, I need to enable serialisation for ...
ticofab's user avatar
  • 7,685
0 votes
0 answers
161 views

Resetting offset for partition after (Re-)joining group

here is the problem: due to some issues (probably network) our application losing connection with 3 of 5 our Kafka brokers: Error sending fetch request (sessionId=2108009367, epoch=INITIAL) to node 4: ...
Zixel's user avatar
  • 421
0 votes
0 answers
126 views

Kafka producer client is not able to connect to schema registry

I am trying to run a job on google Dataproc which pushes a record into a Kafka topic. It uses a schema registry for schema validation and serialization. Representative code below: case class ...
Tarique's user avatar
  • 609
0 votes
1 answer
133 views

Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer

i am doing spark structured streaming with kafka and cassandra while i am running below command i got an error spark-submit --class StreamHandler --master local[*] --packages "org.apache.spark:...
UTHAYAKUMAR M's user avatar

15 30 50 per page
1
2 3 4 5
85