整体架构图

  1. configServer: Server在configServer注册hsf服务,Client在ConfigServer订阅hsf服务信息,configServer会将hsf服务相关信息推送给client
  2. Redis:Client和Server会定时推送信息给redis,作为HSF服务治理平台的数据
  3. Diamond:存储Client和Server应用启动时所需要的配置信息

代码分析

1. 服务发布

<bean id="allocateProportionReadService" class="com.cainiao.ofp.fdcdump.service.api.AllocateProportionReadServiceImpl"/>    
    <bean class="com.taobao.hsf.app.spring.util.HSFSpringProviderBean" init-method="init">
        <property name="target" ref="allocateProportionReadService"/>
        <property name="serviceInterface" value="com.cainiao.ofp.fdcdump.common.api.AllocateProportionReadService"/>
        <property name="serviceVersion" value="${fdc.dump.provide.service.version}"/>
        <property name="serviceGroup" value="HSF"/>
        <property name="serviceName" value="allocateProportionReadService"/>
        <property name="delayedPublish" value="true"/>
        <property name="clientTimeout" value="5000"/>
    </bean>

hsf服务发布配置信息:

  1. target:hsf服务接口实现类,为ref类型
  2. serviceInterface:hsf服务接口类
  3. serviceVersion:hsf服务版本 ,分日常,预发,线上
  4. serviceGroup:服务组别
  5. delayedPublish:延迟注册服务,默认为true
  6. clientTimeout:hsf服务超时时间

从配置信息可以看出 HSFSpringProviderBean 为服务发布入口类

HSFSpringProviderBean部分代码

private final HSFApiProviderBean providerBean = new HSFApiProviderBean();
    public void init() throws Exception {
        if (this.providerBean.getInited().compareAndSet(false, true)) {
            LoggerInit.initHSFLog();
            AppInfoUtils.initAppName(this.providerBean.getMetadata());
            SpasInit.initSpas();
            this.providerBean.checkConfig();
            this.publishIfNotInSpringContainer();
        }
    }
    public void onApplicationEvent(ApplicationEvent event) {
        if (event instanceof ContextRefreshedEvent) {
            this.providerBean.publish();
            this.setAppInitedStatus();
        } else if (event instanceof ContextClosedEvent && AppInfoUtils.appRunning.compareAndSet(true, false)) {
            try {
                this.providerBean.shutdownHSFServer();
            } catch (HSFException var3) {
                LOGGER.error(LoggerHelper.getErrorCodeStr("hsf", "HSF-0037", "环境问题", ""), "Spring容器关闭,销毁HSF相关资源失败!", var3);
            }

            LOGGER.info("Spring容器关闭,设置应用初始化状态为未初始化!");
        }

    }

public void init()方法做了初始化幂等校验,然后初始化日志,校验配置信息,并判断服务是否在spring容器中,如果非spring容器直接发布服务。

public void onApplicationEvent(ApplicationEvent event)是HSFSpringProviderBean实现了ApplicationListener接口的方法,当容器初始化完成后,会调用该方法,可以从代码看到HSFSpringProviderBean通过providerBean进行服务发布,providerBean的类是HSFApiProviderBean,因为hsf服务的环境不止是spring环境,还可能会有其他环境,所以需要把信息给HSFApiProviderBean,由它统一进行操作。

HSFApiProviderBean部分代码

    private final ServiceMetadata metadata = new ServiceMetadata(true);
    public void publish() {
        if (this.isPublished.compareAndSet(false, true)) {
            try {
                ((ProcessService)HSFServiceContainer.getInstance(ProcessService.class)).publish(this.metadata);
            } catch (Exception var2) {
                LOGGER.error("", "接口[" + this.metadata.getInterfaceName() + "]版本[" + this.metadata.getVersion() + "]发布为HSF服务失败", var2);
                throw new RuntimeException(var2);
            }
        }
    }

可以看到publish()把服务发布交给了ProcessService进行处理,这个下面再分析ProcessService。HSFApiProviderBean有个ServiceMetadata变量,保存HSF服务的配置信息,配置信息会上传到configServer和Redis中。

ProcessService的实现类ProcessComponent部分代码

