4 Haziran 2021 Cuma

RxJava Observable.create metodu - Vertx İle Birlikte Kullanılabilir

Giriş
ObservableOnSubscribe nesnesi alır. Bu bir emitter'dır. İş bitince emitter nesnesinin onNext() veya onError() metodu çağrılır. İşin bittiğini belirtmek için de onComplete() çağrılır.

Örnek
Şöyle yaparız.
Observable.create(new ObservableOnSubscribe<Integer>() {
  @Override
  public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
    if (!observableEmitter.isDisposed())
      observableEmitter.onComplete();
  }
}).subscribe (...);
Örnek
Şöyle yaparız
Vertx vertx = Vertx.vertx();
WebClient webClient = WebClient.create(vertx);

Observable<Object> google = hitURL("www.google.com", webClient);
Observable<Object> yahoo = hitURL("www.yahoo.com", webClient);

for (int i = 0; i < 100; i++) {
  google.repeat(100).subscribe(timeTaken -> {
    if ((Long) timeTaken > 10000) {
      System.out.println(timeTaken);
    }
  }, error -> {System.out.println(error.getMessage());});
  yahoo.repeat(100).subscribe(timeTaken -> {
    if ((Long) timeTaken > 10000) {
      System.out.println(timeTaken);
    }
  }, error -> {System.out.println(error.getMessage());});
  }
}

public static Observable<Object> hitURL(String url, WebClient webClient) {
  return Observable.create(emitter -> {
    Long l1 = System.currentTimeMillis();
    webClient.get(80, url, "").send(ar -> {
      if (ar.succeeded()) {
        Long elapsedTime = (System.currentTimeMillis() - l1);
        emitter.onNext(elapsedTime);
      } else {
        emitter.onError(ar.cause());
      }
      emitter.onComplete();
    });
  });
}

Hiç yorum yok:

Yorum Gönder