Hystrix-javanica的HystrixCommand实现原理【源码解析】

王守钰 2020-05-20 14:05:14

AspectJ

在Hystrix的官网上我们可以看到hystrix-javanica引入了aspectj。

HystrixCommandAspect


private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP;

static {
    META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder()
            .put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
            .put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
            .build();
}

// HystrixCommand切点
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}

// HystrixCollapser切点
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
public void hystrixCollapserAnnotationPointcut() {
}

// HystrixCommand和HystrixCollapser环绕执行
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
    // 获取Method
    Method method = getMethodFromTarget(joinPoint);
    // 校验方法是否为空
    Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
    // 判断方法上是否有HystrixCommand和HystrixCollapser两个注解,有的话直接抛出异常
    if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
        throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                "annotations at the same time");
    }
    // 从Map中取出来MetaHolderFactory
    MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
    // 构建MetaHolder
    MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
    // 获取HystrixCommand实例
    HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
    // 获取执行类型
    ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
            metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
    // 执行结果
    Object result;
    try {
        if (!metaHolder.isObservable()) {
            // Command执行
            result = CommandExecutor.execute(invokable, executionType, metaHolder);
        } else {
            result = executeObservable(invokable, executionType, metaHolder);
        }
    } catch (HystrixBadRequestException e) {
        throw e.getCause();
    } catch (HystrixRuntimeException e) {
        throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
    }
    return result;
}

HystrixCommandFactory.getInstance().create

public HystrixInvokable create(MetaHolder metaHolder) {
    HystrixInvokable executable;
    if (metaHolder.isCollapserAnnotationPresent()) {
        executable = new CommandCollapser(metaHolder);
    } else if (metaHolder.isObservable()) {
        executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
    } else {
        // 创建一个标准的执行命令
        executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
    }
    return executable;
}

从上面代码可以看出,工厂类生成的Command是GenericCommand,下面我们看下GenericCommand的继承关系。
image
从继承关系可以看出GenericCommand是HystrixCommand的一个子类。

public class GenericCommand extends AbstractHystrixCommand<Object> {
    private static final Logger LOGGER = LoggerFactory.getLogger(GenericCommand.class);

    public GenericCommand(HystrixCommandBuilder builder) {
        super(builder);
    }
    
    @Override
    protected Object run() throws Exception {
        LOGGER.debug("execute command: {}", getCommandKey().name());
        return process(new Action() {
            @Override
            Object execute() {
                return getCommandAction().execute(getExecutionType());
            }
        });
    }
    
    @Override
    protected Object getFallback() {
        final CommandAction commandAction = getFallbackAction();
        if (commandAction != null) {
            try {
                return process(new Action() {
                    @Override
                    Object execute() {
                        MetaHolder metaHolder = commandAction.getMetaHolder();
                        Object[] args = createArgsForFallback(metaHolder, getExecutionException());
                        return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
                    }
                });
            } catch (Throwable e) {
                LOGGER.error(FallbackErrorMessageBuilder.create()
                        .append(commandAction, e).build());
                throw new FallbackInvocationException(unwrapCause(e));
            }
        } else {
            return super.getFallback();
        }
    }
}

HystrixCommandBuilder类信息

// setter配置信息对应的也就是HystrixCommand.Setter中的配置信息
private final GenericSetterBuilder setterBuilder;
// 执行方法和降级方法信息
private final CommandActions commandActions;
// 上下文缓存结果
private final CacheInvocationContext<CacheResult> cacheResultInvocationContext;
// 上下文缓存移除
private final CacheInvocationContext<CacheRemove> cacheRemoveInvocationContext;
// 合并请求
private final Collection<HystrixCollapser.CollapsedRequest<Object, Object>> collapsedRequests;
// 忽略异常
private final List<Class<? extends Throwable>> ignoreExceptions;
// 执行类型
private final ExecutionType executionType;

public HystrixCommandBuilder(Builder builder) {
    this.setterBuilder = builder.setterBuilder;
    this.commandActions = builder.commandActions;
    this.cacheResultInvocationContext = builder.cacheResultInvocationContext;
    this.cacheRemoveInvocationContext = builder.cacheRemoveInvocationContext;
    this.collapsedRequests = builder.collapsedRequests;
    this.ignoreExceptions = builder.ignoreExceptions;
    this.executionType = builder.executionType;
}

