spring cloud集成 consul源码分析

spring cloud集成 consul源码分析

1.简介

1.1 Consul is a tool for service discovery and configuration. Consul is distributed, highly available, and extremely scalable.

Consul provides several key features:

  • Service Discovery - Consul makes it simple for services to register themselves and to discover other services via a DNS or HTTP interface. External services such as SaaS providers can be registered as well.

  • Health Checking - Health Checking enables Consul to quickly alert operators about any issues in a cluster. The integration with service discovery prevents routing traffic to unhealthy hosts and enables service level circuit breakers.

  • Key/Value Storage - A flexible key/value store enables storing dynamic configuration, feature flagging, coordination, leader election and more. The simple HTTP API makes it easy to use anywhere.

  • Multi-Datacenter - Consul is built to be datacenter aware, and can support any number of regions without complex configuration.

Consul runs on Linux, Mac OS X, FreeBSD, Solaris, and Windows.

1.2 consul-api

Java client for Consul HTTP API (http://consul.io)

Supports all API endpoints (http://www.consul.io/docs/agent/http.html), all consistency modes and parameters (tags, datacenters etc.)

1.3 整体架构

spring cloud集成 consul源码分析

2.源码分析

主要工程:

2.1 spring-cloud-consul-config

spring.factories

# Auto Configuration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=
org.springframework.cloud.consul.config.ConsulConfigAutoConfiguration

# Bootstrap Configuration
org.springframework.cloud.bootstrap.BootstrapConfiguration=
org.springframework.cloud.consul.config.ConsulConfigBootstrapConfiguration

2.1.1 ConsulConfigAutoConfiguration自动配置

@Configuration
@ConditionalOnConsulEnabled
@ConditionalOnProperty(name = "spring.cloud.consul.config.enabled", matchIfMissing = true)
public class ConsulConfigAutoConfiguration {

    @Configuration
    @ConditionalOnClass(RefreshEndpoint.class)
    protected static class ConsulRefreshConfiguration {
        @Bean
        @ConditionalOnProperty(name = "spring.cloud.consul.config.watch.enabled", matchIfMissing = true)
        public ConfigWatch configWatch(ConsulConfigProperties properties,
                ConsulPropertySourceLocator locator, ConsulClient consul) {
            return new ConfigWatch(properties, consul, locator.getContextIndexes());
        }
    }
}

其中,ConfigWatch的主要代码如下:

    @Scheduled(fixedDelayString = "${spring.cloud.consul.config.watch.delay:1000}")
    public void watchConfigKeyValues() {
        if (this.running.get()) {
            for (String context : this.consulIndexes.keySet()) {

                // turn the context into a Consul folder path (unless our config format are FILES)
                if (properties.getFormat() != FILES && !context.endsWith("/")) {
                    context = context + "/";
                }

                try {
                    Long currentIndex = this.consulIndexes.get(context);
                    if (currentIndex == null) {
                        currentIndex = -1L;
                    }

                    // use the consul ACL token if found
                    String aclToken = properties.getAclToken();
                    if (StringUtils.isEmpty(aclToken)) {
                        aclToken = null;
                    }

                    Response<List<GetValue>> response = this.consul.getKVValues(context, aclToken,
                            new QueryParams(this.properties.getWatch().getWaitTime(),
                                    currentIndex));

                    // if response.value == null, response was a 404, otherwise it was a 200
                    // reducing churn if there wasn't anything
                    if (response.getValue() != null && !response.getValue().isEmpty()) {
                        Long newIndex = response.getConsulIndex();

                        if (newIndex != null && !newIndex.equals(currentIndex)) {
                            // don't publish the same index again, don't publish the first time (-1) so index can be primed
                            if (!this.consulIndexes.containsValue(newIndex) && !currentIndex.equals(-1L)) {
                                RefreshEventData data = new RefreshEventData(context, currentIndex, newIndex);
                                this.publisher.publishEvent(new RefreshEvent(this, data, data.toString()));
                            }
                            this.consulIndexes.put(context, newIndex);
                        }
                    }

                } catch (Exception e) {
                    // only fail fast on the initial query, otherwise just log the error
                    if (firstTime && this.properties.isFailFast()) {
                        log.error("Fail fast is set and there was an error reading configuration from consul.");
                        ReflectionUtils.rethrowRuntimeException(e);
                    } else if (log.isTraceEnabled()) {
                        log.trace("Error querying consul Key/Values for context '" + context + "'", e);
                    } else if (log.isWarnEnabled()) {
                        // simplified one line log message in the event of an agent failure
                        log.warn("Error querying consul Key/Values for context '" + context + "'. Message: " + e.getMessage());
                    }
                }
            }
        }
        firstTime = false;
    }

2.1.2 启动配置ConsulConfigBootstrapConfiguration

定义:

@Configuration
@ConditionalOnConsulEnabled
public class ConsulConfigBootstrapConfiguration {

    @Configuration
    @EnableConfigurationProperties
    @Import(ConsulAutoConfiguration.class)
    @ConditionalOnProperty(name = "spring.cloud.consul.config.enabled", matchIfMissing = true)
    protected static class ConsulPropertySourceConfiguration {
        @Autowired
        private ConsulClient consul;

        @Bean
        public ConsulConfigProperties consulConfigProperties() {
            return new ConsulConfigProperties();
        }

        @Bean
        public ConsulPropertySourceLocator consulPropertySourceLocator(
                ConsulConfigProperties consulConfigProperties) {
            return new ConsulPropertySourceLocator(consul, consulConfigProperties);
        }
    }
}

其中,ConsulPropertySourceLocator主要流程

    @Override
    @Retryable(interceptor = "consulRetryInterceptor")
    public PropertySource<?> locate(Environment environment) {
        if (environment instanceof ConfigurableEnvironment) {
            ConfigurableEnvironment env = (ConfigurableEnvironment) environment;
            RelaxedPropertyResolver propertyResolver = new RelaxedPropertyResolver(env);

            String appName = properties.getName();

            if (appName == null) {
                appName = propertyResolver.getProperty("spring.application.name");
            }

            List<String> profiles = Arrays.asList(env.getActiveProfiles());

            String prefix = this.properties.getPrefix();

            List<String> suffixes = new ArrayList<>();
            if (this.properties.getFormat() != FILES) {
                suffixes.add("/");
            } else {
                suffixes.add(".yml");
                suffixes.add(".yaml");
                suffixes.add(".properties");
            }

            String defaultContext = prefix + "/" + this.properties.getDefaultContext();
            for (String suffix : suffixes) {
                this.contexts.add(defaultContext + suffix);
            }
            for (String suffix : suffixes) {
                addProfiles(this.contexts, defaultContext, profiles, suffix);
            }

            String baseContext = prefix + "/" + appName;
            for (String suffix : suffixes) {
                this.contexts.add(baseContext + suffix);
            }
            for (String suffix : suffixes) {
                addProfiles(this.contexts, baseContext, profiles, suffix);
            }

            Collections.reverse(this.contexts);

            CompositePropertySource composite = new CompositePropertySource("consul");

            for (String propertySourceContext : this.contexts) {
                try {
                    ConsulPropertySource propertySource = null;
                    if (this.properties.getFormat() == FILES) {
                        Response<GetValue> response = this.consul.getKVValue(propertySourceContext, this.properties.getAclToken());
                        addIndex(propertySourceContext, response.getConsulIndex());
                        if (response.getValue() != null) {
                            ConsulFilesPropertySource filesPropertySource = new ConsulFilesPropertySource(propertySourceContext, this.consul, this.properties);
                            filesPropertySource.init(response.getValue());
                            propertySource = filesPropertySource;
                        }
                    } else {
                        propertySource = create(propertySourceContext, contextIndex);
                    }
                    if (propertySource != null) {
                        composite.addPropertySource(propertySource);
                    }
                } catch (Exception e) {
                    if (this.properties.isFailFast()) {
                        log.error("Fail fast is set and there was an error reading configuration from consul.");
                        ReflectionUtils.rethrowRuntimeException(e);
                    } else {
                        log.warn("Unable to load consul config from "+ propertySourceContext, e);
                    }
                }
            }

            return composite;
        }
        return null;
    }

2.2 spring-cloud-consul-core

spring.factories

# Auto Configuration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=
org.springframework.cloud.consul.ConsulAutoConfiguration

实现如下:

@Configuration
@EnableConfigurationProperties
@ConditionalOnConsulEnabled
public class ConsulAutoConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public ConsulProperties consulProperties() {
        return new ConsulProperties();
    }

    @Bean
    @ConditionalOnMissingBean
    public ConsulClient consulClient(ConsulProperties consulProperties) {
        return new ConsulClient(consulProperties.getHost(), consulProperties.getPort());
    }

    @Configuration
    @ConditionalOnClass(Endpoint.class)
    protected static class ConsulHealthConfig {

        @Bean
        @ConditionalOnMissingBean
        @ConditionalOnEnabledEndpoint("consul")
        public ConsulEndpoint consulEndpoint(ConsulClient consulClient) {
            return new ConsulEndpoint(consulClient);
        }

        @Bean
        @ConditionalOnMissingBean
        @ConditionalOnEnabledHealthIndicator("consul")
        public ConsulHealthIndicator consulHealthIndicator(ConsulClient consulClient) {
            return new ConsulHealthIndicator(consulClient);
        }
    }

    @ConditionalOnClass({ Retryable.class, Aspect.class, AopAutoConfiguration.class })
    @Configuration
    @EnableRetry(proxyTargetClass = true)
    @Import(AopAutoConfiguration.class)
    @EnableConfigurationProperties(RetryProperties.class)
    protected static class RetryConfiguration {

        @Bean(name = "consulRetryInterceptor")
        @ConditionalOnMissingBean(name = "consulRetryInterceptor")
        public RetryOperationsInterceptor consulRetryInterceptor(
                RetryProperties properties) {
            return RetryInterceptorBuilder
                    .stateless()
                    .backOffOptions(properties.getInitialInterval(),
                            properties.getMultiplier(), properties.getMaxInterval())
                    .maxAttempts(properties.getMaxAttempts()).build();
        }
    }
}

定义ConsulProperties、consulClient、ConsulEndpoint、ConsulHealthIndicator、RetryOperationsInterceptor

2.3 spring-cloud-consul-discovery

spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=
org.springframework.cloud.consul.discovery.RibbonConsulAutoConfiguration,
org.springframework.cloud.consul.discovery.configclient.ConsulConfigServerAutoConfiguration,
org.springframework.cloud.consul.serviceregistry.ConsulAutoServiceRegistrationAutoConfiguration,
org.springframework.cloud.consul.serviceregistry.ConsulServiceRegistryAutoConfiguration

# Discovery Client Configuration
org.springframework.cloud.client.discovery.EnableDiscoveryClient=
org.springframework.cloud.consul.discovery.ConsulDiscoveryClientConfiguration


org.springframework.cloud.bootstrap.BootstrapConfiguration=
org.springframework.cloud.consul.discovery.configclient.ConsulDiscoveryClientConfigServiceBootstrapConfiguration

2.3.1 ConsulRibbonClientConfiguration

@Configuration
@EnableConfigurationProperties
@ConditionalOnConsulEnabled
@ConditionalOnBean(SpringClientFactory.class)
@ConditionalOnProperty(value = "spring.cloud.consul.ribbon.enabled", matchIfMissing = true)
@AutoConfigureAfter(RibbonAutoConfiguration.class)
@RibbonClients(defaultConfiguration = ConsulRibbonClientConfiguration.class)
public class RibbonConsulAutoConfiguration {

}

2.3.2 ConsulConfigServerAutoConfiguration for config server

/**
 * Extra configuration for config server if it happens to be registered with Consul.
 *
 * @author Dave Syer
 */
@Configuration
@EnableConfigurationProperties
@ConditionalOnClass({ ConsulDiscoveryProperties.class, ConsulClient.class,
        ConfigServerProperties.class })
public class ConsulConfigServerAutoConfiguration {

    @Autowired(required = false)
    private ConsulDiscoveryProperties properties;

    @Autowired(required = false)
    private ConfigServerProperties server;

    @PostConstruct
    public void init() {
        if (this.properties == null || this.server == null) {
            return;
        }
        String prefix = this.server.getPrefix();
        if (StringUtils.hasText(prefix)) {
            this.properties.getTags().add("configPath="+prefix);
        }
    }

}

2.3.3 ConsulAutoServiceRegistrationAutoConfiguration

@Configuration
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
@ConditionalOnMissingBean(type = "org.springframework.cloud.consul.discovery.ConsulLifecycle")
@ConditionalOnConsulEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter(ConsulServiceRegistryAutoConfiguration.class)
public class ConsulAutoServiceRegistrationAutoConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public ConsulAutoServiceRegistration consulAutoServiceRegistration(ConsulServiceRegistry registry, ConsulDiscoveryProperties properties, ConsulAutoRegistration consulRegistration) {
        return new ConsulAutoServiceRegistration(registry, properties, consulRegistration);
    }

    @Bean
    @ConditionalOnMissingBean
    public ConsulAutoRegistration consulRegistration(ConsulDiscoveryProperties properties, ApplicationContext applicationContext,
                                                 ServletContext servletContext, HeartbeatProperties heartbeatProperties) {
        return ConsulAutoRegistration.registration(properties, applicationContext, servletContext, heartbeatProperties);
    }

}

