I am using scala Api "2.12.15" of spark "3.5.0" and i am trying to use a data stream to update the rows of my Postgres table :
my data stream and my table both has the same columns
I wanna use the "ip" column as key and update the other columns using the values in the data stream.
Unfortunately spark modes for writing a data stream doesn't support "update".
The main problem is that for each row of my Data Stream i need to update the postgres Table. and that doesn't seem possible.
can anyone help me ?
I have tried this:
// Read from Rate Source
val rateStream = spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
val processedStream = rateStream
.withColumn("user", lit("user1")) // Add your logic to extract/generate user
.withColumn("ip", lit("192.168.1.1")) // Add your logic to extract/generate IP
.withColumn("start_time", col("timestamp"))
.withColumn("end_time", col("timestamp").plus(expr("interval 1 hour")))
.withColumn("in_use", lit(true))
.withColumn("last_tap", col("timestamp"))
// JDBC Connection Properties
val jdbcUrl = "jdbc:postgresql://localhost:5432/your_database"
val user = "your_username"
val password = "your_password"
// Define ForeachWriter for Upsert Logic
class JDBCSink(url: String, user: String, pwd: String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] {
var connection: Connection = _
var statement: java.sql.Statement = _
def open(partitionId: Long, epochId: Long): Boolean = {
connection = DriverManager.getConnection(url, user, pwd)
connection.setAutoCommit(false)
true
}
def process(value: org.apache.spark.sql.Row): Unit = {
val user = value.getAs[String]("user")
val ip = value.getAs[String]("ip")
val startTime = value.getAs[java.sql.Timestamp]("start_time")
val endTime = value.getAs[java.sql.Timestamp]("end_time")
val inUse = value.getAs[Boolean]("in_use")
val lastTap = value.getAs[java.sql.Timestamp]("last_tap")
val sql =
s"""
|INSERT INTO ip_table ("user", ip, start_time, end_time, in_use, last_tap)
|VALUES ('$user', '$ip', '$startTime', '$endTime', $inUse, '$lastTap')
|ON CONFLICT (ip) DO UPDATE SET
| "user" = EXCLUDED."user",
| start_time = EXCLUDED.start_time,
| end_time = EXCLUDED.end_time,
| in_use = EXCLUDED.in_use,
| last_tap = EXCLUDED.last_tap;
""".stripMargin
statement = connection.createStatement()
statement.executeUpdate(sql)
statement.close()
}
def close(errorOrNull: Throwable): Unit = {
if (connection != null) {
if (errorOrNull == null) {
connection.commit()
} else {
connection.rollback()
}
connection.close()
}
}
}
// Write Stream to PostgreSQL using Custom ForeachWriter
val query = processedStream.writeStream
.foreach(new JDBCSink(jdbcUrl, user, password))
.outputMode("update") // Use "append" if you don't need to reprocess updates
.start()
query.awaitTermination()
}
}
which wasn't working at all and i don't think the solution is that complicated(with so many code) !
that code is generated using chat-gpt