private final MetadataService metadataService = (MetadataService)HSFServiceContainer.getInstance(MetadataService.class);
    private final MetadataInfoStoreService metadataInfoStoreService;
    private final List<ProcessHookService> hookServices = HSFServiceContainer.getInstances(ProcessHookService.class);
    public void publish(ServiceMetadata metadata) throws HSFException {
        try {
            this.rpcProtocolService.registerProvider(metadata);
        } catch (HSFException var4) {
            LOGGER.error("", "RPC协议:方式发布HSF服务时出现错误,请确认服务:" + metadata.getUniqueName() + "的rpc属性的配置!");
            throw var4;
        }

        Iterator i$ = this.hookServices.iterator();

        ProcessHookService hookService;
        while(i$.hasNext()) {
            hookService = (ProcessHookService)i$.next();
            hookService.prePublish(metadata);
        }

        if (metadata.isReadyToPublish()) {
            if (!metadata.getGroup().equalsIgnoreCase(metadata.getDefaultGroup())) {
                this.metadataService.unregister(metadata);
            }

            if (this.metadataService.publish(metadata)) {
                LOGGER.info("接口[" + metadata.getUniqueName() + "]组别[" + metadata.getGroup() + "]发布为HSF服务成功!");
            }
        } else {
            LOGGER.info("接口[" + metadata.getUniqueName() + "]组别[" + metadata.getGroup() + "]服务使用了延迟发布功能,服务未发布!");
        }

        i$ = this.hookServices.iterator();

        while(i$.hasNext()) {
            hookService = (ProcessHookService)i$.next();
            hookService.afterPublish(metadata);
        }

        this.metadataInfoStoreService.store(metadata);
    }

从代码可以看出通过rpcProtocolService进行服务发布

ProcessComponent的ProcessHookService列表主要是让用户在HSF服务各个阶段进行额外操作

public interface ProcessHookService {
    void afterConsume(ServiceMetadata var1);

    void afterPublish(ServiceMetadata var1);

    void preConsume(ServiceMetadata var1);

    void prePublish(ServiceMetadata var1);
}

ProcessComponent的MetadataService用于在发布服务后将ServiceMetadata信息上传至configServer,实现类为MetadataComponent,MetadataComponent类把信息上传交给类CsMetadataAddressService

CsMetadataAddressService信息上传核心代码

public void publish(ServiceMetadata metadata) {
        String serviceUniqueName = metadata.getUniqueName() + metadata.getConfigStyle();
        Object var3 = this.lock;
        synchronized(this.lock) {
            List<String> centers = metadata.getConfigserverCenter();
            if (centers != null && centers.size() > 0) {
                boolean isPub = false;
                Iterator i$ = centers.iterator();

                while(i$.hasNext()) {
                    String center = (String)i$.next();
                    Map<String, Publisher<String>> centerPublishers = (Map)this.publishers.get(center);
                    if (centerPublishers == null) {
                        centerPublishers = new HashMap();
                        this.registrations.put(center, new HashMap());
                        this.publishers.put(center, centerPublishers);
                    }

                    if (this.httpPublish()) {
                        Map<String, Publisher<String>> centerHttpPublishers = (Map)this.httpPublishers.get(center);
                        if (centerHttpPublishers == null) {
                            Map<String, Publisher<String>> centerHttpPublishers = new HashMap();
                            this.httpRegistrations.put(center, new HashMap());
                            this.httpPublishers.put(center, centerHttpPublishers);
                        }
                    }

                    if (!((Map)centerPublishers).containsKey(serviceUniqueName)) {
                        isPub = true;
                        Publisher<String> publisher = this.doPublish(metadata, center);
                        ((Map)centerPublishers).put(serviceUniqueName, publisher);
                        if (this.httpPublish()) {
                            Publisher<String> httpPublisher = this.doHttpPublish(metadata, center);
                            ((Map)this.httpPublishers.get(center)).put(serviceUniqueName, httpPublisher);
                        }
                    }
                }

                if (isPub) {
                    this.metadatas.add(metadata);
                }
            } else {
                Map<String, Publisher<String>> centerPublishers = (Map)this.publishers.get("DEFAULT");
                if (centerPublishers == null) {
                    centerPublishers = new HashMap();
                    this.registrations.put("DEFAULT", new HashMap());
                    this.publishers.put("DEFAULT", centerPublishers);
                }

                if (this.httpPublish()) {
                    Map<String, Publisher<String>> centerHttpPublishers = (Map)this.httpPublishers.get("DEFAULT");
                    if (centerHttpPublishers == null) {
                        Map<String, Publisher<String>> centerHttpPublishers = new HashMap();
                        this.httpRegistrations.put("DEFAULT", new HashMap());
                        this.httpPublishers.put("DEFAULT", centerHttpPublishers);
                    }
                }

                if (!((Map)centerPublishers).containsKey(serviceUniqueName)) {
                    this.metadatas.add(metadata);
                    Publisher<String> publisher = this.doPublish(metadata);
                    ((Map)centerPublishers).put(serviceUniqueName, publisher);
                    if (this.httpPublish()) {
                        Publisher<String> httpPublisher = this.doHttpPublish(metadata);
                        ((Map)this.httpPublishers.get("DEFAULT")).put(serviceUniqueName, httpPublisher);
                    }
                }
            }

        }
    }

