27 Mart 2018 Salı

RxJava Observable Sınıfı

Giriş
RxJava, ReactiveX kavramının Java gerçekleştirimi. Şu satırı dahil ederiz.
<dependency>
  <groupId>io.reactivex</groupId>
  <artifactId>rxjava</artifactId>
  <version>1.3.2</version>
</dependency>
Gradle ile şöyle yaparız.
dependencies {
  ...
  compile 'io.reactivex:rxandroid:1.2.1'
  compile 'io.reactivex:rxjava:1.2.1'
}
ReactiveStreams (RxJava 2.0) için şöyle yaparız.
compile('io.reactivex:rxjava-reactive-streams')
Olayı basit tutmak için RxJava'nın amacını asenkron bir işlemi gerçekleştirip bunun sonucunu UI'da göstermek diye düşünelim. Klasik programlama dillerinde asenkron işlem tüm cevabı hazırlayıp gönderir.

RxJava ile cevap peyderpey gönderilebilir. Böylece UI ara ara güncellenerek en son durum görüntülebilir.


Back Pressure
Back Pressure ile ilgili detaylı örnekler burada.

Non-Blocking Back Pressure
ReactiveX ise Reactive Manifesto ile ilgili. Reactive Manifesto'nun bir amacı şunu sunmak.
"a standard for asynchronous stream processing with non-blocking back pressure"
Burada back-pressure kavramını haberleşme protokollerindeki "flow control" kavramı ile aynı şeymiş gibi düşünmek başlamayı kolaylaştırıyor.

Producer'ın hızına yetişemeyen Consumer , bir şekilde Producer'a yavaşlaması gerektiğini bildirir.

"Blocking back-pressure" yönteminde Producer bekler. Asenkron çalışmada Producer'ı bekletme şansımız yok. Dolayısıyla "Non-blocking back-pressure" yöntemi kullanılıyor.

"Non-blocking back-pressure" yönteminde Consumer hazır olduğunda Producer'a kaç tane nesne istediğini söyler. Yani push yöntemi yerine pull yöntemi kullanılır. Producer da elindekileri gönderir.

Java 8 Stream İle Farkı Nedir?
Açıklaması şöyle.
Java 8 Streams are pull based. You iterate over a java 8 stream consuming each item. And it could be an endless stream.

RXJava Observable is by default push based. You subscribe to an Observable and you will get notified when the next item arrives (onNext), or when the stream is completed (onCompleted), or when an error occured (onError).
Observable Benzeri Sınıflar
Single,Completable, Maybe gibi benzer metodları sunan sınıflar da var.

buffer metodu
Satırları 4'er okumak için şöyle yaparız.
Stream<String> stream = Files.lines(Paths.get("file.txt"))

Observable.fromIterable(stream::iterator)
          .buffer(4)                      // Observable<List<String>>
          .map(x -> String.join(", ", x)) // Observable<String>
          .forEach(System.out::println);
create metodu
Şöyle yaparız.
Observable.create(new ObservableOnSubscribe<Integer>() {
  @Override
  public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
    if (!observableEmitter.isDisposed())
      observableEmitter.onComplete();
  }
}).subscribe (...);
doOnNext metodu
Şöyle yaparız.
Observable.just(new Object())
  .doOnNext(o -> { /* /* This method catches the on next but doesn't consume it. */})
  .doOnComplete(() -> { /* test */})
  .doOnError(throwable -> {/* This method catches the error but doesn't consume it. */})
  .subscribe(o ->
             {/* success */}
             ,
             throwable -> {/* error */} // here error will be consumed at the end
  );
Şöyle yaparız.
Observable.just(new Object())
  .doOnNext(o -> { /* /* This method catches the on next but doesn't consume it. */})
  .doOnComplete(() -> { /* test */})
  .doOnError(throwable -> {/* This method catches the error but doesn't consume it. */})
  .subscribe(
    new DefaultObserver<Object>() {
      @Override
      public void onNext(Object o) {

      }

      @Override
      public void onError(Throwable e) {
        // here error will be consumed at the end
      }

      @Override
      public void onComplete() {

      }
    }
  );
