Skip to content

P2

T1-统计违约情况

代码

  • map java public static class MergeMapper extends Mapper<LongWritable, Text, Text, LongWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if(key.get()==0) return; Stream<String> stream = Stream.of(value.toString().split(",")); Optional<String> lastElement = stream.reduce((first, second) -> second); if(lastElement.isPresent()) { int res = Integer.parseInt(lastElement.get()); if(res == 1) context.write(new Text("dis keep"), new LongWritable(1)); else context.write(new Text("keep"), new LongWritable(1)); } } }
  • 按行读取(key为0说明是第一行,直接丢弃)
  • 按照,对每一行进行划分得到字符串数组
  • 取得最后一个元素,如果是1就表示违约,用key=dis keep表示,值为1

    • 如果为0就表示没有违约,用keep表示
  • reduce java public static class MergeReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { LongWritable res = new LongWritable(0); for(LongWritable value : values) { res.set(res.get() + value.get()); } context.write(key, res); } }

  • 数据分类两个key,对于每种key,只需要读取值中的每一项并求和就可以得到结果

运行结果

  • image-20231114222712186
  dis keep  24825
  keep  282686

T2-统计贷款交易数

代码

  • map
    public static class MergeMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            if(key.get()==0)
                return;
            Stream<String> stream = Stream.of(value.toString().split(","));
            Optional<String> lastElement = stream.skip(25).findFirst();
            if(lastElement.isPresent()) {
                context.write(new Text(lastElement.get()), new LongWritable(1));
            }
        }
    }
    
  • 以星期几为键,统计数目

  • reduce

    public static class MergeReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
        public static class Pair {
            private Text key;
            private LongWritable value;
    
            public Pair(Text key, LongWritable value) {
                this.key = key;
                this.value = value;
            }
    
            public Text getKey() {
                return key;
            }
    
            public LongWritable getValue() {
                return value;
            }
        }
        private List<Pair> sortedData = new ArrayList<>();
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context)
            throws IOException, InterruptedException {
            LongWritable res = new LongWritable(0);
            for(LongWritable value : values) {
                res.set(res.get() + value.get());
            }
            sortedData.add(new Pair(new Text(key), res));
        }
    
        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            Collections.sort(sortedData, new Comparator<Pair>() {
                @Override
                public int compare(Pair pair1, Pair pair2) {
                    return pair2.getValue().compareTo(pair1.getValue());
                }
            });
    
            // 输出排序后的结果
            for (Pair pair : sortedData) {
                context.write(pair.getKey(), pair.getValue());
            }
        }
    }
    

  • 自定义一个pair类,用于对每天的统计结果暂存
  • 再reduce种先将结果存储到ArrayList进行暂存
  • 在cleanup自定义比较器并对暂存的结果进行排序,再写入到结果

运行结果

  • image-20231114233846775
TUESDAY 53901
WEDNESDAY   51934
MONDAY  50714
THURSDAY    50591
FRIDAY  50338
SATURDAY    33852
SUNDAY  16181

T3-贷款违约检测模型

代码

  • 使用knn最邻近分类算法
  • 将整个过程如下划分:
  • init:求出每一列的最大最小值,为后续的归一化做准备
  • maxmin:根据init求出的值,对非字符数据进行01归一化操作
  • div:8:2划分数据集得到训练集和测试集
  • knn:根据训练集对测试集进行预测得到结果
  • score:对预测结果进行分析并计算accuracy值

Init

  • map
  • 统计非字符列的所有取值

    //掠过第一行
    if (key.get() == 0)
        return;
    List<String> line = new ArrayList<>(Arrays.asList(value.toString().split(",")));
    for (int i = 1; i < line.size() - 1; i++) {// 第一列是id,最后一列是结果,不处理
        try {
            double value1 = Double.parseDouble(line.get(i));
            context.write(new Text(i + ""), new DoubleWritable(value1));// 汇总每一列的所有取值
        } catch (NumberFormatException e) {
            ;// 字符数据,不处理
        }
    }
    

  • reduce

  • 从可能的取值中找出最大最小值
    protected void reduce(Text key, Iterable<DoubleWritable> values, Context context)
        throws IOException, InterruptedException {
        double max = Double.MIN_VALUE;
        double min = Double.MAX_VALUE;
        for (DoubleWritable value : values) {
            max = Math.max(max, value.get());
            min = Math.min(min, value.get());
        }
        context.write(key, new Text(min + "," + max));
    }
    