GenericSetterBuilder类信息

// 组key
private String groupKey;
// CommandKey
private String commandKey;
// 线程池key
private String threadPoolKey;
// 合并执行key
private String collapserKey;
private HystrixCollapser.Scope scope;
// Command配置
private List<HystrixProperty> commandProperties = Collections.emptyList();
private List<HystrixProperty> collapserProperties = Collections.emptyList();
// 线程池配置
private List<HystrixProperty> threadPoolProperties = Collections.emptyList();

public GenericSetterBuilder(Builder builder) {
    this.groupKey = builder.groupKey;
    this.commandKey = builder.commandKey;
    this.threadPoolKey = builder.threadPoolKey;
    this.collapserKey = builder.collapserKey;
    this.scope = builder.scope;
    this.commandProperties = builder.commandProperties;
    this.collapserProperties = builder.collapserProperties;
    this.threadPoolProperties = builder.threadPoolProperties;
}

// build方法
public HystrixCommand.Setter build() throws HystrixPropertyException {
    // HystrixCommand.Setter构建
    HystrixCommand.Setter setter = HystrixCommand.Setter
            // groupKey
            .withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
            // commandKey
            .andCommandKey(HystrixCommandKey.Factory.asKey(commandKey));
    // 判断线程池key是否为空
    if (StringUtils.isNotBlank(threadPoolKey)) {
        setter.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(threadPoolKey));
    }
    try {
        // 配置线程池默认信息
        setter.andThreadPoolPropertiesDefaults(HystrixPropertiesManager.initializeThreadPoolProperties(threadPoolProperties));
    } catch (IllegalArgumentException e) {
        throw new HystrixPropertyException("Failed to set Thread Pool properties. " + getInfo(), e);
    }
    try {
        // 配置Command信息
        setter.andCommandPropertiesDefaults(HystrixPropertiesManager.initializeCommandProperties(commandProperties));
    } catch (IllegalArgumentException e) {
        throw new HystrixPropertyException("Failed to set Command properties. " + getInfo(), e);
    }
    return setter;
}

HystrixPropertiesManager的配置信息处理类

// command 执行配置属性
public static final String EXECUTION_ISOLATION_STRATEGY = "execution.isolation.strategy";
public static final String EXECUTION_ISOLATION_THREAD_TIMEOUT_IN_MILLISECONDS = "execution.isolation.thread.timeoutInMilliseconds";
public static final String EXECUTION_TIMEOUT_ENABLED = "execution.timeout.enabled";
public static final String EXECUTION_ISOLATION_THREAD_INTERRUPT_ON_TIMEOUT = "execution.isolation.thread.interruptOnTimeout";
public static final String EXECUTION_ISOLATION_SEMAPHORE_MAX_CONCURRENT_REQUESTS = "execution.isolation.semaphore.maxConcurrentRequests";

// command 执行降级属性
public static final String FALLBACK_ISOLATION_SEMAPHORE_MAX_CONCURRENT_REQUESTS = "fallback.isolation.semaphore.maxConcurrentRequests";
public static final String FALLBACK_ENABLED = "fallback.enabled";

// command断路器属性
public static final String CIRCUIT_BREAKER_ENABLED = "circuitBreaker.enabled";
public static final String CIRCUIT_BREAKER_REQUEST_VOLUME_THRESHOLD = "circuitBreaker.requestVolumeThreshold";
public static final String CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS = "circuitBreaker.sleepWindowInMilliseconds";
public static final String CIRCUIT_BREAKER_ERROR_THRESHOLD_PERCENTAGE = "circuitBreaker.errorThresholdPercentage";
public static final String CIRCUIT_BREAKER_FORCE_OPEN = "circuitBreaker.forceOpen";
public static final String CIRCUIT_BREAKER_FORCE_CLOSED = "circuitBreaker.forceClosed";

// command 流量配置属性 
public static final String METRICS_ROLLING_PERCENTILE_ENABLED = "metrics.rollingPercentile.enabled";
public static final String METRICS_ROLLING_PERCENTILE_TIME_IN_MILLISECONDS = "metrics.rollingPercentile.timeInMilliseconds";
public static final String METRICS_ROLLING_PERCENTILE_NUM_BUCKETS = "metrics.rollingPercentile.numBuckets";
public static final String METRICS_ROLLING_PERCENTILE_BUCKET_SIZE = "metrics.rollingPercentile.bucketSize";
public static final String METRICS_HEALTH_SNAPSHOT_INTERVAL_IN_MILLISECONDS = "metrics.healthSnapshot.intervalInMilliseconds";

