import org.apache.spark.{SparkConf, SparkContext} import org.apache.commons.net.util.SubnetUtils import org.apache.spark.SparkContext.rddToPairRDDFunctions val network = sc.textFile("hdfs://localhost:8020/user/cloudera/networks/*") val net1 = network.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter } val net = net1.map(net => { val splitedLine = net.split(",") if (!splitedLine(1).trim.isEmpty && !splitedLine(0).trim.isEmpty) (splitedLine(1).toInt, splitedLine(0)) else (0, "none") }) val locations = sc.textFile("hdfs://localhost:8020/user/cloudera/locations/*") val loc1 = locations.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter } val loc = loc1.map(net => { val splitedLine = net.split(",") if (splitedLine.length >= 5 && !splitedLine(0).trim.isEmpty && !splitedLine(5).trim.isEmpty) (splitedLine(0).toInt, splitedLine(5)) else (0, "none") }) val netAndLoc = net.join(loc).map(x => (x._2)) val events = sc.textFile("hdfs://localhost:8020/user/cloudera/events/2017/*") val eventsMap = events.map(line => { var splitedLine = line.split(",") (splitedLine(4), splitedLine(1).replaceAll("\"","").toDouble) })