IT源码网

Hadoop 系列(三)Java API

developer 2020年02月20日 大数据 694 0

Hadoop 系列(三)Java API

<dependency> 
    <groupId>org.apache.hadoop</groupId> 
    <artifactId>hadoop-hdfs</artifactId> 
    <version>2.9.2</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.hadoop</groupId> 
    <artifactId>hadoop-client</artifactId> 
    <version>2.9.2</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.hadoop</groupId> 
    <artifactId>hadoop-common</artifactId> 
    <version>2.9.2</version> 
</dependency>

一、HDFS 操作

@Test 
public void upload() throws Exception { 
 
    Configuration conf = new Configuration();  // (1)  
    //conf.set("fs.defaultFS", "hdfs://master:9000/"); 
 
    Path dst = new Path("hdfs://master:9000/upload/MPSetup4.log"); 
    FileSystem fs = FileSystem.get(new URI("hdfs://master:9000/"), conf, "hadoop"); // (2) 
    FSDataOutputStream os = fs.create(dst); 
    FileInputStream is = new FileInputStream("c:/MPSetup.log"); 
 
    IOUtils.copy(is, os); 
}
  1. Configuration 配置文件默认读取 resources 目录下的 core-site.xml、hdfs-site.xml、mapred-site.xml、yarn-site.xml 文件。可以将 Hadoop 安装目录下的这些配制文件直接拷贝过来,也可以直接 conf.set() 设置参数。

  2. FileSystem.get() 必须要以 hadoop 的身份运行,否则会出现权限不足的问题。可以配置 -DHADOOP_USER_NAME=hadoop 参数。

下面提供一个 HdfsUtil 工具类:

public class HdfsUtil { 
    FileSystem fs = null; 
 
    @Before 
    public void init() throws Exception{ 
        System.setProperty("hadoop.home.dir", "D:/Program_Files/apache/hadoop-common-bin/"); 
        //1. 读取classpath下的xxx-site.xml 配置文件,并解析其内容,封装到conf对象中 
        Configuration conf = new Configuration(); 
 
        //2. 也可以在代码中对conf中的配置信息进行手动设置,会覆盖掉配置文件中的读取的值 
        conf.set("fs.defaultFS", "hdfs://master:9000/"); 
 
        //3. 根据配置信息,去获取一个具体文件系统的客户端操作实例对象 
        fs = FileSystem.get(new URI("hdfs://master:9000/"), conf, "hadoop"); 
    } 
 
    /** 上传文件,封装好的写法 */ 
    @Test 
    public void upload2() throws Exception, IOException{ 
        fs.copyFromLocalFile(new Path("c:/MPSetup.log"), 
                new Path("hdfs://master:9000/aaa/bbb/ccc/MPSetup.log")); 
    } 
 
 
    /** 下载文件 */ 
    @Test 
    public void download() throws Exception { 
        fs.copyToLocalFile(new Path("hdfs://master:9000/aaa/bbb/ccc/MPSetup.log"), 
                new Path("d:/MPSetup2.txt")); 
 
    } 
 
    /** 查看文件信息 */ 
    @Test 
    public void listFiles() throws FileNotFoundException, IllegalArgumentException, IOException { 
 
        // listFiles列出的是文件信息,而且提供递归遍历 
        RemoteIterator<LocatedFileStatus> files = fs.listFiles(new Path("/"), true); 
 
        while(files.hasNext()) { 
            LocatedFileStatus file = files.next(); 
            Path filePath = file.getPath(); 
            String fileName = filePath.getName(); 
            System.out.println(fileName); 
        } 
 
        System.out.println("---------------------------------"); 
 
        //listStatus 可以列出文件和文件夹的信息,但是不提供自带的递归遍历 
        FileStatus[] listStatus = fs.listStatus(new Path("/")); 
        for(FileStatus status: listStatus){ 
            String name = status.getPath().getName(); 
            System.out.println(name + (status.isDirectory()?" is dir":" is file")); 
        } 
    } 
 
