AspectJ
在Hystrix的官网上我们可以看到hystrix-javanica引入了aspectj。
HystrixCommandAspect
private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP;
static {
META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder()
.put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
.put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
.build();
}
// HystrixCommand切点
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}
// HystrixCollapser切点
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
public void hystrixCollapserAnnotationPointcut() {
}
// HystrixCommand和HystrixCollapser环绕执行
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
// 获取Method
Method method = getMethodFromTarget(joinPoint);
// 校验方法是否为空
Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
// 判断方法上是否有HystrixCommand和HystrixCollapser两个注解,有的话直接抛出异常
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
"annotations at the same time");
}
// 从Map中取出来MetaHolderFactory
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
// 构建MetaHolder
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
// 获取HystrixCommand实例
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
// 获取执行类型
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
// 执行结果
Object result;
try {
if (!metaHolder.isObservable()) {
// Command执行
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause();
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
}
HystrixCommandFactory.getInstance().create
public HystrixInvokable create(MetaHolder metaHolder) {
HystrixInvokable executable;
if (metaHolder.isCollapserAnnotationPresent()) {
executable = new CommandCollapser(metaHolder);
} else if (metaHolder.isObservable()) {
executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
} else {
// 创建一个标准的执行命令
executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
}
return executable;
}
从上面代码可以看出,工厂类生成的Command是GenericCommand,下面我们看下GenericCommand的继承关系。
从继承关系可以看出GenericCommand是HystrixCommand的一个子类。
public class GenericCommand extends AbstractHystrixCommand<Object> {
private static final Logger LOGGER = LoggerFactory.getLogger(GenericCommand.class);
public GenericCommand(HystrixCommandBuilder builder) {
super(builder);
}
@Override
protected Object run() throws Exception {
LOGGER.debug("execute command: {}", getCommandKey().name());
return process(new Action() {
@Override
Object execute() {
return getCommandAction().execute(getExecutionType());
}
});
}
@Override
protected Object getFallback() {
final CommandAction commandAction = getFallbackAction();
if (commandAction != null) {
try {
return process(new Action() {
@Override
Object execute() {
MetaHolder metaHolder = commandAction.getMetaHolder();
Object[] args = createArgsForFallback(metaHolder, getExecutionException());
return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
}
});
} catch (Throwable e) {
LOGGER.error(FallbackErrorMessageBuilder.create()
.append(commandAction, e).build());
throw new FallbackInvocationException(unwrapCause(e));
}
} else {
return super.getFallback();
}
}
}
HystrixCommandBuilder类信息
// setter配置信息对应的也就是HystrixCommand.Setter中的配置信息
private final GenericSetterBuilder setterBuilder;
// 执行方法和降级方法信息
private final CommandActions commandActions;
// 上下文缓存结果
private final CacheInvocationContext<CacheResult> cacheResultInvocationContext;
// 上下文缓存移除
private final CacheInvocationContext<CacheRemove> cacheRemoveInvocationContext;
// 合并请求
private final Collection<HystrixCollapser.CollapsedRequest<Object, Object>> collapsedRequests;
// 忽略异常
private final List<Class<? extends Throwable>> ignoreExceptions;
// 执行类型
private final ExecutionType executionType;
public HystrixCommandBuilder(Builder builder) {
this.setterBuilder = builder.setterBuilder;
this.commandActions = builder.commandActions;
this.cacheResultInvocationContext = builder.cacheResultInvocationContext;
this.cacheRemoveInvocationContext = builder.cacheRemoveInvocationContext;
this.collapsedRequests = builder.collapsedRequests;
this.ignoreExceptions = builder.ignoreExceptions;
this.executionType = builder.executionType;
}
GenericSetterBuilder类信息
// 组key
private String groupKey;
// CommandKey
private String commandKey;
// 线程池key
private String threadPoolKey;
// 合并执行key
private String collapserKey;
private HystrixCollapser.Scope scope;
// Command配置
private List<HystrixProperty> commandProperties = Collections.emptyList();
private List<HystrixProperty> collapserProperties = Collections.emptyList();
// 线程池配置
private List<HystrixProperty> threadPoolProperties = Collections.emptyList();
public GenericSetterBuilder(Builder builder) {
this.groupKey = builder.groupKey;
this.commandKey = builder.commandKey;
this.threadPoolKey = builder.threadPoolKey;
this.collapserKey = builder.collapserKey;
this.scope = builder.scope;
this.commandProperties = builder.commandProperties;
this.collapserProperties = builder.collapserProperties;
this.threadPoolProperties = builder.threadPoolProperties;
}
// build方法
public HystrixCommand.Setter build() throws HystrixPropertyException {
// HystrixCommand.Setter构建
HystrixCommand.Setter setter = HystrixCommand.Setter
// groupKey
.withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
// commandKey
.andCommandKey(HystrixCommandKey.Factory.asKey(commandKey));
// 判断线程池key是否为空
if (StringUtils.isNotBlank(threadPoolKey)) {
setter.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(threadPoolKey));
}
try {
// 配置线程池默认信息
setter.andThreadPoolPropertiesDefaults(HystrixPropertiesManager.initializeThreadPoolProperties(threadPoolProperties));
} catch (IllegalArgumentException e) {
throw new HystrixPropertyException("Failed to set Thread Pool properties. " + getInfo(), e);
}
try {
// 配置Command信息
setter.andCommandPropertiesDefaults(HystrixPropertiesManager.initializeCommandProperties(commandProperties));
} catch (IllegalArgumentException e) {
throw new HystrixPropertyException("Failed to set Command properties. " + getInfo(), e);
}
return setter;
}
HystrixPropertiesManager的配置信息处理类
// command 执行配置属性
public static final String EXECUTION_ISOLATION_STRATEGY = "execution.isolation.strategy";
public static final String EXECUTION_ISOLATION_THREAD_TIMEOUT_IN_MILLISECONDS = "execution.isolation.thread.timeoutInMilliseconds";
public static final String EXECUTION_TIMEOUT_ENABLED = "execution.timeout.enabled";
public static final String EXECUTION_ISOLATION_THREAD_INTERRUPT_ON_TIMEOUT = "execution.isolation.thread.interruptOnTimeout";
public static final String EXECUTION_ISOLATION_SEMAPHORE_MAX_CONCURRENT_REQUESTS = "execution.isolation.semaphore.maxConcurrentRequests";
// command 执行降级属性
public static final String FALLBACK_ISOLATION_SEMAPHORE_MAX_CONCURRENT_REQUESTS = "fallback.isolation.semaphore.maxConcurrentRequests";
public static final String FALLBACK_ENABLED = "fallback.enabled";
// command断路器属性
public static final String CIRCUIT_BREAKER_ENABLED = "circuitBreaker.enabled";
public static final String CIRCUIT_BREAKER_REQUEST_VOLUME_THRESHOLD = "circuitBreaker.requestVolumeThreshold";
public static final String CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS = "circuitBreaker.sleepWindowInMilliseconds";
public static final String CIRCUIT_BREAKER_ERROR_THRESHOLD_PERCENTAGE = "circuitBreaker.errorThresholdPercentage";
public static final String CIRCUIT_BREAKER_FORCE_OPEN = "circuitBreaker.forceOpen";
public static final String CIRCUIT_BREAKER_FORCE_CLOSED = "circuitBreaker.forceClosed";
// command 流量配置属性
public static final String METRICS_ROLLING_PERCENTILE_ENABLED = "metrics.rollingPercentile.enabled";
public static final String METRICS_ROLLING_PERCENTILE_TIME_IN_MILLISECONDS = "metrics.rollingPercentile.timeInMilliseconds";
public static final String METRICS_ROLLING_PERCENTILE_NUM_BUCKETS = "metrics.rollingPercentile.numBuckets";
public static final String METRICS_ROLLING_PERCENTILE_BUCKET_SIZE = "metrics.rollingPercentile.bucketSize";
public static final String METRICS_HEALTH_SNAPSHOT_INTERVAL_IN_MILLISECONDS = "metrics.healthSnapshot.intervalInMilliseconds";
// command 请求上下文属性
public static final String REQUEST_CACHE_ENABLED = "requestCache.enabled";
public static final String REQUEST_LOG_ENABLED = "requestLog.enabled";
// 线程池属性
public static final String MAX_QUEUE_SIZE = "maxQueueSize";
public static final String CORE_SIZE = "coreSize";
public static final String KEEP_ALIVE_TIME_MINUTES = "keepAliveTimeMinutes";
public static final String QUEUE_SIZE_REJECTION_THRESHOLD = "queueSizeRejectionThreshold";
public static final String METRICS_ROLLING_STATS_NUM_BUCKETS = "metrics.rollingStats.numBuckets";
public static final String METRICS_ROLLING_STATS_TIME_IN_MILLISECONDS = "metrics.rollingStats.timeInMilliseconds";
// 合并配置属性
public static final String MAX_REQUESTS_IN_BATCH = "maxRequestsInBatch";
public static final String TIMER_DELAY_IN_MILLISECONDS = "timerDelayInMilliseconds";
// command配置
public static HystrixCommandProperties.Setter initializeCommandProperties(List<HystrixProperty> properties) throws IllegalArgumentException {
return initializeProperties(HystrixCommandProperties.Setter(), properties, CMD_PROP_MAP, "command");
}
// 线程池配置
public static HystrixThreadPoolProperties.Setter initializeThreadPoolProperties(List<HystrixProperty> properties) throws IllegalArgumentException {
return initializeProperties(HystrixThreadPoolProperties.Setter(), properties, TP_PROP_MAP, "thread pool");
}
// 合并线程配置
public static HystrixCollapserProperties.Setter initializeCollapserProperties(List<HystrixProperty> properties) {
return initializeProperties(HystrixCollapserProperties.Setter(), properties, COLLAPSER_PROP_MAP, "collapser");
}
private static <S> S initializeProperties(S setter, List<HystrixProperty> properties, Map<String, PropSetter<S, String>> propMap, String type) {
if (properties != null && properties.size() > 0) {
for (HystrixProperty property : properties) {
validate(property);
if (!propMap.containsKey(property.name())) {
throw new IllegalArgumentException("unknown " + type + " property: " + property.name());
}
propMap.get(property.name()).set(setter, property.value());
}
}
return setter;
}
// 校验配置信息是否为空
private static void validate(HystrixProperty hystrixProperty) throws IllegalArgumentException {
Validate.notBlank(hystrixProperty.name(), "hystrix property name cannot be null or blank");
}
// 执行配置Map
private static final Map<String, PropSetter<HystrixCommandProperties.Setter, String>> CMD_PROP_MAP =
ImmutableMap.<String, PropSetter<HystrixCommandProperties.Setter, String>>builder()
.put(EXECUTION_ISOLATION_STRATEGY, new PropSetter<HystrixCommandProperties.Setter, String>() {
@Override
public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withExecutionIsolationStrategy(toEnum(EXECUTION_ISOLATION_STRATEGY, value, HystrixCommandProperties.ExecutionIsolationStrategy.class,
HystrixCommandProperties.ExecutionIsolationStrategy.values()));
}
})
.put(EXECUTION_ISOLATION_THREAD_TIMEOUT_IN_MILLISECONDS, new PropSetter<HystrixCommandProperties.Setter, String>() {
@Override
public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withExecutionTimeoutInMilliseconds(toInt(EXECUTION_ISOLATION_THREAD_TIMEOUT_IN_MILLISECONDS, value));
}
})
.put(EXECUTION_TIMEOUT_ENABLED, new PropSetter<HystrixCommandProperties.Setter, String>() {
@Override
public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withExecutionTimeoutEnabled(toBoolean(value));
}
})
.put(EXECUTION_ISOLATION_THREAD_INTERRUPT_ON_TIMEOUT, new PropSetter<HystrixCommandProperties.Setter, String>() {
@Override
public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withExecutionIsolationThreadInterruptOnTimeout(toBoolean(value));
}
})
.put(EXECUTION_ISOLATION_SEMAPHORE_MAX_CONCURRENT_REQUESTS, new PropSetter<HystrixCommandProperties.Setter, String>() {
@Override
public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withExecutionIsolationSemaphoreMaxConcurrentRequests(toInt(EXECUTION_ISOLATION_SEMAPHORE_MAX_CONCURRENT_REQUESTS, value));
}
})
.put(FALLBACK_ISOLATION_SEMAPHORE_MAX_CONCURRENT_REQUESTS, new PropSetter<HystrixCommandProperties.Setter, String>() {
@Override
public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withFallbackIsolationSemaphoreMaxConcurrentRequests(toInt(FALLBACK_ISOLATION_SEMAPHORE_MAX_CONCURRENT_REQUESTS, value));
}
})
.put(FALLBACK_ENABLED, new PropSetter<HystrixCommandProperties.Setter, String>() {
@Override
public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withFallbackEnabled(toBoolean(value));
}
})
.put(CIRCUIT_BREAKER_ENABLED, new PropSetter<HystrixCommandProperties.Setter, String>() {
@Override
public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withCircuitBreakerEnabled(toBoolean(value));
}
})
.put(CIRCUIT_BREAKER_REQUEST_VOLUME_THRESHOLD, new PropSetter<HystrixCommandProperties.Setter, String>() {
@Override
public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withCircuitBreakerRequestVolumeThreshold(toInt(CIRCUIT_BREAKER_REQUEST_VOLUME_THRESHOLD, value));
}
})
.put(CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS, new PropSetter<HystrixCommandProperties.Setter, String>() {
@Override
public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withCircuitBreakerSleepWindowInMilliseconds(toInt(CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS, value));
}
})
.put(CIRCUIT_BREAKER_ERROR_THRESHOLD_PERCENTAGE, new PropSetter<HystrixCommandProperties.Setter, String>() {
@Override
public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withCircuitBreakerErrorThresholdPercentage(toInt(CIRCUIT_BREAKER_ERROR_THRESHOLD_PERCENTAGE, value));
}
})
.put(CIRCUIT_BREAKER_FORCE_OPEN, new PropSetter<HystrixCommandProperties.Setter, String>() {
@Override
public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withCircuitBreakerForceOpen(toBoolean(value));
}
})
.put(CIRCUIT_BREAKER_FORCE_CLOSED, new PropSetter<HystrixCommandProperties.Setter, String>() {
@Override
public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withCircuitBreakerForceClosed(toBoolean(value));
}
})
.put(METRICS_ROLLING_STATS_TIME_IN_MILLISECONDS, new PropSetter<HystrixCommandProperties.Setter, String>() {
@Override
public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withMetricsRollingStatisticalWindowInMilliseconds(toInt(METRICS_ROLLING_STATS_TIME_IN_MILLISECONDS, value));
}
})
.put(METRICS_ROLLING_STATS_NUM_BUCKETS, new PropSetter<HystrixCommandProperties.Setter, String>() {
@Override
public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withMetricsRollingStatisticalWindowBuckets(toInt(METRICS_ROLLING_STATS_NUM_BUCKETS, value));
}
})
.put(METRICS_ROLLING_PERCENTILE_ENABLED, new PropSetter<HystrixCommandProperties.Setter, String>() {
@Override
public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withMetricsRollingPercentileEnabled(toBoolean(value));
}
})
.put(METRICS_ROLLING_PERCENTILE_TIME_IN_MILLISECONDS, new PropSetter<HystrixCommandProperties.Setter, String>() {
@Override
public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withMetricsRollingPercentileWindowInMilliseconds(toInt(METRICS_ROLLING_PERCENTILE_TIME_IN_MILLISECONDS, value));
}
})
.put(METRICS_ROLLING_PERCENTILE_NUM_BUCKETS, new PropSetter<HystrixCommandProperties.Setter, String>() {
@Override
public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withMetricsRollingPercentileWindowBuckets(toInt(METRICS_ROLLING_PERCENTILE_NUM_BUCKETS, value));
}
})
.put(METRICS_ROLLING_PERCENTILE_BUCKET_SIZE, new PropSetter<HystrixCommandProperties.Setter, String>() {
@Override
public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withMetricsRollingPercentileBucketSize(toInt(METRICS_ROLLING_PERCENTILE_BUCKET_SIZE, value));
}
})
.put(METRICS_HEALTH_SNAPSHOT_INTERVAL_IN_MILLISECONDS, new PropSetter<HystrixCommandProperties.Setter, String>() {
@Override
public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withMetricsHealthSnapshotIntervalInMilliseconds(toInt(METRICS_HEALTH_SNAPSHOT_INTERVAL_IN_MILLISECONDS, value));
}
})
.put(REQUEST_CACHE_ENABLED, new PropSetter<HystrixCommandProperties.Setter, String>() {
@Override
public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withRequestCacheEnabled(toBoolean(value));
}
})
.put(REQUEST_LOG_ENABLED, new PropSetter<HystrixCommandProperties.Setter, String>() {
@Override
public void set(HystrixCommandProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withRequestLogEnabled(toBoolean(value));
}
})
.build();
// 线程池配置信息
private static final Map<String, PropSetter<HystrixThreadPoolProperties.Setter, String>> TP_PROP_MAP =
ImmutableMap.<String, PropSetter<HystrixThreadPoolProperties.Setter, String>>builder()
.put(MAX_QUEUE_SIZE, new PropSetter<HystrixThreadPoolProperties.Setter, String>() {
@Override
public void set(HystrixThreadPoolProperties.Setter setter, String value) {
setter.withMaxQueueSize(toInt(MAX_QUEUE_SIZE, value));
}
})
.put(CORE_SIZE, new PropSetter<HystrixThreadPoolProperties.Setter, String>() {
@Override
public void set(HystrixThreadPoolProperties.Setter setter, String value) {
setter.withCoreSize(toInt(CORE_SIZE, value));
}
}
)
.put(KEEP_ALIVE_TIME_MINUTES, new PropSetter<HystrixThreadPoolProperties.Setter, String>() {
@Override
public void set(HystrixThreadPoolProperties.Setter setter, String value) {
setter.withKeepAliveTimeMinutes(toInt(KEEP_ALIVE_TIME_MINUTES, value));
}
}
)
.put(QUEUE_SIZE_REJECTION_THRESHOLD, new PropSetter<HystrixThreadPoolProperties.Setter, String>() {
@Override
public void set(HystrixThreadPoolProperties.Setter setter, String value) {
setter.withQueueSizeRejectionThreshold(toInt(QUEUE_SIZE_REJECTION_THRESHOLD, value));
}
}
)
.put(METRICS_ROLLING_STATS_NUM_BUCKETS, new PropSetter<HystrixThreadPoolProperties.Setter, String>() {
@Override
public void set(HystrixThreadPoolProperties.Setter setter, String value) {
setter.withMetricsRollingStatisticalWindowBuckets(toInt(METRICS_ROLLING_STATS_NUM_BUCKETS, value));
}
}
)
.put(METRICS_ROLLING_STATS_TIME_IN_MILLISECONDS, new PropSetter<HystrixThreadPoolProperties.Setter, String>() {
@Override
public void set(HystrixThreadPoolProperties.Setter setter, String value) {
setter.withMetricsRollingStatisticalWindowInMilliseconds(toInt(METRICS_ROLLING_STATS_TIME_IN_MILLISECONDS, value));
}
}
)
.build();
// 合并请求配置信息
private static final Map<String, PropSetter<HystrixCollapserProperties.Setter, String>> COLLAPSER_PROP_MAP =
ImmutableMap.<String, PropSetter<HystrixCollapserProperties.Setter, String>>builder()
.put(TIMER_DELAY_IN_MILLISECONDS, new PropSetter<HystrixCollapserProperties.Setter, String>() {
@Override
public void set(HystrixCollapserProperties.Setter setter, String value) {
setter.withTimerDelayInMilliseconds(toInt(TIMER_DELAY_IN_MILLISECONDS, value));
}
})
.put(MAX_REQUESTS_IN_BATCH, new PropSetter<HystrixCollapserProperties.Setter, String>() {
@Override
public void set(HystrixCollapserProperties.Setter setter, String value) {
setter.withMaxRequestsInBatch(toInt(MAX_REQUESTS_IN_BATCH, value));
}
}
)
.put(METRICS_ROLLING_STATS_TIME_IN_MILLISECONDS, new PropSetter<HystrixCollapserProperties.Setter, String>() {
@Override
public void set(HystrixCollapserProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withMetricsRollingStatisticalWindowInMilliseconds(toInt(METRICS_ROLLING_STATS_TIME_IN_MILLISECONDS, value));
}
})
.put(METRICS_ROLLING_STATS_NUM_BUCKETS, new PropSetter<HystrixCollapserProperties.Setter, String>() {
@Override
public void set(HystrixCollapserProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withMetricsRollingStatisticalWindowBuckets(toInt(METRICS_ROLLING_STATS_NUM_BUCKETS, value));
}
})
.put(METRICS_ROLLING_PERCENTILE_ENABLED, new PropSetter<HystrixCollapserProperties.Setter, String>() {
@Override
public void set(HystrixCollapserProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withMetricsRollingPercentileEnabled(toBoolean(value));
}
})
.put(METRICS_ROLLING_PERCENTILE_TIME_IN_MILLISECONDS, new PropSetter<HystrixCollapserProperties.Setter, String>() {
@Override
public void set(HystrixCollapserProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withMetricsRollingPercentileWindowInMilliseconds(toInt(METRICS_ROLLING_PERCENTILE_TIME_IN_MILLISECONDS, value));
}
})
.put(METRICS_ROLLING_PERCENTILE_NUM_BUCKETS, new PropSetter<HystrixCollapserProperties.Setter, String>() {
@Override
public void set(HystrixCollapserProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withMetricsRollingPercentileWindowBuckets(toInt(METRICS_ROLLING_PERCENTILE_NUM_BUCKETS, value));
}
})
.put(METRICS_ROLLING_PERCENTILE_BUCKET_SIZE, new PropSetter<HystrixCollapserProperties.Setter, String>() {
@Override
public void set(HystrixCollapserProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withMetricsRollingPercentileBucketSize(toInt(METRICS_ROLLING_PERCENTILE_BUCKET_SIZE, value));
}
})
.put(REQUEST_CACHE_ENABLED, new PropSetter<HystrixCollapserProperties.Setter, String>() {
@Override
public void set(HystrixCollapserProperties.Setter setter, String value) throws IllegalArgumentException {
setter.withRequestCacheEnabled(toBoolean(value));
}
})
.build();
private interface PropSetter<S, V> {
void set(S setter, V value) throws IllegalArgumentException;
}
private static <E extends Enum<E>> E toEnum(String propName, String propValue, Class<E> enumType, E... values) throws IllegalArgumentException {
try {
return Enum.valueOf(enumType, propValue);
} catch (NullPointerException npe) {
throw createBadEnumError(propName, propValue, values);
} catch (IllegalArgumentException e) {
throw createBadEnumError(propName, propValue, values);
}
}
private static int toInt(String propName, String propValue) throws IllegalArgumentException {
try {
return Integer.parseInt(propValue);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("bad property value. property name '" + propName + "'. Expected int value, actual = " + propValue);
}
}
private static boolean toBoolean(String propValue) {
return Boolean.valueOf(propValue);
}
private static IllegalArgumentException createBadEnumError(String propName, String propValue, Enum... values) {
throw new IllegalArgumentException("bad property value. property name '" + propName + "'. Expected correct enum value, one of the [" + Arrays.toString(values) + "] , actual = " + propValue);
}
CommandExecutor
public class CommandExecutor {
// 执行
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
// 校验执行器是否为空
Validate.notNull(invokable);
Validate.notNull(metaHolder);
// 选择执行类型
switch (executionType) {
// 同步执行
case SYNCHRONOUS: {
// 执行
return castToExecutable(invokable, executionType).execute();
}
// 异步执行
case ASYNCHRONOUS: {
HystrixExecutable executable = castToExecutable(invokable, executionType);
if (metaHolder.hasFallbackMethodCommand()
&& ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
return new FutureDecorator(executable.queue());
}
return executable.queue();
}
// observ方式执行
case OBSERVABLE: {
HystrixObservable observable = castToObservable(invokable);
return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
}
default:
throw new RuntimeException("unsupported execution type: " + executionType);
}
}
// 强制转换为HystrixExecutable
private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) {
if (invokable instanceof HystrixExecutable) {
return (HystrixExecutable) invokable;
}
throw new RuntimeException("Command should implement " + HystrixExecutable.class.getCanonicalName() + " interface to execute in: " + executionType + " mode");
}
private static HystrixObservable castToObservable(HystrixInvokable invokable) {
if (invokable instanceof HystrixObservable) {
return (HystrixObservable) invokable;
}
throw new RuntimeException("Command should implement " + HystrixObservable.class.getCanonicalName() + " interface to execute in observable mode");
}
}