Feign-Hystrix
GitHub地址:https://github.com/OpenFeign/feign/tree/master/hystrix
HelloWorld
interface Organization{
@RequestLine("GET /all")
String all();
}
@Test
public void test(){
Organization org = HystrixFeign
.builder()
.target(Organization.class, "http://localhost:9100/system/organization");
String all = org.all();
log.info("organizations:{}", all);
}
正常运行结果
11:13:24.507 [main] INFO com.wangshouyu.feign.FeignHystrixTest - organizations:{"data":[,{"id":1,"parentId":0,"code":"root","name":"京东汽车商城","orderNum":0,"description":"京东汽车商城","parentIds":"","isEnabled":true}],"code":200,"msg":null}
模拟超时返回结果
com.netflix.hystrix.exception.HystrixRuntimeException: Organization#all() timed-out and no fallback available.
at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:822)
at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:807)
at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140)
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
at com.netflix.hystrix.AbstractCommand$DeprecatedOnFallbackHookApplication$1.onError(AbstractCommand.java:1472)
at com.netflix.hystrix.AbstractCommand$FallbackHookApplication$1.onError(AbstractCommand.java:1397)
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
at rx.observers.Subscribers$5.onError(Subscribers.java:230)
at rx.internal.operators.OnSubscribeThrow.call(OnSubscribeThrow.java:44)
at rx.internal.operators.OnSubscribeThrow.call(OnSubscribeThrow.java:28)
at rx.Observable.unsafeSubscribe(Observable.java:10327)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
at rx.Observable.unsafeSubscribe(Observable.java:10327)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10327)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10327)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10327)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10327)
at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:142)
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
at com.netflix.hystrix.AbstractCommand$HystrixObservableTimeoutOperator$1.run(AbstractCommand.java:1142)
at com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable$1.call(HystrixContextRunnable.java:41)
at com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable$1.call(HystrixContextRunnable.java:37)
at com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable.run(HystrixContextRunnable.java:57)
at com.netflix.hystrix.AbstractCommand$HystrixObservableTimeoutOperator$2.tick(AbstractCommand.java:1159)
at com.netflix.hystrix.util.HystrixTimer$1.run(HystrixTimer.java:99)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException
at com.netflix.hystrix.AbstractCommand.handleTimeoutViaFallback(AbstractCommand.java:997)
at com.netflix.hystrix.AbstractCommand.access$500(AbstractCommand.java:60)
at com.netflix.hystrix.AbstractCommand$12.call(AbstractCommand.java:609)
at com.netflix.hystrix.AbstractCommand$12.call(AbstractCommand.java:601)
at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140)
... 16 more
降级处理
Organization fallback = () -> {
return "fallback";
};
@Test
public void test(){
Organization org = HystrixFeign
.builder()
.target(Organization.class, "http://localhost:9100/system/organization", fallback);
String all = org.all();
log.info("organizations:{}", all);
}
执行结果
11:19:48.310 [main] INFO com.wangshouyu.feign.FeignHystrixTest - organizations:fallback
FallbackFactory方式处理
FallbackFactory<Organization> fallbackFactory = cause -> () -> {
return "fallbackFactory";
};
@Test
public void test(){
Organization org = HystrixFeign
.builder()
.target(Organization.class, "http://localhost:9100/system/organization", fallbackFactory);
String all = org.all();
log.info("organizations:{}", all);
}
执行结果
11:22:23.617 [main] INFO com.wangshouyu.feign.FeignHystrixTest - organizations:fallbackFactory
SetterFactory配置
class MySetterFactory implements SetterFactory{
@Override
public HystrixCommand.Setter create(Target<?> target, Method method) {
String groupKey = target.name();
String commandKey = Feign.configKey(target.type(), method);
return HystrixCommand.Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
.andCommandKey(HystrixCommandKey.Factory.asKey(commandKey))
.andCommandPropertiesDefaults(HystrixCommandProperties.defaultSetter()
.withExecutionTimeoutInMilliseconds(30 * 1000));
}
}
@Test
public void test(){
Organization org = HystrixFeign
.builder()
.setterFactory(new MySetterFactory())
.target(Organization.class, "http://localhost:9100/system/organization", fallbackFactory);
String all = org.all();
log.info("organizations:{}", all);
}
实现原理
Feign的实现原理是通过动态代理最终生成一个Http请求来进行处理。那么HystrixFeign的原理也是一样,只不过是在InvocationHandler中加入了HystrixCommand来进行执行,这样的话就会直接融入了Hystrix。
final class HystrixInvocationHandler implements InvocationHandler {
// FeignTarget包装对象
private final Target<?> target;
// 方法执行器
private final Map<Method, MethodHandler> dispatch;
// 执行失败工厂
private final FallbackFactory<?> fallbackFactory; // Nullable
private final Map<Method, Method> fallbackMethodMap;
private final Map<Method, Setter> setterMethodMap;
HystrixInvocationHandler(Target<?> target, Map<Method, MethodHandler> dispatch,
SetterFactory setterFactory, FallbackFactory<?> fallbackFactory) {
this.target = checkNotNull(target, "target");
this.dispatch = checkNotNull(dispatch, "dispatch");
this.fallbackFactory = fallbackFactory;
this.fallbackMethodMap = toFallbackMethod(dispatch);
this.setterMethodMap = toSetters(setterFactory, target, dispatch.keySet());
}
static Map<Method, Method> toFallbackMethod(Map<Method, MethodHandler> dispatch) {
Map<Method, Method> result = new LinkedHashMap<Method, Method>();
for (Method method : dispatch.keySet()) {
method.setAccessible(true);
result.put(method, method);
}
return result;
}
static Map<Method, Setter> toSetters(SetterFactory setterFactory,
Target<?> target,
Set<Method> methods) {
Map<Method, Setter> result = new LinkedHashMap<Method, Setter>();
for (Method method : methods) {
method.setAccessible(true);
result.put(method, setterFactory.create(target, method));
}
return result;
}
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args)
throws Throwable {
// 比较是否是equals方法
ReflectiveFeign.FeignInvocationHandler
if ("equals".equals(method.getName())) {
try {
Object otherHandler =
args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
return equals(otherHandler);
} catch (IllegalArgumentException e) {
return false;
}
// 是否是hashcode方法
} else if ("hashCode".equals(method.getName())) {
return hashCode();
// 是否是toString方法
} else if ("toString".equals(method.getName())) {
return toString();
}
// 创建HystrixCommand对象
HystrixCommand<Object> hystrixCommand =
new HystrixCommand<Object>(setterMethodMap.get(method)) {
@Override
protected Object run() throws Exception {
try {
// 调用方法执行器进行真正发送请求
return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
} catch (Exception e) {
throw e;
} catch (Throwable t) {
throw (Error) t;
}
}
// 构造Hystrix的Fallback
@Override
protected Object getFallback() {
if (fallbackFactory == null) {
return super.getFallback();
}
try {
Object fallback = fallbackFactory.create(getExecutionException());
Object result = fallbackMethodMap.get(method).invoke(fallback, args);
if (isReturnsHystrixCommand(method)) {
return ((HystrixCommand) result).execute();
} else if (isReturnsObservable(method)) {
// Create a cold Observable
return ((Observable) result).toBlocking().first();
} else if (isReturnsSingle(method)) {
// Create a cold Observable as a Single
return ((Single) result).toObservable().toBlocking().first();
} else if (isReturnsCompletable(method)) {
((Completable) result).await();
return null;
} else if (isReturnsCompletableFuture(method)) {
return ((Future) result).get();
} else {
return result;
}
} catch (IllegalAccessException e) {
// shouldn't happen as method is public due to being an interface
throw new AssertionError(e);
} catch (InvocationTargetException | ExecutionException e) {
// Exceptions on fallback are tossed by Hystrix
throw new AssertionError(e.getCause());
} catch (InterruptedException e) {
// Exceptions on fallback are tossed by Hystrix
Thread.currentThread().interrupt();
throw new AssertionError(e.getCause());
}
}
};
if (Util.isDefault(method)) {
return hystrixCommand.execute();
} else if (isReturnsHystrixCommand(method)) {
return hystrixCommand;
} else if (isReturnsObservable(method)) {
// Create a cold Observable
return hystrixCommand.toObservable();
} else if (isReturnsSingle(method)) {
// Create a cold Observable as a Single
return hystrixCommand.toObservable().toSingle();
} else if (isReturnsCompletable(method)) {
return hystrixCommand.toObservable().toCompletable();
} else if (isReturnsCompletableFuture(method)) {
return new ObservableCompletableFuture<>(hystrixCommand);
}
return hystrixCommand.execute();
}
private boolean isReturnsCompletable(Method method) {
return Completable.class.isAssignableFrom(method.getReturnType());
}
private boolean isReturnsHystrixCommand(Method method) {
return HystrixCommand.class.isAssignableFrom(method.getReturnType());
}
private boolean isReturnsObservable(Method method) {
return Observable.class.isAssignableFrom(method.getReturnType());
}
private boolean isReturnsCompletableFuture(Method method) {
return CompletableFuture.class.isAssignableFrom(method.getReturnType());
}
private boolean isReturnsSingle(Method method) {
return Single.class.isAssignableFrom(method.getReturnType());
}
@Override
public boolean equals(Object obj) {
if (obj instanceof HystrixInvocationHandler) {
HystrixInvocationHandler other = (HystrixInvocationHandler) obj;
return target.equals(other.target);
}
return false;
}
@Override
public int hashCode() {
return target.hashCode();
}
@Override
public String toString() {
return target.toString();
}
}
HystrixFeign
public final class HystrixFeign {
// 内部Builder构建起
public static Builder builder() {
return new Builder();
}
// 内部的Builder类继承了Feign.Builder的构造器
public static final class Builder extends Feign.Builder {
private Contract contract = new Contract.Default();
// 构造Hystrix的Setter的工厂类
private SetterFactory setterFactory = new SetterFactory.Default();
public Builder setterFactory(SetterFactory setterFactory) {
this.setterFactory = setterFactory;
return this;
}
public <T> T target(Target<T> target, T fallback) {
return build(fallback != null ? new FallbackFactory.Default<T>(fallback) : null)
.newInstance(target);
}
public <T> T target(Target<T> target, FallbackFactory<? extends T> fallbackFactory) {
return build(fallbackFactory).newInstance(target);
}
public <T> T target(Class<T> apiType, String url, T fallback) {
return target(new Target.HardCodedTarget<T>(apiType, url), fallback);
}
public <T> T target(Class<T> apiType,
String url,
FallbackFactory<? extends T> fallbackFactory) {
return target(new Target.HardCodedTarget<T>(apiType, url), fallbackFactory);
}
@Override
public Feign.Builder invocationHandlerFactory(InvocationHandlerFactory invocationHandlerFactory) {
throw new UnsupportedOperationException();
}
@Override
public Builder contract(Contract contract) {
this.contract = contract;
return this;
}
@Override
public Feign build() {
return build(null);
}
// Hystrix的build方法,生成最终执行器是哪一个
Feign build(final FallbackFactory<?> nullableFallbackFactory) {
super.invocationHandlerFactory(new InvocationHandlerFactory() {
@Override
public InvocationHandler create(Target target,
Map<Method, MethodHandler> dispatch) {
// 创建了一个Hystrix的处理器
return new HystrixInvocationHandler(target, dispatch, setterFactory,
nullableFallbackFactory);
}
});
super.contract(new HystrixDelegatingContract(contract));
return super.build();
}
……builder方法省略
}
}