HSF的基本结构

HSF作为远程服务框架,具备客户端负载均衡、服务发现以及去中心化等多种分布式特性。HSF内部领域分为三个维度:规则、地址和服务,三者相互“独立”且又“关联”。

规则:

规则是HSF(服务端和客户端)在运作的时候状态信息的体现,比如:服务端发布的服务在某些IP端将会以哪个分组进行发布。这些信息在应用启动时能够结合应用的特性针对应用或者某个服务接口进行个性化配置,是HSF“状态”的体现,目前HSF是使用Diamond这个产品完成规则的存储与通知的。

地址:

地址是调用的载体,它表示提供服务的一台机器(ip),服务端需要将地址发送到注册中心让客户端能够进行服务发现,客户端需要通过注册中心订阅一个服务的地址。服务与地址的对应关系是多对多的关系,一个服务可以由多个地址提供,一个地址可以提供多种服务。当一个服务有了新地址(机器)或者减少了地址(机器)时,注册中心会通知这个服务的订阅方将地址增加或者减少,这个注册中心就是configserver,它负责存储地址信息以及地址变更的推送。

服务:

服务是调用方和提供方交流的依凭,一般是一个接口,表示一个业务行为以及相关的数据含义。HSF提供了服务提供方的暴露以及调用方Client的生成。通过使用HSFApiProviderBean能够暴露一个服务,将机器的地址注册到configserver,并且能够通过12200端口进行服务提供,通过HSFApiConsumerBean能够包装出一个客户端,它是服务接口的一个代理,并且它从configserver上订阅了服务的地址列表,能够在这个列表上完成随机调用,做到负载均衡与HA。

http://www.sohu.com/a/141490021_268033

图 3-5 HSF 服务框架工作原理示意图

http://www.cnblogs.com/dongqingswt/archive/2013/01/22/2872068.html

1、HSF单元测试环境的启动:

HSF组开发同事提供了一个HSFEasyStarter,支持HSF环境的快速启动。它的原理是: 从淘宝内网下载一个taobao-hsf.sar目录,这个目录下面有META-INF,lib,plugins三个目录. 其中lib目录下存放了hsf容器的jar (hsf.container-1.4.8.7.jar)和随着eclipse一起发布的osgi标准的实现(org.eclipse.osgi-framework-3.4.2.R34x_v20080826.jar)

hsf.container-1.4.8.7.jar hsf.thirdcontainer.jboss-1.4.8.7.jar hsf.thirdcontainer.tomcat-1.4.8.7.jar org.eclipse.osgi-framework-3.4.2.R34x_v20080826.jar

HSF容器启动的时候,会基于equinox的Declare Service的方式进行Service的定义,另外,plugins目录下的plugin(理解为一个bundle)里面打包了一个properties文件, 里面指明了bundle中需要导出给外部应用使用的HSF的一些Class,并且HSFEasyStarter在启动时使用的一个HSFMiniContainer对双亲委派的Classloader机制进行了改写,在App classloader和ext classloader之间插入了一个urlclassloader,这个Urlclassloader就从前面说的bundle导出的类去findClass,代码片段如下:

 * 在SystemAppClassLoader和ExtClassLoader链之间插入一个优先于classpath参数的URLClassLoader

