12、Eureka教程-EurekaClient创建过程精读

EurekaClient创建过程精读

承接上一篇,我们来到了DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer)构造方法,现在我们来分析一下:

if (args != null) {

    this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
    this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
    this.eventListeners.addAll(args.getEventListeners());
    this.preRegistrationHandler = args.preRegistrationHandler;
} else {

    this.healthCheckCallbackProvider = null;
    this.healthCheckHandlerProvider = null;
    this.preRegistrationHandler = null;
}

首先看到的是它判断了一下AbstractDiscoveryClientOptionalArgs是否为空,看名字应该是可选参数,这里我们上一篇讲了是传了一个NULL,那就不会把健康检查回调生产者、健康检查生产者、预注册处理器给保存起来~,那我们继续往下看:

this.applicationInfoManager = applicationInfoManager;
InstanceInfo myInfo = applicationInfoManager.getInfo();

clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();
instanceInfo = myInfo;
if (myInfo != null) {

    appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
} else {

    logger.warn("Setting instanceInfo to a passed in null value");
}

this.backupRegistryProvider = backupRegistryProvider;
this.endpointRandomizer = endpointRandomizer;
this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
localRegionApps.set(new Applications());

fetchRegistryGeneration = new AtomicLong(0);

remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));

这一些代码貌似就是把一些配置,类属性给保存一下,我们大概看一下就好了,继续~

if (config.shouldFetchRegistry()) {

    this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{

     15L, 30L, 60L, 120L, 240L, 480L});
} else {

    this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}

if (config.shouldRegisterWithEureka()) {

    this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{

     15L, 30L, 60L, 120L, 240L, 480L});
} else {

    this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}

这里判断了一下配置里面是否开启了需要拉取注册表、是否开启了需要向其他Eureka注册自己。对应的配置就是eureka.client.fetch-registryeureka.client.register-with-eureka。看到这里的小伙伴了解这两个参数的作用了吧。如果开启的话,会分表注册一个服务注册监控和服务心跳监控。后面的long数组笔者猜测是监控15秒、30秒、60秒、120秒等等的状态吧。这里可以看一下ThresholdLevelsMetric

public ThresholdLevelsMetric(Object owner, String prefix, long[] levels) {

    this.levels = levels;
    this.gauges = new LongGauge[levels.length];
    for (int i = 0; i < levels.length; i++) {

        String name = prefix + String.format("%05d", levels[i]);
        MonitorConfig config = new MonitorConfig.Builder(name)
                .withTag("class", owner.getClass().getName())
                .build();
        gauges[i] = new LongGauge(config);

        try {

            DefaultMonitorRegistry.getInstance().register(gauges[i]);
        } catch (Throwable e) {

            logger.warn("Cannot register metric {}", name, e);
        }
    }
}

看完两个监控器我们接着继续走:

if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {

    logger.info("Client configured to neither register nor query for data.");
    scheduler = null;
    heartbeatExecutor = null;
    cacheRefreshExecutor = null;
    eurekaTransport = null;
    instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

    // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
    // to work with DI'd DiscoveryClient
    DiscoveryManager.getInstance().setDiscoveryClient(this);
    DiscoveryManager.getInstance().setEurekaClientConfig(config);

    initTimestampMs = System.currentTimeMillis();
    initRegistrySize = this.getApplications().size();
    registrySize = initRegistrySize;
    logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
            initTimestampMs, initRegistrySize);

    return;  // no need to setup up an network tasks and we are done
}