// command 请求上下文属性
public static final String REQUEST_CACHE_ENABLED = "requestCache.enabled";
public static final String REQUEST_LOG_ENABLED = "requestLog.enabled";

// 线程池属性 
public static final String MAX_QUEUE_SIZE = "maxQueueSize";
public static final String CORE_SIZE = "coreSize";
public static final String KEEP_ALIVE_TIME_MINUTES = "keepAliveTimeMinutes";
public static final String QUEUE_SIZE_REJECTION_THRESHOLD = "queueSizeRejectionThreshold";
public static final String METRICS_ROLLING_STATS_NUM_BUCKETS = "metrics.rollingStats.numBuckets";
public static final String METRICS_ROLLING_STATS_TIME_IN_MILLISECONDS = "metrics.rollingStats.timeInMilliseconds";

// 合并配置属性 
public static final String MAX_REQUESTS_IN_BATCH = "maxRequestsInBatch";
public static final String TIMER_DELAY_IN_MILLISECONDS = "timerDelayInMilliseconds";

// command配置 
public static HystrixCommandProperties.Setter initializeCommandProperties(List<HystrixProperty> properties) throws IllegalArgumentException {
    return initializeProperties(HystrixCommandProperties.Setter(), properties, CMD_PROP_MAP, "command");
}

// 线程池配置 
public static HystrixThreadPoolProperties.Setter initializeThreadPoolProperties(List<HystrixProperty> properties) throws IllegalArgumentException {
    return initializeProperties(HystrixThreadPoolProperties.Setter(), properties, TP_PROP_MAP, "thread pool");
}

// 合并线程配置 
public static HystrixCollapserProperties.Setter initializeCollapserProperties(List<HystrixProperty> properties) {
    return initializeProperties(HystrixCollapserProperties.Setter(), properties, COLLAPSER_PROP_MAP, "collapser");
}

private static <S> S initializeProperties(S setter, List<HystrixProperty> properties, Map<String, PropSetter<S, String>> propMap, String type) {
    if (properties != null && properties.size() > 0) {
        for (HystrixProperty property : properties) {
            validate(property);
            if (!propMap.containsKey(property.name())) {
                throw new IllegalArgumentException("unknown " + type + " property: " + property.name());
            }

            propMap.get(property.name()).set(setter, property.value());
        }
    }
    return setter;
}

// 校验配置信息是否为空
private static void validate(HystrixProperty hystrixProperty) throws IllegalArgumentException {
    Validate.notBlank(hystrixProperty.name(), "hystrix property name cannot be null or blank");
}