    /** 创建文件夹 */ 
    @Test 
    public void mkdir() throws IllegalArgumentException, Exception { 
        fs.mkdirs(new Path("/aaa/bbb/ccc")); 
    } 
 
    /** 删除文件或文件夹 */ 
    @Test 
    public void rm() throws IllegalArgumentException, IOException { 
        fs.delete(new Path("/aa"), true); 
    } 
}

二、RPC 调用

(1) LoginServiceInterface 接口

package com.github.binarylei.hadoop.rpc; 
 
public interface LoginServiceInterface { 
     
    public static final long versionID = 1L; 
    public String login(String username, String password); 
 
} 
 
public class LoginServiceImpl implements LoginServiceInterface { 
 
    @Override 
    public String login(String username, String password) {        
        return username + " login in successfully!"; 
    } 
}

(2) RPCServer

// 目前只能上传到 Linux 上运行 ?????? 
public class RPCServer { 
 
    private static String host = "master"; 
    private static int port = 10001; 
 
    public static void main(String[] args) throws HadoopIllegalArgumentException, IOException { 
        Configuration conf = new Configuration(); 
        conf.set("fs.defaultFS", "hdfs://master:9000/"); 
        Builder builder = new Builder(conf); 
         
        builder.setBindAddress("master") 
                .setPort(port) 
                .setProtocol(LoginServiceInterface.class) 
                .setInstance(new LoginServiceImpl()); 
         
        Server server = builder.build(); 
         
        server.start(); 
    } 
}
  1. 将打包后的 hadoop-api-1.0.0.jar 上传到 Linux,启动 RPC 服务,执行

    hadoop jar hadoop-api-1.0.0.jar com.github.binarylei.hadoop.rpc.RPCServer

    2018-05-13 18:20:16,606 INFO ipc.CallQueueManager: Using callQueue: class java.util.concurrent.LinkedBlockingQueue queueCapacity: 100 scheduler: class org.apache.hadoop.ipc.DefaultRpcScheduler
    2018-05-13 18:20:17,631 INFO ipc.Server: Starting Socket Reader #1 for port 10001
    2018-05-13 18:20:19,613 INFO ipc.Server: IPC Server Responder: starting
    2018-05-13 18:20:19,618 INFO ipc.Server: IPC Server listener on 10001: starting

(3) RPCClient

public class RPCClient { 
 
    private static String host = "master"; 
    private static int port = 10001; 
 
    public static void main(String[] args) throws Exception { 
        System.setProperty("hadoop.home.dir", "D:/Program_Files/apache/hadoop-common-bin/"); 
        Configuration conf = new Configuration(); 
        conf.set("fs.defaultFS", "hdfs://master:9000/"); 
 
        LoginServiceInterface proxy = RPC.getProxy( 
                LoginServiceInterface.class, 
                1L, 
                new InetSocketAddress(host, port), 
                conf); 
         
        String result = proxy.login("hadoop-test", "test"); 
         
        System.out.println(result); 
    } 
}
  1. 直接在 Windows 上运行,结果如下:

    hadoop-test login in successfully!

三、MapReduce

下面模仿 wordcount,写一个 MapReduce

(1) WCMapper

//4个泛型中,前两个是指定mapper输入数据的类型,KEYIN是输入的key的类型,VALUEIN是输入的value的类型 
//map 和 reduce 的数据输入输出都是以 key-value对的形式封装的 
//默认情况下,框架传递给我们的mapper的输入数据中,key是要处理的文本中一行的起始偏移量,这一行的内容作为value 
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ 
 
    //mapreduce框架每读一行数据就调用一次该方法 
    @Override 
    protected void map(LongWritable key, Text value,Context context) 
            throws IOException, InterruptedException { 
        //具体业务逻辑就写在这个方法体中,而且我们业务要处理的数据已经被框架传递进来,在方法的参数中 key-value 
        //key 是这一行数据的起始偏移量     value 是这一行的文本内容 
 
        //将这一行的内容转换成string类型 
        String line = value.toString(); 
 
        //对这一行的文本按特定分隔符切分 
        String[] words = StringUtils.split(line, " "); 
 
        //遍历这个单词数组输出为kv形式  k:单词   v : 1 
        for(String word : words){ 
            context.write(new Text(word), new LongWritable(1)); 
        } 
    } 
}

