Skip to content

P4

  • 使用idea+scala完成实验
  • 所有输出在项目spark文件夹下的output文件夹

T1任务一

代码

  • 从 csv 读取数据到 RDD
    val conf = new SparkConf().setAppName("Spark Pi").setMaster("local")
    val sc = new SparkContext(conf)
    val fileRDD = sc.textFile("C:\\Users\\MSI\\OneDrive\\study\\作业\\金融大数据\\application_data.csv")
    val header = fileRDD.first()
    val dataRDD = fileRDD.filter(row => row != header)
    val data = dataRDD.map{line =>
        val parts = line.split(",")
        val difference = parts(7).toDouble - parts(6).toDouble
        (parts, difference)}
    
  • 去掉首行,并且加入额外的差值列(用于子任务2的排序)

  • 子任务1

    val loanAmounts = data.map(x => x._1(7).toDouble)
    def getRange(amount: Double): (Double, Double) = {
        val lower = (amount / 10000).toInt * 10000
        val upper = lower + 10000
        (lower, upper)
    }
    val rangeCounts = loanAmounts.map(amount => (getRange(amount), 1)).reduceByKey(_ + _)
    val sortedConts = rangeCounts.sortBy(_._1._1)
    val outputPath = "C:\\Users\\MSI\\OneDrive\\study\\作业\\金融大数据\\spark\\output\\task1_1"
    sortedConts.saveAsTextFile(outputPath)
    

  • 提取出贷款金额,并且建立映射到范围区域,最后使用reduceByKey聚合得到结果
  • 将结果根据左侧起点进行排序,之后存储到文件

  • 子任务2

    val res= data.sortBy(_._2,ascending = true)
    val max5 = res.sortBy(x => x._2, ascending = false).take(10)
    val min5 = res.take(10)
    val max5RDD = sc.parallelize(max5)
    val min5RDD = sc.parallelize(min5)
    val res2 = max5RDD.union(min5RDD)
    val res3 = res2.map(t => t._1(0)+" "+t._1(1)+" "+t._1(7)+" "+t._1(6)+", "+t._2)
    val outputPath2 = "C:\\Users\\MSI\\OneDrive\\study\\作业\\金融大数据\\spark\\output\\task1_2"
    res3.saveAsTextFile(outputPath2)
    

  • 根据创建RDD时添加的额外列进行排序,提取出最大最小的项,将数组转化为RDD后进行合并
  • 将结果存储到文件

运行截图

  • image-20231219172533350

  • webui查看job运行结果

  • image-20231219172258513
  • image-20231219172240843

  • 子任务一部分输出

  • image-20231219132725347

  • 子任务二输出

  • 最高的10项
    • image-20231221163232607
  • 最低的10项
    • image-20231221163242326

T2任务二

代码

  • 读取文件到dataframe

    val spark = SparkSession.builder.appName("Spark").master("local").getOrCreate()
    val df = spark.read.option("header", "true").option("inferSchema", "true").csv("C:\\Users\\MSI\\OneDrive\\study\\作业\\金融大数据\\application_data.csv")
    

  • 子任务1

    val dfM = df.filter(df("CODE_GENDER")==="M")
    val sum = dfM.count()
    val dfG = dfM.groupBy("CNT_CHILDREN").count()
    val res = dfG.select(dfG("CNT_CHILDREN"), (dfG("count")/sum).as("count"))
    val path = "C:\\Users\\MSI\\OneDrive\\study\\作业\\金融大数据\\spark\\output\\task2_1"
    res.write.csv(path)
    

  • 先筛选出全部性别为男性的数据,然后统计信息条数
  • 根据孩子数目分组,并聚合计数
  • 选择孩子数目,以及占全体比率生成新表并写入文件

  • 子任务2

    val dfI = df.select(df("SK_ID_CURR"),(df("AMT_INCOME_TOTAL")/(-df("DAYS_BIRTH"))).as("avg_income"))
    val dfB = dfI.filter(dfI("avg_income")>1)
    val res2 = dfB.sort(dfB("avg_income").desc)
    val path2 = "C:\\Users\\MSI\\OneDrive\\study\\作业\\金融大数据\\spark\\output\\task2_2"
    res2.write.csv(path2)
    

  • 首先选取ID和每天收入建立新表
  • 过滤到收入<=1的项,并按照从大到小排序
  • 写入结果