定义了ConsulAutoServiceRegistration、ConsulAutoRegistration

2.3.4 ConsulServiceRegistryAutoConfiguration

@Configuration
@ConditionalOnConsulEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.enabled", matchIfMissing = true)
@AutoConfigureBefore(ServiceRegistryAutoConfiguration.class)
public class ConsulServiceRegistryAutoConfiguration {

    @Autowired(required = false)
    private TtlScheduler ttlScheduler;

    @Bean
    @ConditionalOnMissingBean
    public ConsulServiceRegistry consulServiceRegistry(ConsulClient consulClient, ConsulDiscoveryProperties properties,
                                                       HeartbeatProperties heartbeatProperties) {
        return new ConsulServiceRegistry(consulClient, properties, ttlScheduler, heartbeatProperties);
    }

    @Bean
    @ConditionalOnMissingBean
    @ConditionalOnProperty("spring.cloud.consul.discovery.heartbeat.enabled")
    public TtlScheduler ttlScheduler(ConsulClient consulClient, HeartbeatProperties heartbeatProperties) {
        return new TtlScheduler(heartbeatProperties, consulClient);
    }

    @Bean
    @ConditionalOnMissingBean
    public HeartbeatProperties heartbeatProperties() {
        return new HeartbeatProperties();
    }