maxmin

  • main
  • 需要传入上一次的统计结果job_maxmin.addCacheFile(new Path(output_init+"/part-r-00000").toUri());
  • setup(map)
  • 读取文件,建立列号和最大最小值之间的映射

    //HashMap<Integer, Pair> minmax = new HashMap<>();
    URI[] cacheFiles = context.getCacheFiles();
    if (cacheFiles != null && cacheFiles.length > 0) {
        try {
            FileSystem fs = FileSystem.get(context.getConfiguration());
            Path path = new Path(cacheFiles[0].toString());
            BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(path)));
            String line;//逐行读取
            while ((line = reader.readLine()) != null) {
                String[] split = line.trim().split("\\s+");
                String[] split2 = split[1].split(",");
                minmax.put(Integer.parseInt(split[0]),
                           new Pair(Double.parseDouble(split2[0]), Double.parseDouble(split2[1])));
            }
            reader.close();
        } catch (IOException e) {
            System.err.println("MinMax::Exception reading DistributedCache: " + e);
        }
    }
    

  • map

  • 根据建立的映射对所有非字符列进行归一化
    List<String> line = new ArrayList<>(Arrays.asList(value.toString().split(",")));
    String res = "";
    for (int i = 1; i < line.size() - 1; i++) {
        try {
            double value1 = Double.parseDouble(line.get(i));
            Pair pair = minmax.get(i);
            double min = pair.min;
            double max = pair.max;
            double result = (value1 - min) / (max - min);
            res += "," + result;
        } catch (NumberFormatException e) {
            ;// 字符数据,不处理
        }
    }
    context.write(new Text(line.get(0) + res + "," + line.get(line.size() - 1)), new Text());
    
  • reduce
  • 直接存储map的结果即可context.write(key, new Text());

div

  • map
  • 随机数流static Iterator<Integer> iterator = new Random(47).ints(0, 100).boxed().iterator();
  • 每次取一个随机数,根据值决定key是train还是test,实现近似8:2划分

    Integer random = iterator.next();
    if (random >= 80) {// 训练集
        context.write(new Text("test"), value);
    } else {
        context.write(new Text("train"), value);
    }
    

  • reduce

  • 使用multipleOutputs实现将训练集和测试集分别写入到两个文件分别存储
    static MultipleOutputs<Text, Text> multipleOutputs;
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        multipleOutputs = new MultipleOutputs<>(context);
    }
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {
        for (Text value : values) {
            multipleOutputs.write(value, new Text(), key.toString());
        }
    }
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        multipleOutputs.close();
    }
    

knn

  • main
  • 传入训练集job_knn.addCacheFile(new Path(output_div+"/train-r-00000").toUri());
  • 训练集需要每个节点一份,用于在分类时进行比对
  • 测试集作为 map 的输入

  • setup(map)

  • 读取并缓存训练集

    FileSystem fs = FileSystem.get(context.getConfiguration());
    Path path = new Path(cacheFiles[0].toString());
    BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(path)));
    String line;
    while ((line = reader.readLine()) != null) {
        String[] split = line.split(",");
        Double[] temp = new Double[split.length - 1];
        for (int i = 1; i < split.length; i++) {
            temp[i - 1] = Double.parseDouble(split[i]);
        }
        if(data.size()<MaxSize)
            data.add(temp);
    }
    reader.close();
    

  • 优先队列

    private PriorityQueue<Double[]> findNearestNeighbors(Double[] testFeatures) {
        PriorityQueue<Double[]> neighbors = new PriorityQueue<>(k, Comparator.comparingDouble(a -> distance(a, testFeatures)));
        for (Double[] trainData : data) {
            neighbors.offer(trainData);
            if (neighbors.size() > k) {
                neighbors.poll();
            }
        }
        return neighbors;
    }
    

  • 维护与目标最为接近的k个条目

  • map

    //解析数据
    Double[] features = new Double[fields.length - 2];
    for (int i = 1; i < fields.length - 1; i++) {
        features[i - 1] = Double.parseDouble(fields[i]);
    }
    // 计算距离并选取最近的k个邻居
    PriorityQueue<Double[]> neighbors = findNearestNeighbors(features);
    // 进行多数投票
    int count0 = 0, count1 = 0;
    for (int i = 0; i < k; i++) {
        Double[] neighbor = neighbors.poll();
        if (neighbor[neighbor.length - 1] - 0.0 < 1e-6) {
            count0++;//t投票
        } else {
            count1++;
        }
    }
    // 输出结果
    context.write(new Text(id), new Text(count1 > count0 ? "1" : "0"));
    

  • reduce
  • 直接存储map的结果context.write(key,values.iterator().next());

score

  • main
  • 传入含结果的测试集作为答案用于与预测结果进行比对
  • job_score.addCacheFile(new Path(output_div+"/test-r-00000").toUri());

  • setup(map)

  • 同理读取训练集数据,建立id-结果的映射
  • HashMap<Integer,Integer> res = new HashMap<>();

  • map

  • 读取预测结果并于训练集的结果进行比对

    String[] line = value.toString().split("\\s+");
    int id = Integer.parseInt(line[0]);
    int result = Integer.parseInt(line[line.length-1]);
    int right = res.get(id);
    if(result==right)
        context.write(new Text("res"),new Text("1"));
    else
        context.write(new Text("res"),new Text("0"));
    

  • reduce

  • 根据对正确和错误数目的统计计算出准确率
    for(Text value:values){
        if(value.toString().equals("1"))
            sum1++;
        else
            sum2++;
    }
    context.write(new Text("Accuracy"),new Text(sum1/(sum1+sum2)+""));
    

运行结果

  • image-20231115144932948
  • 可见5个任务运行成功
  • 最终统计结果Accuracy 0.9176155283034192
  • 每个步骤的详细输出再output文件夹下
  • image-20231115145839906