入口
还记得springboot会注册DubboBootstrapApplicationListener监听事件,在这个事件中onContextRefreshedEvent调用了dubboBootstrap.start()方法。说明了在Spring容器启动的时候执行了服务导出的过程
@Override
public void onApplicationContextEvent(ApplicationContextEvent event) {
if (event instanceof ContextRefreshedEvent) {
onContextRefreshedEvent((ContextRefreshedEvent) event);
} else if (event instanceof ContextClosedEvent) {
onContextClosedEvent((ContextClosedEvent) event);
}
}
private void onContextRefreshedEvent(ContextRefreshedEvent event) {
dubboBootstrap.start();
}
DubboBootstrap#start
public DubboBootstrap start() {
// CAS操作 保证只执行一次,避免并发
if (started.compareAndSet(false, true)) {
ready.set(false);
// 初始化服务发布的相关配置
initialize();
// 1. export Dubbo Services
// 服务导出
exportServices();
// Not only provider register
if (!isOnlyRegisterProvider() || hasExportedServices()) {
// 2. export MetadataService
exportMetadataService();
//3. Register the local ServiceInstance if required
registerServiceInstance();
}
referServices();
if (asyncExportingFutures.size() > 0) {
new Thread(() -> {
this.awaitFinish();
ready.set(true);
}).start();
} else {
ready.set(true);
}
}
return this;
}
DubboBootstrap#initialize
public void initialize() {
// CAS 操作保证只会执行一次初始化操作
if (!initialized.compareAndSet(false, true)) {
return;
}
// 初始化拓展配置
ApplicationModel.initFrameworkExts();
// 启动配置中心
startConfigCenter();
// 加载远程配置,这里主要是处理RegistryConfig,ProtocolConfig
loadRemoteConfigs();
// 检查全局配置
checkGlobalConfigs();
// @since 2.7.8
// 开启元数据中心,继dubbo注册中心和配置中心后引入了元数据中心(也是起到存储配置的功能)
// 元数据中心和配置中心一样当没有配置的时候会使用注册中心作为元数据中心
startMetadataCenter();
// 初始化元数据服务
initMetadataService();
// 初始化元数据服务导出
initMetadataServiceExports();
// 初始化事件监听器
initEventListener();
}
initFrameworkExts
public static void initFrameworkExts() {
// 通过SPI拿到FrameworkExt接口的实现类,这里可以取到ConfigManager,Environment,ServiceRepository三个实现类
Set<FrameworkExt> exts = ExtensionLoader.getExtensionLoader(FrameworkExt.class).getSupportedExtensionInstances();
// 挨个调用三个实现类的initialize,其中只有Environment有实现initialize方法,其余都是空方法
for (FrameworkExt ext : exts) {
ext.initialize();
}
}
startConfigCenter
启动配置中心
private void startConfigCenter() {
// 如果没有配置配置中心,使用注册中心作为配置中心
useRegistryAsConfigCenterIfNecessary();
// 获取配置中心
Collection<ConfigCenterConfig> configCenters = configManager.getConfigCenters();
// check Config Center
if (CollectionUtils.isEmpty(configCenters)) {
ConfigCenterConfig configCenterConfig = new ConfigCenterConfig();
configCenterConfig.refresh();
if (configCenterConfig.isValid()) {
configManager.addConfigCenter(configCenterConfig);
configCenters = configManager.getConfigCenters();
}
} else {
for (ConfigCenterConfig configCenterConfig : configCenters) {
// ======刷新配置,这个方法很重要首先refresh是AbstractConfig的方法,所以每个Config类都会调用这里=============
configCenterConfig.refresh();
ConfigValidationUtils.validateConfigCenterConfig(configCenterConfig);
}
}
if (CollectionUtils.isNotEmpty(configCenters)) {
CompositeDynamicConfiguration compositeDynamicConfiguration = new CompositeDynamicConfiguration();
for (ConfigCenterConfig configCenter : configCenters) {
// 从远程配置中心获取数据(应用配置,全局配置),==============prepareEnvironment重要==============
compositeDynamicConfiguration.addConfiguration(prepareEnvironment(configCenter));
}
environment.setDynamicConfiguration(compositeDynamicConfiguration);
}
// 刷新所有的XxConfig中的属性,除开ServiceConfig
configManager.refreshAll();
}
prepareEnvironment
远程从配置中心获取配置并根据是全局配置还是应用配置存入不同Map中,并且指定优先级,默认configCenterFirst 为 true
- externalConfigurationMap -> 全局配置
- appExternalConfigurationMap -> 应用配置
private DynamicConfiguration prepareEnvironment(ConfigCenterConfig configCenter) {
if (configCenter.isValid()) {
if (!configCenter.checkOrUpdateInited()) {
return null;
}
// 获取配置中心
DynamicConfiguration dynamicConfiguration = getDynamicConfiguration(configCenter.toUrl());
// 从配置中心获取全局配置,这里需要注意dubbo-admin存储到zk路径是否与dubbo自带获取路径是否一致,存在bug
String configContent = dynamicConfiguration.getProperties(configCenter.getConfigFile(), configCenter.getGroup());
String appGroup = getApplication().getName();
String appConfigContent = null;
if (isNotEmpty(appGroup)) {
// 从配置中心获取应用配置,这里需要注意dubbo-admin存储到zk路径是否与dubbo自带获取路径是否一致,存在bug
appConfigContent = dynamicConfiguration.getProperties
(isNotEmpty(configCenter.getAppConfigFile()) ? configCenter.getAppConfigFile() : configCenter.getConfigFile(),
appGroup
);
}
// 指定优先级,后续配置排序会用到,默认configCenterFirst = true
// 可通过配置dubbo.config-center.highest-priority: true|false指定
environment.setConfigCenterFirst(configCenter.isHighestPriority());
// externalConfigurationMap -> 全局配置
environment.updateExternalConfigurationMap(parseProperties(configContent));
// appExternalConfigurationMap -> 应用配置
environment.updateAppExternalConfigurationMap(parseProperties(appConfigContent));
return dynamicConfiguration;
}
return null;
}
DubboBootstrap#exportServices
服务导出
这里我们直接进入export方法
public synchronized void export() {
if (!shouldExport()) {
return;
}
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
bootstrap.initialize();
}
// 检查更新子配置 =======这里面有关于配置优先级的源码,很重要==========================
checkAndUpdateSubConfigs();
//初始化服务元数据信息
serviceMetadata.setVersion(getVersion());
serviceMetadata.setGroup(getGroup());
serviceMetadata.setDefaultGroup(getGroup());
serviceMetadata.setServiceType(getInterfaceClass());
serviceMetadata.setServiceInterfaceName(getInterface());
serviceMetadata.setTarget(getRef());
// 是否需要延迟加载,需要就启动一个定时器启动
if (shouldDelay()) {
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
// ======================重点导出服务==============
doExport();
}
exported();
}
checkAndUpdateSubConfigs
检查和更新配置,是配置变得即全也新
private void checkAndUpdateSubConfigs() {
// ServiceConfig中的某些属性如果是空的,那么就从ProviderConfig中获取,主要是protocols、configCenter、registryIds、protocolIds
completeCompoundConfigs();
checkDefault();
checkProtocol();
// ConfigInitializer接口扩展点 ---初始化配置前调用initServiceConfig
List<ConfigInitializer> configInitializers = ExtensionLoader.getExtensionLoader(ConfigInitializer.class)
.getActivateExtension(URL.valueOf("configInitializer://"), (String[]) null);
configInitializers.forEach(e -> e.initServiceConfig(this));
// 如果protocol不是只有injvm协议,表示服务调用不是只在本机jvm里面调用,那就需要注册中心
if (!isOnlyInJvm()) {
// 没有注册中心,报错
checkRegistry();
}
// 刷新ServiceConfig配置,前面refreshAll的时候是没有刷新ServiceConfig,这里刷新
// 这里面就有配置优先级逻辑
this.refresh();
// 当前服务对应的实现类是GenericService,没有指定接口,泛化接口
if (ref instanceof GenericService) {
interfaceClass = GenericService.class;
if (StringUtils.isEmpty(generic)) {
// 是泛化接口
generic = Boolean.TRUE.toString();
}
} else {
// 加载接口
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
// 刷新MethodConfig,并判断MethodConfig中对应的方法在接口中是否存在
checkInterfaceAndMethods(interfaceClass, getMethods());
// 实现类是不是该接口类型
checkRef();
// 不是泛化接口
generic = Boolean.FALSE.toString();
}
// =================代码太冗长,省略================
// 1.处理local,本地调用
// 2.处理stud,本地存根
// ================================================
checkStubAndLocal(interfaceClass);
ConfigValidationUtils.checkMock(interfaceClass, this);
ConfigValidationUtils.validateServiceConfig(this);
// ConfigPostProcessor接口扩展点 ---初始化配置结束后调用postProcessServiceConfig,可在这修改配置
postProcessConfig();
}
AbstractInterfaceConfig#refresh
Dubbo中的配置类执行刷新方法都要来到这。XxConfig对象的属性需要从其他位置获取属性值,来进行属性的覆盖
覆盖的优先级2种,默认第一种
- 系统变量->环境变量配置->配置中心应用配置->配置中心全局配置->自定义配置->dubbo.properties文件
- 系统变量->环境变量配置->自定义配置->配置中心应用配置->配置中心全局配置->dubbo.properties文件
public void refresh() {
Environment env = ApplicationModel.getEnvironment();
// 获取配置顺序,就是优先级
/*1. 系统变量->环境变量配置->配置中心应用配置->配置中心全局配置->自定义配置->dubbo.properties文件
2. 系统变量->环境变量配置->自定义配置->配置中心应用配置->配置中心全局配置->dubbo.properties文件*/
CompositeConfiguration compositeConfiguration = env.getPrefixedConfiguration(this);
// 获取所有方法
Method[] methods = getClass().getMethods();
for (Method method : methods) {
// 判断方法是不是set方法
if (MethodUtils.isSetter(method)) {
// 从set方法后截取获得属性名,然后遍历上面的配置列表一个一个找,找到就调用set方法设值进去
String value = StringUtils.trim(compositeConfiguration.getString(extractPropertyName(getClass(), method)));
if (StringUtils.isNotEmpty(value) && ClassUtils.isTypeMatch(method.getParameterTypes()[0], value)) {
method.invoke(this, ClassUtils.convertPrimitive(method.getParameterTypes()[0], value));
}
// 判断是不是setParameters()方法
} else if (isParametersSetter(method)) {
// 获取值
String value = StringUtils.trim(compositeConfiguration.getString(extractPropertyName(getClass(), method)));
if (StringUtils.isNotEmpty(value)) {
// 调用getParameters()方法
Map<String, String> map = invokeGetParameters(getClass(), this);
map = map == null ? new HashMap<>() : map;
// 解析值取出来put进原来的map中
map.putAll(convert(StringUtils.parseParameters(value), ""));
// 调用setParameters()
invokeSetParameters(getClass(), this, map);
}
}
}
}
doExport->doExportUrls
获取url列表,并注册服务
private void doExportUrls() {
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
repository.registerProvider(
getUniqueServiceName(),
ref,
serviceDescriptor,
this,
serviceMetadata
);
// 获取注册中心URL列表
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
for (ProtocolConfig protocolConfig : protocols) {
// 构建接口存入注册中心地址路径
// 例:default/com.dm.DemoService:1.0
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
// In case user specified path, register service one more time to map it to path.
repository.registerService(pathKey, interfaceClass);
// TODO, uncomment this line once service key is unified
serviceMetadata.setServiceKey(pathKey);
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
loadRegistries
获取注册中心URL列表
public static List<URL> loadRegistries(AbstractInterfaceConfig interfaceConfig, boolean provider) {
// check && override if necessary
List<URL> registryList = new ArrayList<URL>();
ApplicationConfig application = interfaceConfig.getApplication();
// 获取注册中心配置
List<RegistryConfig> registries = interfaceConfig.getRegistries();
if (CollectionUtils.isNotEmpty(registries)) {
for (RegistryConfig config : registries) {
String address = config.getAddress();
// 如果注册中心没有配地址,则地址为0.0.0.0
if (StringUtils.isEmpty(address)) {
address = ANYHOST_VALUE;
}
if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
Map<String, String> map = new HashMap<String, String>();
// 解析application并把参数存入map
AbstractConfig.appendParameters(map, application);
// 解析注册中心配置并把参数存入map
AbstractConfig.appendParameters(map, config);
map.put(PATH_KEY, RegistryService.class.getName());
AbstractInterfaceConfig.appendRuntimeParameters(map);
// 如果map中如果没有protocol,那么默认为dubbo
if (!map.containsKey(PROTOCOL_KEY)) {
map.put(PROTOCOL_KEY, DUBBO_PROTOCOL);
}
// 构造注册中心url
// 例:zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-provider&dubbo=2.0.2&pid=12728&qos.enable=false&release=2.7.8×tamp=1620632029719
List<URL> urls = UrlUtils.parseURLs(address, map);
for (URL url : urls) {
url = URLBuilder.from(url)
.addParameter(REGISTRY_KEY, url.getProtocol())
.setProtocol(extractRegistryType(url))
.build();
// 例:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-provider&dubbo=2.0.2&pid=12728&qos.enable=false®istry=zookeeper&release=2.7.8×tamp=1620632029719
// 如果是服务提供者,获取register的值,如果为false,表示该服务不注册到注册中心
// 如果是服务消费者,获取subscribe的值,如果为false,表示该引入的服务不订阅注册中心中的数据
if ((provider && url.getParameter(REGISTER_KEY, true))
|| (!provider && url.getParameter(SUBSCRIBE_KEY, true))) {
registryList.add(url);
}
}
}
}
}
return registryList;
}
doExportUrlsFor1Protocol
这个方法代码太长,便于阅读只贴部分重要代码
这个方法主要做的就是把服务解析为一个URL资源,然后注册到注册中心(注册中心是列表表示,每个注册中心都注册服务)。
所有配置类信息全部放到map中准备
获取到访问该服务的host和port组装成URL服务资源
String host = findConfigedHosts(protocolConfig, registryURLs, map); Integer port = findConfigedPorts(protocolConfig, name, map); URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);通过ConfiguratorFactory SPI扩展点,处理URL服务资源
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); }向注册中心注册服务
// z注册中心是列表,保证服务会向每一个注册中心注册 for (URL registryURL : registryURLs) { // 协议是injvm就不需要注册到注册中心,它是在JVM内部调用 if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { continue; } // 该服务是否是动态,对应zookeeper上表示是否是临时节点,对应dubbo中的功能就是静态服务 url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY)); // 获取监控中心URL资源 URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString()); } // 服务使用的动态代理机制,默认SPI为javassit String proxy = url.getParameter(PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(PROXY_KEY, proxy); } // ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); // 我们又发现ProxyFactory#getInvoker方法里面带有URL,getAdaptiveExtension表示会从Url中获取参数proxy来动态获取PROXY_FACTORY // 功能:使用代理生成一个Invoker,Invoker表示服务提供者的代理,Invoker#invoke执行服务 // 生成的invoker包括了服务的实现者、服务接口类、服务的注册地址(参数export指定了当前服务) Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); // 更完整服务提供者,包括了Invoker和服务的配置 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); // 通过SPI获取到PROTOCOL为RegistryProtocol Exporter<?> exporter = PROTOCOL.export(wrapperInvoker); exporters.add(exporter); }
RegistryProtocol#export
之前都是做一些准备工作,到这才是真正的服务导出
- ⽣成监听器,监听动态配置中⼼此服务的参数数据的变化,⼀旦监听到变化,则重写服务URL,在服务导出时先重写⼀次服务URL
- 拿到重写之后的URL之后,调⽤doLocalExport()进⾏服务导出,通过SPI调⽤DubboProtocol#export导出服务,导出成功后将得到⼀个ExporterChangeableWrapper
- 从Invoker中获取注册中⼼的实现类,⽐如ZookeeperRegistry
- 简化服务URL,省略掉不⽤注册到注册中⼼的参数
- ZookeeperRegistry#registry()注册服务
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 获取注册中心资源,其实就是把protocol换了 registry->parameters.get("registry"),这里我用的是zookeeper为注册中心
// registry://xxx?registry=zookeeper ---> zookeeper://xxx
URL registryUrl = getRegistryUrl(originInvoker);
// 把registryUrl中export后参数弄出来,这就是服务提供者资源
URL providerUrl = getProviderUrl(originInvoker);
// 服务提供者资源基础上,生成一个overrideSubscribeUrl,协议为provider://xxx,增加参数category=configurators&check=false
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
// overrideSubscribeUrl->OverrideListener,用来监听变化事件,监听到overrideSubscribeUrl的变化后, OverrideListener就会根据变化进行相应处理,具体处理逻辑看OverrideListener的实现
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// 监听到动态配置变化后,重写URL资源,这里面有2个监听
// providerConfigurationListener表示应用级别的动态配置监听器,providerConfigurationListener是RegistyProtocol的一个属性
// serviceConfigurationListener表示服务级别的动态配置监听器,serviceConfigurationListener是在每暴露一个服务时就会生成一个
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
// 通过调用originInvoker和providerUrl资源导出服务,这里会启动Netty服务接收请求,============doLocalExport可以研究一波===========
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// ListenerRegistryWrapper->ZookeeperRegistry
final Registry registry = getRegistry(originInvoker);
// 简化服务URL并注册到注册中心
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// 是否需要注册到注册中心
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
// 注册,这里很简单
// ListenerRegistryWrapper->FailbackRegistry(重试机制)->ZookeeperRegistry#doRegister(这里连接zk创建节点)
register(registryUrl, registeredProviderUrl);
}
// register stated url on provider model
registerStatedUrl(registryUrl, registeredProviderUrl, register);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
// Deprecated! Subscribe to override rules in 2.6.x or before
// overrideSubscribeUrl和overrideSubscribeListener绑定
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
notifyExport(exporter);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
RegistryProtocol#doLocalExport
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
// 通过invokerDelegate中URL参数SPI动态获取protocol协议类,这里的export主要做的事情就是开启Netty(协议Dubbo)或Tomcat(协议http)
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
监听器覆盖规则
不管是OverrideListener,providerConfigurationListener还是serviceConfigurationListener监听器都走了RegistryProtocol#doOverrideIfNecessary。这里就是规则覆写逻辑
public synchronized void doOverrideIfNecessary() {
final Invoker<?> invoker;
if (originInvoker instanceof InvokerDelegate) {
invoker = ((InvokerDelegate<?>) originInvoker).getInvoker();
} else {
invoker = originInvoker;
}
// 当前服务的原始服务提供者url
URL originUrl = RegistryProtocol.this.getProviderUrl(invoker);
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<?> exporter = bounds.get(key);
// 当前服务被导出的url
URL currentUrl = exporter.getInvoker().getUrl();
// 根据configurators修改url,configurators是全量的,并不是某个新增的或删除的,所以是基于原始的url进行修改,并不是基于currentUrl
URL newUrl = getConfigedInvokerUrl(configurators, currentUrl);
newUrl = getConfigedInvokerUrl(providerConfigurationListener.getConfigurators(), newUrl);
newUrl = getConfigedInvokerUrl(serviceConfigurationListeners.get(originUrl.getServiceKey())
.getConfigurators(), newUrl);
// 修改过的url如果和目前的url不相同,则重新按newUrl导出
if (!currentUrl.equals(newUrl)) {
// 重新导出
RegistryProtocol.this.reExport(originInvoker, newUrl);
}
}
总结
服务导出流程
- 确定配置(这里有优先级的确定),有系统变量、环境变量配置、配置中心应用配置、配置中心全局配置、自定义配置、dubbo.properties文件
- 构造服务URL资源
- 开启Netty(协议Dubbo)或Tomcat(协议http)服务接收请求
- 将服务URL注册到注册中⼼去
- Dubbo⽀持动态配置服务参数,服务导出时绑定监听器Listener来监听服务的参数是否有修改,如果发现有修改,则需要重新进⾏导出