    @Bean
    @ConditionalOnMissingBean
    public ConsulDiscoveryProperties consulDiscoveryProperties(InetUtils inetUtils) {
        return new ConsulDiscoveryProperties(inetUtils);
    }

}

定义了 ConsulServiceRegistry、TtlScheduler、HeartbeatProperties、ConsulDiscoveryProperties

2.3.5 客户端发现ConsulDiscoveryClientConfiguration

@Configuration
@ConditionalOnConsulEnabled
@ConditionalOnProperty(value = "spring.cloud.consul.discovery.enabled", matchIfMissing = true)
@EnableConfigurationProperties
public class ConsulDiscoveryClientConfiguration {

    @Autowired
    private ConsulClient consulClient;

    @Autowired(required = false)
    private ServerProperties serverProperties;

    @Bean
    @ConditionalOnMissingBean
    @ConditionalOnProperty("spring.cloud.consul.discovery.heartbeat.enabled")
    //TODO: move to service-registry for Edgware
    public TtlScheduler ttlScheduler(HeartbeatProperties heartbeatProperties) {
        return new TtlScheduler(heartbeatProperties, consulClient);
    }

    @Bean
    //TODO: move to service-registry for Edgware
    public HeartbeatProperties heartbeatProperties() {
        return new HeartbeatProperties();
    }

