Nacos服务注册源码分析


源码构建

版本:1.4.1

源码地址

git 下载

git clone -b 1.4.1 https://github.com/alibaba/nacos.git

maven编译

mvn -Prelease-nacos -Dmaven.test.skip=true -Drat.skip=true clean install -U

启动nacos

启动类 com.alibaba.nacos.Nacos

修改启动参数

-Dnacos.standalone=true -Dnacos.home=[源码地址]

配置MySQL

conf\application.properties

spring.datasource.platform=mysql

### Count of DB:
db.num=1

### Connect URL of DB:
db.url.0=jdbc:mysql://localhost:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC
db.user.0=nacos
db.password.0=nacos

服务注册

首先我们要知道整个服务注册发现的流程,服务注册是在客户端这边实现的。还记得我们项目要想使用nacos首先要引入一个jar包spring‐cloud‐starter‐alibaba‐nacos‐discovery。我们来到这个jar包,

202111131636792096832

看到这,我们发现是个springboot项目,遵循着springboot自动装配规则,进入spring.factories看到

202111131636792195848

通过命名我们来到NacosServiceRegistryAutoConfiguration(Nacos服务注册自动装配类)这里就是nacos服务注册的代码入口。

分析NacosServiceRegistryAutoConfiguration这里的代码发现核心在NacosAutoServiceRegistration,其余2个Bean注入之后都往这个Bean里面注入。

202111131636792877915

NacosServiceRegistryAutoConfiguration通过类图发信其父类实现了ApplicationListener<WebServerInitializedEvent>,通过spring我们知道实现了ApplicationListener必然会有一个onApplicationEvent在spring容器启动的时候调用

public void onApplicationEvent(WebServerInitializedEvent event) {
    bind(event);
}

@Deprecated
public void bind(WebServerInitializedEvent event) {
  ApplicationContext context = event.getApplicationContext();
  if (context instanceof ConfigurableWebServerApplicationContext) {
    if ("management".equals(((ConfigurableWebServerApplicationContext) context)
                            .getServerNamespace())) {
      return;
    }
  }
  this.port.compareAndSet(0, event.getWebServer().getPort());
  this.start();
}

找到这我们终于发现了nacos服务注册真正的入口所在

一直往里面跟bind()->start()->register()->serviceRegistry.register()->namingService.registerInstance()

开启心跳任务并注册实例

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        // 判断是不是临时实例,进去看默认是true
    if (instance.isEphemeral()) {
          // 构建一个心跳实例,将服务实例一些信息存进去
        BeatInfo beatInfo = new BeatInfo();
        beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
        beatInfo.setIp(instance.getIp());
        beatInfo.setPort(instance.getPort());
        beatInfo.setCluster(instance.getClusterName());
        beatInfo.setWeight(instance.getWeight());
        beatInfo.setMetadata(instance.getMetadata());
        beatInfo.setScheduled(false);
          // 获取心跳间隔,这里也就是一个定时任务的间隔,默认5s 可配置--preserved.heart.beat.interval
        long instanceInterval = instance.getInstanceHeartBeatInterval();
          // 容错,如果配置是0 使用默认值
        beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
                // 跟进去我们会发现会开启一个ScheduledThreadPoolExecutor执行周期性任务,任务是BeatTask,beatTask里面就存了刚刚封装的beanInfo实例。这一块的心跳代码我会放在后面的  服务心跳目录
        beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
    }
        // 调用服务端服务注册接口
    serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}

调用服务端接口注册服务

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
        namespaceId, serviceName, instance);

      // 将实例信息封装
    final Map<String, String> params = new HashMap<String, String>(9);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put(CommonParams.GROUP_NAME, groupName);
    params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
    params.put("ip", instance.getIp());
    params.put("port", String.valueOf(instance.getPort()));
    params.put("weight", String.valueOf(instance.getWeight()));
    params.put("enable", String.valueOf(instance.isEnabled()));
    params.put("healthy", String.valueOf(instance.isHealthy()));
    params.put("ephemeral", String.valueOf(instance.isEphemeral()));
    params.put("metadata", JSON.toJSONString(instance.getMetadata()));
        // 发起请求API  /nacos/v1/ns/instance -- post  去官网发现这个接口确实是服务注册的接口
    reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);

}
202111131636794995832

现在nacos客户端发起一个http请求调用服务端,那么我们为了追求逻辑的连贯性,我们要去看nacos服务端是怎么处理请求的