可以看出上传数据是先获取ConfigserverCenter上传列表,如果列表为空使用默认上传列表,遍历上传列表,判断是centerPublishers还是centerHttpPublishers,然后放入对应类型的Publisher哈希列表,然后开始上传数据。

ProcessComponent的MetadataInfoStoreService用于ServiceMetadata信息定时上传至redis,用于hsfops平台进行服务治理

MetadataInfoStoreService实现类MetadataInfoStoreServiceRedis上传数据代码

    public void store(ServiceMetadata metadata) {
        if (metadata.isProvider()) {
            HSFRuntimeInfoPublisherScheduler.scheduleForOnce(new MetadataInfoStoreServiceRedis.ClassInfoRunner(metadata), 10L, TimeUnit.SECONDS);
        }

        if (processed.compareAndSet(false, true)) {
            HSFRuntimeInfoPublisherScheduler.scheduleAtFixedRate(new MetadataInfoStoreServiceRedis.RuntimeInfoPublisher(), 10L, 1440L, TimeUnit.MINUTES);
            LOGGER.info("HSF-RuntimeInfo-Publisher started.");
        }
    }

可以看出每隔1440s会上传一次信息至redis

2 服务订阅

服务订阅xml配置

<bean id="dicRcRouteService" class="com.taobao.hsf.app.spring.util.HSFSpringConsumerBean"
          init-method="init">
        <property name="interfaceName">
            <value>com.cainiao.dicrc.api.service.DicrcRouteService</value>
        </property>
        <property name="version" >
            <value>${fdc.consume.dicrc.service.version}</value>
        </property>
        <property name="clientTimeout">
            <value>${fdc.consume.dicrc.service.timeout}</value>
        </property>
    </bean>

服务订阅和服务发布架构相似,把操作交给HSFApiConsumerBean,

public void init() throws Exception {
        if (!this.inited.compareAndSet(false, true)) {
            LOGGER.warn(LoggerHelper.getErrorCodeStr("hsf", "HSF-0020", "业务问题", "HSF服务:" + this.metadata.getUniqueName() + " 重复初始化!"));
        } else {
            AppInfoUtils.initAppName(this.metadata);
            LoggerInit.initHSFLog();
            if (HSFServiceTargetUtil.isGeneric(this.metadata.getGeneric())) {
                this.metadata.setIfClazz(GenericService.class);
            } else if (this.metadata.getIfClazz() == null) {
                StringBuilder errorMsg = new StringBuilder();
                errorMsg.append("ConsumerBean中指定的接口类不存在[");
                errorMsg.append(this.metadata.getInterfaceName()).append("].");
                throw new IllegalArgumentException(errorMsg.toString());
            }

            if (this.asyncallMethods != null) {
                Iterator i$ = this.asyncallMethods.iterator();

                while(i$.hasNext()) {
                    String desc = (String)i$.next();
                    this.parseAsyncFunc(desc);
                }
            }

            this.metadata.initUniqueName();
            ProcessService processService = (ProcessService)HSFServiceContainer.getInstance(ProcessService.class);

            try {
                this.metadata.setTarget(processService.consume(this.metadata));
                LOGGER.warn("成功生成对接口为[" + this.metadata.getInterfaceName() + "]版本为[" + this.metadata.getVersion() + "]的HSF服务调用的代理!");
            } catch (Exception var5) {
                LOGGER.error("", "生成对接口为[" + this.metadata.getInterfaceName() + "]版本为[" + this.metadata.getVersion() + "]的HSF服务调用的代理失败", var5);
                throw var5;
            }

            int waitTime = this.metadata.getMaxWaitTimeForCsAddress();
            if (waitTime > 0) {
                try {
                    this.metadata.getCsAddressCountDownLatch().await((long)waitTime, TimeUnit.MILLISECONDS);
                } catch (InterruptedException var4) {
                    ;
                }
            }

        }
    }