相信每个Java程序员都知道Classpath是什么。Java的类加载器(Classloader)是一种分层结构,如下图所示,分为引导类加载器(Bootstrap Class Loader),扩展类加载器(Extension Class Loader),系统类加载器(System Class Loader)以及用户定义的类加载器(User-defined Class Loader)。引导类加载器在JVM时负责加载rt.jar里面的类,扩展类加载器负责加载在扩展目录下的jar文件中的类,系统类加载器则在Classpath上面搜索类加载器,用户定义的类加载器则从用户指定的路径(比如一个网络URI)加载类。在该类加载体系中,一个类加载器总是先去上层类加载器加载类,一层一层迭代,当无法找到需要的类时在自己加载。

  在这种类加载机制中,存在以下几个问题:1)类版本冲突:当类路径上存在同一个类的不同版本时,如果类加载器找到一个版本,则不再搜索加载下一个版本;2)无法确定jar之间的依赖关系:现有的JAR标准中缺乏对与Jar文件之间依赖关系的定义支持,因此只有在运行时间无法找到所需的类时,才会打出java.lang.ClassNotFoundException,但这通常不能有效帮助开发人员解决问题;3)信息隐藏:如果一个jar在类路径上并且被加载,那么所有该jar中的公共类(public class)都会被加载,无法避免某些类被隐藏从而不被加载。尽管在J2EE中改进了类加载机制,可以支持以war或者ear应用为单元进行加载,但是这些问题还是没有被很好地解决,并且热部署效果让人忧心。

  OSGi就是为了克服这些问题而生,却又不局限与这些问题,对Java开发人员而言的确是好多顿大餐。OSGi是一个动态的Java模块(Module)系统,它规定了如何定义一个Module以及这些模块之间如何交互。每个OSGi的Java模块被称为一个bundle。每个bundle都有自己的类路径,可以精确规定哪些Java包和类可以被导出,需要导入哪些其它bundle的哪些类和包,并从而指明bundle之间的依赖关系。另外bundle可以被在运行时间安装,更新,卸载并且不影响整个应用。通过这种方式,分层的类加载机制变成了网状的类加载机制。在应用程序启动之前,OSGi就可以检测出来是否所有的依赖关系被满足,并在不满足时精确报出是哪些依赖关系没被满足。

当然,OSGi不止这些。我会在后续的系列文章中系统介绍什么是OSGi以及其内部的秘密。

2.生产者消费者bean

昨天讲到了HSF容器的启动 。 HSF容器启动以后,通过osgi的bundleContext拿到了一组需要暴露给hsf容器外部使用的类,以及基于这些类的urlclassLoader. 这里面有两个关键的类:HSFSpringConsumerBean 和HSFSpringProviderBean ,

HSFSpringProviderBean负责启动RPC服务器 ,并把HSF服务信息发布到配置中心(淘宝的config server) .

HSFConsumerBean负责从配置中心获取HSF接口的服务器列表,生成接口的代理类,由这个代理类承担RPC请求。

整个系统结构图如下 :

整个过程描述如下:

1、HSFSpringProviderBean启动HSF服务器, 采用了Apache Mina做网络通信框架(之前的blog中有描述),淘宝的tbremoting基于apache mina,提供更接近业务层的网络通信服务。

    /**
     * 一般来说只允许在延迟发布时候,调用; 有潜在的并发问题(Diamond通知线程与spring容器启动线程的并发发布metadata)。
     */
    public void register() {
        // 是否允许重复的不停调用。 如果不允許,需要用compareAndSet readyToPublish
        if (metadata.setReadyToPublish(true)) {
            metadata.exportSync();
        }
    }

2、发HSF服务的元信息,包括服务器的ip,端口,是否是异步调用等信息发布到淘宝的配置中心(ConfigServer)。

public List<ServiceURL> exportSync() {
        Future<List<ServiceURL>> future = getExportReferExecutorService().submit(new Callable<List<ServiceURL>>() {
            @Override
            public List<ServiceURL> call() throws Exception {
                return export();
            }
        });

        try {
            List<ServiceURL> rawURLs = future.get();
            if (rawURLs != null) {
                LOGGER.info("Interface[" + getUniqueName() + "]Group[" + getGroup() + "]Publish HSF service to ConfigServer success!");
            } else {
                if (!readyToPublish.get()) {
                    LOGGER.info("Interface[" + getUniqueName() + "]Group[" + getGroup() + "]use delay publish,wait for online/onlineSmart command to publish!");
                }
            }
            return rawURLs;
        } catch (Throwable e) {
            throw new RuntimeException(e);
        }
    }

3、HSFSpringConsumerBean生成一个HSF接口的代理类, 然后从configserver获取HSF的服务地址列表,从diamond server(淘宝的另一个配置中心,支持配置信息的实时推送) 获取流控和服务器路由信息。

