16 Temmuz 2020 Perşembe

RxJava PublishSubject Sınıfı - En Basit Hot Observable

Giriş
Açıklaması şöyle
PublishSubject emits all the items at the point of subscription. This is the most basic form of Subject.
Örnek
Elimizde şöyle bir kod olsun
PublishSubject<Integer> pSubject = PublishSubject.create();
pSubject.onNext(0);


pSubject.subscribe(it -> System.out.println("Observer 1 onNext: " + it),
                  (Throwable error) -> { }, () -> {},
                  on1 -> System.out.println("Observer 1 onSubscribe"));

pSubject.onNext(1);
pSubject.onNext(2);


pSubject.subscribe(it -> System.out.println("Observer 2 onNext: " + it),
                  (Throwable error) -> { }, () -> {},
                  on1 -> System.out.println("Observer 2 onSubscribe"));

pSubject.onNext(3);
pSubject.onNext(4);
Çıktı olarak şunu alırız. Abone olmadığı için 0 değeri yazılmıyor. 2 numaralı abone gelince de 1 ve 2 değerlerini görmüyor.
Observer 1 onSubscribe
Observer 1 onNext: 1
Observer 1 onNext: 2
Observer 2 onSubscribe
Observer 1 onNext: 3
Observer 2 onNext: 3
Observer 1 onNext: 4
Observer 2 onNext: 4
Backpressure Desteği
Hot Observable olduğu için kendiliğinden backpressure desteklemez. Bunu kodla yapmak gerekir. Açıklaması şöyle
A hot Observable begins generating items and emits them immediately when they are created. It is contrary to a Cold Observables pull model of processing. Hot Observable emits items at its own pace, and it is up to its observers to keep up.
back pressure için buffer(),window(),sample(), throttle(), onBackPressureBuffer(),onBackPressureDrop() gibi metodlar kullanılabilir

buffer metodu
Şöyle yaparız
PublishSubject<Integer> source = PublishSubject.<Integer>create();
        
source.buffer(1024)
  .observeOn(Schedulers.computation())
  .subscribe(ComputeFunction::compute, Throwable::printStackTrace);
onBackPressureBuffer metodu
Şöylee yaparız
Observable.range(1, 1_000_000)
  .onBackpressureBuffer(16, () -> {}, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST)
  .observeOn(Schedulers.computation())
  .subscribe(e -> {}, Throwable::printStackTrace);
debounce metodu
Örnek
Elimizde şöyle bir kod olsun
public class RxClickObservable {

  public static Observable<String> fromView(View view, String pokemonName) {

    final PublishSubject<String> subject = PublishSubject.create();

    view.setOnClickListener(new View.OnClickListener() {
      @Override
      public void onClick(View v) {
        subject.onNext(pokemonName);
      }
    });

    return subject;
  }

}
Şöyle yaparız
RxClickObservable.fromView(binding.button, "pokemonName")
  .subscribeOn(pokemonSchedulers.background())
  .observeOn(pokemonSchedulers.ui())
  .debounce(300, TimeUnit.MILLISECONDS)
  .pokemonDetailInteractor.getPokemonDetailByName(name)
   .subscribe(... );

Hiç yorum yok:

Yorum Gönder