Tuesday, March 5, 2019

Caching Data in Spark

symvol.cache()

symvol.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)
symvol.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
symvol.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK)

symvol.unpersist()

--caching symvol

val stocks = sc.textFile("hdfs://ip-10-10-10-10.ec2.internal:8020/user/deepakdubey/input/stocks")
val splits = stocks.map(record => record.split(","))
val symvol = splits.map(arr => (arr(1), arr(7).toInt))
symvol.cache()
val maxvol = symvol.reduceByKey((vol1, vol2) => Math.max(vol1, vol2))
maxvol.collect().foreach(println)