Feign-Hystrix

王守钰 2020-05-26 11:05:40

Feign-Hystrix

GitHub地址:https://github.com/OpenFeign/feign/tree/master/hystrix

HelloWorld

interface Organization{
    @RequestLine("GET /all")
    String all();
}

@Test
public void test(){
    Organization org = HystrixFeign
            .builder()
            .target(Organization.class, "http://localhost:9100/system/organization");
    String all = org.all();
    log.info("organizations:{}", all);
}

正常运行结果

11:13:24.507 [main] INFO com.wangshouyu.feign.FeignHystrixTest - organizations:{"data":[,{"id":1,"parentId":0,"code":"root","name":"京东汽车商城","orderNum":0,"description":"京东汽车商城","parentIds":"","isEnabled":true}],"code":200,"msg":null}

模拟超时返回结果

com.netflix.hystrix.exception.HystrixRuntimeException: Organization#all() timed-out and no fallback available.

	at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:822)
	at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:807)
	at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140)
	at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
	at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
	at com.netflix.hystrix.AbstractCommand$DeprecatedOnFallbackHookApplication$1.onError(AbstractCommand.java:1472)
	at com.netflix.hystrix.AbstractCommand$FallbackHookApplication$1.onError(AbstractCommand.java:1397)
	at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
	at rx.observers.Subscribers$5.onError(Subscribers.java:230)
	at rx.internal.operators.OnSubscribeThrow.call(OnSubscribeThrow.java:44)
	at rx.internal.operators.OnSubscribeThrow.call(OnSubscribeThrow.java:28)
	at rx.Observable.unsafeSubscribe(Observable.java:10327)
	at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
	at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
	at rx.Observable.unsafeSubscribe(Observable.java:10327)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.Observable.unsafeSubscribe(Observable.java:10327)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
	at rx.Observable.unsafeSubscribe(Observable.java:10327)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.Observable.unsafeSubscribe(Observable.java:10327)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.Observable.unsafeSubscribe(Observable.java:10327)
	at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:142)
	at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
	at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
	at com.netflix.hystrix.AbstractCommand$HystrixObservableTimeoutOperator$1.run(AbstractCommand.java:1142)
	at com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable$1.call(HystrixContextRunnable.java:41)
	at com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable$1.call(HystrixContextRunnable.java:37)
	at com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable.run(HystrixContextRunnable.java:57)
	at com.netflix.hystrix.AbstractCommand$HystrixObservableTimeoutOperator$2.tick(AbstractCommand.java:1159)
	at com.netflix.hystrix.util.HystrixTimer$1.run(HystrixTimer.java:99)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException
	at com.netflix.hystrix.AbstractCommand.handleTimeoutViaFallback(AbstractCommand.java:997)
	at com.netflix.hystrix.AbstractCommand.access$500(AbstractCommand.java:60)
	at com.netflix.hystrix.AbstractCommand$12.call(AbstractCommand.java:609)
	at com.netflix.hystrix.AbstractCommand$12.call(AbstractCommand.java:601)
	at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140)
	... 16 more

降级处理

Organization fallback = () -> {
    return "fallback";
};
@Test
public void test(){
    Organization org = HystrixFeign
            .builder()
            .target(Organization.class, "http://localhost:9100/system/organization", fallback);
    String all = org.all();
    log.info("organizations:{}", all);
}

执行结果

11:19:48.310 [main] INFO com.wangshouyu.feign.FeignHystrixTest - organizations:fallback

FallbackFactory方式处理

FallbackFactory<Organization> fallbackFactory = cause -> () -> {
    return "fallbackFactory";
};

@Test
public void test(){
    Organization org = HystrixFeign
            .builder()
            .target(Organization.class, "http://localhost:9100/system/organization", fallbackFactory);
    String all = org.all();
    log.info("organizations:{}", all);
}

执行结果

11:22:23.617 [main] INFO com.wangshouyu.feign.FeignHystrixTest - organizations:fallbackFactory

SetterFactory配置

class MySetterFactory implements SetterFactory{

    @Override
    public HystrixCommand.Setter create(Target<?> target, Method method) {
        String groupKey = target.name();
        String commandKey = Feign.configKey(target.type(), method);
        return HystrixCommand.Setter
                .withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
                .andCommandKey(HystrixCommandKey.Factory.asKey(commandKey))
                .andCommandPropertiesDefaults(HystrixCommandProperties.defaultSetter()
                        .withExecutionTimeoutInMilliseconds(30 * 1000));
    }
}