    @Bean
    //TODO: Split appropriate values to service-registry for Edgware
    public ConsulDiscoveryProperties consulDiscoveryProperties(InetUtils inetUtils) {
        return new ConsulDiscoveryProperties(inetUtils);
    }

    @Bean
    @ConditionalOnMissingBean
    public ConsulDiscoveryClient consulDiscoveryClient(ConsulDiscoveryProperties discoveryProperties, final ApplicationContext context) {
        ConsulDiscoveryClient discoveryClient = new ConsulDiscoveryClient(consulClient,
                discoveryProperties, new LifecycleRegistrationResolver(context));
        discoveryClient.setServerProperties(serverProperties); //null ok
        return discoveryClient;
    }

    class LifecycleRegistrationResolver implements ConsulDiscoveryClient.LocalResolver {
        private ApplicationContext context;

        public LifecycleRegistrationResolver(ApplicationContext context) {
            this.context = context;
        }

        @Override
        public String getInstanceId() {
            ConsulRegistration registration = getBean(ConsulRegistration.class);
            if (registration != null) {
                return registration.getInstanceId();
            }
            ConsulLifecycle lifecycle = getBean(ConsulLifecycle.class);
            if (lifecycle != null) {
                return lifecycle.getInstanceId();
            }
            throw new IllegalStateException("Must have one of ConsulRegistration or ConsulLifecycle");
        }

