整体架构图
- configServer: Server在configServer注册hsf服务,Client在ConfigServer订阅hsf服务信息,configServer会将hsf服务相关信息推送给client
- Redis:Client和Server会定时推送信息给redis,作为HSF服务治理平台的数据
- Diamond:存储Client和Server应用启动时所需要的配置信息
代码分析
1. 服务发布
<bean id="allocateProportionReadService" class="com.cainiao.ofp.fdcdump.service.api.AllocateProportionReadServiceImpl"/>
<bean class="com.taobao.hsf.app.spring.util.HSFSpringProviderBean" init-method="init">
<property name="target" ref="allocateProportionReadService"/>
<property name="serviceInterface" value="com.cainiao.ofp.fdcdump.common.api.AllocateProportionReadService"/>
<property name="serviceVersion" value="${fdc.dump.provide.service.version}"/>
<property name="serviceGroup" value="HSF"/>
<property name="serviceName" value="allocateProportionReadService"/>
<property name="delayedPublish" value="true"/>
<property name="clientTimeout" value="5000"/>
</bean>
hsf服务发布配置信息:
- target:hsf服务接口实现类,为ref类型
- serviceInterface:hsf服务接口类
- serviceVersion:hsf服务版本 ,分日常,预发,线上
- serviceGroup:服务组别
- delayedPublish:延迟注册服务,默认为true
- clientTimeout:hsf服务超时时间
从配置信息可以看出 HSFSpringProviderBean 为服务发布入口类
HSFSpringProviderBean部分代码
private final HSFApiProviderBean providerBean = new HSFApiProviderBean();
public void init() throws Exception {
if (this.providerBean.getInited().compareAndSet(false, true)) {
LoggerInit.initHSFLog();
AppInfoUtils.initAppName(this.providerBean.getMetadata());
SpasInit.initSpas();
this.providerBean.checkConfig();
this.publishIfNotInSpringContainer();
}
}
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof ContextRefreshedEvent) {
this.providerBean.publish();
this.setAppInitedStatus();
} else if (event instanceof ContextClosedEvent && AppInfoUtils.appRunning.compareAndSet(true, false)) {
try {
this.providerBean.shutdownHSFServer();
} catch (HSFException var3) {
LOGGER.error(LoggerHelper.getErrorCodeStr("hsf", "HSF-0037", "环境问题", ""), "Spring容器关闭,销毁HSF相关资源失败!", var3);
}
LOGGER.info("Spring容器关闭,设置应用初始化状态为未初始化!");
}
}
public void init()方法做了初始化幂等校验,然后初始化日志,校验配置信息,并判断服务是否在spring容器中,如果非spring容器直接发布服务。
public void onApplicationEvent(ApplicationEvent event)是HSFSpringProviderBean实现了ApplicationListener接口的方法,当容器初始化完成后,会调用该方法,可以从代码看到HSFSpringProviderBean通过providerBean进行服务发布,providerBean的类是HSFApiProviderBean,因为hsf服务的环境不止是spring环境,还可能会有其他环境,所以需要把信息给HSFApiProviderBean,由它统一进行操作。
HSFApiProviderBean部分代码
private final ServiceMetadata metadata = new ServiceMetadata(true);
public void publish() {
if (this.isPublished.compareAndSet(false, true)) {
try {
((ProcessService)HSFServiceContainer.getInstance(ProcessService.class)).publish(this.metadata);
} catch (Exception var2) {
LOGGER.error("", "接口[" + this.metadata.getInterfaceName() + "]版本[" + this.metadata.getVersion() + "]发布为HSF服务失败", var2);
throw new RuntimeException(var2);
}
}
}
可以看到publish()把服务发布交给了ProcessService进行处理,这个下面再分析ProcessService。HSFApiProviderBean有个ServiceMetadata变量,保存HSF服务的配置信息,配置信息会上传到configServer和Redis中。
ProcessService的实现类ProcessComponent部分代码
private final MetadataService metadataService = (MetadataService)HSFServiceContainer.getInstance(MetadataService.class);
private final MetadataInfoStoreService metadataInfoStoreService;
private final List<ProcessHookService> hookServices = HSFServiceContainer.getInstances(ProcessHookService.class);
public void publish(ServiceMetadata metadata) throws HSFException {
try {
this.rpcProtocolService.registerProvider(metadata);
} catch (HSFException var4) {
LOGGER.error("", "RPC协议:方式发布HSF服务时出现错误,请确认服务:" + metadata.getUniqueName() + "的rpc属性的配置!");
throw var4;
}
Iterator i$ = this.hookServices.iterator();
ProcessHookService hookService;
while(i$.hasNext()) {
hookService = (ProcessHookService)i$.next();
hookService.prePublish(metadata);
}
if (metadata.isReadyToPublish()) {
if (!metadata.getGroup().equalsIgnoreCase(metadata.getDefaultGroup())) {
this.metadataService.unregister(metadata);
}
if (this.metadataService.publish(metadata)) {
LOGGER.info("接口[" + metadata.getUniqueName() + "]组别[" + metadata.getGroup() + "]发布为HSF服务成功!");
}
} else {
LOGGER.info("接口[" + metadata.getUniqueName() + "]组别[" + metadata.getGroup() + "]服务使用了延迟发布功能,服务未发布!");
}
i$ = this.hookServices.iterator();
while(i$.hasNext()) {
hookService = (ProcessHookService)i$.next();
hookService.afterPublish(metadata);
}
this.metadataInfoStoreService.store(metadata);
}
从代码可以看出通过rpcProtocolService进行服务发布
ProcessComponent的ProcessHookService列表主要是让用户在HSF服务各个阶段进行额外操作
public interface ProcessHookService {
void afterConsume(ServiceMetadata var1);
void afterPublish(ServiceMetadata var1);
void preConsume(ServiceMetadata var1);
void prePublish(ServiceMetadata var1);
}
ProcessComponent的MetadataService用于在发布服务后将ServiceMetadata信息上传至configServer,实现类为MetadataComponent,MetadataComponent类把信息上传交给类CsMetadataAddressService
CsMetadataAddressService信息上传核心代码
public void publish(ServiceMetadata metadata) {
String serviceUniqueName = metadata.getUniqueName() + metadata.getConfigStyle();
Object var3 = this.lock;
synchronized(this.lock) {
List<String> centers = metadata.getConfigserverCenter();
if (centers != null && centers.size() > 0) {
boolean isPub = false;
Iterator i$ = centers.iterator();
while(i$.hasNext()) {
String center = (String)i$.next();
Map<String, Publisher<String>> centerPublishers = (Map)this.publishers.get(center);
if (centerPublishers == null) {
centerPublishers = new HashMap();
this.registrations.put(center, new HashMap());
this.publishers.put(center, centerPublishers);
}
if (this.httpPublish()) {
Map<String, Publisher<String>> centerHttpPublishers = (Map)this.httpPublishers.get(center);
if (centerHttpPublishers == null) {
Map<String, Publisher<String>> centerHttpPublishers = new HashMap();
this.httpRegistrations.put(center, new HashMap());
this.httpPublishers.put(center, centerHttpPublishers);
}
}
if (!((Map)centerPublishers).containsKey(serviceUniqueName)) {
isPub = true;
Publisher<String> publisher = this.doPublish(metadata, center);
((Map)centerPublishers).put(serviceUniqueName, publisher);
if (this.httpPublish()) {
Publisher<String> httpPublisher = this.doHttpPublish(metadata, center);
((Map)this.httpPublishers.get(center)).put(serviceUniqueName, httpPublisher);
}
}
}
if (isPub) {
this.metadatas.add(metadata);
}
} else {
Map<String, Publisher<String>> centerPublishers = (Map)this.publishers.get("DEFAULT");
if (centerPublishers == null) {
centerPublishers = new HashMap();
this.registrations.put("DEFAULT", new HashMap());
this.publishers.put("DEFAULT", centerPublishers);
}
if (this.httpPublish()) {
Map<String, Publisher<String>> centerHttpPublishers = (Map)this.httpPublishers.get("DEFAULT");
if (centerHttpPublishers == null) {
Map<String, Publisher<String>> centerHttpPublishers = new HashMap();
this.httpRegistrations.put("DEFAULT", new HashMap());
this.httpPublishers.put("DEFAULT", centerHttpPublishers);
}
}
if (!((Map)centerPublishers).containsKey(serviceUniqueName)) {
this.metadatas.add(metadata);
Publisher<String> publisher = this.doPublish(metadata);
((Map)centerPublishers).put(serviceUniqueName, publisher);
if (this.httpPublish()) {
Publisher<String> httpPublisher = this.doHttpPublish(metadata);
((Map)this.httpPublishers.get("DEFAULT")).put(serviceUniqueName, httpPublisher);
}
}
}
}
}
可以看出上传数据是先获取ConfigserverCenter上传列表,如果列表为空使用默认上传列表,遍历上传列表,判断是centerPublishers还是centerHttpPublishers,然后放入对应类型的Publisher哈希列表,然后开始上传数据。
ProcessComponent的MetadataInfoStoreService用于ServiceMetadata信息定时上传至redis,用于hsfops平台进行服务治理
MetadataInfoStoreService实现类MetadataInfoStoreServiceRedis上传数据代码
public void store(ServiceMetadata metadata) {
if (metadata.isProvider()) {
HSFRuntimeInfoPublisherScheduler.scheduleForOnce(new MetadataInfoStoreServiceRedis.ClassInfoRunner(metadata), 10L, TimeUnit.SECONDS);
}
if (processed.compareAndSet(false, true)) {
HSFRuntimeInfoPublisherScheduler.scheduleAtFixedRate(new MetadataInfoStoreServiceRedis.RuntimeInfoPublisher(), 10L, 1440L, TimeUnit.MINUTES);
LOGGER.info("HSF-RuntimeInfo-Publisher started.");
}
}
可以看出每隔1440s会上传一次信息至redis
2 服务订阅
服务订阅xml配置
<bean id="dicRcRouteService" class="com.taobao.hsf.app.spring.util.HSFSpringConsumerBean"
init-method="init">
<property name="interfaceName">
<value>com.cainiao.dicrc.api.service.DicrcRouteService</value>
</property>
<property name="version" >
<value>${fdc.consume.dicrc.service.version}</value>
</property>
<property name="clientTimeout">
<value>${fdc.consume.dicrc.service.timeout}</value>
</property>
</bean>
服务订阅和服务发布架构相似,把操作交给HSFApiConsumerBean,
public void init() throws Exception {
if (!this.inited.compareAndSet(false, true)) {
LOGGER.warn(LoggerHelper.getErrorCodeStr("hsf", "HSF-0020", "业务问题", "HSF服务:" + this.metadata.getUniqueName() + " 重复初始化!"));
} else {
AppInfoUtils.initAppName(this.metadata);
LoggerInit.initHSFLog();
if (HSFServiceTargetUtil.isGeneric(this.metadata.getGeneric())) {
this.metadata.setIfClazz(GenericService.class);
} else if (this.metadata.getIfClazz() == null) {
StringBuilder errorMsg = new StringBuilder();
errorMsg.append("ConsumerBean中指定的接口类不存在[");
errorMsg.append(this.metadata.getInterfaceName()).append("].");
throw new IllegalArgumentException(errorMsg.toString());
}
if (this.asyncallMethods != null) {
Iterator i$ = this.asyncallMethods.iterator();
while(i$.hasNext()) {
String desc = (String)i$.next();
this.parseAsyncFunc(desc);
}
}
this.metadata.initUniqueName();
ProcessService processService = (ProcessService)HSFServiceContainer.getInstance(ProcessService.class);
try {
this.metadata.setTarget(processService.consume(this.metadata));
LOGGER.warn("成功生成对接口为[" + this.metadata.getInterfaceName() + "]版本为[" + this.metadata.getVersion() + "]的HSF服务调用的代理!");
} catch (Exception var5) {
LOGGER.error("", "生成对接口为[" + this.metadata.getInterfaceName() + "]版本为[" + this.metadata.getVersion() + "]的HSF服务调用的代理失败", var5);
throw var5;
}
int waitTime = this.metadata.getMaxWaitTimeForCsAddress();
if (waitTime > 0) {
try {
this.metadata.getCsAddressCountDownLatch().await((long)waitTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException var4) {
;
}
}
}
}
其中核心代码是processService.consume(this.metadata),通过该操作获取到hsf服务代理对象
processService实现类 ProcessComponent部分代码
public Object consume(ServiceMetadata metadata) throws HSFException {
if (ApplicationModel.instance().getConsumedServiceModel(metadata.getUniqueName()) != null) {
return ApplicationModel.instance().getConsumedServiceModel(metadata.getUniqueName()).getProxyObject();
} else {
Iterator i$ = this.hookServices.iterator();
while(i$.hasNext()) {
ProcessHookService hookService = (ProcessHookService)i$.next();
hookService.preConsume(metadata);
}
List<Class<?>> interfaces = new ArrayList(3);
if (metadata.getIfClazz() != null) {
interfaces.add(metadata.getIfClazz());
}
if (metadata.isSupportEcho()) {
interfaces.add(EchoService.class);
}
if (!GenericService.class.equals(metadata.getIfClazz())) {
interfaces.add(GenericService.class);
}
Class<?>[] interfacesArray = new Class[interfaces.size()];
interfaces.toArray(interfacesArray);
ProcessComponent.HSFServiceProxy proxy = new ProcessComponent.HSFServiceProxy(metadata, interfacesArray, !"javassist".equalsIgnoreCase(metadata.getProxyStyle()));
Object proxyObj = proxy.getInstance();
this.metadataService.subscribe(metadata);
Iterator i$ = this.hookServices.iterator();
while(i$.hasNext()) {
ProcessHookService hookService = (ProcessHookService)i$.next();
hookService.afterConsume(metadata);
}
this.metadataInfoStoreService.store(metadata);
return proxyObj;
}
}
public static class HSFServiceProxy implements InvocationHandler {
private static final RPCProtocolTemplateService rpcProtocolService = (RPCProtocolTemplateService)HSFServiceContainer.getInstance(RPCProtocolTemplateService.class);
private final ServiceMetadata serviceConsumerMetadata;
private final ConsumerServiceModel serviceModel;
private final Invoker<?> invokerChain;
private final Object instance;
private final Method equalsMethod;
private final Method toStringMethod;
private final Method hashCodeMethod;
public HSFServiceProxy(final ServiceMetadata metadata, Class<?>[] classes, boolean isJava) {
this.serviceConsumerMetadata = metadata;
Method equalsMethod1 = null;
Method toStringMethod1 = null;
Method hashCodeMethod1 = null;
if (isJava) {
this.instance = Proxy.newProxyInstance(this.serviceConsumerMetadata.getIfClazz().getClassLoader(), classes, this);
try {
Field hashCodeFeild = this.instance.getClass().getDeclaredField("m0");
hashCodeFeild.setAccessible(true);
hashCodeMethod1 = (Method)hashCodeFeild.get(this.instance);
Field equalsFeild = this.instance.getClass().getDeclaredField("m1");
equalsFeild.setAccessible(true);
equalsMethod1 = (Method)equalsFeild.get(this.instance);
Field toStringFeild = this.instance.getClass().getDeclaredField("m2");
toStringFeild.setAccessible(true);
toStringMethod1 = (Method)toStringFeild.get(this.instance);
} catch (Exception var10) {
ProcessComponent.LOGGER.warn(var10.getMessage(), new Object[]{var10});
}
} else {
this.instance = JavassistProxy.getProxy(classes).newInstance(this);
}
ApplicationModel.instance().initConsumerService(metadata.getUniqueName(), new ConsumerServiceModel(metadata, this.instance, isJava));
this.serviceModel = ApplicationModel.instance().getConsumedServiceModel(metadata.getUniqueName());
this.hashCodeMethod = hashCodeMethod1;
this.toStringMethod = toStringMethod1;
this.equalsMethod = equalsMethod1;
if (StringUtils.isNotBlank(metadata.getFilter())) {
ProcessComponent.LOGGER.info("[Filter Enable:]" + metadata + "###" + metadata.getFilter());
Invoker<?> trueRpcInvoke = new Invoker() {
public URL getUrl() {
return URL.valueOf("127.0.0.1?reference.filter=" + metadata.getFilter());
}
public boolean isAvailable() {
return true;
}
public void destroy() {
}
public Class<?> getInterface() {
return HSFServiceProxy.this.serviceConsumerMetadata.getIfClazz();
}
public Result invoke(Invocation invocation) throws RpcException {
try {
RpcContext.getContext().getAttachments().putAll(invocation.getAttachments());
Object result = HSFServiceProxy.this.trueInvoke(HSFServiceProxy.this.serviceModel.getMethodModel(((ExtendRpcInvocation)invocation).getMethod()), invocation.getArguments());
return new RpcResult(result);
} catch (Throwable var3) {
return new RpcResult(var3);
}
}
};
this.invokerChain = this.buildInvokerChain(trueRpcInvoke, "reference.filter", "ONLYTHEONE");
} else {
this.invokerChain = null;
}
}
}
ConsumedServiceModel是一个concurrentHashMap,存放着订阅的hsf服务,首先通过metadata的uniqueName去ConsumedServiceModel查询是否存在该服务,如果存在直接返回服务代理对象。如果不存在,则将代理对象的生成交给内部类HSFServiceProxy,HSFServiceProxy先判断是否是java,是的话通过java的Proxy生成对象,类加载器由metadata提供,如果不是java,则交给JavassistProxy。然后将生成的对象放入ConsumedServiceModel,并生成一个Invoker对象,负责hsf服务方法的调用。
1.3 服务的调用
上文提到了服务订阅最后阶段会生成Invoker对象,负责服务的调用,
processService中服务调用代码
public Result invoke(Invocation invocation) throws RpcException {
try {
RpcContext.getContext().getAttachments().putAll(invocation.getAttachments());
Object result = HSFServiceProxy.this.trueInvoke(HSFServiceProxy.this.serviceModel.getMethodModel(((ExtendRpcInvocation)invocation).getMethod()), invocation.getArguments());
return new RpcResult(result);
} catch (Throwable var3) {
return new RpcResult(var3);
}
}
private Object trueInvoke(ConsumerMethodModel methodModel, Object[] args) throws HSFException, Throwable {
AtomicInteger maxPoolSize = this.serviceConsumerMetadata.getCurConsumerMaxPoolSize();
if (maxPoolSize == null) {
return rpcProtocolService.invokeWithMethodObject(methodModel, args);
} else {
int currentSize = maxPoolSize.decrementAndGet();
Object var5;
try {
if (currentSize < 0) {
String errorMsg = MessageFormat.format("消费端线程池已满,service[{0}],consumerMaxPoolSize[{1}]", this.serviceConsumerMetadata.getUniqueName(), this.serviceConsumerMetadata.getConsumerMaxPoolSize());
ProcessComponent.LOGGER.warn(errorMsg);
throw new RuntimeException(new HSFException(errorMsg));
}
var5 = rpcProtocolService.invokeWithMethodObject(methodModel, args);
} finally {
maxPoolSize.incrementAndGet();
}
return var5;
}
}
invoker对象的invoke方法先获取到调用的方法与参数,然后调用trueInvoke方法,如果消费端线程池不限制线程数,则直接调用,否则每次调用钱线程池最大线程数减一,调用完毕再加一,变量maxPoolSize为AtomicInteger,保证线程安全。
Hessian协议
经常使用hsf服务治理平台,会发现在hsf服务基本信息里面会有一个序列化方式hessian。
hessian序列化 对于简单数据类型,例如Integer a = 20;它会序列化成I20这样的字节流,I代表数据类型,20代表数据。对于复杂对象的属性,通过java反射机制,把属性当成Map来进行序列化。如果在序列化过程中,发现有多个相同的对象,会序列化一个,其他对象直接引用该序列化对象。而java序列化会将对象所有信息都序列化,且继承关系关联的信息也会序列化,所以hessian序列化相对于java序列化速度更快,字节流更少,但是java序列化包含了对象所有信息,所以相对更可靠一些