`

Hadoop RPC使用与实现

 
阅读更多

 

hadoop.ipc和RPC简介

hadoop 和hbase中的大部分服务都是通过hadoop.ipt.RPC这个类来实现的。


hadoop.ipc.RPC 实现了一种远程过程调用的框架,应用可以直接定义过程调用的协议接口和协议的server端实现,就可以直接通过RPC框架获得RPC serverclient端的接口代理。


hadoop.ipc.RPC 的实现利用了 hadoop.ipc.Server 和 hadoop.ipc.Client这两个类, 这两个类实现了网络中非常典型的Request-Response模式服务器和客户端框架。用户可以通过定义一个协议接口并实现出Request和Response类,以及Server端的抽象处理接口(Server.call()) 就可以实现出完整的服务器程序,而客户端程序只需要在创建hadoop.ipc.Client实体时,指定协议接口和网络相关参数,然后调用 call() 就可以发送请求并获取响应。


Hadoop.ipc.RPC作为Hadoop的底层核心组件,在hadoop HDFSMapReduce以及HBase中都有广泛的使用。 HDFSNameNodeDataNode等都是通过实现对应协议的接口,然后利用hadoop.ipc.RPC获取服务器实体的。 HBase中的HBaseRPC采用的也是与hadoop.ipc.RPC类似的实现,其中的Region Server, Master Server 都是通过实现对应的协议接口直接获取服务器实体的。  


hadoop.ipc将应用逻辑与网络消息的处理分离开,并且使得逻辑对象在不同的进程或组件之间有同样的语言接口,无需区分远程对象和本地对象,使得开发者可以关注于应用的处理逻辑。


hadoop.ipc.RPC类中有两个重要的函数getServer和getProxy,getServer通过接口协议实现的实体来获取真正的server,getProxy获取远程访问的本地代理。

 

class RPC {
    public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf) 
    public static VersionedProtocol getProxy(
      Class<? extends VersionedProtocol> protocol,
      long clientVersion, InetSocketAddress addr, Configuration conf,
      SocketFactory factory)
    ......
}

 

 

 

实例(远程执行Shell命令)

定义客户端与服务器的协议接口ExecProtocol

import java.lang.String;
import org.apache.hadoop.ipc.VersionedProtocol;
/* 这里不扩展VersionedProtocol 也是可以的 */
public interface ExecProtocol extends VersionedProtocol
{
    public static final long versionID = 1L;
    public String exec(String[] cmd);
}

ExecServer 实现了ExecProtocol,并通过RPC.getServer 获取RPCserver

import java.lang.String;
import java.io.IOException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.conf.Configuration;
import java.io.BufferedInputStream;
import java.io.InputStream;

public class ExecServer implements ExecProtocol
{
    private String host;
    private int port;
    private RPC.Server server;
    public ExecServer (String host, int port) throws IOException
    {
        this.host = host;
        this.port = port;
        /* 获取server实体 */
        this.server = RPC.getServer (this, host, port, new Configuration ());
    }

    public void run () throws IOException
    {
        /* 运行 */
        this.server.start ();
    }
    /* 实现 VersionedProtocol 接口 */
    public long getProtocolVersion (String s, long v)
    {
        return versionID;
    }
    /* 实现 ExecProtocol.exec接口 */
    public String exec (String[] cmd)
    {
        try {
            Process process = Runtime.getRuntime ().exec (cmd);
            process.waitFor();
            return loadStream (process.getInputStream ())
                   + loadStream (process.getErrorStream ());
        } catch (Exception e) {
            return e.getMessage ();
        }
    }
    /* 用于实现具体的exec接口 */
    private static String loadStream (InputStream stream) throws IOException
    {
        if (stream == null) {
            throw new java.io.IOException ("null stream");
        }
        stream = new java.io.BufferedInputStream (stream);
        int avail = stream.available ();
        byte[]data = new byte[avail];
        int numRead = 0;
        int pos = 0;
        do {
            if (pos + avail > data.length) {
                byte[]newData = new byte[pos + avail];
                System.arraycopy (data, 0, newData, 0, pos);
                data = newData;
            }
            numRead = stream.read (data, pos, avail);
            if (numRead >= 0) {
                pos += numRead;
            }
            avail = stream.available ();
        } while (avail > 0 && numRead >= 0);
        return new String (data, 0, pos, "US-ASCII");
    }

    public static void main (String[]args) throws IOException
    {
        ExecServer s = new ExecServer ("localhost", 1600);
        s.run ();
    }
}

 

client 端通过RPC.getProxy获取本地代理

import java.lang.String;
import java.io.IOException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.conf.Configuration;
import java.net.InetSocketAddress;
public class ExecClient
{
    public static void main (String[] args) throws IOException
    {
        /* 服务器地址 */
        InetSocketAddress addr = new InetSocketAddress ("localhost", 1600); 
        /* 通过RPC.getProxy 获取客户端代理类的实体 */
        ExecProtocol proxy = (ExecProtocol) RPC.getProxy(ExecProtocol.class,
                                ExecProtocol.versionID, addr,
                                new Configuration ());
        /* 输出 */
        System.out.print (proxy.exec(args));
    }
}

 

 

Hadoop RPC的实现

 

Hadoop.ipc.RPC的实现依赖于ipc.Server 和ipc.Client。Hadoop.ipc.RPC 将过程调用封装成具体的Invocation类,该类封装了调用的方法和参数,并用一个ObjectWritable类来定义返回的数据类型。ipc.Server 和 ipc.Client 负责网络数据的收发。

 

ipc.Server 的实现包含以下几个部分

Listener 监听网络端口,接受网络请求,然后交给Reader处理

Reader 非阻塞收取网络数据,并解析

Handler 调用抽象方法Server.call生成相应数据

Responder 非阻塞发送数据

Server.call() 由具体RPC.Server利用反射和代理实现,处理请求


这是一种典型的流水线结构,采用流水线的原因是Handler的操作可能导致阻塞,必须要有独立的线程或线程组处理Hander,线程之间的切换必不可少。


Listener独立为单独的线程大概是为了Reader之间负载的均衡,新加入的连接按照round robinReader之间进行负载均衡(实际上可能并不均衡,每条连接处理的请求以及持续的时间是不确定的)。同步时需要注意的一点是,Listener在向Reader的Selector中添加链接时,需要设置一个adding标记,并打断Selecter,这样做的目的是避免Reader的select() 操作和register()操作产生竞态。


















































分享到:
评论

相关推荐

    Hadoop Java接口+RPC代码实现

    1.java接口操作Hadoop文件系统(文件上传下载删除创建......2.RPC远程过程调用的java代码实现,便于理解Hadoop的RPC协议,具体使用方法可参考我的博客https://blog.csdn.net/qq_34233510/article/details/88142507

    hadooprpc机制&&将avro引入hadooprpc机制初探

    RPCServer实现了一种抽象的RPC服务,同时提供Call队列。RPCServer作为服务提供者由两个部分组成:接收Call调用和处理Call调用。接收Call调用负责接收来自RPCClient的调用请求,编码成Call对象后放入到Call队列中。这...

    Hadoop 2.X HDFS源码剖析

    《Hadoop 2.X HDFS源码剖析》以Hadoop 2.6.0源码为基础,深入剖析了HDFS 2.X中各个模块的实现细节,包括RPC框架实现、Namenode实现、Datanode实现以及HDFS客户端实现等。《Hadoop 2.X HDFS源码剖析》一共有5章,其中...

    Hadoop技术内幕 深入理解MapReduce架构设计与实现原理 扫描版 带简单书签

    《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》由Hadoop领域资深的实践者亲自执笔,首先介绍了MapReduce的设计理念和编程模型,然后从源代码的角度深入分析了RPC框架、客户端、JobTracker、TaskTracker和...

    hadoop技术内幕 深入解析mapreduce架构设计与实现原理

    《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》由Hadoop领域资深的实践者亲自执笔,首先介绍了MapReduce的设计理念和编程模型,然后从源代码的角度深入分析了RPC框架、客户端、JobTracker、TaskTracker和...

    hadoop技术内幕 深入解析mapreduce架构设计与实现原理.(董西成).全本

    《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》由Hadoop领域资深的实践者亲自执笔,首先介绍了MapReduce的设计理念和编程模型,然后从源代码的角度深入分析了RPC框架、客户端、JobTracker、TaskTracker和...

    新版Hadoop视频教程 段海涛老师Hadoop八天完全攻克Hadoop视频教程 Hadoop开发

    06-hadoop中的RPC框架实现机制.avi 07-hadoop中的RPC应用实例demo.avi 08-hdfs下载数据源码跟踪铺 垫.avi 09-hdfs下载数据源码分析-getFileSystem.avi 10-hdfs下载数据源码分析-getFileSystem2.avi 第三天 ...

    HDFS源码剖析带书签目录高清.zip

    《Hadoop 2.X HDFS源码剖析》以Hadoop 2.6.0源码为基础,深入剖析了HDFS 2.X中各个模块的实现细节,包括RPC框架实现、Namenode实现、Datanode实现以及HDFS客户端实现等。《Hadoop 2.X HDFS源码剖析》一共有5章,其中...

    hadoop技术内幕 深入解析mapreduce架构设计与实现原理.(董西成).全本1

    《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》由Hadoop领域资深的实践者亲自执笔,首先介绍了MapReduce的设计理念和编程模型,然后从源代码的角度深入分析了RPC框架、客户端、JobTracker、TaskTracker和...

    hadoop与spring结合

    利用hadoop的rpc包简单与spring结合,实现了一个简单的分布式。

    java_RPC_hadoop.zip

    java模拟hadoop的RPC(Remote Procedure Call)通讯,连接和心跳

    hadoop段海涛老师八天实战视频

    06-hadoop中的RPC框架实现机制.avi 07-hadoop中的RPC应用实例demo.avi 08-hdfs下载数据源码跟踪铺垫.avi 09-hdfs下载数据源码分析-getFileSystem.avi 10-hdfs下载数据源码分析-getFileSystem2.avi 第三天 ...

    《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》

    《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》由Hadoop领域资深的实践者亲自执笔,首先介绍了MapReduce的设计理念和编程模型,然后从源代码的角度深入分析了RPC框架、客户端、JobTracker、TaskTracker和...

    Hadoop技术内幕:深入解析MapReduce架构与实现原理

    深入 剖析 了 Hadoop MapReduce 中 各个 组件 的 实现 细节, 包括 RPC 框架、 JobTracker 实现、 TaskTracker 实现、 Task 实现 和 作业 调度 器 实现 等。 书中 不仅 详细 介绍 了 MapReduce 各个 组件 的 内部 ...

    hdfs源码.zip

    2.3 Hadoop RPC实现 63 2.3.1 RPC类实现 63 2.3.2 Client类实现 64 2.3.3 Server类实现 76 第3章 Namenode(名字节点) 88 3.1 文件系统树 88 3.1.1 INode相关类 89 3.1.2 Feature相关类 102 3.1.3 ...

    hadoop的经典讲义

    hadoop Common – 是hadoop的核心,包括文件系统、远程调用RPC的序列化函数。 HDSF : 提供高吞吐量的可靠分布式文件系统是 GFS的开源实现。 •Hadoop的文件系统。必须通过hadoop fs 命令来读取。支持分布式。 ...

    Hadoop技术内幕_深入解析MapReduce架构设计与实现原理

    首先介绍了MapReduce的设计理念和编程模型,然后从源代码的角度深入分析了RPC框架、客户端、JobTracker、TaskTracker和Task等MapReduce运行时环境的架构设计与实现原理,最后从实际应用的角度深入讲解了Hadoop的性能...

    hadoop技术内幕

    《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》由Hadoop领域资深的实践者亲自执笔,首先介绍了MapReduce的设计理念和编程模型,然后从源代码的角度深入分析了RPC框架、客户端、JobTracker、TaskTracker和...

    Apress - Pro Hadoop

     HDFS通信部分使用org.apache.hadoop.ipc,可以很快使用RPC.Server.start()构造一个节点,具体业务功能还需自己实现。针对HDFS的业务则为数据流的读写,NameNode/DataNode的通信等。  MapReduce主要在org.apache....

Global site tag (gtag.js) - Google Analytics