运行截图

  • image-20231219182254537

  • webui

  • image-20231219182202119
  • 子任务1结果
  • image-20231219182216657
  • 子任务2部分结果
  • image-20231219182230611

T3任务三

代码

  • 读取文件

    val spark = SparkSession.builder.appName("Spark").master("local").getOrCreate()
    val df = spark.read.option("header", "true").option("inferSchema", "true").csv("C:\\Users\\MSI\\OneDrive\\study\\作业\\金融大数据\\application_data.csv")
    

  • 准备数据集

    val assembler = new VectorAssembler().setInputCols(Array(
        "CNT_CHILDREN", ... "FLAG_DOCUMENT_21"
    )).setOutputCol("features").setHandleInvalid("skip")
    val assemblerDf = assembler.transform(df)
    

  • 设置要作为参数的项,将其转化为特征features列,并设置跳过有空行的列
  • 划分数据集

val Array(trainingData, testData) = assemblerDf.randomSplit(Array(0.8, 0.2))
- LogisticRegression模型

val classifier1 = new org.apache.spark.ml.classification.LogisticRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
.setLabelCol("TARGET")
.setFeaturesCol("features")
val model1 = classifier1.fit(trainingData)
val predictions1 = model1.transform(testData)
val evaluator1 = new org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator()
.setLabelCol("TARGET")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy1 = evaluator1.evaluate(predictions1)
- 首先设置模型参数,配置classifier1,设置参数及预测目标项 - 使用fit方法学习训练数据 - 然后使用模型预测测试数据 - 最后创建evaluator1并对预测结果进行评估,计算预测准确率

  • 决策树模型

    val classifier2 = new org.apache.spark.ml.classification.DecisionTreeClassifier()
    .setLabelCol("TARGET")
    .setFeaturesCol("features")
    val model2 = classifier2.fit(trainingData)
    val predictions2 = model2.transform(testData)
    val evaluator2 = new org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator()
    .setLabelCol("TARGET")
    .setPredictionCol("prediction")
    .setMetricName("accuracy")
    val accuracy2 = evaluator2.evaluate(predictions2)
    

  • 结果保存

    val path1="C:\\Users\\MSI\\OneDrive\\study\\作业\\金融大数据\\spark\\output\\task3_logistic"
    val path2="C:\\Users\\MSI\\OneDrive\\study\\作业\\金融大数据\\spark\\output\\task3_decisiontree"
    val res1 = predictions1.select("SK_ID_CURR", "prediction")
    val res2 = predictions2.select("SK_ID_CURR", "prediction")
    res1.write.csv(path1)
    res2.write.csv(path2)
    println("LogisticRegression accuracy = " + accuracy1)
    println("DecisionTreeClassifier accuracy = " + accuracy2)
    

  • 选择ID列和Target预测结果列并保存为文件
  • 最后输出计算的准确率

运行截图

  • image-20231219204116184
  • 由于数据集违约不违约分布的过于不均衡,实际上两个模型的预测结果都是很差的,尝试调整过模型及参数均没有改善
  • webui
  • image-20231219204224800
  • LogisticRegression分类结果
  • image-20231219204432595
  • DecisionTreeClassifier分类结果
  • image-20231219204445678

遇到的问题

  • 对于RDD中的Array数组元素,不能直接存储到文件(默认将引用存储进去,而不是真实数组的内容),需要先进行转化val res3 = res2.map(t => t._1.mkString(","))