(2) WCReducer

public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ 
 
    //框架在map处理完成之后,将所有kv对缓存起来,进行分组,然后传递一个组<key,valus{}>,调用一次reduce方法 
    //<hello,{1,1,1,1,1,1.....}> 
    @Override 
    protected void reduce(Text key, Iterable<LongWritable> values,Context context) 
            throws IOException, InterruptedException { 
 
        long count = 0; 
        //遍历value的list,进行累加求和 
        for(LongWritable value:values){ 
            count += value.get(); 
        } 
 
        //输出这一个单词的统计结果 
 
        context.write(key, new LongWritable(count)); 
    } 
}

(3) WCReducer

/** 
 * 用来描述一个特定的作业 
 * 比如,该作业使用哪个类作为逻辑处理中的map,哪个作为reduce 
 * 还可以指定该作业要处理的数据所在的路径 
 * 还可以指定改作业输出的结果放到哪个路径 
 * .... 
 * @author duanhaitao@itcast.cn 
 */ 
public class WCRunner { 
 
    public static void main(String[] args) throws Exception { 
 
        //System.setProperty("hadoop.home.dir", "D:/Program_Files/apache/hadoop-common-bin/"); 
        Configuration conf = new Configuration(); 
        Job wcjob = Job.getInstance(conf); 
 
        //设置整个job所用的那些类在哪个jar包 
        wcjob.setJarByClass(WCRunner.class); 
 
        //本job使用的mapper和reducer的类 
        wcjob.setMapperClass(WCMapper.class); 
        wcjob.setReducerClass(WCReducer.class); 
 
        //指定reduce的输出数据kv类型 
        wcjob.setOutputKeyClass(Text.class); 
        wcjob.setOutputValueClass(LongWritable.class); 
 
        //指定mapper的输出数据kv类型 
        wcjob.setMapOutputKeyClass(Text.class); 
        wcjob.setMapOutputValueClass(LongWritable.class); 
 
        //指定要处理的输入数据存放路径 
        FileInputFormat.setInputPaths(wcjob, new Path("hdfs://master:9000/wc/input/")); 
 
        //指定处理结果的输出数据存放路径 
        FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://master:9000/wc/output5/")); 
 
        //将job提交给集群运行 
        wcjob.waitForCompletion(true); 
    } 
}

四、Hadoop 运行(Windows)

问题 1:缺少 winutils.exe 和 hadoop.dll

# 缺少 winutils.exe 
Could not locate executable null \bin\winutils.exe in the hadoop binaries 
# 缺少 hadoop.dll 
Unable to load native-hadoop library for your platform… using builtin-Java classes where applicable

解决办法:

  1. 下载地址:https://github.com/srccodes/hadoop-common-2.2.0-bin
  2. 解压后将 hadoop-common-2.2.0-bin/bin 目录下的文件全部拷贝到 HADOOP_HOME/bin 目录下,并配置 HADOOP_HOME 环境变量。
  3. 将 hadoop-common-2.2.0-bin/bin/hadoop.dll 拷贝到 C:\Windows\System32 目录下。

问题 2:Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z

解决办法:

  1. 首先确保 C:\Windows\System32 目录下已经有 hadoop.dll 文件
  2. 在自己的工程中拷贝一份 org.apache.hadoop.io.nativeio.NativeIO 类,修改如下:

    public static boolean access(String path, AccessRight desiredAccess) 
                    throws IOException { 
        return true; 
        //return access0(path, desiredAccess.accessRight()); 
    }

参考:

  1. 《Hadoop 运行问题》:https://blog.csdn.net/congcong68/article/details/42043093
  2. 《winutils.exe 下载地址》:https://github.com/srccodes/hadoop-common-2.2.0-bin

每天用心记录一点点。内容也许不重要,但习惯很重要!

评论关闭
IT源码网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!