使用ConfigServer发布/订阅数据
当服务编写好了,就可以通过HSFApiProviderBean将服务发布出去,让其他客户端调用到当前机器。
回想一下HSF的基本结构,服务消费方和服务提供方二者之间是通过保持长连接来完成调用的,在二者建立连接之前是不知道相互的存在的,而完成二者发现的关键在与服务注册中心,当一个消费方需要订阅一个服务时,注册中心会将这个服务的服务地址推送给消费方,这样消费方就能够知道有哪些地址可以调用了。
服务注册中心
:目前HSF主要使用ConfigServer作为服务注册中心的实现
服务发布
:指的是服务提供方将本机信息与本机提供的服务发布给注册中心(只有这样客户端才能够看到地址)
ConfigServer通过客户端将数据发送到服务端同时也通过服务端将这些数据变更推送到所有的客户端。因此ConfigServer的客户端具备了发布和订阅这两种特性。
可以看到ConfigServer进行通知的依据是
KEY
,在
KEY
的约束下,发布方发布数据,而订阅方能够收到针对于这个
KEY
的所有数据。
使用Publisher发布服务
使用ConfigServer客户端进行数据发布。
public class PublisherMain {
public static void main(String[] args) {
// 注册登记表
// 发布者名称与DataId,其中DataId是标识与订阅方之间关系的Key,这个是关键
PublisherRegistration<String> registration = new PublisherRegistration<String>("weipeng2k-publisher", "com.alibaba.OrderService:1.0.0");
registration.setGroup("HSF");
// 创建一个发布者
Publisher<String> register = PublisherRegistrar.register(registration);
Scanner scanner = new Scanner(System.in);
String line;
while ((line = scanner.nextLine()) != null) {
if (line.equals("quit")) {
break;
} else {
// 发布数据
System.out.println("Publish:" + line);
register.publish(line);
}
}
// ConfigServer客户端无法停止下来,只能如此
System.exit(1);
}
}
可以看到使用configserver注册一个PublisherRegistration,并通过PublisherRegistrar得到一个Publisher,并使用Publisher发布数据。
使用Subscriber订阅数据
使用ConfigServer客户端进行数据订阅。
public class SubscriberMain {
public static void main(String[] args) {
// 数据订阅表
// 订阅人的Id
SubscriberRegistration subscriberRegistration = new SubscriberRegistration("weipeng2k-subscriber", "com.alibaba.OrderService:1.0.0");
subscriberRegistration.setGroup("HSF");
Subscriber subscriber = SubscriberRegistrar.register(subscriberRegistration);
subscriber.setDataObserver(new SubscriberDataObserver() {
@Override
public void handleData(String dataId, List<Object> data) {
System.out.println("收到dataId:" + dataId);
System.out.println("数据:" + data);
System.out.println("运行在:" + Thread.currentThread());
}
});
}
}
该调用示例与数据发布十分类似,唯一的区别是需要注册一个
SubscriberDataObserver
,这个数据观察者将会把需要订阅的数据通知到这个观察者。
注册中心默认实现
HSF框架默认的注册中心实现是ConfigServer,对于ConfigServer的使用可以参考《使用ConfigServer发布/订阅数据》对应的章节。
这里简单的回顾一下ConfigServer客户端的基本接口:
PublisherRegistration
用来构造发布项,通过将
PublisherRegistration
注册给
PublisherRegistrar
返回
Publisher
,然后通过
Publisher
发布数据。
SubscriberRegistration
用来构造订阅项,通过将
SubscriberRegistration
注册给
SubscriberRegistrar
返回
Subscriber
,然后将
SubscriberDataObserver
(一个监听器)设置给
Subscriber
来监听数据变更。
发布和取消发布地址
发布地址的过程就是将数据发送到注册中心,首先看
Registry
的定义:void register(ServiceMetadata metadata, List<ServiceURL>serviceURLs);
。默认实现会将服务全限定名作为Key,生成PublisherRegistration
,随后将地址信息ServiceURL
发送到注册中心,同时会保存服务权限名到Publisher
的关系。
取消发布地址的过程比较简单,定义为
void unregister(ServiceMetadata metadata);
,当进行取消发布时,会根据发布时记录的关系,通过服务全限定名获取
Publisher
,然后调用
PublisherRegistrar.unregister(publisher)
来注销。
订阅和取消订阅服务
订阅服务的过程是通过服务全限定名通过注册中心客户端向注册中心订阅服务地址,定义为
void subscribe(Protocol protocol, ServiceMetadata metadata, RawAddressListener listener);
,这里出现了地址更新监听器
RawAddressListener
,原因在于地址通知的过程是异步的,所以需要注册一个监听器给注册中心客户端,让其通知。当服务全限定名对应的地址有变动时,会将全量数据推送到客户端,此时将会拿着数据通知注册的地址更新监听器
RawAddressListener
。
取消订阅服务的过程是根据服务全限定名找寻到地址更新监听器
RawAddressListener
将其状态设置成为不可用,注册中心客户端的定义为
void unsubscribe(Protocol protocol, ServiceMetadata metadata, RawAddressListener listener);
,取消订阅只是将地址更新监听器的状态变为不可用,这样地址就算有推送,也不会在HSF调用端生效。
自己看代码里面看到的是Client会进行
public class DefaultPublisher<T extends Serializable> extends DefaultDataClient implements Publisher<T> {
@Override
public void publish(T datum) {
if (null == datum) {
throw new IllegalArgumentException("cannot publish null");
}
if (isDisable())
throw new IllegalStateException("Unregistered publisher cannot be reused.");
try {
new ObjectOutputStream(new OutputStream() {
@Override
public void write(int b) throws IOException {}
}).writeObject(datum);
} catch (IOException e) {
throw new RuntimeException("Not serializable datum", e);
}
//发布数据长度控制
if (datum.getClass() == String.class) {
String tmp = (String) datum;
if (tmp != null && tmp.length() > ConfigClientPerfCtrl.dataContentLengthMax) {
throw new IllegalArgumentException("publisher content length bigger than " + ConfigClientPerfCtrl.dataContentLengthMax + " :" + tmp);
}
}
doPublish(datum);
}
private synchronized void doPublish(Object datum) {
data = datum;
revision = revision.getUpdatedRevision();
String dataId = getDataId();
String publisherId = getClientId();
if (datum == null) {
log.info("[Publish] " + dataId + " (rev." + revision + ", by " + publisherId + "): (NULL)");
} else {
log.info("[Publish] dataId=" + dataId + ", clientId=" + publisherId + ", datumId=" + registration.getDatumId() + ", rev=" + revision.getRevision() + ", env=" + env);
}
signalWorker();
}
同时启动项目的时候HSFSpringProviderBean也会注册Bean到ConfigServer
重点类:
ConfigServerRegistry
PublisherRegistrar
ConfigServer(本地有个不知道是不是,不过存的都是本地Map,也没看到有与远程通信)
配置中心Config Server
是一种基于订阅发布的通讯组件,发布者和订阅者通过groupId和dataId(统称为数据标识)联系,只有在groupId和dataId相同时,双方才能通讯。
配置中心是SOFA框架的一部分,paycore中没有对应的代码
Confreg Server 2.0结构
支付宝使用的是2.0结构。2.0结构相比1.0,最大不同之处是将confreg进行的垂直拆分,将confreg拆分成confregsession和confregdata两个角色
- confregsession是一种无状态、对等的集群,用长连接联系客户端和confregdata;
- confregdata通过dataid分片存储数据(即不对等),confregdata之间是没有业务关联的
- 发布者通过长连接向confregsession发送数据,confregsession先缓存如proxycache后,根据dataId将数据发送至对应的confregdata存储;confregdata将新的数据通过confregsession发送给订阅者
- 为解决2带来的整体故障率,当单台confregsession故障后,confregdata集群感知后销毁来自该confregsession的数据,客户端从地址列表发起重新连接,新的confregsession通过相同的hash将数据发往对应的confregdata,这样既分担了压力,有提高了稳定性
寻址过程
正常注册于收发过程
异常流程
- confregsession故障
- confregdata监听到confregsession断开,立即销毁该confregsession对应的sub和pub,尝试将新的数据推送给sub(见b)
- sub和pub监听到断开,立即尝试连接新的confregsession,开始新的注册、订阅;注意新的数据还会落入原来对应的confregdata中
- 数据恢复
- 故障的confregsession恢复连接后,sub、pub会在多次重启之后慢慢连接到该confregsession
- Confregdata故障
- 短时可以恢复的情况下,confregsession在感知到连接断开,会不断尝试重新连接,故障期间sub收不到pub消息,当连接恢复后,将proxycache的数据回放至cachedata,在有必要的情况下,将cachedata的数据全量推送给sub
- 长时间不恢复,启动failover,所有confregsession中属于该confregdata的数据全部送入failover的DB,此时failover充当发生故障的confregdata,其他confregdata的数据传送不变,待故障恢复,failover中的数据回放至原confregdata,failover关闭
Confreg Server 3.0
3.0中加入了LDC,在物理机房内分隔出多个逻辑zone,业务系统部署到每个zone内,每个zone部署一个配置中心,逻辑zone内的用户请求限制在zone内部。为了避免服务器浪费,Rzone部署关键业务,Gzone部署所有业务,包括关键业务;软负载限制了RPC只能在本地zone,为了实现zone之间的RPC调用,SOFA会优先在本zone内寻找,没有则去同IDC内的其他zone寻找
Q&A
Q: 为何需要2.0架构,而不使用1.0 A: 1.0架构中,存在容量瓶颈,主要体现在客户端连接数、服务器内存容量上;单点风险,单机的配置方式形成单点,配置中心宕机会影响整站业务
Q: 1.0结构也支持水平扩展,支付宝为何发展2.0 A: 1.0结构虽然支持水平扩展,但同步机制复杂,集群虽然分担了客户端连接压力,但服务器仍存储了全量数据;支付宝SOA化程度很高,相应需要cachedata很多,这是1.0无法解决的问题