4、在HSF接口调用时通过第3步生成的接口的代理类进行一个RPC调用(序列化方式可以采用java和hessian) ,调用服务器的选择算法目前采用的是随机分配的方式。(多重反射,同步)

/**
 * @author sixie.xyn on 2016/8/12.
 */
@Name("jdk")
public class JdkProxyFactory implements ProxyFactory {

    @Override
    public Object getProxy(ServiceMetadata metadata, Class<?>... interfacesArray) {
        JdkProxyInvocationHandler jdkProxyInvocationHandler = new JdkProxyInvocationHandler(metadata);
        Object instance = Proxy.newProxyInstance(metadata.getIfClazz().getClassLoader(), interfacesArray, jdkProxyInvocationHandler);
        jdkProxyInvocationHandler.init(instance);
        return instance;
    }

最终的Future回调hanlder

后面结合源码详细的展开描述:1、configserver配置信息的发布和订阅。 2、基于tbremoting进行的rpc的详细分析。

netty

http://www.cnblogs.com/dongqingswt/archive/2013/01/28/2880624.html

HSF网络层

如果将一次RPC调用在网络层的交互进行职能分解,那么将会涉及到以下的必不可少的参与者:客户端、服务端、连接、请求对象和响应对象,我们将其分别命名为Client、Server、Connection、Request和Response。一次远程调用就是Client通过Connection将Request发送到Server并等待Response返回的过程。

Client直接将Request发送给Connection,并且将发送的数据记录进行保存,这点最为关键:保存的数据记录为一个`Map`,其中key为请求id,而value为一个Future对象,它等待着Server的数据响应,而数据响应Response一定会携带Client发送的的请求id,这使得请求和响应能够对应起来。

Client对于Request的发送,Server对于Response的发送,也就是进行网络I/O操作,这些操作都是异步非阻塞的,可以按照一个个消息来理解,只是这些消息上有一个特殊的id,来完成Request与Response的对应。

HSF2.2的网络层设计

在开始介绍HSF2.2网络层之前,先看这样一句话,Don't Call Me, I'll Call You,它表述的意思是我会在合适的时间叫你。有类似概念的框架有很多,比如说spring,也有相关的设计模式,比如说reactor模式。

网络层的设计最本质的诉求就是要识别出以下的场景:连接的建立与销毁,数据的发送与接收。业界知名的网络通信框架都是基于reactor模式的,比如:netty,而HSF2.2的网络层设计就是考虑如何将网络框架的事件体系映射到HSF的网络层上,HSF2.2网络层的基本结构如下:

网络层的设计最本质的诉求就是要识别出以下的场景:连接的建立与销毁,数据的发送与接收。业界知名的网络通信框架都是基于reactor模式的,比如:netty,而HSF2.2的网络层设计就是考虑如何将网络框架的事件体系映射到HSF的网络层上,HSF2.2网络层的基本结构如下:

HsfNetwork是网络层的核心抽象,NetworkAdapter负责使用Netty并将网络事件映射会HsfNetwork,框架功能层面HsfNetworkImp通过扩展HsfNetwork加以实现。

网络层核心抽象

网络层最重要的是连接,因此HSF2.2网络层将连接视为一等公民,并且统一了HSF2.1中客户端和服务端的不同定义,只有一个最基础的抽象

Stream

,它代表着一个通道,连接两点之间,使其能够相互通信,完成p2p的工作。

由于客户端需要发起心跳,且写出数据后要得到返回,因此根据客户端和服务端的不同,在

Stream的基础上,抽象了ClientStreamServerStream,HSF2.2的核心抽象如下图所示:

Client应对于客户端,它可以根据注册中心推送的地址ServiceURL,创建一个ClientStream,并通过它发送请求。Server

是服务端,它根据用户指定的ip和端口完成本地绑定,比如绑定本机网卡的12200端口。

事件监听器

有了核心抽象,接下来是网络事件的处理,相比HSF2.1直接基于Netty进行实现,HSF2.2优先围绕网络事件的不同类型、客户端与服务端的不同特性定义了4种监听器,以监听器的方式来实现功能:
监听器名称 功能描述 作用域
ClientStreamLifecycleListener 客户端连接生命周期监听器 1.当前Client连接ClientStream建立失败 2.当前Client建立了一个连接ClientStream3.当前Client关闭了一个连接ClientStream4.当前的Client上的连接ClientStream处于空闲状态 5.当前Client上的连接ClientStream出现了异常 客户端
ClientStreamMessageListener 客户端连接消息监听器 1.客户端Client开始通过ClientStream写出一个数据 2.客户端Client通过ClientStream写出一个数据完毕 3.客户端Client通过ClientStream写出一个数据失败 4.客户端Client通过ClientStream收到一个数据 客户端
ServerStreamLifecycleListener 服务端连接生命周期监听器 1.服务端Server绑定成功 2.服务端Server绑定失败 3.服务端Server关闭 4.当前的Server接受建立了一个连接ServerStream5.当前的Server关闭了一个连接ServerStream6.当前的ServerServerStream处于空闲状态 7.当前的Server上的ServerStream出现了异常 服务端
ServerStreamMessageListener 服务端连接消息监听器 1.服务端Server开始通过ServerStream写出一个数据 2.服务端Server通过ServerStream写出一个数据完毕 3.服务端Server通过ServerStream写出一个数据失败 4.服务端Server通过ServerStream收到一个数据 服务端
经过网络事件监听器的设计,HSF2.2的网络层基本能够应对任何扩展需求。下面以客户端两个监听器为例,看一下具体的方法,这样对监听器的具体功能有一个认识:

简单的介绍一下

ClientStreamLifecycleListener

中的

connectSuccess

方法,它表示当前客户端

Client

与服务端建立成功了一个连接

ClientStream

,而

idle

方法,表示客户端

Client

的一个连接

ClientStream

出现了空闲(一般半分钟左右没有数据往来)。

ClientStreamMessageListener

则代表了客户端

Client

通过

ClientStream

将请求发送到服务端,发送、发送成功以及发送失败都会通过监听器进行告知。

有了完备的网络事件监听器机制,我们就可以轻松的扩展HSF网络层功能,而HSF本身的服务调用,也是基于这些扩展搭建的。

HSF2.2的功能实现

客户端发送心跳

以前HSF2.1为了实现心跳,选择新启动一个线程,不断的遍历客户端所有的连接,发送心跳请求,这就像拽光弹一样,将心跳混在业务请求中一并发送。心跳的本质是维护连接的活性,做到TCP防呆,正常有业务请求发送时,理论是不需要心跳的,因此HSF2.2可以基于客户端连接生命周期监听器的

idle

做到心跳发送。

@Tag("tcp")
@Order(2)
public class SendHeartbeat extends ClientStreamLifecycleListenerAdapter {
    private static final Logger log = LoggerInit.LOGGER;

