1

What I'm trying to do is use a window function to get the last and current row and do some computation on a couple of the columns with a custom aggregator. I have time series data with points that are going to be grouped by a track id. I would like to take to calculate the speed from the previous and next point using the timestamp, latitude, and longitude, then return it as a new column. That way I can calculate deviation and remove outliers in the data.

I think I'm on the right track here with the window function rowsBetween. The main issue is I do not know how to make a custom aggregator to return the new column. I've made aggregators before but when I look at the functions that come with spark (like add, avg, etc.) it looks to be extending DeclarativeAggregate and is nothing like a normal Aggregator class.

This is what I have so far and I do not know how to approach creating a custom aggregator that will work with this:

val w = Window
  .partitionBy(trackIdColumn)
  .orderBy("timestamp")
  .rowsBetween(-1, Window.currentRow)

retDf = retDf.withColumn("speed", myCustomAggregator("timestamp", "latitude", "longitude").over(w))

If anyone has any ideas that would be great because I can't find how to do this by searching.

3
  • what did you try? see here for an example using UDAF's. It's also possible a generic aggregator like agg_expr would be enough for your use case.
    – Chris
    Commented Jul 3 at 10:48
  • I was able to get close to what I was looking for using org.apache.spark.sql.functions.aggregate after creating an array from the columns and doing a collect_list on them. It may work and I'll report later if it does. Other than that I have made UDAFs and extended the Aggregator class before but I can't visualize how to make that work with what I'm trying to do. I'll check out agg_expr too, thank you for the reply. Commented Jul 3 at 17:50
  • yw, if you have a test case, inputs and outputs it'd help people answer
    – Chris
    Commented Jul 4 at 8:48

0