这里可以看到,如果我们不需要拉取注册表也不需要向其他EurekaServer注册自己,那么就直接结算整个方法了,后面的代码就不执行。所以第一次玩Eureka的小伙伴现在应该知道为什么配置eureka.client.fetch-registryeureka.client.register-with-eureka为false以后程序启动就不会报错了吧~ 现在我们当它配置了这两个参数为true,继续走:

 try {

    // default size of 2 - 1 each for heartbeat and cacheRefresh
    scheduler = Executors.newScheduledThreadPool(2,
            new ThreadFactoryBuilder()
                    .setNameFormat("DiscoveryClient-%d")
                    .setDaemon(true)
                    .build());

    heartbeatExecutor = new ThreadPoolExecutor(
            1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>(),
            new ThreadFactoryBuilder()
                    .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                    .setDaemon(true)
                    .build()
    );  // use direct handoff

    cacheRefreshExecutor = new ThreadPoolExecutor(
            1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>(),
            new ThreadFactoryBuilder()
                    .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                    .setDaemon(true)
                    .build()
    );  // use direct handoff

    eurekaTransport = new EurekaTransport();
    scheduleServerEndpointTask(eurekaTransport, args);

    AzToRegionMapper azToRegionMapper;
    if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {

        azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
    } else {

        azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
    }
    if (null != remoteRegionsToFetch.get()) {

        azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
    }
    instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {

    throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}

这里可以看到首先它创建了三个定时任务,scheduler应该是Client发送请求的(盲猜)、heartbeatExecutor心跳检查的、cacheRefreshExecutor刷新注册表的。这里吐槽一下第一个定时器的命名呀,完全猜不到是干什么用的!
接着eurekaTransport = new EurekaTransport()这行代码是创建了一个网络通讯的组件。scheduleServerEndpointTask(eurekaTransport, args)是执行一个什么服务端点任务。这里吐槽一下,看名字笔者还以为是执行上面那三个定时任务,结果点进去发现是初始换Eureka网络通讯组件…这里我们可以看一下~

private void scheduleServerEndpointTask(EurekaTransport eurekaTransport,AbstractDiscoveryClientOptionalArgs args) {

Collection<?> additionalFilters = args == null
        ? Collections.emptyList()
        : args.additionalFilters;

EurekaJerseyClient providedJerseyClient = args == null
        ? null
        : args.eurekaJerseyClient;

TransportClientFactories argsTransportClientFactories = null;
if (args != null && args.getTransportClientFactories() != null) {

    argsTransportClientFactories = args.getTransportClientFactories();
}

// Ignore the raw types warnings since the client filter interface changed between jersey 1/2
@SuppressWarnings("rawtypes")
TransportClientFactories transportClientFactories = argsTransportClientFactories == null
        ? new Jersey1TransportClientFactories()
        : argsTransportClientFactories;

Optional<SSLContext> sslContext = args == null
        ? Optional.empty()
        : args.getSSLContext();
Optional<HostnameVerifier> hostnameVerifier = args == null
        ? Optional.empty()
        : args.getHostnameVerifier();

// If the transport factory was not supplied with args, assume they are using jersey 1 for passivity
eurekaTransport.transportClientFactory = providedJerseyClient == null
        ? transportClientFactories.newTransportClientFactory(clientConfig, additionalFilters, applicationInfoManager.getInfo(), sslContext, hostnameVerifier)
        : transportClientFactories.newTransportClientFactory(additionalFilters, providedJerseyClient);

ApplicationsResolver.ApplicationsSource applicationsSource = new ApplicationsResolver.ApplicationsSource() {

    @Override
    public Applications getApplications(int stalenessThreshold, TimeUnit timeUnit) {

        long thresholdInMs = TimeUnit.MILLISECONDS.convert(stalenessThreshold, timeUnit);
        long delay = getLastSuccessfulRegistryFetchTimePeriod();
        if (delay > thresholdInMs) {

            logger.info("Local registry is too stale for local lookup. Threshold:{}, actual:{}",
                    thresholdInMs, delay);
            return null;
        } else {

            return localRegionApps.get();
        }
    }
};

eurekaTransport.bootstrapResolver = EurekaHttpClients.newBootstrapResolver(
        clientConfig,
        transportConfig,
        eurekaTransport.transportClientFactory,
        applicationInfoManager.getInfo(),
        applicationsSource,
        endpointRandomizer
);

if (clientConfig.shouldRegisterWithEureka()) {

    EurekaHttpClientFactory newRegistrationClientFactory = null;
    EurekaHttpClient newRegistrationClient = null;
    try {

        newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory(
                eurekaTransport.bootstrapResolver,
                eurekaTransport.transportClientFactory,
                transportConfig
        );
        newRegistrationClient = newRegistrationClientFactory.newClient();
    } catch (Exception e) {

        logger.warn("Transport initialization failure", e);
    }
    eurekaTransport.registrationClientFactory = newRegistrationClientFactory;
    eurekaTransport.registrationClient = newRegistrationClient;
}

// new method (resolve from primary servers for read)
// Configure new transport layer (candidate for injecting in the future)
if (clientConfig.shouldFetchRegistry()) {

    EurekaHttpClientFactory newQueryClientFactory = null;
    EurekaHttpClient newQueryClient = null;
    try {

        newQueryClientFactory = EurekaHttpClients.queryClientFactory(
                eurekaTransport.bootstrapResolver,
                eurekaTransport.transportClientFactory,
                clientConfig,
                transportConfig,
                applicationInfoManager.getInfo(),
                applicationsSource,
                endpointRandomizer
        );
        newQueryClient = newQueryClientFactory.newClient();
    } catch (Exception e) {

        logger.warn("Transport initialization failure", e);
    }
    eurekaTransport.queryClientFactory = newQueryClientFactory;
    eurekaTransport.queryClient = newQueryClient;
}
}

可以看到就是构建了一堆东西,问题不大。我们不管他,网络通讯这一块Eureka用的是HTTP协议,我们大概瞄一眼就行~继续看主流程代码。

if (clientConfig.shouldFetchRegistry()) {

    try {

        boolean primaryFetchRegistryResult = fetchRegistry(false);
        if (!primaryFetchRegistryResult) {

            logger.info("Initial registry fetch from primary servers failed");
        }
        boolean backupFetchRegistryResult = true;
        if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {

            backupFetchRegistryResult = false;
            logger.info("Initial registry fetch from backup servers failed");
        }
        if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {

            throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");
        }
    } catch (Throwable th) {

        logger.error("Fetch registry error at startup: {}", th.getMessage());
        throw new IllegalStateException(th);
    }
}

这里可以看到,如我如果配置了拉取注册表,它先会去拉取注册表,如果没有拉取到,它就会去备用列表上拉取。如果还是没有拉取到,并且配置了强制拉取注册表的话,就会抛异常。拉取注册列表的逻辑以后会单独在用一篇的文章来讲的。这里先不关注

看完了拉取注册表,我们继续主流程:

if (this.preRegistrationHandler != null) {

    this.preRegistrationHandler.beforeRegistration();
}

这里是判断一下有没有预注册处理器,有的话就执行一下,笔者看了一下,无论是Eureka原生API还是Spring-Cloud-Netfix-Eureka里面都是没有具体实现的~==继续吐槽,设计思想是好的,可惜没有实现。==我们继续:

if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {

    try {

        if (!register() ) {

            throw new IllegalStateException("Registration error at startup. Invalid server response.");
        }
    } catch (Throwable th) {

        logger.error("Registration error at startup: {}", th.getMessage());
        throw new IllegalStateException(th);
    }
}

这里我们可以看到它判断了一下是否需要注册到EurekaServer和是否需要强制初始化,如果需要就开始注册。这里说一下,笔者看的版本是1.9.12,在1.7.x中是没有这段代码的。这里笔者在GitHub上截了个1.7.x版本的图:

&nbsp;

而且是否需要强制初始化默认也是false的。它实际的初始化是在接下来的一行代码里面:

initScheduledTasks();

看名字是不是很懵逼,初始化调度任务,Eureka默认把Client注册的代码写到这个里面,实在是不知道说什么了。而且藏的还特别深。这里带大家看一下~

private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        cacheRefreshTask = new TimedSupervisorTask(
                "cacheRefresh",
                scheduler,
                cacheRefreshExecutor,
                registryFetchIntervalSeconds,
                TimeUnit.SECONDS,
                expBackOffBound,
                new CacheRefreshThread()
        );
        scheduler.schedule(
                cacheRefreshTask,
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }

    if (clientConfig.shouldRegisterWithEureka()) {
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

        // Heartbeat timer
        heartbeatTask = new TimedSupervisorTask(
                "heartbeat",
                scheduler,
                heartbeatExecutor,
                renewalIntervalInSecs,
                TimeUnit.SECONDS,
                expBackOffBound,
                new HeartbeatThread()
        );
        scheduler.schedule(
                heartbeatTask,
                renewalIntervalInSecs, TimeUnit.SECONDS);

        // InstanceInfo replicator
        instanceInfoReplicator = new InstanceInfoReplicator(
                this,
                instanceInfo,
                clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                2); // burstSize

        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
            @Override
            public String getId() {
                return "statusChangeListener";
            }

            @Override
            public void notify(StatusChangeEvent statusChangeEvent) {
                if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                        InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                    // log at warn level if DOWN was involved
                    logger.warn("Saw local status change event {}", statusChangeEvent);
                } else {
                    logger.info("Saw local status change event {}", statusChangeEvent);
                }
                instanceInfoReplicator.onDemandUpdate();
            }
        };

        if (clientConfig.shouldOnDemandUpdateStatusChange()) {
            applicationInfoManager.registerStatusChangeListener(statusChangeListener);
        }

        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