    @Override
    public void idle(final Client client, final ClientStream stream) {
        //send heartbeat
        try {
            RequestPacket packet = (RequestPacket) Heartbeats.requestOf(stream.connectionID().getServiceURL());
            final ListenableFuture<RPCResult> listenableFuture = stream.write(new PacketRequestWrapper(packet));

            listenableFuture.addListener(new Runnable() {
                @Override
                public void run() {
                    // 略
                }
            });

        } catch (Exception e) {
            log.warn("heartbeat on response exception: " + e.getMessage() + stream);
        }
    }


}
 可以看到心跳发送的功能,只需要实现ClientStreamLifecycleListener的idle方法即可,也就是说当Client上的一个连接ClientStream出现了空闲(30秒无数据往来),将会通知到实现了idle的监听器,这样就可以再这里完成心跳的发送了。服务端接口客户端的请求    服务端接收到客户端传递的请求,会将请求派发到业务线程池中进行执行,并将结果通过ServerStream写回。在HSF2.2的架构下,这个异步的过程实现起来更加简单。
@Tag({"http", "tcp"})
@Order(20)
public class HandleRequest extends ServerStreamMessageListenerAdapter {

    @Override
    public void received(Server server, ServerStream stream, RequestPacket requestPacket) {
        ServerHandler serverHandler = ServerHandlerSelector.getInstance().select(requestPacket.protocolType());
        serverHandler.process(requestPacket, stream);
    }
}
 可以看到实现了ServerStreamMessageListener的received方法后,当有请求事件通过ServerStream发送到Server,就会通知该监听器,这样就只需要对请求进行处理写回即可。
 依靠HSF2.2网络层的新架构,HSF现有的功能,都已监听器的方式进行了实现,每个功能只需要考虑如何适配监听器体系,相互之间不会影响,功能更加的专一高效。

HSF调用层设计与扩展

HSF调用层的基本工作是理解用户调用的上下文(调用的接口、方法、参数以及特定需要携带的调用信息),将上下文发送给远端,然后等待远端的返回数据。这个过程涉及到以下几个点:
1.调用与响应,HSF是如何定义一次调用以及响应的;
2.调用的执行,一次调用如同在一个pipeline上执行一样,HSF是如何定义执行的流程;
3.如何扩展调用流程。

如果是一个基本的RPC调用,从参数的形成,经由网络传输到对端,完成方法调用,再将结果携带回来。这个基本的过程是大部分RPC框架都具备的。但是框架本身会接入很多的扩展,比如:调用记录、流量控制、调用的路由等,这些扩展对于应用的调用方是透明的,但是它们的作用却反映在大RPC概念上,而这些扩展需要无缝的集成到框架中。

HSF2.2支持调用方对于框架的扩展,能够在运行时组装扩展,使得扩展知晓每次调用。

对比一些开源的RPC框架,也支持调用扩展,比如gRPC的interceptor。但是interceptor它需要在构造时进行传递,缺乏主动发现的特性,而且只支持一个,多个interceptor的支持需要依赖其他扩展做到。
扩展结构良好的RPC框架,目前主要是dubbo。

HSF的调用层完成的主要功能包括:
  • 定义请求和响应
  • 定义调用执行
  • 定义调用扩展点
  • 装配调用链
  • 调用的执行