        @Override
        public Integer getPort() {
            ConsulRegistration registration = getBean(ConsulRegistration.class);
            if (registration != null) {
                return registration.getService().getPort();
            }
            ConsulLifecycle lifecycle = getBean(ConsulLifecycle.class);
            if (lifecycle != null) {
                return lifecycle.getConfiguredPort();
            }
            throw new IllegalStateException("Must have one of ConsulRegistration or ConsulLifecycle");
        }

        <T> T getBean(Class<T> type) {
            try {
                return context.getBean(type);
            } catch (NoSuchBeanDefinitionException e) {
            }
            return null;
        }
    }

    @Bean
    @ConditionalOnMissingBean
    @ConditionalOnProperty(name = "spring.cloud.consul.discovery.catalogServicesWatch.enabled", matchIfMissing = true)
    public ConsulCatalogWatch consulCatalogWatch(
            ConsulDiscoveryProperties discoveryProperties) {
        return new ConsulCatalogWatch(discoveryProperties, consulClient);
    }
}

2.3.6 ConsulDiscoveryClientConfigServiceBootstrapConfiguration

/**
 * Helper for config client that wants to lookup the config server via discovery.
 *
 * @author Spencer Gibb
 */
@ConditionalOnClass(ConfigServicePropertySourceLocator.class)
@ConditionalOnProperty(value = "spring.cloud.config.discovery.enabled", matchIfMissing = false)
@Configuration
@Import({ ConsulAutoConfiguration.class, ConsulDiscoveryClientConfiguration.class})
public class ConsulDiscoveryClientConfigServiceBootstrapConfiguration {

    @Bean
    public ConsulDiscoveryProperties consulDiscoveryProperties(InetUtils inetUtils) {
        ConsulDiscoveryProperties properties = new ConsulDiscoveryProperties(inetUtils);
        // for bootstrap, lifecycle (and hence registration) is not needed, just discovery client
        properties.getLifecycle().setEnabled(false);
        return properties;
    }
}

参考文献

【1】https://github.com/hashicorp/consul

【2】https://github.com/Ecwid/consul-api

【3】https://www.consul.io/api/index.html