import com.opencsv.CSVParser import org.apache.commons.net.util.SubnetUtils import scala.collection.mutable import org.apache.spark.SparkContext import org.apache.spark.SparkConf import scala.collection.mutable.ArrayBuffer /** * Created by tdurakov on 26/05/2017. */ object Main { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("topCategories").setMaster("local[2]") val sc = new SparkContext(conf) topCategories(sc) } def topCategories(sc: SparkContext): Unit = { val events = sc.sequenceFile[Long, String]("hdfs://192.168.56.101:8020/localhost/events/2017/05/*") events. map(s => new CSVParser().parseLine(s._2)(3)). map(cat => (cat, 1)).reduceByKey(_ + _). takeOrdered(10)(Ordering[(Int, String)].on(x => (-x._2, x._1))) } def topProducts(sc: SparkContext): Unit = { val events = sc.sequenceFile[Long, String]("hdfs://localhost/events/2017/05/*") events.map(s => new CSVParser().parseLine(s._2)). map(x => ((x(0), x(3)), 1)). reduceByKey(_ + _). map(x => (x._1._2, (x._1._1, x._2))). aggregateByKey(ArrayBuffer.empty[(String, Int)])((acc: ArrayBuffer[(String, Int)], x: (String, Int)) => { acc += x; acc.sortBy(x => (x._2, x._1)); acc.take(10); }, (acc1: ArrayBuffer[(String, Int)], acc2: ArrayBuffer[(String, Int)]) => (acc1 ++ acc2).take(10)). collect(). foreach(println) } def topCountries(sc: SparkContext): Unit = { val events = sc.sequenceFile[Long, String]("hdfs://localhost/events/2017/05/*"). map(s => new CSVParser().parseLine(s._2)) val ips = sc.textFile("hdfs:///country_ips/country_blocks.csv").map(s => new CSVParser().parseLine(s)) val countries = sc.textFile("hdfs:///country_names/country_names.scv"). map(s => new CSVParser().parseLine(s)).map(s => (s(0), s(5))) val topCountries = events. cartesian(ips). filter(s => new SubnetUtils(s._2(0)).getInfo().isInRange(s._1(4))). map(s => (s._2(1), s._1(1).toDouble)). reduceByKey(_ + _). join(countries) .cache() topCountries. map(s => (s._2._2, s._2._1)). takeOrdered(10)(Ordering[(Double, String)].on(x => (-x._2, x._1))) } }