    通过扩展HSF调用层,可以做到以下(但不限)工作:

  • AOP拦截调用端的所有请求和响应

  • AOP拦截服务端的所有请求和响应

  • 针对请求和响应的内容做自定义处理

调用层基本介绍

请求Invocation

HSF同其他RPC框架一样,对于请求也有自己的定义,请求定义:

响应RPCResult

当调用发送到服务端后,服务端完成处理,就会生成响应,当然也会将该响应通过网络写回给客户端。在本节最开始的例子中,返回的

result

就是响应,但是由于涉及到了远程调用,仅仅是返回值是不足够的。

HSF对于响应的定义如下:

Future的设计和使用

ListenableFuture和SettableFuture

在HSF中定义了

ListenableFuture

SettableFuture

,可以通过

Futures

工具类方便的使用。一般意义上,

Future

的本质是wait/notify模式,而notify的前提是值被设置,所在

Futures

中使用

createSettableFuture

方法创建一个

SettableFuture

ListenableFuture

接口定义。

public interface ListenableFuture<V> extends Future<V> {

    /**
     * 添加一个{@link Runnable},当future完成时会被调用
     *
     * @param listener
     */
    void addListener(Runnable listener);
}

SettableFuture

    /**
     * <pre>
     * 设置值到当前的Future
     *
     * </pre>
     *
     * @param v
     * @return 如果设置成功返回true,反之当前Future已经设置或者被取消,返回false
     */
    boolean set(V v);
}

MapFuture

ListenableFuture

能够让我们监听

future

被设置的动作,但是无法监听设置的结果。因为一个

future

添加的Listener,这个Listener其实就是一个Runnable,它不能直接获取到

future

设置的值。

具备拦截功能的MapFuture

Futures

还提供了一个

map

方法,用来构建一个

MapFuture

,但是在其Listener执行前将会做一些初始化和清理操作。

