-1

I am using scala Api "2.12.15" of spark "3.5.0" and i am trying to use a data stream to update the rows of my Postgres table :
my data stream and my table both has the same columns
I wanna use the "ip" column as key and update the other columns using the values in the data stream.
Unfortunately spark modes for writing a data stream doesn't support "update".
The main problem is that for each row of my Data Stream i need to update the postgres Table. and that doesn't seem possible.
can anyone help me ?

I have tried this:

// Read from Rate Source
    val rateStream = spark.readStream
      .format("rate")
      .option("rowsPerSecond", 1)
      .load()

    val processedStream = rateStream
      .withColumn("user", lit("user1"))  // Add your logic to extract/generate user
      .withColumn("ip", lit("192.168.1.1"))  // Add your logic to extract/generate IP
      .withColumn("start_time", col("timestamp"))
      .withColumn("end_time", col("timestamp").plus(expr("interval 1 hour")))
      .withColumn("in_use", lit(true))
      .withColumn("last_tap", col("timestamp"))

    // JDBC Connection Properties
    val jdbcUrl = "jdbc:postgresql://localhost:5432/your_database"
    val user = "your_username"
    val password = "your_password"

    // Define ForeachWriter for Upsert Logic
    class JDBCSink(url: String, user: String, pwd: String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] {
      var connection: Connection = _
      var statement: java.sql.Statement = _

      def open(partitionId: Long, epochId: Long): Boolean = {
        connection = DriverManager.getConnection(url, user, pwd)
        connection.setAutoCommit(false)
        true
      }

      def process(value: org.apache.spark.sql.Row): Unit = {
        val user = value.getAs[String]("user")
        val ip = value.getAs[String]("ip")
        val startTime = value.getAs[java.sql.Timestamp]("start_time")
        val endTime = value.getAs[java.sql.Timestamp]("end_time")
        val inUse = value.getAs[Boolean]("in_use")
        val lastTap = value.getAs[java.sql.Timestamp]("last_tap")

        val sql =
          s"""
             |INSERT INTO ip_table ("user", ip, start_time, end_time, in_use, last_tap)
             |VALUES ('$user', '$ip', '$startTime', '$endTime', $inUse, '$lastTap')
             |ON CONFLICT (ip) DO UPDATE SET
             |  "user" = EXCLUDED."user",
             |  start_time = EXCLUDED.start_time,
             |  end_time = EXCLUDED.end_time,
             |  in_use = EXCLUDED.in_use,
             |  last_tap = EXCLUDED.last_tap;
          """.stripMargin

        statement = connection.createStatement()
        statement.executeUpdate(sql)
        statement.close()
      }

      def close(errorOrNull: Throwable): Unit = {
        if (connection != null) {
          if (errorOrNull == null) {
            connection.commit()
          } else {
            connection.rollback()
          }
          connection.close()
        }
      }
    }

    // Write Stream to PostgreSQL using Custom ForeachWriter
    val query = processedStream.writeStream
      .foreach(new JDBCSink(jdbcUrl, user, password))
      .outputMode("update")  // Use "append" if you don't need to reprocess updates
      .start()

    query.awaitTermination()
  }
}

which wasn't working at all and i don't think the solution is that complicated(with so many code) !
that code is generated using chat-gpt

0