1 package hiveStream;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5
6 import org.apache.hadoop.conf.Configuration;
7 import org.apache.hadoop.fs.FSDataInputStream;
8 import org.apache.hadoop.fs.FileSystem;
9 import org.apache.hadoop.fs.Path;
10 import org.apache.hadoop.io.LongWritable;
11 import org.apache.hadoop.io.Text;
12 import org.apache.hadoop.io.compress.CompressionCodec;
13 import org.apache.hadoop.io.compress.CompressionCodecFactory;
14 import org.apache.hadoop.mapred.FileSplit;
15 import org.apache.hadoop.mapred.RecordReader;
16 import org.apache.hadoop.util.LineReader;
17
18
19 public class MyRecordReader implements RecordReader<LongWritable, Text>{
20
21 private CompressionCodecFactory compressionCodecs = null;
22 private long start;
23 private long pos;
24 private long end;
25 private LineReader lineReader;
26 int maxLineLength;
27
28 public MyRecordReader(InputStream in, long offset, long endOffset,
29 int maxLineLength) {
30 this.maxLineLength = maxLineLength;
31 this.start = offset;
32 this.lineReader = new LineReader(in);
33 this.pos = offset;
34 this.end = endOffset;
35 }
36
37 public MyRecordReader(InputStream in, long offset, long endOffset,
38 Configuration job) throws IOException {
39 this.maxLineLength = job.getInt(
40 "mapred.mutilCharRecordReader.maxlength", Integer.MAX_VALUE);
41 this.lineReader = new LineReader(in, job);
42 this.start = offset;
43 this.end = endOffset;
44 }
45
46 // 构造方法
47 public MyRecordReader(FileSplit inputSplit, Configuration job)
48 throws IOException {
49 maxLineLength = job.getInt("mapred.mutilCharRecordReader.maxlength",
50 Integer.MAX_VALUE);
51 start = inputSplit.getStart();
52 end = start + inputSplit.getLength();
53 final Path file = inputSplit.getPath();
54 // 创建压缩器
55 compressionCodecs = new CompressionCodecFactory(job);
56 final CompressionCodec codec = compressionCodecs.getCodec(file);
57 // 打开文件系统
58 FileSystem fs = file.getFileSystem(job);
59 FSDataInputStream fileIn = fs.open(file);
60 boolean skipFirstLine = false;
61
62 if (codec != null) {
63 lineReader = new LineReader(codec.createInputStream(fileIn), job);
64 end = Long.MAX_VALUE;
65 } else {
66 if (start != 0) {
67 skipFirstLine = true;
68 --start;
69 fileIn.seek(start);
70 }
71 lineReader = new LineReader(fileIn, job);
72 }
73
74 if (skipFirstLine) {
75 start += lineReader.readLine(new Text(), 0,
76 (int) Math.min((long) Integer.MAX_VALUE, end - start));
77 }
78 this.pos = start;
79 }
80
81 @Override
82 public void close() throws IOException {
83 if (lineReader != null)
84 lineReader.close();
85 }
86
87 @Override
88 public LongWritable createKey() {
89 return new LongWritable();
90 }
91
92 @Override
93 public Text createValue() {
94 return new Text();
95 }
96
97 @Override
98 public long getPos() throws IOException {
99 return pos;
100 }
101
102 @Override
103 public float getProgress() throws IOException {
104 if (start == end) {
105 return 0.0f;
106 } else {
107 return Math.min(1.0f, (pos - start) / (float) (end - start));
108 }
109 }
110
111 @Override
112 public boolean next(LongWritable key, Text value) throws IOException {
113 while (pos < end) {
114 key.set(pos);
115 int newSize = lineReader.readLine(value, maxLineLength,
116 Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
117 maxLineLength));
118 // 把字符串中的"##"转变为"#"
119 String strReplace = value.toString().replaceAll("\\s+", "\001");
120 Text txtReplace = new Text();
121 txtReplace.set(strReplace);
122 value.set(txtReplace.getBytes(), 0, txtReplace.getLength());
123 if (newSize == 0)
124 return false;
125 pos += newSize;
126 if (newSize < maxLineLength)
127 return true;
128 }
129 return false;
130 }
131 }