这里的代码入口很简单,既然服务端要接受请求直接搜请求即可。

Nacos 服务端处理服务注册请求

202111131636795244559
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
    // 获取发送请求带来的参数并解析
    final String namespaceId = WebUtils
            .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
   
    final Instance instance = parseInstance(request);
    // 开始注册
    serviceManager.registerInstance(namespaceId, serviceName, instance);
    return "ok";
}

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        // 创建内存注册表结构 这里面其实挺重要的
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
          // 省略代码
        // 将实例放进空服务中去
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

createEmptyService->putServiceAndInit(健康检查)

private void putServiceAndInit(Service service) throws NacosException {
      // 看这里面其实就可以看到我们nacos内存注册表结构到底长什么样了
    putService(service);
      // 开启健康检查定时线程  ClientBeatCheckTask  这个也会在后面的健康检查目录
    service.init();
    consistencyService
            .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
    consistencyService
            .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
    Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

// 内存注册表结构,serviceMap == Map<String,Map<String,Service>
// 转化为实际存储 -- Map<namespace,Mao<group::serviceName,Service>>
public void putService(Service service) {
  if (!serviceMap.containsKey(service.getNamespaceId())) {
    synchronized (putServiceLock) {
      if (!serviceMap.containsKey(service.getNamespaceId())) {
        serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
      }
    }
  }
  serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
}

addInstance

将实例放进注册表结构中

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
        throws NacosException {
    // 构建实例key , 通过实例key可区分临时节点还是持久节点
    String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
    // 获取服务
    Service service = getService(namespaceId, serviceName);
    // 将实例丢进服务里
    synchronized (service) {
          // 获取原来的服务实例列表然后将新实例放进去
        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
        
        Instances instances = new Instances();
        instances.setInstanceList(instanceList);
        // 这里调的是DelegateConsistencyServiceImpl  因为spring注入的就是它
          // 核心方法,这里会根据生成的key是否是临时实例调用不同的服务同步方法
        consistencyService.put(key, instances);
    }
}

临时实例服务同步

consistencyService.put(key, instances)->mapConsistencyService(key).put(key, value);

临时实例服务–ephemeralConsistencyService
持久实例服务–persistentConsistencyService

这里我们仅仅先看临时实例是怎么做的,持久实例CP架构较为复杂

临时实例这里调用的是DistroConsistencyServiceImpl#put()

DistroConsistencyServiceImpl#put()

@Override
public void put(String key, Record value) throws NacosException {
    onPut(key, value);
      // 集群服务同步节点数据
    distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
            globalConfig.getTaskDispatchPeriod() / 2);
}

onPut

这里的方法并没有真正的将实例放进服务里面去,而是放进一个阻塞队列里面去

这里这样处理的一个关键原因想必是通过异步操作提升性能,但同时会有一个问题就是不会写入立即生效,会与一定的延迟,但nacos认为这种延迟其实并不是影响很严重,性能要求优先于一致性要求

notifier.addTask(key, DataOperation.CHANGE)

private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
         
public void addTask(String datumKey, DataOperation action) {
    
    if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
        return;
    }
    if (action == DataOperation.CHANGE) {
        services.put(datumKey, StringUtils.EMPTY);
    }
      // 别被名字误导了这里不是定时任务,而是一个阻塞队列,现在问题是从哪里从队列获取数据并放入实例里面去呢
    tasks.offer(Pair.with(datumKey, action));
}

阻塞队列取数据并放入注册表

在DistroConsistencyServiceImpl类的init方法,我们发现它上面有注解@PostConstruct可知它会在spring容器启动后调用

@PostConstruct
public void init() {
      // 开启一个线程,线程执行Notifier,从阻塞队列取数据并放入内存注册表
    GlobalExecutor.submitDistroNotifyTask(notifier);
}

Notifier#run

@Override
public void run() {
    Loggers.DISTRO.info("distro notifier started");
    
    for (; ; ) {
        try {
              // 从阻塞队列取数据
            Pair<String, DataOperation> pair = tasks.take();
            handle(pair);
        } catch (Throwable e) {
            Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
        }
    }
}


private void handle(Pair<String, DataOperation> pair) {
  String datumKey = pair.getValue0();
  DataOperation action = pair.getValue1();

  services.remove(datumKey);

  int count = 0;

  if (!listeners.containsKey(datumKey)) {
    return;
  }

  for (RecordListener listener : listeners.get(datumKey)) {

    count++;

    if (action == DataOperation.CHANGE) {
      // 核心方法 它下面的updateIPs是真正的更新实例的地方
      listener.onChange(datumKey, dataStore.get(datumKey).value);
      continue;
    }

    if (action == DataOperation.DELETE) {
      listener.onDelete(datumKey);
      continue;
    }

  }
}