@Test
public void test(){
    Organization org = HystrixFeign
            .builder()
            .setterFactory(new MySetterFactory())
            .target(Organization.class, "http://localhost:9100/system/organization", fallbackFactory);
    String all = org.all();
    log.info("organizations:{}", all);
}

实现原理

Feign的实现原理是通过动态代理最终生成一个Http请求来进行处理。那么HystrixFeign的原理也是一样,只不过是在InvocationHandler中加入了HystrixCommand来进行执行,这样的话就会直接融入了Hystrix。

final class HystrixInvocationHandler implements InvocationHandler {

  // FeignTarget包装对象
  private final Target<?> target;
  // 方法执行器
  private final Map<Method, MethodHandler> dispatch;
  // 执行失败工厂
  private final FallbackFactory<?> fallbackFactory; // Nullable
  private final Map<Method, Method> fallbackMethodMap;
  private final Map<Method, Setter> setterMethodMap;

  HystrixInvocationHandler(Target<?> target, Map<Method, MethodHandler> dispatch,
      SetterFactory setterFactory, FallbackFactory<?> fallbackFactory) {
    this.target = checkNotNull(target, "target");
    this.dispatch = checkNotNull(dispatch, "dispatch");
    this.fallbackFactory = fallbackFactory;
    this.fallbackMethodMap = toFallbackMethod(dispatch);
    this.setterMethodMap = toSetters(setterFactory, target, dispatch.keySet());
  }


  static Map<Method, Method> toFallbackMethod(Map<Method, MethodHandler> dispatch) {
    Map<Method, Method> result = new LinkedHashMap<Method, Method>();
    for (Method method : dispatch.keySet()) {
      method.setAccessible(true);
      result.put(method, method);
    }
    return result;
  }

  static Map<Method, Setter> toSetters(SetterFactory setterFactory,
                                       Target<?> target,
                                       Set<Method> methods) {
    Map<Method, Setter> result = new LinkedHashMap<Method, Setter>();
    for (Method method : methods) {
      method.setAccessible(true);
      result.put(method, setterFactory.create(target, method));
    }
    return result;
  }

  @Override
  public Object invoke(final Object proxy, final Method method, final Object[] args)
      throws Throwable {
    // 比较是否是equals方法
    ReflectiveFeign.FeignInvocationHandler
    if ("equals".equals(method.getName())) {
      try {
        Object otherHandler =
            args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
        return equals(otherHandler);
      } catch (IllegalArgumentException e) {
        return false;
      }
    // 是否是hashcode方法
    } else if ("hashCode".equals(method.getName())) {
      return hashCode();
    // 是否是toString方法
    } else if ("toString".equals(method.getName())) {
      return toString();
    }
    // 创建HystrixCommand对象
    HystrixCommand<Object> hystrixCommand =
        new HystrixCommand<Object>(setterMethodMap.get(method)) {
          @Override
          protected Object run() throws Exception {
            try {
              // 调用方法执行器进行真正发送请求
              return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
            } catch (Exception e) {
              throw e;
            } catch (Throwable t) {
              throw (Error) t;
            }
          }

          // 构造Hystrix的Fallback
          @Override
          protected Object getFallback() {
            if (fallbackFactory == null) {
              return super.getFallback();
            }
            try {
              Object fallback = fallbackFactory.create(getExecutionException());
              Object result = fallbackMethodMap.get(method).invoke(fallback, args);
              if (isReturnsHystrixCommand(method)) {
                return ((HystrixCommand) result).execute();
              } else if (isReturnsObservable(method)) {
                // Create a cold Observable
                return ((Observable) result).toBlocking().first();
              } else if (isReturnsSingle(method)) {
                // Create a cold Observable as a Single
                return ((Single) result).toObservable().toBlocking().first();
              } else if (isReturnsCompletable(method)) {
                ((Completable) result).await();
                return null;
              } else if (isReturnsCompletableFuture(method)) {
                return ((Future) result).get();
              } else {
                return result;
              }
            } catch (IllegalAccessException e) {
              // shouldn't happen as method is public due to being an interface
              throw new AssertionError(e);
            } catch (InvocationTargetException | ExecutionException e) {
              // Exceptions on fallback are tossed by Hystrix
              throw new AssertionError(e.getCause());
            } catch (InterruptedException e) {
              // Exceptions on fallback are tossed by Hystrix
              Thread.currentThread().interrupt();
              throw new AssertionError(e.getCause());
            }
          }
        };

    if (Util.isDefault(method)) {
      return hystrixCommand.execute();
    } else if (isReturnsHystrixCommand(method)) {
      return hystrixCommand;
    } else if (isReturnsObservable(method)) {
      // Create a cold Observable
      return hystrixCommand.toObservable();
    } else if (isReturnsSingle(method)) {
      // Create a cold Observable as a Single
      return hystrixCommand.toObservable().toSingle();
    } else if (isReturnsCompletable(method)) {
      return hystrixCommand.toObservable().toCompletable();
    } else if (isReturnsCompletableFuture(method)) {
      return new ObservableCompletableFuture<>(hystrixCommand);
    }
    return hystrixCommand.execute();
  }

