0

We hold our documents in rocks-db. We will be syncing these rocks-db sst files to S3. I would like to create a dataframe on the SST files and later run an sql. When googled, I was not able to find any relevant connector for the same.

What is the best way to do this? Can we write a custom dataframe implementation for this? We use Spark 3.1.0 with scala 2.12

( converting rocks db to json and to parquet and then reading dataframe out of parquet is very time consuming and resource intensive --120 1-core node minutes. So I cant use this approach )

3
  • You can implement a connector or just write plain Scala code to read the files and create a Dataframe from the files, yes. I've no idea how a SST file is structured but I guess that if you're able to convert it to JSON it should be easily readable to make it a Dataframe. Also note that you could read JSON directly with Spark, no strict need to convert to parquet.
    – Gaël J
    Commented Jul 8 at 8:47
  • we can read SST using SSTFileReader. Any pointer you can give how to do this? The spark is reading json and converting them to parquet. This is because the down streams read parquets. We use parquet assuming joins of tables are efficient when we go with parquets (pushdowns)
    – chendu
    Commented Jul 8 at 8:58
  • @GaëlJ -> _id of rockdb are sorted. If we create dataframe direclty from sst files (like with json files) then it may lack predicate pushdown. I need predicate pushdown on _id as we use _id in joins downstream.
    – chendu
    Commented Jul 15 at 9:03

1 Answer 1

0

If you know how to parse files using plain scala code, you just need spark to distribute it on executors:

  1. List file names in your s3 bucket, It will result in Seq[String]

  2. Transform them to dataset/dataframe (spark.createDataset)

  3. Do mapPartitions operation

  4. Inside mapPartitions, init s3Client, read file content of files using it. Parse using plain scala code, output as case class

  5. Output of mapPratitions will be dataset of parsed files

upd:

that is sample of code

I do not know what is rock db files and how to read them via plain scala code

Assuming you can read them with plain scala, without spark

next code sample can be used to parallelize read/parse with spark

  val session: SparkSession = SparkSession.builder().config(new SparkConf().setMaster("local")).getOrCreate()
  
  import session.implicits._

  def listS3Files(bucket: String, prefix: String): Seq[String] = {
    val s3Client = AmazonS3ClientBuilder.standard.build // add credentials, region if needed
    val listObjectsRequest = (new ListObjectsV2Request)
      .withBucketName(bucket)
      .withPrefix(prefix)
    s3Client.listObjectsV2(listObjectsRequest).getObjectSummaries.asScala.map(_.getKey).filter(p => !p.endsWith("/"))
  }
  
  val bucket = "your-bucket"
  val files = listS3Files(bucket, "test")

  val result = files
    .toDS()
    .mapPartitions(fs => {
      val s3Client = AmazonS3ClientBuilder.standard.build // add credentials, region if needed
      // here got content as string, in your case you need to read rock db files and parse them to case class
      val content = fs.map(f => IOUtils.toString(s3Client.getObject(bucket, f).getObjectContent, StandardCharsets.UTF_8))
      content
    }).cache()
6
  • Rocks db indexes all data on _id, in ascending order. Using the above approach would partition pushdown work? Or will the performance be same as creating dataframe on eqvivalent json files?
    – chendu
    Commented Jul 10 at 8:34
  • I am thinking we should implement a dataframe over the SST files. LIke spark gives dataframe on parquet files.
    – chendu
    Commented Jul 10 at 8:39
  • Can you give working example for the above? I am not able to go beyond step 3
    – chendu
    Commented Jul 12 at 5:02
  • 1
    @chendu updated with code sample
    – Grish
    Commented Jul 13 at 19:31
  • i think it generates 1 row per file url. But a file may have several rows
    – chendu
    Commented Jul 15 at 8:58

Not the answer you're looking for? Browse other questions tagged or ask your own question.