// 执行配置Map
private static final Map<String, PropSetter<HystrixCommandProperties.Setter, String>> CMD_PROP_MAP =
        ImmutableMap.<String, PropSetter<HystrixCommandProperties.Setter, String>>builder()
                .put(EXECUTION_ISOLATION_STRATEGY, new PropSetter<HystrixCommandProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withExecutionIsolationStrategy(toEnum(EXECUTION_ISOLATION_STRATEGY, value, HystrixCommandProperties.ExecutionIsolationStrategy.class,
                                HystrixCommandProperties.ExecutionIsolationStrategy.values()));
                    }
                })
                .put(EXECUTION_ISOLATION_THREAD_TIMEOUT_IN_MILLISECONDS, new PropSetter<HystrixCommandProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withExecutionTimeoutInMilliseconds(toInt(EXECUTION_ISOLATION_THREAD_TIMEOUT_IN_MILLISECONDS, value));
                    }
                })
                .put(EXECUTION_TIMEOUT_ENABLED, new PropSetter<HystrixCommandProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withExecutionTimeoutEnabled(toBoolean(value));
                    }
                })
                .put(EXECUTION_ISOLATION_THREAD_INTERRUPT_ON_TIMEOUT, new PropSetter<HystrixCommandProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withExecutionIsolationThreadInterruptOnTimeout(toBoolean(value));
                    }
                })
                .put(EXECUTION_ISOLATION_SEMAPHORE_MAX_CONCURRENT_REQUESTS, new PropSetter<HystrixCommandProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withExecutionIsolationSemaphoreMaxConcurrentRequests(toInt(EXECUTION_ISOLATION_SEMAPHORE_MAX_CONCURRENT_REQUESTS, value));
                    }
                })
                .put(FALLBACK_ISOLATION_SEMAPHORE_MAX_CONCURRENT_REQUESTS, new PropSetter<HystrixCommandProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withFallbackIsolationSemaphoreMaxConcurrentRequests(toInt(FALLBACK_ISOLATION_SEMAPHORE_MAX_CONCURRENT_REQUESTS, value));
                    }
                })
                .put(FALLBACK_ENABLED, new PropSetter<HystrixCommandProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withFallbackEnabled(toBoolean(value));
                    }
                })
                .put(CIRCUIT_BREAKER_ENABLED, new PropSetter<HystrixCommandProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withCircuitBreakerEnabled(toBoolean(value));
                    }
                })
                .put(CIRCUIT_BREAKER_REQUEST_VOLUME_THRESHOLD, new PropSetter<HystrixCommandProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withCircuitBreakerRequestVolumeThreshold(toInt(CIRCUIT_BREAKER_REQUEST_VOLUME_THRESHOLD, value));
                    }
                })
                .put(CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS, new PropSetter<HystrixCommandProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withCircuitBreakerSleepWindowInMilliseconds(toInt(CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS, value));
                    }
                })
                .put(CIRCUIT_BREAKER_ERROR_THRESHOLD_PERCENTAGE, new PropSetter<HystrixCommandProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withCircuitBreakerErrorThresholdPercentage(toInt(CIRCUIT_BREAKER_ERROR_THRESHOLD_PERCENTAGE, value));
                    }
                })
                .put(CIRCUIT_BREAKER_FORCE_OPEN, new PropSetter<HystrixCommandProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withCircuitBreakerForceOpen(toBoolean(value));
                    }
                })
                .put(CIRCUIT_BREAKER_FORCE_CLOSED, new PropSetter<HystrixCommandProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withCircuitBreakerForceClosed(toBoolean(value));
                    }
                })
                .put(METRICS_ROLLING_STATS_TIME_IN_MILLISECONDS, new PropSetter<HystrixCommandProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withMetricsRollingStatisticalWindowInMilliseconds(toInt(METRICS_ROLLING_STATS_TIME_IN_MILLISECONDS, value));
                    }
                })
                .put(METRICS_ROLLING_STATS_NUM_BUCKETS, new PropSetter<HystrixCommandProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withMetricsRollingStatisticalWindowBuckets(toInt(METRICS_ROLLING_STATS_NUM_BUCKETS, value));
                    }
                })
                .put(METRICS_ROLLING_PERCENTILE_ENABLED, new PropSetter<HystrixCommandProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withMetricsRollingPercentileEnabled(toBoolean(value));
                    }
                })
                .put(METRICS_ROLLING_PERCENTILE_TIME_IN_MILLISECONDS, new PropSetter<HystrixCommandProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withMetricsRollingPercentileWindowInMilliseconds(toInt(METRICS_ROLLING_PERCENTILE_TIME_IN_MILLISECONDS, value));
                    }
                })
                .put(METRICS_ROLLING_PERCENTILE_NUM_BUCKETS, new PropSetter<HystrixCommandProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withMetricsRollingPercentileWindowBuckets(toInt(METRICS_ROLLING_PERCENTILE_NUM_BUCKETS, value));
                    }
                })
                .put(METRICS_ROLLING_PERCENTILE_BUCKET_SIZE, new PropSetter<HystrixCommandProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withMetricsRollingPercentileBucketSize(toInt(METRICS_ROLLING_PERCENTILE_BUCKET_SIZE, value));
                    }
                })
                .put(METRICS_HEALTH_SNAPSHOT_INTERVAL_IN_MILLISECONDS, new PropSetter<HystrixCommandProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withMetricsHealthSnapshotIntervalInMilliseconds(toInt(METRICS_HEALTH_SNAPSHOT_INTERVAL_IN_MILLISECONDS, value));
                    }
                })
                .put(REQUEST_CACHE_ENABLED, new PropSetter<HystrixCommandProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withRequestCacheEnabled(toBoolean(value));
                    }
                })
                .put(REQUEST_LOG_ENABLED, new PropSetter<HystrixCommandProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withRequestLogEnabled(toBoolean(value));
                    }
                })
                .build();

