Java自学者论坛

 找回密码
 立即注册

手机号码,快捷登录

恭喜Java自学者论坛(https://www.javazxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,会员资料板块,购买链接:点击进入购买VIP会员

JAVA高级面试进阶训练营视频教程

Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程Go语言视频零基础入门到精通Java架构师3期(课件+源码)
Java开发全终端实战租房项目视频教程SpringBoot2.X入门到高级使用教程大数据培训第六期全套视频教程深度学习(CNN RNN GAN)算法原理Java亿级流量电商系统视频教程
互联网架构师视频教程年薪50万Spark2.0从入门到精通年薪50万!人工智能学习路线教程年薪50万大数据入门到精通学习路线年薪50万机器学习入门到精通教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程MySQL入门到精通教程
查看: 565|回复: 0

hive多分隔符的解决方案

[复制链接]
  • TA的每日心情
    奋斗
    2024-4-6 11:05
  • 签到天数: 748 天

    [LV.9]以坛为家II

    2034

    主题

    2092

    帖子

    70万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    705612
    发表于 2021-7-15 03:39:31 | 显示全部楼层 |阅读模式

    题记:

      近期在做某个大型银行的大数据项目,当在处理非结构化数据时,却发现他们给的数据并不符合hive和pig的处理要求,数据每行必须需要多个分割符才能完美处理,一下午也没有想到完美的办法解决,今天重新审视了一下整个过程。看来hive的命令行没法搞定了。于是乎,只能通过代码来搞定。

     

    1、重新实现hive的InputFormat了,别急放码过来

     1 package hiveStream;
     2 
     3 import java.io.IOException;
     4 
     5 import org.apache.hadoop.io.LongWritable;
     6 import org.apache.hadoop.io.Text;
     7 import org.apache.hadoop.mapred.FileSplit;
     8 import org.apache.hadoop.mapred.InputSplit;
     9 import org.apache.hadoop.mapred.JobConf;
    10 import org.apache.hadoop.mapred.JobConfigurable;
    11 import org.apache.hadoop.mapred.RecordReader;
    12 import org.apache.hadoop.mapred.Reporter;
    13 import org.apache.hadoop.mapred.TextInputFormat;
    14 
    15 public class MyHiveInputFormat  extends TextInputFormat implements JobConfigurable{
    16 
    17     @Override
    18     public RecordReader<LongWritable, Text> getRecordReader(
    19             InputSplit genericSplit, JobConf job, Reporter reporter)
    20             throws IOException {
    21          reporter.setStatus(genericSplit.toString());
    22            return new MyRecordReader((FileSplit) genericSplit, job);
    23     }
    24     
    25 }
    View Code

    2、仔细看看下面的方法,不解释,自己领悟。

      1 package hiveStream;
      2 
      3 import java.io.IOException;
      4 import java.io.InputStream;
      5 
      6 import org.apache.hadoop.conf.Configuration;
      7 import org.apache.hadoop.fs.FSDataInputStream;
      8 import org.apache.hadoop.fs.FileSystem;
      9 import org.apache.hadoop.fs.Path;
     10 import org.apache.hadoop.io.LongWritable;
     11 import org.apache.hadoop.io.Text;
     12 import org.apache.hadoop.io.compress.CompressionCodec;
     13 import org.apache.hadoop.io.compress.CompressionCodecFactory;
     14 import org.apache.hadoop.mapred.FileSplit;
     15 import org.apache.hadoop.mapred.RecordReader;
     16 import org.apache.hadoop.util.LineReader;
     17 
     18 
     19 public class MyRecordReader implements RecordReader<LongWritable, Text>{
     20     
     21     private CompressionCodecFactory compressionCodecs = null;
     22     private long start;
     23     private long pos;
     24     private long end;
     25     private LineReader lineReader;
     26     int maxLineLength;
     27     
     28     public MyRecordReader(InputStream in, long offset, long endOffset,
     29             int maxLineLength) {
     30         this.maxLineLength = maxLineLength;
     31         this.start = offset;
     32         this.lineReader = new LineReader(in);
     33         this.pos = offset;
     34         this.end = endOffset;
     35     }
     36     
     37     public MyRecordReader(InputStream in, long offset, long endOffset,
     38             Configuration job) throws IOException {
     39         this.maxLineLength = job.getInt(
     40                 "mapred.mutilCharRecordReader.maxlength", Integer.MAX_VALUE);
     41         this.lineReader = new LineReader(in, job);
     42         this.start = offset;
     43         this.end = endOffset;
     44     }
     45     
     46     // 构造方法
     47     public MyRecordReader(FileSplit inputSplit, Configuration job)
     48             throws IOException {
     49         maxLineLength = job.getInt("mapred.mutilCharRecordReader.maxlength",
     50                 Integer.MAX_VALUE);
     51         start = inputSplit.getStart();
     52         end = start + inputSplit.getLength();
     53         final Path file = inputSplit.getPath();
     54         // 创建压缩器
     55         compressionCodecs = new CompressionCodecFactory(job);
     56         final CompressionCodec codec = compressionCodecs.getCodec(file);
     57         // 打开文件系统
     58         FileSystem fs = file.getFileSystem(job);
     59         FSDataInputStream fileIn = fs.open(file);
     60         boolean skipFirstLine = false;
     61  
     62         if (codec != null) {
     63             lineReader = new LineReader(codec.createInputStream(fileIn), job);
     64             end = Long.MAX_VALUE;
     65         } else {
     66             if (start != 0) {
     67                 skipFirstLine = true;
     68                 --start;
     69                 fileIn.seek(start);
     70             }
     71             lineReader = new LineReader(fileIn, job);
     72         }
     73  
     74         if (skipFirstLine) {
     75             start += lineReader.readLine(new Text(), 0,
     76                     (int) Math.min((long) Integer.MAX_VALUE, end - start));
     77         }
     78         this.pos = start;
     79     }
     80     
     81     @Override
     82     public void close() throws IOException {
     83         if (lineReader != null)
     84             lineReader.close();
     85     }
     86 
     87     @Override
     88     public LongWritable createKey() {
     89         return new LongWritable();
     90     }
     91 
     92     @Override
     93     public Text createValue() {
     94          return new Text();
     95     }
     96 
     97     @Override
     98     public long getPos() throws IOException {
     99           return pos;
    100     }
    101 
    102     @Override
    103     public float getProgress() throws IOException {
    104        if (start == end) {
    105             return 0.0f;
    106         } else {
    107             return Math.min(1.0f, (pos - start) / (float) (end - start));
    108         }
    109     }
    110 
    111     @Override
    112     public boolean next(LongWritable key, Text value) throws IOException {
    113          while (pos < end) {
    114                 key.set(pos);
    115                 int newSize = lineReader.readLine(value, maxLineLength,
    116                         Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
    117                                 maxLineLength));
    118                 // 把字符串中的"##"转变为"#"
    119                 String strReplace = value.toString().replaceAll("\\s+", "\001");
    120                 Text txtReplace = new Text();
    121                 txtReplace.set(strReplace);
    122                 value.set(txtReplace.getBytes(), 0, txtReplace.getLength());
    123                 if (newSize == 0)
    124                     return false;
    125                 pos += newSize;
    126                 if (newSize < maxLineLength)
    127                     return true;
    128             }
    129             return false;
    130         }
    131     }
    View Code

    3、处理实例:如下

     1 数据处理要求:
     2      
     3     12 afd   fewf   fewfe  we
     4     76 vee   ppt    wfew  wefw
     5     83 tyutr   ppt  wfew  wefw
     6     45 vbe   ppt    wfew  wefw
     7     565 wee   ppt   wfew  wefw
     8     12 sde   ppt    wfew  wefw
     9 注意:字段之间的空格不一致
    10  
    11 1、建表:
    12     create table micmiu_blog(author int, category string, url string,town string,oop string) stored as inputformat 'hiveStream.MyHiveInputFormat' outputformat  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
    13 注意:输出咱可没有重写哦
    14  
    15 2、加载数据:
    16     LOAD DATA LOCAL INPATH'/mnt/test' OVERWRITE INTO TABLE micmiu_blog;
    17  
    18 3、看看的成果:
    19     select * from micmiu_blog;
    20  
    21 自己去试试,不解释
    View Code

     

    哎...今天够累的,签到来了1...
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|小黑屋|Java自学者论坛 ( 声明:本站文章及资料整理自互联网,用于Java自学者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-4-28 19:54 , Processed in 0.079170 second(s), 29 queries .

    Powered by Discuz! X3.4

    Copyright © 2001-2021, Tencent Cloud.

    快速回复 返回顶部 返回列表