首先在initScheduledTasks方法中,它启动了三个定时器,接着它创建了一个instanceInfoReplicator实例信息复制器,==这个instanceInfoReplicator是重点哟。==其次创建了一个statusChangeListener状态监听器,并将其注册到了applicationInfoManager上。注意最后一行代码:instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds())

public void start(int initialDelayMs) {

    if (started.compareAndSet(false, true)) {

        instanceInfo.setIsDirty();  // for initial register
        Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

初看一眼是不是感觉没什么,但是坑爹的就是在这里。这个InstanceInfoReplicator类是实现了Runnable接口的。所以我们要去看它的run方法

class InstanceInfoReplicator implements Runnable {

    public void run() {

        try {

            discoveryClient.refreshInstanceInfo();

            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {

                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {

            logger.warn("There was a problem with the instance info replicator", t);
        } finally {

            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }
}

可以看到它这里又调用了discoveryClientregister方法。

boolean register() throws Throwable {

    logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
    EurekaHttpResponse<Void> httpResponse;
    try {

        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {

        logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {

        logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}

弄来弄去,又回到了了discoveryClient,感觉Eureka原先的设计很有问题,把这个Client连接藏的这么深。新版本默认情况下也不会直接执行register方法。搞不懂搞不懂…我们继续看,这里他调用了eurekaTransport里的registrationClientregister方法。这里点进去,是到了Jersey框架里面的httpClient:

public EurekaHttpResponse<Void> register(InstanceInfo info) {

    String urlPath = "apps/" + info.getAppName();
    Response response = null;
    try {

        Builder resourceBuilder = jerseyClient.target(serviceUrl).path(urlPath).request();
        addExtraProperties(resourceBuilder);
        addExtraHeaders(resourceBuilder);
        response = resourceBuilder
                .accept(MediaType.APPLICATION_JSON)
                .acceptEncoding("gzip")
                .post(Entity.json(info));
        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {

        if (logger.isDebugEnabled()) {

            logger.debug("Jersey2 HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                    response == null ? "N/A" : response.getStatus());
        }
        if (response != null) {

            response.close();
        }
    }
}

这里它组装了url,json数据体,然后就一个http请求发送到EurekaServer上去了。具体的注册逻辑和拉取服务列表一样,等EurekaServerContext初始化主流程看完以后会单独分析的。 那么到这里,整个EurekaClient创建流程就结束了。主流程上最后一点收尾的工作就是注册监控器和保存一些属性了:

try {

    Monitors.registerObject(this);
} catch (Throwable e) {

    logger.warn("Cannot register timers", e);
}

// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);

initTimestampMs = System.currentTimeMillis();
initRegistrySize = this.getApplications().size();
registrySize = initRegistrySize;
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
        initTimestampMs, initRegistrySize);

总结

看Eureka源码前面部分的额时候,感觉写的还可以,什么单例、构造器模式全部应用了,但是看到EurekaClient创建的时候,真心想吐槽,真的是写的渣渣,全部东西都放在一起了,方法命名也奇奇怪怪- -难道这就是Eureka现在开始没落的原因了。来个图总结一下吧~

&nbsp;