其中核心代码是processService.consume(this.metadata),通过该操作获取到hsf服务代理对象

processService实现类 ProcessComponent部分代码

public Object consume(ServiceMetadata metadata) throws HSFException {
        if (ApplicationModel.instance().getConsumedServiceModel(metadata.getUniqueName()) != null) {
            return ApplicationModel.instance().getConsumedServiceModel(metadata.getUniqueName()).getProxyObject();
        } else {
            Iterator i$ = this.hookServices.iterator();

            while(i$.hasNext()) {
                ProcessHookService hookService = (ProcessHookService)i$.next();
                hookService.preConsume(metadata);
            }

            List<Class<?>> interfaces = new ArrayList(3);
            if (metadata.getIfClazz() != null) {
                interfaces.add(metadata.getIfClazz());
            }

            if (metadata.isSupportEcho()) {
                interfaces.add(EchoService.class);
            }

            if (!GenericService.class.equals(metadata.getIfClazz())) {
                interfaces.add(GenericService.class);
            }

            Class<?>[] interfacesArray = new Class[interfaces.size()];
            interfaces.toArray(interfacesArray);
            ProcessComponent.HSFServiceProxy proxy = new ProcessComponent.HSFServiceProxy(metadata, interfacesArray, !"javassist".equalsIgnoreCase(metadata.getProxyStyle()));
            Object proxyObj = proxy.getInstance();
            this.metadataService.subscribe(metadata);
            Iterator i$ = this.hookServices.iterator();

            while(i$.hasNext()) {
                ProcessHookService hookService = (ProcessHookService)i$.next();
                hookService.afterConsume(metadata);
            }

            this.metadataInfoStoreService.store(metadata);
            return proxyObj;
        }
    }
 public static class HSFServiceProxy implements InvocationHandler {
        private static final RPCProtocolTemplateService rpcProtocolService = (RPCProtocolTemplateService)HSFServiceContainer.getInstance(RPCProtocolTemplateService.class);
        private final ServiceMetadata serviceConsumerMetadata;
        private final ConsumerServiceModel serviceModel;
        private final Invoker<?> invokerChain;
        private final Object instance;
        private final Method equalsMethod;
        private final Method toStringMethod;
        private final Method hashCodeMethod;

        public HSFServiceProxy(final ServiceMetadata metadata, Class<?>[] classes, boolean isJava) {
            this.serviceConsumerMetadata = metadata;
            Method equalsMethod1 = null;
            Method toStringMethod1 = null;
            Method hashCodeMethod1 = null;
            if (isJava) {
                this.instance = Proxy.newProxyInstance(this.serviceConsumerMetadata.getIfClazz().getClassLoader(), classes, this);

                try {
                    Field hashCodeFeild = this.instance.getClass().getDeclaredField("m0");
                    hashCodeFeild.setAccessible(true);
                    hashCodeMethod1 = (Method)hashCodeFeild.get(this.instance);
                    Field equalsFeild = this.instance.getClass().getDeclaredField("m1");
                    equalsFeild.setAccessible(true);
                    equalsMethod1 = (Method)equalsFeild.get(this.instance);
                    Field toStringFeild = this.instance.getClass().getDeclaredField("m2");
                    toStringFeild.setAccessible(true);
                    toStringMethod1 = (Method)toStringFeild.get(this.instance);
                } catch (Exception var10) {
                    ProcessComponent.LOGGER.warn(var10.getMessage(), new Object[]{var10});
                }
            } else {
                this.instance = JavassistProxy.getProxy(classes).newInstance(this);
            }

            ApplicationModel.instance().initConsumerService(metadata.getUniqueName(), new ConsumerServiceModel(metadata, this.instance, isJava));
            this.serviceModel = ApplicationModel.instance().getConsumedServiceModel(metadata.getUniqueName());
            this.hashCodeMethod = hashCodeMethod1;
            this.toStringMethod = toStringMethod1;
            this.equalsMethod = equalsMethod1;
            if (StringUtils.isNotBlank(metadata.getFilter())) {
                ProcessComponent.LOGGER.info("[Filter Enable:]" + metadata + "###" + metadata.getFilter());
                Invoker<?> trueRpcInvoke = new Invoker() {
                    public URL getUrl() {
                        return URL.valueOf("127.0.0.1?reference.filter=" + metadata.getFilter());
                    }

                    public boolean isAvailable() {
                        return true;
                    }

                    public void destroy() {
                    }

                    public Class<?> getInterface() {
                        return HSFServiceProxy.this.serviceConsumerMetadata.getIfClazz();
                    }

                    public Result invoke(Invocation invocation) throws RpcException {
                        try {
                            RpcContext.getContext().getAttachments().putAll(invocation.getAttachments());
                            Object result = HSFServiceProxy.this.trueInvoke(HSFServiceProxy.this.serviceModel.getMethodModel(((ExtendRpcInvocation)invocation).getMethod()), invocation.getArguments());
                            return new RpcResult(result);
                        } catch (Throwable var3) {
                            return new RpcResult(var3);
                        }
                    }
                };
                this.invokerChain = this.buildInvokerChain(trueRpcInvoke, "reference.filter", "ONLYTHEONE");
            } else {
                this.invokerChain = null;
            }

        }
}