// 线程池配置信息
private static final Map<String, PropSetter<HystrixThreadPoolProperties.Setter, String>> TP_PROP_MAP =
        ImmutableMap.<String, PropSetter<HystrixThreadPoolProperties.Setter, String>>builder()
                .put(MAX_QUEUE_SIZE, new PropSetter<HystrixThreadPoolProperties.Setter, String>() {
                    @Override
                    public void set(HystrixThreadPoolProperties.Setter setter, String value) {
                        setter.withMaxQueueSize(toInt(MAX_QUEUE_SIZE, value));
                    }
                })
                .put(CORE_SIZE, new PropSetter<HystrixThreadPoolProperties.Setter, String>() {
                            @Override
                            public void set(HystrixThreadPoolProperties.Setter setter, String value) {
                                setter.withCoreSize(toInt(CORE_SIZE, value));
                            }
                        }
                )
                .put(KEEP_ALIVE_TIME_MINUTES, new PropSetter<HystrixThreadPoolProperties.Setter, String>() {
                            @Override
                            public void set(HystrixThreadPoolProperties.Setter setter, String value) {
                                setter.withKeepAliveTimeMinutes(toInt(KEEP_ALIVE_TIME_MINUTES, value));
                            }
                        }
                )
                .put(QUEUE_SIZE_REJECTION_THRESHOLD, new PropSetter<HystrixThreadPoolProperties.Setter, String>() {
                            @Override
                            public void set(HystrixThreadPoolProperties.Setter setter, String value) {
                                setter.withQueueSizeRejectionThreshold(toInt(QUEUE_SIZE_REJECTION_THRESHOLD, value));
                            }
                        }
                )
                .put(METRICS_ROLLING_STATS_NUM_BUCKETS, new PropSetter<HystrixThreadPoolProperties.Setter, String>() {
                            @Override
                            public void set(HystrixThreadPoolProperties.Setter setter, String value) {
                                setter.withMetricsRollingStatisticalWindowBuckets(toInt(METRICS_ROLLING_STATS_NUM_BUCKETS, value));
                            }
                        }
                )
                .put(METRICS_ROLLING_STATS_TIME_IN_MILLISECONDS, new PropSetter<HystrixThreadPoolProperties.Setter, String>() {
                            @Override
                            public void set(HystrixThreadPoolProperties.Setter setter, String value) {
                                setter.withMetricsRollingStatisticalWindowInMilliseconds(toInt(METRICS_ROLLING_STATS_TIME_IN_MILLISECONDS, value));
                            }
                        }
                )
                .build();