updateIps

这里运用了CopyOnWrite思想防止并发读写冲突, 把原内存结构复制一份,在复制的结构里面进行操作,操作完成后在合并回真正的注册表内存里去。

public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
      // 构建一个空结构
    Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
    for (String clusterName : clusterMap.keySet()) {
        ipMap.put(clusterName, new ArrayList<>());
    }
    // 将实例都放进刚刚构建的ipMap中去
    for (Instance instance : instances) {
      if (StringUtils.isEmpty(instance.getClusterName())) {
        instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
      }

      if (!clusterMap.containsKey(instance.getClusterName())) {
        Cluster cluster = new Cluster(instance.getClusterName(), this);
        cluster.init();
        getClusterMap().put(instance.getClusterName(), cluster);
      }

      List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
      if (clusterIPs == null) {
        clusterIPs = new LinkedList<>();
        ipMap.put(instance.getClusterName(), clusterIPs);
      }

      clusterIPs.add(instance);
    }
    
    for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
        //make every ip mine
        List<Instance> entryIPs = entry.getValue();
          // 更新ipMap中实例
        clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
    }
    
    setLastModifiedMillis(System.currentTimeMillis());
      // 服务变更,发布事件ServiceChangeEvent
    getPushService().serviceChanged(this);
    StringBuilder stringBuilder = new StringBuilder();
    
    for (Instance instance : allIPs()) {
        stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
    }
    
}

服务心跳

之前我们在客户端注册服务时发现会开启一个BeatTask任务去进行服务心跳

我们直接跳到BeatTask

class BeatTask implements Runnable {

    BeatInfo beatInfo;

    public BeatTask(BeatInfo beatInfo) {
        this.beatInfo = beatInfo;
    }

    @Override
    public void run() {
        if (beatInfo.isStopped()) {
            return;
        }
          // 发送服务心跳请求
        long result = serverProxy.sendBeat(beatInfo);
        long nextTime = result > 0 ? result : beatInfo.getPeriod();
        executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
    }
}

public long sendBeat(BeatInfo beatInfo) {
    Map<String, String> params = new HashMap<String, String>(4);
    params.put("beat", JSON.toJSONString(beatInfo));
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
      // 路由 /nacos/v1/ns/instance/beat
    String result = reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/beat", params, HttpMethod.PUT);
    JSONObject jsonObject = JSON.parseObject(result);

    if (jsonObject != null) {
      return jsonObject.getLong("clientBeatInterval");
    }
  return 0L;
}
202111131636800357597

健康检查

之前我们在服务端注册服务时发现会开启一个ClientBeatCheckTask任务去进行服务心跳

我们直接跳到ClientBeatCheckTask

@Override
public void run() {
  List<Instance> instances = service.allIPs(true);
  // 遍历所有实例,不满足要求的标记为不健康
  for (Instance instance : instances) {
    // 当前时间-最后心跳时间 > 15秒 可配置 -- preserved.heart.beat.timeout
    if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
      if (!instance.isMarked()) {
        if (instance.isHealthy()) {
          instance.setHealthy(false);
          getPushService().serviceChanged(service);
          ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
        }
      }
    }
  }

  if (!getGlobalConfig().isExpireInstance()) {
    return;
  }

  // 遍历所有实例,不满足要求的剔除服务
  for (Instance instance : instances) {  
    // 当前时间-最后心跳时间 > 30秒 可配置 -- preserved.ip.delete.timeout
    if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
      // 剔除实例
      deleteIp(instance);
    }
  } 
}

文章作者: dm
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 dm !
评论
 上一篇
Nacos配置中心使用以及源码分析 Nacos配置中心使用以及源码分析
Nacos不仅仅只是用于做注册中心,它还可以做配置中心使用。那么什么是配置中心呢?配置中心就是为分布式系统提供统一的外部配置服务端。 简单使用引入依赖<dependency> <groupId>com.alibaba.c
2023-04-10
下一篇 
Nacos注册中心基本概念和使用 Nacos注册中心基本概念和使用
前言Nacos 致力于帮助您发现、配置和管理微服务。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。 Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。 Nacos 是构建以“服
2023-03-12
  目录