ConsumedServiceModel是一个concurrentHashMap,存放着订阅的hsf服务,首先通过metadata的uniqueName去ConsumedServiceModel查询是否存在该服务,如果存在直接返回服务代理对象。如果不存在,则将代理对象的生成交给内部类HSFServiceProxy,HSFServiceProxy先判断是否是java,是的话通过java的Proxy生成对象,类加载器由metadata提供,如果不是java,则交给JavassistProxy。然后将生成的对象放入ConsumedServiceModel,并生成一个Invoker对象,负责hsf服务方法的调用。

1.3 服务的调用

上文提到了服务订阅最后阶段会生成Invoker对象,负责服务的调用,

processService中服务调用代码

public Result invoke(Invocation invocation) throws RpcException {
                        try {
                            RpcContext.getContext().getAttachments().putAll(invocation.getAttachments());
                            Object result = HSFServiceProxy.this.trueInvoke(HSFServiceProxy.this.serviceModel.getMethodModel(((ExtendRpcInvocation)invocation).getMethod()), invocation.getArguments());
                            return new RpcResult(result);
                        } catch (Throwable var3) {
                            return new RpcResult(var3);
                        }
                    }

        private Object trueInvoke(ConsumerMethodModel methodModel, Object[] args) throws HSFException, Throwable {
            AtomicInteger maxPoolSize = this.serviceConsumerMetadata.getCurConsumerMaxPoolSize();
            if (maxPoolSize == null) {
                return rpcProtocolService.invokeWithMethodObject(methodModel, args);
            } else {
                int currentSize = maxPoolSize.decrementAndGet();

                Object var5;
                try {
                    if (currentSize < 0) {
                        String errorMsg = MessageFormat.format("消费端线程池已满,service[{0}],consumerMaxPoolSize[{1}]", this.serviceConsumerMetadata.getUniqueName(), this.serviceConsumerMetadata.getConsumerMaxPoolSize());
                        ProcessComponent.LOGGER.warn(errorMsg);
                        throw new RuntimeException(new HSFException(errorMsg));
                    }

                    var5 = rpcProtocolService.invokeWithMethodObject(methodModel, args);
                } finally {
                    maxPoolSize.incrementAndGet();
                }

                return var5;
            }
        }

invoker对象的invoke方法先获取到调用的方法与参数,然后调用trueInvoke方法,如果消费端线程池不限制线程数,则直接调用,否则每次调用钱线程池最大线程数减一,调用完毕再加一,变量maxPoolSize为AtomicInteger,保证线程安全。

Hessian协议

经常使用hsf服务治理平台,会发现在hsf服务基本信息里面会有一个序列化方式hessian。

hessian序列化 对于简单数据类型,例如Integer a = 20;它会序列化成I20这样的字节流,I代表数据类型,20代表数据。对于复杂对象的属性,通过java反射机制,把属性当成Map来进行序列化。如果在序列化过程中,发现有多个相同的对象,会序列化一个,其他对象直接引用该序列化对象。而java序列化会将对象所有信息都序列化,且继承关系关联的信息也会序列化,所以hessian序列化相对于java序列化速度更快,字节流更少,但是java序列化包含了对象所有信息,所以相对更可靠一些

results matching ""

    No results matching ""