// 合并请求配置信息
private static final Map<String, PropSetter<HystrixCollapserProperties.Setter, String>> COLLAPSER_PROP_MAP =
        ImmutableMap.<String, PropSetter<HystrixCollapserProperties.Setter, String>>builder()
                .put(TIMER_DELAY_IN_MILLISECONDS, new PropSetter<HystrixCollapserProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCollapserProperties.Setter setter, String value) {
                        setter.withTimerDelayInMilliseconds(toInt(TIMER_DELAY_IN_MILLISECONDS, value));
                    }
                })
                .put(MAX_REQUESTS_IN_BATCH, new PropSetter<HystrixCollapserProperties.Setter, String>() {
                            @Override
                            public void set(HystrixCollapserProperties.Setter setter, String value) {
                                setter.withMaxRequestsInBatch(toInt(MAX_REQUESTS_IN_BATCH, value));
                            }
                        }
                )
                .put(METRICS_ROLLING_STATS_TIME_IN_MILLISECONDS, new PropSetter<HystrixCollapserProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCollapserProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withMetricsRollingStatisticalWindowInMilliseconds(toInt(METRICS_ROLLING_STATS_TIME_IN_MILLISECONDS, value));
                    }
                })
                .put(METRICS_ROLLING_STATS_NUM_BUCKETS, new PropSetter<HystrixCollapserProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCollapserProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withMetricsRollingStatisticalWindowBuckets(toInt(METRICS_ROLLING_STATS_NUM_BUCKETS, value));
                    }
                })
                .put(METRICS_ROLLING_PERCENTILE_ENABLED, new PropSetter<HystrixCollapserProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCollapserProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withMetricsRollingPercentileEnabled(toBoolean(value));
                    }
                })
                .put(METRICS_ROLLING_PERCENTILE_TIME_IN_MILLISECONDS, new PropSetter<HystrixCollapserProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCollapserProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withMetricsRollingPercentileWindowInMilliseconds(toInt(METRICS_ROLLING_PERCENTILE_TIME_IN_MILLISECONDS, value));
                    }
                })
                .put(METRICS_ROLLING_PERCENTILE_NUM_BUCKETS, new PropSetter<HystrixCollapserProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCollapserProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withMetricsRollingPercentileWindowBuckets(toInt(METRICS_ROLLING_PERCENTILE_NUM_BUCKETS, value));
                    }
                })
                .put(METRICS_ROLLING_PERCENTILE_BUCKET_SIZE, new PropSetter<HystrixCollapserProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCollapserProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withMetricsRollingPercentileBucketSize(toInt(METRICS_ROLLING_PERCENTILE_BUCKET_SIZE, value));
                    }
                })
                .put(REQUEST_CACHE_ENABLED, new PropSetter<HystrixCollapserProperties.Setter, String>() {
                    @Override
                    public void set(HystrixCollapserProperties.Setter setter, String value) throws IllegalArgumentException {
                        setter.withRequestCacheEnabled(toBoolean(value));
                    }
                })
                .build();


private interface PropSetter<S, V> {
    void set(S setter, V value) throws IllegalArgumentException;
}

private static <E extends Enum<E>> E toEnum(String propName, String propValue, Class<E> enumType, E... values) throws IllegalArgumentException {
    try {
        return Enum.valueOf(enumType, propValue);
    } catch (NullPointerException npe) {
        throw createBadEnumError(propName, propValue, values);
    } catch (IllegalArgumentException e) {
        throw createBadEnumError(propName, propValue, values);
    }
}

private static int toInt(String propName, String propValue) throws IllegalArgumentException {
    try {
        return Integer.parseInt(propValue);
    } catch (NumberFormatException e) {
        throw new IllegalArgumentException("bad property value. property name '" + propName + "'. Expected int value, actual = " + propValue);
    }
}

private static boolean toBoolean(String propValue) {
    return Boolean.valueOf(propValue);
}

private static IllegalArgumentException createBadEnumError(String propName, String propValue, Enum... values) {
    throw new IllegalArgumentException("bad property value. property name '" + propName + "'. Expected correct enum value, one of the [" + Arrays.toString(values) + "] , actual = " + propValue);
}

CommandExecutor

public class CommandExecutor {

    // 执行
    public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
        // 校验执行器是否为空
        Validate.notNull(invokable);
        Validate.notNull(metaHolder);
        
        // 选择执行类型
        switch (executionType) {
            // 同步执行
            case SYNCHRONOUS: {
                // 执行
                return castToExecutable(invokable, executionType).execute();
            }
            // 异步执行
            case ASYNCHRONOUS: {
                HystrixExecutable executable = castToExecutable(invokable, executionType);
                if (metaHolder.hasFallbackMethodCommand()
                        && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
                    return new FutureDecorator(executable.queue());
                }
                return executable.queue();
            }
            // observ方式执行
            case OBSERVABLE: {
                HystrixObservable observable = castToObservable(invokable);
                return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
            }
            default:
                throw new RuntimeException("unsupported execution type: " + executionType);
        }
    }

    // 强制转换为HystrixExecutable
    private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) {
        if (invokable instanceof HystrixExecutable) {
            return (HystrixExecutable) invokable;
        }
        throw new RuntimeException("Command should implement " + HystrixExecutable.class.getCanonicalName() + " interface to execute in: " + executionType + " mode");
    }

    private static HystrixObservable castToObservable(HystrixInvokable invokable) {
        if (invokable instanceof HystrixObservable) {
            return (HystrixObservable) invokable;
        }
        throw new RuntimeException("Command should implement " + HystrixObservable.class.getCanonicalName() + " interface to execute in observable mode");
    }

}