  private boolean isReturnsCompletable(Method method) {
    return Completable.class.isAssignableFrom(method.getReturnType());
  }

  private boolean isReturnsHystrixCommand(Method method) {
    return HystrixCommand.class.isAssignableFrom(method.getReturnType());
  }

  private boolean isReturnsObservable(Method method) {
    return Observable.class.isAssignableFrom(method.getReturnType());
  }

  private boolean isReturnsCompletableFuture(Method method) {
    return CompletableFuture.class.isAssignableFrom(method.getReturnType());
  }

  private boolean isReturnsSingle(Method method) {
    return Single.class.isAssignableFrom(method.getReturnType());
  }

  @Override
  public boolean equals(Object obj) {
    if (obj instanceof HystrixInvocationHandler) {
      HystrixInvocationHandler other = (HystrixInvocationHandler) obj;
      return target.equals(other.target);
    }
    return false;
  }

  @Override
  public int hashCode() {
    return target.hashCode();
  }

  @Override
  public String toString() {
    return target.toString();
  }
}

HystrixFeign

public final class HystrixFeign {

  // 内部Builder构建起
  public static Builder builder() {
    return new Builder();
  }

  // 内部的Builder类继承了Feign.Builder的构造器
  public static final class Builder extends Feign.Builder {

    private Contract contract = new Contract.Default();
    // 构造Hystrix的Setter的工厂类
    private SetterFactory setterFactory = new SetterFactory.Default();

    public Builder setterFactory(SetterFactory setterFactory) {
      this.setterFactory = setterFactory;
      return this;
    }


    public <T> T target(Target<T> target, T fallback) {
      return build(fallback != null ? new FallbackFactory.Default<T>(fallback) : null)
          .newInstance(target);
    }

    public <T> T target(Target<T> target, FallbackFactory<? extends T> fallbackFactory) {
      return build(fallbackFactory).newInstance(target);
    }


    public <T> T target(Class<T> apiType, String url, T fallback) {
      return target(new Target.HardCodedTarget<T>(apiType, url), fallback);
    }

    public <T> T target(Class<T> apiType,
                        String url,
                        FallbackFactory<? extends T> fallbackFactory) {
      return target(new Target.HardCodedTarget<T>(apiType, url), fallbackFactory);
    }

    @Override
    public Feign.Builder invocationHandlerFactory(InvocationHandlerFactory invocationHandlerFactory) {
      throw new UnsupportedOperationException();
    }

    @Override
    public Builder contract(Contract contract) {
      this.contract = contract;
      return this;
    }

    @Override
    public Feign build() {
      return build(null);
    }

    // Hystrix的build方法,生成最终执行器是哪一个
    Feign build(final FallbackFactory<?> nullableFallbackFactory) {
      super.invocationHandlerFactory(new InvocationHandlerFactory() {
        @Override
        public InvocationHandler create(Target target,
                                        Map<Method, MethodHandler> dispatch) {
          // 创建了一个Hystrix的处理器
          return new HystrixInvocationHandler(target, dispatch, setterFactory,
              nullableFallbackFactory);
        }
      });
      super.contract(new HystrixDelegatingContract(contract));
      return super.build();
    }

    ……builder方法省略
  }
}