/**
     * <pre>
     * 构造一个{@link ListenableFuture}包装,它将计算结果透过func进行处理,而ListenerWrapper将会在监听器执行前后
     * 调用对应的方法{@link WrappedListener.ListenerWrapper#before()}和{@link WrappedListener.ListenerWrapper#after()} ()}
     *
     * </pre>
     *
     * @param source          目标future
     * @param func            转换
     * @param listenerWrapper listener执行拦截
     * @param <T>             参数
     * @param <R>             结果
     * @return 包装后的Future
     */
    public static <T, R> ListenableFuture<R> map(ListenableFuture<T> source, Func1<T, R> func,
                                                 WrappedListener.ListenerWrapper listenerWrapper) {
        //only used in test
        return new DefaultMapFuture<T, R>(MoreExecutors.directExecutor(), source, func, listenerWrapper);
    }

该方法将返回一个

DefaultMapFuture

,它在之前的基础上,在调用func前,将会调用

listenerWrapper

before

方法,当func完成后,就已经设置了

DefaultMapFuture

的结果,最终再调用

listenerWrapper

after

方法。

可以简单的认为listenerWrapper是在结果被设置前的拦截

调用处理器

同步调用处理器

在HSF中保留了同步流水线模式,它对于调用处理器的定义如下:

可以看到

SyncInvocationHandler

中定义了

RPCResult invoke(Invocation invocation)

方法,这里就是典型的输入请求,得到响应的模式。

对于同步调用处理器的扩展方式就要使用

SyncInvocationHandlerInterceptor

,该拦截器能够拦截请求,如果能够处理可以再拦截器中完成请求处理,如果当前拦截器无法处理,则可以选择为派给后端。

异步调用处理器

之前提到HSF是一个异步调用框架,因为HSF在调用过程中,有多个参与者,比如:和I/O线程的交互。HSF与I/O之间的操作是异步的,可以理解为调用请求发送到I/O这个参与者时,网络将请求异步发送出去,然后就等着响应的回来。这时如果从调用者的角度去看,返回的自然就是一个异步计算(Future),而非真实的结果。

HSF对于异步调用处理器的定义如下:

可以从异步调用处理器的结构可以看出,

InvocationHandler

返回的是一个

ListenableFuture

<

RPCResult

>

,当

InvocationHandler

能够进行计算得出结果时,只需要返回

ListenableFuture

即可。

可以使用之前章节提供的
Futures
工具类进行
ListenableFuture
生成

同时异步调用处理器提供了

AbstractInvocationHandlerInterceptor

进行链式结构的支持,开发者直接继承它就可以。

调用层扩展点的基本结构

RPCFilter

需要被整合到调用处理器中,因此每个

RPCFilter

都需要被转换成为

InvocationHandler

RPCFilter

的基本结构如下图所示:

可以看到该结构的特点:
  • 每个 RPCFilter 都会被聚合到 RPCFilterNode 中,由 RPCFilterNode 来触发 RPCFilter 的调用
  • 每个 RPCFilterNode 都具备一个头结点和尾节点,因此由 RPCFilter 组成的调用链是一个双向链表
  • RPCFilterNode 实现了接口 InvocationHandler ,可以通过这个接口整合到调用处理器链中

HSF2.2的调用层实现

服务端初始化阶段调用链的拼接

服务端的调用链在

ServiceMetadata.init()

过程中对各个

AbstractInvocationHandlerInterceptor

节点进行创建和拼接。可以把这个阶段理解为对于

ReflectInvocationHandler

装饰

,需要在一个服务端反射调用处理前增加一些调用处理器拦截器。

在服务端的调用拦截器完成拼装,形成类似如下调用链:

服务端的

FilterInvocationHandler

进行构建时,会将通过应用服务加载器加载

ServerFilter

实例,并拼装起来。这里展示一下HSF常用的

ServerFilter

可以看到,不同的功能都是通过实现

ServerFilter

来完成,比如进行调用统计的

InvocationStatsServerFilter

,通过实现

ServerFilter

来进行服务端的调用统计。

results matching ""

    No results matching ""