hadoop学习笔记 联系客服

发布时间 : 星期三 文章hadoop学习笔记更新完毕开始阅读8c6c49ebb90d6c85ed3ac652

Hadoop MapReduce,这里进行简单的总结,主要来自于《Hadoop In Action》。 后续将按照Hadoop处理的顺序整理一些笔记,主要包括: (1)Hadoop预定义数据类型; (2)Hadoop InputFormat; (3)Hadoop Mapper;

(4)Hadoop Partitioner(洗牌); (5)Hadoop Reducer; (6)Hadoop OutputFormt; (7)Hadoop Driver (驱动程序); (8)Hadoop Combiner; (9)Hadoop Pipes; (10)Hadoop Streaming; (11)Aggregate;

其它更高级应用,如数据连接等请自行参阅相关书籍,《Hadoop In Action》、《Hadoop 权威指南》等。

一、Hadoop数据类型

Hadoop预定义了一些类用于实现WritableComparable,主要包括面向基本类型的封装类: BooleanWritable 标准布尔变量的封装 ByteWritable 单字节数的封装 DoubleWritable 双字节数的封装 FloatWritable 浮点数的封装 IntWritable 整数的封装 LongWritable Long的封装

Text 使用UTF8格式的文本封装 NullWritable 无键值时的占位符

键和值可以自定义数据类型,Hadoop提供了Writable和WritableComparable接口,Writable实现的是序列化功能,WritableComparable实现了序列化和比较的功能。Hadoop要求键必须实现WritableComparatable接口,值必须至少实现Writable接口。实现Writable接口的类可以是值,不能作为键,而实现WritableComparable接口的类既可以是值也可以是键。

下面实现一个类,用于表示一个网络的边界,比如代表两个城市之间的航线。Edge类实现了Writable接口的readFields和write方法,它们与java中的DataInput和DataOutput类实现内容的串行化,而Comparable接口实现的是compareTo方法。

public class Edge implements WritableComparable {

private String departureNode ; private String arrivalNode ;

public String getDepartureNode(){ return departureNode; } public String getArrivalNode() { return arrivalNode ; }

@override

public void readFields(DataInput in) throws IOException {

departureNode = in.readUTF() ; arrivalNode = in.readUTF() ; }

@override

public void write(DataOutput out) throws IOException {

out.writeUTF(departureNode) ; out.writeUTF(arrivalNode) ; }

@override

public int compareTo(Edge 0) {

return (departureNode.compareTo(o.depatrueNode)!=0) ?

departureNode.compareTo(o.departureNode): arrivalNode.compareTo(o.arrivalNode) ; } }

通常使用Hadoop,预定义类型基本满足需要,通过Hadoop数据类型的学习,我们可以自定义数据类型,从而根据需求进行扩充。

二、InputFormat

Hadoop分割与读取输入文件的方式被定义为InputFormat接口的一个实现中,TextInputFormat是InputFormat的默认实现。

Hadoop预定义的一些InputFormat类:

TextInputFormat 在文本文件中的每行一个记录,key为一行的字节偏移,值为一行的内容。ey: LongWritable, Value:Text

KeyValueTextInputFormat 文本文件中每行是一个记录,以每行的第一个分隔符为界,分隔符前的为键,分割符后的为值,分隔符由key.value.separator.in.input.line中设定,默认为'\\t'

SequenceFileInputFormat 用于读取序列文件的InputFormat,键值类型有用户定义,序列文件为hadoop专用的压缩二进制格式,专用于一个MapReduce作业和其它MapReduce作业之间传送数据

NLineInputFormat 与TextInputFormat相同,但每个分片一定有N行,N由

mapred.line.input.format.linespermap中设定,默认为1,key: LongWritable, Value:Text

MapReduce输入格式由 conf.setInputFormat(KeyValueTextInputFormat.class) ; 设定。

2. 生成一个定制的InputFormat --- InputSplit和RecordReader

如果Hadoop提供的InputFormat类不能满足需要,则必须编写自定义的InputFormat类,InputFormat主要完成2件事情:

1)确定所有用于输入数据的文件,并将之分割为输入分片,每个map任务分配一个分片; 2)提供一个RecordReader对象,循环提取给定分片中的记录,并解析每个记录为预定义类型的键和值;

public interface InputFormat {

InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;

RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException; }

FileInputFormat类实现了InputFormat中的getSplits方法,保留getRecordReader抽象让子类实现,所以在创建InputFormat子类时,最好从负责文件分割的FileInputFormat类中继承,其中有一个isSplitable(FileSystem fs, Path filename)方法,检查是否将给定文件分片,默认返回true,正如压缩文件,如果不对文件进行拆分,则返回false。

使用FileInputFormat时,只需要关注RecordReader,它负责把一个输入分片解析为一条一条的记录,转变成键值对。

public interface RecordReader {

bool next(K key, V value) throws IOException ; K createKey() ; V createValue() ;

long getPos() throws IOException ; public void close() throws IOException ;

float getProgress() throws IOException ; }

预定义的RecordReader有:

LineRecordReader用于TextInputFormat中每次读取一行,以字节偏移作为键,行的内容作为值。 KeyValueRecordReader用于KeyValueTextInputFormat

自定义的RecordReader痛处基于现有实现,并把大多数操作放在next()函数中。

public class TimeUrlTextInputFormat extends FileInputFormat {

public RecordReader getRecordReader(InputSplit input, JobConf job, Reporter reporter) throws IOException {

return new TimeUrlLineRecordReader(job, (FileSplit)input) ; } }

public class URLWritable implements Writable {

protected URL url ; public URLWritable(){}

public URLWritable(URL url){ this.url = url;}

public void write(DataOutput out) throws IOException {

out.writeUTF(url.toString()) ; }

public void readFields(DataInput in) throws IOException {

url = new URL(in.readUTF()) ; }

public void set(String s) throws MalformadURLException {

url = new URL(s) ; } }

class TimeUrlLineRecordReader implements RecordReader {

private KeyValueLineRecordReader lineReader ; private Text lineKey, lineValue ;

public TimeUrlLineRecordReader(JobConf job, FileSplit split) throws IOException {

lineReader = new KeyValueLineRecordReader(job, split) ; lineKey = lineReader.createKey() ; lineValue = lineReader.createValue() ;