defer metodu
İmzası şöyle.
public final static <T> Observable<T> defer(Func0<Observable<T>> observableFactory)...
Açıklaması şöyle.
Returns an Observable that calls an Observable factory to create an Observable for each new Observer that subscribes. That is, for each subscriber, the actual Observable that subscriber observes is determined by the factory function.
Bir abone geldiğinde defer() metoduna verilen Func0 çağrılır. Func0'in imzası şöyle.
public interface Func0<R> extends Function, Callable<R> {
    @Override
    public R call();
}
Kolay olsun diye şöyle düşünülebilir.
interface Func0<R> {
    R call();
}
Şöyle yaparız.
Observable.defer(new Func0<Observable<String>>() {
  @Override
  public Observable<String> call() {
    try {
      return Observable.just(Database.readValue());       
    }
    catch(IOException e) {
      return Observable.error(e);     
    }   
  })
  .subscribe(new Action1<String>() {
    @Override
    public void call(String result) {
      resultTextView.setText(result);     
    }   
  }
}
from metodu
Örnek ver.

fromIterable metodu
Şöyle yaparız.
List<Form> form = ...;
Observable.fromIterable(form)
.concatMapIterable(Form::getFormVersions) // get form version list from single form object
.doOnSubscribe(disposable -> AppLogger.i(tag, "Form Versions download is subscribed"))
.filter(this::checkFormVersionToDownloadOrNot) //get those versions to be downloaded
.doOnNext(formVersion -> { // get formVersion object

  AppLogger.i(tag, "download this form ---------> " + formVersion.getFormUrl());
  AppLogger.i(tag, formVersion.getFormUrl());
  String constructURL = formVersion.getFormUrl();
  /* download form gzip process starts from here */
  ...
})
.doOnTerminate(() -> AppLogger.i(tag, "Form Versions terminated"))
.doOnError(Throwable::printStackTrace)
.subscribe(
  formVersion -> AppLogger.i(tag, "Form Versions download completed"),
  throwable -> AppLogger.e(tag, throwable.getMessage(), throwable)
 );
just metodu
Örnek
Tek bir nesne dönmek için şöyle yaparız.
Observable.just("item").subscribe(
  new Observer<String>() {
  @Override
  public void onSubscribe(Disposable d) {
   ...
  }

  @Override
  public void onNext(String s) {
    ...
  }

  @Override
  public void onError(Throwable e) {
    ...
  }

  @Override
  public void onComplete() {
    ...
  }
})
Örnek
Liste kullanmak için şöyle yaparız.
List<Foo> list = ...;
Observable.just(list).flatMapIterable(i -> i).groupBy(f -> f.Id)
Örnek
Dizi kullanmak için şöyle yaparız.
String[] strings = {"Hello", "World"};
Observable<String[]> stringsObservable = Observable.just(strings);
range metodu
Elimizde şöyle bir sınıf olsun.
public class MyClass {
  private int number;

  public MyClass(int number) {
    super();
    this.number = number;
  }

  public int getNumber() {
    return number;
  }

}
Bu sınıftan 50 tane farklı sayı ile ilklendirmek için şöyle yaparız.
 Observable.range(1, 10).map(number -> new MyClass(number)).toList()
  .subscribe(myClasses -> printData(myClasses));
Nesne listesi oluşturulduktan sonra şu metod çağrılır.
static void printData(List<MyClass> mydata) {
  mydata.stream().forEach(myClass -> System.out.println(myClass.getNumber()));
}
subscribe metodu
İmzası şöyle.
public final Subscription subscribe(final Action1<? super T> onNext)...
Açıklaması şöyle.
Subscribes to an Observable and provides a callback to handle the items it emits.
Action1'in imzası şöyle.
public interface Action1<T1> extends Action {
  public void call(T1 t1);
}
Kolay olsun diye şöyle düşünülebilir.
interface Action1<T> {
    void call(T t);
}
Örnek
Şöyle yaparız.
Observable.just(1).subscribe(new Observer<Integer>() {

  Disposable disposable;

  @Override
  public void onSubscribe(Disposable disposable) {
    System.out.println("Subscribed");
    this.disposable = disposable;
  }

  @Override
  public void onNext(Integer integer) {
    System.out.println(integer);
    System.out.println(disposable.isDisposed());
  }

  @Override
  public void onError(Throwable throwable) {
    System.out.println("Error");
    System.out.println(disposable.isDisposed());
  }

  @Override
  public void onComplete() {
    System.out.println("Complete");
    System.out.println(disposable.isDisposed());
  }
})
Örnek
Bu örnekte Observer yerine Subscriber kullanılıyor. Subscriber da Observer arayüzünden kalıtır.
public abstract class Subscriber<T> implements Observer<T>, Subscription
Şöyle yaparız.
Subscription subscription =  Observable.subscribe(new Subscriber<Type>() {
  @Override
  public void onCompleted() {
  }

  @Override
  public void onError(Throwable e) {
  }

  @Override
  public void onNext(String responseString) {
  }
});
subscribeOn metodu
Observable ve Threading yazsına taşıdım.


Hiç yorum yok:

Yorum Gönder